Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-19 Thread via GitHub


mobuchowski merged PR #44477:
URL: https://github.com/apache/airflow/pull/44477


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-15 Thread via GitHub


kacpermuda commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2544816446

   > To make sure I understand the target state:
   > 
   > With Kacper's changes, some additional metadata about parent job (Airflow 
DAG / task in this case) will be passed to the Spark job, and emitted in an 
OpenLineage event to OpenLineage-supporting-catalog by the Spark job itself.
   Regardless of that, OpenLineage event at the Airflow level can/will be 
emitted.
   
   Correct, that is the target state in my opinion, Airflow events will still 
be emitted without any changes. For now we are simply automating the transfer 
of some additional information to Spark integration (but people have been doing 
this manually until now with OL provided macros).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-13 Thread via GitHub


michalmodras commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2541737829

   To make sure I understand the target state:
   - With Kacper's changes, some additional metadata about parent job (Airflow 
DAG / task in this case) will be passed to the Spark job, and emitted in an 
OpenLineage event to OpenLineage-supporting-catalog by the Spark job itself.
   - Regardless of that, OpenLineage event at the Airflow level can/will be 
emitted.
   
   I think it's fair for each layer of orchestration to emit metadata that it 
has access to, for example depending on the Spark job type/implementation, the 
low level information about Spark execution, or, in case of Airflow, 
information about DAG/task/Airflow deployment. 
   
   For Airflow itself, to construct such lineage event, Airflow needs to be 
aware of the input/output assets (as long as we cannot link lineage events only 
by process identifier). SQL parsing can be a way to get this information for 
SQL-like jobs, in case of other types of jobs (not necessarily Spark jobs) we 
can for example query the service the operator is integrated with (e.g. with 
BigQuery jobs - we could query BigQuery API to get that information and emit 
event linking input/output assets with BigQuery job id, and DAG/Task/Airflow 
deployment id).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-10 Thread via GitHub


kacpermuda commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2532232417

   The failing test comes from changes made in #44717. Waiting for the fixing 
PR as it's on the way by the author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-05 Thread via GitHub


mobuchowski commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2519920387

   @ahidalgob I don't think that's right, since you can submit JAR with 
arbitrary code rather than just SQL. Also, even for SQL jobs, rather than using 
parser (which is a best effort solution) we can use Spark integration that 
actually understands the uploaded jobs. Airflow events here can contribute 
proper hierarchy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-05 Thread via GitHub


ahidalgob commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2519832092

   Thanks @kacpermuda, we would like to contribute the logic we used in 
Composer to generate the events from the SQL queries in other DataprocSubmitJob 
types. I think this PR and what we want to contribute are not incompatible, 
does it sound good to you? (also @mobuchowski )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-04 Thread via GitHub


ahidalgob commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517269215

   Hi, are we planning on also emitting the lineage events from Airflow itself? 
I think we have other services that emit lineage (for example, BigQuery) where 
we also still emit this lineage from Airflow. For example, in Composer, we 
generate the events based on the SQL query of Hive, SparkSQL, Presto and Trino 
jobs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-04 Thread via GitHub


kacpermuda commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517639918

   Correct, this feature is only about automatically passing some OpenLineage 
information from Airflow to Spark to automate the process of configuring the 
OpenLineage/Spark integration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-04 Thread via GitHub


ahidalgob commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517594409

   Hi @kacpermuda , what I meant was exactly this you describe: parsing the SQL 
query on Airflow side and generating the inputs/outputs. Right now this PR only 
as you confirmed only configures how Spark generates the lineage events but 
doesn't generate from Airflow side, right? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-04 Thread via GitHub


kacpermuda commented on PR #44477:
URL: https://github.com/apache/airflow/pull/44477#issuecomment-2517427061

   Hey @ahidalgob, just to confirm I understand you correctly: are you asking 
