eladkal commented on code in PR #35090: URL: https://github.com/apache/airflow/pull/35090#discussion_r1367768708
########## airflow/providers/amazon/aws/operators/athena.py: ########## @@ -163,3 +167,132 @@ def on_kill(self) -> None: "Polling Athena for query with id %s to reach final state", self.query_execution_id ) self.hook.poll_query_status(self.query_execution_id, sleep_time=self.sleep_time) + + def get_openlineage_facets_on_start(self): + from openlineage.client.facet import ExtractionError, ExtractionErrorRunFacet, SqlJobFacet + from openlineage.client.run import Dataset + + from airflow.providers.openlineage.extractors.base import OperatorLineage + from airflow.providers.openlineage.sqlparser import SQLParser + + sql_parser = SQLParser(dialect="generic") + + job_facets: dict[str, BaseFacet] = {"sql": SqlJobFacet(query=sql_parser.normalize_sql(self.query))} + parse_result = sql_parser.parse(sql=self.query) + + if not parse_result: + return OperatorLineage(job_facets=job_facets) + + run_facets: dict[str, BaseFacet] = {} + if parse_result.errors: + run_facets["extractionError"] = ExtractionErrorRunFacet( + totalTasks=len(self.query) if isinstance(self.query, list) else 1, + failedTasks=len(parse_result.errors), + errors=[ + ExtractionError( + errorMessage=error.message, + stackTrace=None, + task=error.origin_statement, + taskNumber=error.index, + ) + for error in parse_result.errors + ], + ) + + inputs: list[Dataset] = list( + filter( + None, + [ + self.get_openlineage_dataset(table.schema or self.database, table.name) + for table in parse_result.in_tables + ], + ) + ) + + # Athena can output query result to a new table with CTAS query. + # cf. https://docs.aws.amazon.com/athena/latest/ug/ctas.html + outputs: list[Dataset] = list( + filter( + None, + [ + self.get_openlineage_dataset(table.schema or self.database, table.name) + for table in parse_result.out_tables + ], + ) + ) + + # In addition to CTAS query, it's also possible to specify output location on S3 + # with a mandatory parameter, OutputLocation in ResultConfiguration. + # cf. https://docs.aws.amazon.com/athena/latest/APIReference/API_ResultConfiguration.html#athena-Type-ResultConfiguration-OutputLocation # noqa: E501 + # + # Depending on the query type and the external_location property in the CTAS query, + # its behavior changes as follows: + # + # * Normal SELECT statement + # -> The result is put into output_location as files rather than a table. + # + # * CTAS statement without external_location (`CREATE TABLE ... AS SELECT ...`) + # -> The result is put into output_location as a table, + # that is, both metadata files and data files are in there. + # + # * CTAS statement with external_location + # (`CREATE TABLE ... WITH (external_location='s3://bucket/key') AS SELECT ...`) + # -> The result is output as a table, but metadata and data files are + # separated into output_location and external_location respectively. + # + # For the last case, output_location may be unnecessary as OL's output information, + # but we keep it as of now since it may be useful for some purpose. + output_location = self.output_location + parsed = urlparse(output_location) + outputs.append(Dataset(namespace=f"{parsed.scheme}://{parsed.netloc}", name=parsed.path)) + + return OperatorLineage(job_facets=job_facets, run_facets=run_facets, inputs=inputs, outputs=outputs) + + def get_openlineage_dataset(self, database, table) -> Dataset | None: + from openlineage.client.facet import ( + SchemaDatasetFacet, + SchemaField, + SymlinksDatasetFacet, + SymlinksDatasetFacetIdentifiers, + ) + from openlineage.client.run import Dataset + + # Currently, AthenaOperator and AthenaHook don't have a functionality to specify catalog, + # and it seems to implicitly assume that the default catalog (AwsDataCatalog) is target. + CATALOG_NAME = "AwsDataCatalog" Review Comment: I believe this is wrong assumption. `AthenaOperator` has `query_execution_context` parameter which is passed to AthenaHook https://github.com/apache/airflow/blob/659d94f0ae89f47a7d4b95d6c19ab7f87bd3a60f/airflow/providers/amazon/aws/operators/athena.py#L107 and the hook sets the value of `query_execution_context` in `AthenaHook.run_query` as `query_context` https://github.com/apache/airflow/blob/667ab8c6eaceb689f6a1afbd909d88bdf0584342/airflow/providers/amazon/aws/hooks/athena.py#L111 which is passed as `QueryExecutionContext`. In boto3 docs: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/batch_get_query_execution.html You have : ``` 'QueryExecutionContext': { 'Database': 'string', 'Catalog': 'string' }, ``` so it means you can set the Catalog from the `AthenaOperator`. It may be a bit confusing because in the operator we expose `database` parameter but not the `catalog` parameter https://github.com/apache/airflow/blob/659d94f0ae89f47a7d4b95d6c19ab7f87bd3a60f/airflow/providers/amazon/aws/operators/athena.py#L68 and later set it in the https://github.com/apache/airflow/blob/659d94f0ae89f47a7d4b95d6c19ab7f87bd3a60f/airflow/providers/amazon/aws/operators/athena.py#L103 but it's just for easier use. We can also expose the catalog parameter and set it in similar way. In any case both of these should work: `AthenaOperator(..., query_execution_context={"Database": "MyDataBase", "Catalog": "MyCatalog"})` `AthenaOperator(..., database="MyDataBase", query_execution_context={"Catalog": "MyCatalog"})` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org