Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
mobuchowski merged PR #44477: URL: https://github.com/apache/airflow/pull/44477 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
kacpermuda commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2544816446 > To make sure I understand the target state: > > With Kacper's changes, some additional metadata about parent job (Airflow DAG / task in this case) will be passed to the Spark job, and emitted in an OpenLineage event to OpenLineage-supporting-catalog by the Spark job itself. Regardless of that, OpenLineage event at the Airflow level can/will be emitted. Correct, that is the target state in my opinion, Airflow events will still be emitted without any changes. For now we are simply automating the transfer of some additional information to Spark integration (but people have been doing this manually until now with OL provided macros). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
michalmodras commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2541737829 To make sure I understand the target state: - With Kacper's changes, some additional metadata about parent job (Airflow DAG / task in this case) will be passed to the Spark job, and emitted in an OpenLineage event to OpenLineage-supporting-catalog by the Spark job itself. - Regardless of that, OpenLineage event at the Airflow level can/will be emitted. I think it's fair for each layer of orchestration to emit metadata that it has access to, for example depending on the Spark job type/implementation, the low level information about Spark execution, or, in case of Airflow, information about DAG/task/Airflow deployment. For Airflow itself, to construct such lineage event, Airflow needs to be aware of the input/output assets (as long as we cannot link lineage events only by process identifier). SQL parsing can be a way to get this information for SQL-like jobs, in case of other types of jobs (not necessarily Spark jobs) we can for example query the service the operator is integrated with (e.g. with BigQuery jobs - we could query BigQuery API to get that information and emit event linking input/output assets with BigQuery job id, and DAG/Task/Airflow deployment id). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
kacpermuda commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2532232417 The failing test comes from changes made in #44717. Waiting for the fixing PR as it's on the way by the author. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
mobuchowski commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2519920387 @ahidalgob I don't think that's right, since you can submit JAR with arbitrary code rather than just SQL. Also, even for SQL jobs, rather than using parser (which is a best effort solution) we can use Spark integration that actually understands the uploaded jobs. Airflow events here can contribute proper hierarchy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
ahidalgob commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2519832092 Thanks @kacpermuda, we would like to contribute the logic we used in Composer to generate the events from the SQL queries in other DataprocSubmitJob types. I think this PR and what we want to contribute are not incompatible, does it sound good to you? (also @mobuchowski ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
ahidalgob commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517269215 Hi, are we planning on also emitting the lineage events from Airflow itself? I think we have other services that emit lineage (for example, BigQuery) where we also still emit this lineage from Airflow. For example, in Composer, we generate the events based on the SQL query of Hive, SparkSQL, Presto and Trino jobs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
kacpermuda commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517639918 Correct, this feature is only about automatically passing some OpenLineage information from Airflow to Spark to automate the process of configuring the OpenLineage/Spark integration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
ahidalgob commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517594409 Hi @kacpermuda , what I meant was exactly this you describe: parsing the SQL query on Airflow side and generating the inputs/outputs. Right now this PR only as you confirmed only configures how Spark generates the lineage events but doesn't generate from Airflow side, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
kacpermuda commented on PR #44477: URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517427061 Hey @ahidalgob, just to confirm I understand you correctly: are you asking if we plan to emit the lineage from the child job (in this case, Spark) directly from Airflow? As of now, there aren’t any plans for that that I'm aware of. In my opinion, it’s a bit more complex to implement compared to a SQL-based approach, where we can parse the SQL on the Airflow side and occasionally patch it with API calls to BigQuery or similar solutions. Extracting lineage from a Spark jar, which can do virtually anything, is more challenging. For now, I’m focusing on making it easier for users to configure Spark integration, without changing the entity responsible for emitting the events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
github-advanced-security[bot] commented on code in PR #44477: URL: https://github.com/apache/airflow/pull/44477#discussion_r1866227874 ## providers/src/airflow/providers/openlineage/utils/spark.py: ## @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from airflow.providers.openlineage.plugins.macros import ( +lineage_job_name, +lineage_job_namespace, +lineage_run_id, +) + +if TYPE_CHECKING: +from airflow.utils.context import Context + +log = logging.getLogger(__name__) + + +def _get_parent_job_information_as_spark_properties(context: Context) -> dict: +""" +Retrieve parent job information as Spark properties. + +Args: +context: The context containing task instance information. + +Returns: +Spark properties with the parent job information. +""" +ti = context["ti"] +return { +"spark.openlineage.parentJobNamespace": lineage_job_namespace(), +"spark.openlineage.parentJobName": lineage_job_name(ti), # type: ignore[arg-type] +"spark.openlineage.parentRunId": lineage_run_id(ti), # type: ignore[arg-type] +} + + +def _is_parent_job_information_present_in_spark_properties(properties: dict) -> bool: +""" +Check if any parent job information is present in Spark properties. + +Args: +properties: Spark properties. + +Returns: +True if parent job information is present, False otherwise. +""" +return any(str(key).startswith("spark.openlineage.parent") for key in properties) + + +def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict: +""" +Inject parent job information into Spark properties if not already present. + +Args: +properties: Spark properties. +context: The context containing task instance information. + +Returns: +Modified Spark properties with OpenLineage parent job information properties injected, if applicable. +""" +if _is_parent_job_information_present_in_spark_properties(properties): +log.info( +"Some OpenLineage properties with parent job information are already present " +"in the Spark job configuration. Skipping the automatic injection of OpenLineage " +"parent job information into Spark job configuration." +) +return properties + +ol_parent_job_properties = _get_parent_job_information_as_spark_properties(context) +properties = {**properties, **ol_parent_job_properties} +log.debug( +"Successfully injected OpenLineage parent job information into Spark job configuration: %s", +ol_parent_job_properties, Review Comment: ## Clear-text logging of sensitive information This expression logs [sensitive data (secret)](1) as clear text. This expression logs [sensitive data (secret)](2) as clear text. [Show more details](https://github.com/apache/airflow/security/code-scanning/278) ## providers/src/airflow/providers/common/compat/openlineage/utils/spark.py: ## @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +log = logging.getLogger(__name__) + +if TYPE_CHECKING: +from airflow.providers.openlineage.utils.spark import inject_parent_job_information
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
kacpermuda commented on code in PR #44477: URL: https://github.com/apache/airflow/pull/44477#discussion_r1863608223 ## providers/src/airflow/providers/google/cloud/operators/dataproc.py: ## @@ -2060,6 +2066,36 @@ def on_kill(self): if self.job_id and self.cancel_on_kill: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, region=self.region) +def _inject_openlineage_properties_into_spark_job_config(self, job: dict, context: Context) -> dict: +""" +Inject OpenLineage properties into the Spark job configuration. + +Note: +This function will modify the job configuration ONLY +when the automatic injection of OpenLineage properties is enabled. +If You are not using OpenLineage integration, you can safely ignore this function. +# TODO Add more information on what this function does and when it's not doing anything + +Read more about this feature at: # TODO: Add link to the documentation + +Args: +job: The original Dataproc job definition. +context: The Airflow context in which the job is running. + +Returns: +The modified job configuration with OpenLineage properties injected, if applicable. +""" +from airflow.providers.google.cloud.openlineage.utils import ( Review Comment: That makes sense. The local import was an oversight, but thinking it through again, I realize I can eliminate the entire method and directly call the utils function instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
kacpermuda commented on code in PR #44477: URL: https://github.com/apache/airflow/pull/44477#discussion_r1863608223 ## providers/src/airflow/providers/google/cloud/operators/dataproc.py: ## @@ -2060,6 +2066,36 @@ def on_kill(self): if self.job_id and self.cancel_on_kill: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, region=self.region) +def _inject_openlineage_properties_into_spark_job_config(self, job: dict, context: Context) -> dict: +""" +Inject OpenLineage properties into the Spark job configuration. + +Note: +This function will modify the job configuration ONLY +when the automatic injection of OpenLineage properties is enabled. +If You are not using OpenLineage integration, you can safely ignore this function. +# TODO Add more information on what this function does and when it's not doing anything + +Read more about this feature at: # TODO: Add link to the documentation + +Args: +job: The original Dataproc job definition. +context: The Airflow context in which the job is running. + +Returns: +The modified job configuration with OpenLineage properties injected, if applicable. +""" +from airflow.providers.google.cloud.openlineage.utils import ( Review Comment: That makes sense. The local import was an oversight, but thinking it through again, I realize I can eliminate the entire method and directly call the utility function instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
mobuchowski commented on code in PR #44477: URL: https://github.com/apache/airflow/pull/44477#discussion_r1863591255 ## providers/src/airflow/providers/google/cloud/operators/dataproc.py: ## @@ -2060,6 +2066,36 @@ def on_kill(self): if self.job_id and self.cancel_on_kill: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, region=self.region) +def _inject_openlineage_properties_into_spark_job_config(self, job: dict, context: Context) -> dict: +""" +Inject OpenLineage properties into the Spark job configuration. + +Note: +This function will modify the job configuration ONLY +when the automatic injection of OpenLineage properties is enabled. +If You are not using OpenLineage integration, you can safely ignore this function. +# TODO Add more information on what this function does and when it's not doing anything + +Read more about this feature at: # TODO: Add link to the documentation + +Args: +job: The original Dataproc job definition. +context: The Airflow context in which the job is running. + +Returns: +The modified job configuration with OpenLineage properties injected, if applicable. +""" +from airflow.providers.google.cloud.openlineage.utils import ( Review Comment: why not top-level import this and just call the method in `execute`? This is the same provider, I don't think the indirection provides anything here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]
mobuchowski commented on code in PR #44477: URL: https://github.com/apache/airflow/pull/44477#discussion_r1863589688 ## providers/src/airflow/providers/google/cloud/operators/dataproc.py: ## @@ -1962,6 +1962,9 @@ def __init__( polling_interval_seconds: int = 10, cancel_on_kill: bool = True, wait_timeout: int | None = None, +ol_inject_parent_job_info: bool = conf.getboolean( Review Comment: If we're already using rather long option names, we can expand the `ol` abbreviation ## providers/src/airflow/providers/google/cloud/operators/dataproc.py: ## @@ -1962,6 +1962,9 @@ def __init__( polling_interval_seconds: int = 10, cancel_on_kill: bool = True, wait_timeout: int | None = None, +ol_inject_parent_job_info: bool = conf.getboolean( Review Comment: If we're already using rather long option names, we can expand the `ol` abbreviation where the user would set it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org