if we plan to emit the lineage from the child job (in this case, Spark) 
directly from Airflow? As of now, there aren’t any plans for that that I'm 
aware of. In my opinion, it’s a bit more complex to implement compared to a 
SQL-based approach, where we can parse the SQL on the Airflow side and 
occasionally patch it with API calls to BigQuery or similar solutions. 
Extracting lineage from a Spark jar, which can do virtually anything, is more 
challenging. For now, I’m focusing on making it easier for users to configure 
Spark integration, without changing the entity responsible for emitting the 
events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-12-02 Thread via GitHub


github-advanced-security[bot] commented on code in PR #44477:
URL: https://github.com/apache/airflow/pull/44477#discussion_r1866227874


##
providers/src/airflow/providers/openlineage/utils/spark.py:
##
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.providers.openlineage.plugins.macros import (
+lineage_job_name,
+lineage_job_namespace,
+lineage_run_id,
+)
+
+if TYPE_CHECKING:
+from airflow.utils.context import Context
+
+log = logging.getLogger(__name__)
+
+
+def _get_parent_job_information_as_spark_properties(context: Context) -> dict:
+"""
+Retrieve parent job information as Spark properties.
+
+Args:
+context: The context containing task instance information.
+
+Returns:
+Spark properties with the parent job information.
+"""
+ti = context["ti"]
+return {
+"spark.openlineage.parentJobNamespace": lineage_job_namespace(),
+"spark.openlineage.parentJobName": lineage_job_name(ti),  # type: 
ignore[arg-type]
+"spark.openlineage.parentRunId": lineage_run_id(ti),  # type: 
ignore[arg-type]
+}
+
+
+def _is_parent_job_information_present_in_spark_properties(properties: dict) 
-> bool:
+"""
+Check if any parent job information is present in Spark properties.
+
+Args:
+properties: Spark properties.
+
+Returns:
+True if parent job information is present, False otherwise.
+"""
+return any(str(key).startswith("spark.openlineage.parent") for key in 
properties)
+
+
+def inject_parent_job_information_into_spark_properties(properties: dict, 
context: Context) -> dict:
+"""
+Inject parent job information into Spark properties if not already present.
+
+Args:
+properties: Spark properties.
+context: The context containing task instance information.
+
+Returns:
+Modified Spark properties with OpenLineage parent job information 
properties injected, if applicable.
+"""
+if _is_parent_job_information_present_in_spark_properties(properties):
+log.info(
+"Some OpenLineage properties with parent job information are 
already present "
+"in the Spark job configuration. Skipping the automatic injection 
of OpenLineage "
+"parent job information into Spark job configuration."
+)
+return properties
+
+ol_parent_job_properties = 
_get_parent_job_information_as_spark_properties(context)
+properties = {**properties, **ol_parent_job_properties}
+log.debug(
+"Successfully injected OpenLineage parent job information into Spark 
job configuration: %s",
+ol_parent_job_properties,

Review Comment:
   ## Clear-text logging of sensitive information
   
   This expression logs [sensitive data (secret)](1) as clear text.
   This expression logs [sensitive data (secret)](2) as clear text.
   
   [Show more 
details](https://github.com/apache/airflow/security/code-scanning/278)



##
providers/src/airflow/providers/common/compat/openlineage/utils/spark.py:
##
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+log = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+from airflow.providers.openlineage.utils.spark import 
inject_parent_job_information

Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-11-29 Thread via GitHub


kacpermuda commented on code in PR #44477:
URL: https://github.com/apache/airflow/pull/44477#discussion_r1863608223


##
providers/src/airflow/providers/google/cloud/operators/dataproc.py:
##
@@ -2060,6 +2066,36 @@ def on_kill(self):
 if self.job_id and self.cancel_on_kill:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id, region=self.region)
 
+def _inject_openlineage_properties_into_spark_job_config(self, job: dict, 
context: Context) -> dict:
+"""
+Inject OpenLineage properties into the Spark job configuration.
+
+Note:
+This function will modify the job configuration ONLY
+when the automatic injection of OpenLineage properties is enabled.
+If You are not using OpenLineage integration, you can safely 
ignore this function.
+# TODO Add more information on what this function does and when 
it's not doing anything
+
+Read more about this feature at: # TODO: Add link to the documentation
+
+Args:
+job: The original Dataproc job definition.
+context: The Airflow context in which the job is running.
+
+Returns:
+The modified job configuration with OpenLineage properties 
injected, if applicable.
+"""
+from airflow.providers.google.cloud.openlineage.utils import (

Review Comment:
   That makes sense. The local import was an oversight, but thinking it through 
again, I realize I can eliminate the entire method and directly call the utils 
function instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-11-29 Thread via GitHub


kacpermuda commented on code in PR #44477:
URL: https://github.com/apache/airflow/pull/44477#discussion_r1863608223


##
providers/src/airflow/providers/google/cloud/operators/dataproc.py:
##
@@ -2060,6 +2066,36 @@ def on_kill(self):
 if self.job_id and self.cancel_on_kill:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id, region=self.region)
 
+def _inject_openlineage_properties_into_spark_job_config(self, job: dict, 
context: Context) -> dict:
+"""
+Inject OpenLineage properties into the Spark job configuration.
+
+Note:
+This function will modify the job configuration ONLY
+when the automatic injection of OpenLineage properties is enabled.
+If You are not using OpenLineage integration, you can safely 
ignore this function.
+# TODO Add more information on what this function does and when 
it's not doing anything
+
+Read more about this feature at: # TODO: Add link to the documentation
+
+Args:
+job: The original Dataproc job definition.
+context: The Airflow context in which the job is running.
+
+Returns:
+The modified job configuration with OpenLineage properties 
injected, if applicable.
+"""
+from airflow.providers.google.cloud.openlineage.utils import (

Review Comment:
   That makes sense. The local import was an oversight, but thinking it through 
again, I realize I can eliminate the entire method and directly call the 
utility function instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-11-29 Thread via GitHub


mobuchowski commented on code in PR #44477:
URL: https://github.com/apache/airflow/pull/44477#discussion_r1863591255


##
providers/src/airflow/providers/google/cloud/operators/dataproc.py:
##
@@ -2060,6 +2066,36 @@ def on_kill(self):
 if self.job_id and self.cancel_on_kill:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id, region=self.region)
 
+def _inject_openlineage_properties_into_spark_job_config(self, job: dict, 
context: Context) -> dict:
+"""
+Inject OpenLineage properties into the Spark job configuration.
+
+Note:
+This function will modify the job configuration ONLY
+when the automatic injection of OpenLineage properties is enabled.
+If You are not using OpenLineage integration, you can safely 
ignore this function.
+# TODO Add more information on what this function does and when 
it's not doing anything
+
+Read more about this feature at: # TODO: Add link to the documentation
+
+Args:
+job: The original Dataproc job definition.
+context: The Airflow context in which the job is running.
+
+Returns:
+The modified job configuration with OpenLineage properties 
injected, if applicable.
+"""
+from airflow.providers.google.cloud.openlineage.utils import (

Review Comment:
   why not top-level import this and just call the method in `execute`? This is 
the same provider, I don't think the indirection provides anything here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: automatically inject OL info into spark job in DataprocSubmitJobOperator [airflow]

2024-11-29 Thread via GitHub


mobuchowski commented on code in PR #44477:
URL: https://github.com/apache/airflow/pull/44477#discussion_r1863589688


##
providers/src/airflow/providers/google/cloud/operators/dataproc.py:
##
@@ -1962,6 +1962,9 @@ def __init__(
 polling_interval_seconds: int = 10,
 cancel_on_kill: bool = True,
 wait_timeout: int | None = None,
+ol_inject_parent_job_info: bool = conf.getboolean(

Review Comment:
   If we're already using rather long option names, we can expand the `ol` 
abbreviation



##
providers/src/airflow/providers/google/cloud/operators/dataproc.py:
##
@@ -1962,6 +1962,9 @@ def __init__(
 polling_interval_seconds: int = 10,
 cancel_on_kill: bool = True,
 wait_timeout: int | None = None,
+ol_inject_parent_job_info: bool = conf.getboolean(

Review Comment:
   If we're already using rather long option names, we can expand the `ol` 
abbreviation where the user would set it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org