Athena with AWS

Create implementation athena AWS

CODE:

      
        export class AthenaAWSImpl {
            private readonly logger: any;
            private readonly athena: AWS.Athena;
            constructor(logger: any, session: {
                accessKeyId: string;
                secretAccessKey: string;
                sessionToken: string;
            } | undefined) {
                this.logger = logger;
                this.athena = new AWS.Athena({
                    apiVersion: '2017-05-18',
                    region: Constants.AWS_REGION,
                    credentials: session,
                });
            }

            /**
             * https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Athena.html
             */
            async getTableMetadata(input: { catalogName: string, databaseName: string, tableName: string }): Promise {
                try {
                    const params = {
                        CatalogName: input.catalogName,
                        DatabaseName: input.databaseName,
                        TableName: input.tableName,
                    };
                    const result = await this.athena.getTableMetadata(params).promise();
                    return result;
                } catch (error) {
                    throw error;
                }
            }

            /**
             * https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Athena.html
             */
            async startQueryExecution(input: AWS.Athena.StartQueryExecutionInput): Promise {
                try {
                    const result = await this.athena.startQueryExecution(input).promise()
                    return result;
                } catch (error) {
                    throw error;
                }
            }

            async getQueryExecution(input: AWS.Athena.GetQueryExecutionInput): Promise {
                try {
                    const result = await this.athena.getQueryExecution(input).promise();
                    return result;
                } catch (error) {
                    throw error;
                }
            }

            async getQueryResults(input: AWS.Athena.GetQueryResultsInput): Promise {
                try {
                    const result = await this.athena.getQueryResults(input).promise();
                    return result;
                } catch (error) {
                    throw error;
                }
            }

            private sleep(ms: number) {
                return new Promise((resolve) => setTimeout(() => resolve("OK"), ms));
            }

            async getResultsAll(QueryExecutionId: string, maxResults: number): Promise {
                //, waitForResult: boolean
                try {
                    while (true) {
                        const queryExecution = await this.getQueryExecution({
                            QueryExecutionId: QueryExecutionId
                        });
                        const status: AWS.Athena.Types.QueryExecutionState | undefined = queryExecution.QueryExecution?.Status?.State;
                        if (status === "FAILED") {
                            throw new Error(queryExecution.QueryExecution?.Status?.StateChangeReason);
                        }
                        if (status === "CANCELLED") {
                            throw new Error('Query has been cancelled');
                        }
                        if (status === "SUCCEEDED") {
                            const allRows: AWS.Athena.Types.Row[] = [];
                            let nextToken: any = undefined;
                            let columnInfo: AWS.Athena.Types.ColumnInfoList = [];
                            do {
                                const results: AWS.Athena.Types.GetQueryResultsOutput | undefined = await this.getQueryResults({
                                    QueryExecutionId: QueryExecutionId,
                                    MaxResults: maxResults,
                                    NextToken: nextToken,
                                });
                                nextToken = results.NextToken;
                                const rows: AWS.Athena.Types.RowList = results.ResultSet?.Rows || [];
                                //columnInfo = results.ResultSet?.ResultSetMetadata?.ColumnInfo || [];
                                //Option 1
                                /** const header = columnInfo?.map((item: AWS.Athena.Types.ColumnInfo) => { return item.Name });
                                * const firstItem = lodash.head(rows)?.Data || [];
                                * const topRow = firstItem!.map((data) => { return data.VarCharValue });
                                * const resultSet = lodash.difference(header, topRow).length > 0 ? rows : lodash.drop(rows);
                                * resultSet.forEach((item) => allRows.push(lodash.zipObject(header, lodash.map(item.Data, (n) => n.VarCharValue))));
                                */
                                //Option 2
                                //const [_resultSetMetadata, ...rows] = results.ResultSet?.Rows!;
                                //allRows.push(...(allRows.length ? results.ResultSet?.Rows! : rows));
                                if (Utils.isEmpty(columnInfo)) {
                                    //columnInfo = /SHOW COLUMNS/.test("query") ? [{ Name: 'column' }] : results.ResultSet?.ResultSetMetadata?.ColumnInfo!;
                                    columnInfo = results.ResultSet?.ResultSetMetadata?.ColumnInfo || [];
                                }
                                allRows.push(...rows);
                            } while (!Utils.isEmpty(nextToken));
                            return allRows.map(r => columnInfo.map((c, i) => ({ [c.Name]: r.Data![i].VarCharValue })).reduce((a, b) => ({ ...a, ...b }), {}));
                        }
                        await this.sleep(500);
                    }
                } catch (error) {
                    throw error;
                }
            }

            async getResultsByLimit(QueryExecutionId: string, maxResults: number, nextToken: string | undefined): Promise<{ nextToken: string | undefined; results: any[]; }> {
                //, waitForResult: boolean
                try {
                    while (true) {
                        const queryExecution = await this.getQueryExecution({
                            QueryExecutionId: QueryExecutionId
                        });
                        const status: AWS.Athena.Types.QueryExecutionState | undefined = queryExecution.QueryExecution?.Status?.State;
                        if (status === "FAILED") {
                            throw new Error(queryExecution.QueryExecution?.Status?.StateChangeReason);
                        }
                        if (status === "CANCELLED") {
                            throw new Error('Query has been cancelled');
                        }
                        if (status === "SUCCEEDED") {
                            let nextToken1: string | undefined = nextToken;
                            let columnInfo: AWS.Athena.Types.ColumnInfoList = [];
                            const results: AWS.Athena.Types.GetQueryResultsOutput | undefined = await this.getQueryResults({
                                QueryExecutionId: QueryExecutionId,
                                MaxResults: maxResults,
                                NextToken: nextToken1,
                            });
                            nextToken1 = results.NextToken;
                            const rows: AWS.Athena.Types.RowList = results.ResultSet?.Rows || [];
                            columnInfo = results.ResultSet?.ResultSetMetadata?.ColumnInfo || [];
                            return {
                                nextToken: nextToken1,
                                results: rows.map(r => columnInfo.map((c, i) => ({ [c.Name]: r.Data![i].VarCharValue })).reduce((a, b) => ({ ...a, ...b }), {})),
                            }
                        }
                        await this.sleep(500);
                    }
                } catch (error) {
                    throw error;
                }
            }
        }
	
   

Comentarios

Entradas más populares de este blog

Ejemplo Log4j 2 en JAVA | Log4j 2 en Springboot | Configuración Log4j 2 | Log4j 2 in SpringBoot| Example Log4j 2 in SpringBoot | Configuring Log4j 2

Python: Inyección de dependencias

GOlang con Docker | GOlang with Docker | GO con Docker | GO with Docker