Athena with AWS
Create implementation athena AWS
CODE:
export class AthenaAWSImpl {
private readonly logger: any;
private readonly athena: AWS.Athena;
constructor(logger: any, session: {
accessKeyId: string;
secretAccessKey: string;
sessionToken: string;
} | undefined) {
this.logger = logger;
this.athena = new AWS.Athena({
apiVersion: '2017-05-18',
region: Constants.AWS_REGION,
credentials: session,
});
}
/**
* https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Athena.html
*/
async getTableMetadata(input: { catalogName: string, databaseName: string, tableName: string }): Promise {
try {
const params = {
CatalogName: input.catalogName,
DatabaseName: input.databaseName,
TableName: input.tableName,
};
const result = await this.athena.getTableMetadata(params).promise();
return result;
} catch (error) {
throw error;
}
}
/**
* https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Athena.html
*/
async startQueryExecution(input: AWS.Athena.StartQueryExecutionInput): Promise {
try {
const result = await this.athena.startQueryExecution(input).promise()
return result;
} catch (error) {
throw error;
}
}
async getQueryExecution(input: AWS.Athena.GetQueryExecutionInput): Promise {
try {
const result = await this.athena.getQueryExecution(input).promise();
return result;
} catch (error) {
throw error;
}
}
async getQueryResults(input: AWS.Athena.GetQueryResultsInput): Promise {
try {
const result = await this.athena.getQueryResults(input).promise();
return result;
} catch (error) {
throw error;
}
}
private sleep(ms: number) {
return new Promise((resolve) => setTimeout(() => resolve("OK"), ms));
}
async getResultsAll(QueryExecutionId: string, maxResults: number): Promise {
//, waitForResult: boolean
try {
while (true) {
const queryExecution = await this.getQueryExecution({
QueryExecutionId: QueryExecutionId
});
const status: AWS.Athena.Types.QueryExecutionState | undefined = queryExecution.QueryExecution?.Status?.State;
if (status === "FAILED") {
throw new Error(queryExecution.QueryExecution?.Status?.StateChangeReason);
}
if (status === "CANCELLED") {
throw new Error('Query has been cancelled');
}
if (status === "SUCCEEDED") {
const allRows: AWS.Athena.Types.Row[] = [];
let nextToken: any = undefined;
let columnInfo: AWS.Athena.Types.ColumnInfoList = [];
do {
const results: AWS.Athena.Types.GetQueryResultsOutput | undefined = await this.getQueryResults({
QueryExecutionId: QueryExecutionId,
MaxResults: maxResults,
NextToken: nextToken,
});
nextToken = results.NextToken;
const rows: AWS.Athena.Types.RowList = results.ResultSet?.Rows || [];
//columnInfo = results.ResultSet?.ResultSetMetadata?.ColumnInfo || [];
//Option 1
/** const header = columnInfo?.map((item: AWS.Athena.Types.ColumnInfo) => { return item.Name });
* const firstItem = lodash.head(rows)?.Data || [];
* const topRow = firstItem!.map((data) => { return data.VarCharValue });
* const resultSet = lodash.difference(header, topRow).length > 0 ? rows : lodash.drop(rows);
* resultSet.forEach((item) => allRows.push(lodash.zipObject(header, lodash.map(item.Data, (n) => n.VarCharValue))));
*/
//Option 2
//const [_resultSetMetadata, ...rows] = results.ResultSet?.Rows!;
//allRows.push(...(allRows.length ? results.ResultSet?.Rows! : rows));
if (Utils.isEmpty(columnInfo)) {
//columnInfo = /SHOW COLUMNS/.test("query") ? [{ Name: 'column' }] : results.ResultSet?.ResultSetMetadata?.ColumnInfo!;
columnInfo = results.ResultSet?.ResultSetMetadata?.ColumnInfo || [];
}
allRows.push(...rows);
} while (!Utils.isEmpty(nextToken));
return allRows.map(r => columnInfo.map((c, i) => ({ [c.Name]: r.Data![i].VarCharValue })).reduce((a, b) => ({ ...a, ...b }), {}));
}
await this.sleep(500);
}
} catch (error) {
throw error;
}
}
async getResultsByLimit(QueryExecutionId: string, maxResults: number, nextToken: string | undefined): Promise<{ nextToken: string | undefined; results: any[]; }> {
//, waitForResult: boolean
try {
while (true) {
const queryExecution = await this.getQueryExecution({
QueryExecutionId: QueryExecutionId
});
const status: AWS.Athena.Types.QueryExecutionState | undefined = queryExecution.QueryExecution?.Status?.State;
if (status === "FAILED") {
throw new Error(queryExecution.QueryExecution?.Status?.StateChangeReason);
}
if (status === "CANCELLED") {
throw new Error('Query has been cancelled');
}
if (status === "SUCCEEDED") {
let nextToken1: string | undefined = nextToken;
let columnInfo: AWS.Athena.Types.ColumnInfoList = [];
const results: AWS.Athena.Types.GetQueryResultsOutput | undefined = await this.getQueryResults({
QueryExecutionId: QueryExecutionId,
MaxResults: maxResults,
NextToken: nextToken1,
});
nextToken1 = results.NextToken;
const rows: AWS.Athena.Types.RowList = results.ResultSet?.Rows || [];
columnInfo = results.ResultSet?.ResultSetMetadata?.ColumnInfo || [];
return {
nextToken: nextToken1,
results: rows.map(r => columnInfo.map((c, i) => ({ [c.Name]: r.Data![i].VarCharValue })).reduce((a, b) => ({ ...a, ...b }), {})),
}
}
await this.sleep(500);
}
} catch (error) {
throw error;
}
}
}
Comentarios
Publicar un comentario