This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 195abf8f71 SparkSubmit: Adding propertyfiles option (#36164)
195abf8f71 is described below

commit 195abf8f7116c9e37fd3dc69bfee8cbf546c5a3f
Author: Ashish Patel <ashishpatel0...@gmail.com>
AuthorDate: Mon Dec 11 22:02:32 2023 +0530

    SparkSubmit: Adding propertyfiles option (#36164)
    
    Introduce a new parameter 'properties_file' in SparkSubmitHook and 
SparkSubmitOperator. This allows loading of extra properties from a specified 
file, defaulting to conf/spark-defaults.conf if nothing is specified. The goal 
is to provide greater flexibility for Spark configuration. The changes have 
been tested for the affected files.
    
    
    ---------
    
    Co-authored-by: ghostp13409 <gajjarpart...@gmail.com>
---
 airflow/providers/apache/spark/hooks/spark_submit.py        | 6 ++++++
 airflow/providers/apache/spark/operators/spark_submit.py    | 6 ++++++
 tests/providers/apache/spark/operators/test_spark_submit.py | 3 +++
 3 files changed, 15 insertions(+)

diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py 
b/airflow/providers/apache/spark/hooks/spark_submit.py
index be087dada9..b64f3a0c22 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -78,6 +78,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
     :param verbose: Whether to pass the verbose flag to spark-submit process 
for debugging
     :param spark_binary: The command to use for spark submit.
                          Some distros may use spark2-submit or spark3-submit.
+    :param properties_file: Path to a file from which to load extra 
properties. If not
+                              specified, this will look for 
conf/spark-defaults.conf.
     :param use_krb5ccache: if True, configure spark to use ticket cache 
instead of relying
         on keytab for Kerberos login
     """
@@ -122,6 +124,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         env_vars: dict[str, Any] | None = None,
         verbose: bool = False,
         spark_binary: str | None = None,
+        properties_file: str | None = None,
         *,
         use_krb5ccache: bool = False,
     ) -> None:
@@ -155,6 +158,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         self._yarn_application_id: str | None = None
         self._kubernetes_driver_pod: str | None = None
         self.spark_binary = spark_binary
+        self._properties_file = properties_file
         self._connection = self._resolve_connection()
         self._is_yarn = "yarn" in self._connection["master"]
         self._is_kubernetes = "k8s" in self._connection["master"]
@@ -292,6 +296,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
                 "--conf",
                 f"spark.kubernetes.namespace={self._connection['namespace']}",
             ]
+        if self._properties_file:
+            connection_cmd += ["--properties-file", self._properties_file]
         if self._files:
             connection_cmd += ["--files", self._files]
         if self._py_files:
diff --git a/airflow/providers/apache/spark/operators/spark_submit.py 
b/airflow/providers/apache/spark/operators/spark_submit.py
index 903ff9b720..3f4c539536 100644
--- a/airflow/providers/apache/spark/operators/spark_submit.py
+++ b/airflow/providers/apache/spark/operators/spark_submit.py
@@ -69,6 +69,8 @@ class SparkSubmitOperator(BaseOperator):
     :param verbose: Whether to pass the verbose flag to spark-submit process 
for debugging
     :param spark_binary: The command to use for spark submit.
                          Some distros may use spark2-submit or spark3-submit.
+    :param properties_file: Path to a file from which to load extra 
properties. If not
+                              specified, this will look for 
conf/spark-defaults.conf.
     :param use_krb5ccache: if True, configure spark to use ticket cache 
instead of relying
                            on keytab for Kerberos login
     """
@@ -88,6 +90,7 @@ class SparkSubmitOperator(BaseOperator):
         "_name",
         "_application_args",
         "_env_vars",
+        "_properties_file",
     )
     ui_color = WEB_COLORS["LIGHTORANGE"]
 
@@ -120,6 +123,7 @@ class SparkSubmitOperator(BaseOperator):
         env_vars: dict[str, Any] | None = None,
         verbose: bool = False,
         spark_binary: str | None = None,
+        properties_file: str | None = None,
         use_krb5ccache: bool = False,
         **kwargs: Any,
     ) -> None:
@@ -149,6 +153,7 @@ class SparkSubmitOperator(BaseOperator):
         self._env_vars = env_vars
         self._verbose = verbose
         self._spark_binary = spark_binary
+        self._properties_file = properties_file
         self._hook: SparkSubmitHook | None = None
         self._conn_id = conn_id
         self._use_krb5ccache = use_krb5ccache
@@ -191,5 +196,6 @@ class SparkSubmitOperator(BaseOperator):
             env_vars=self._env_vars,
             verbose=self._verbose,
             spark_binary=self._spark_binary,
+            properties_file=self._properties_file,
             use_krb5ccache=self._use_krb5ccache,
         )
diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py 
b/tests/providers/apache/spark/operators/test_spark_submit.py
index 3c6aa78336..884036467f 100644
--- a/tests/providers/apache/spark/operators/test_spark_submit.py
+++ b/tests/providers/apache/spark/operators/test_spark_submit.py
@@ -53,6 +53,7 @@ class TestSparkSubmitOperator:
         "application": "test_application.py",
         "driver_memory": "3g",
         "java_class": "com.foo.bar.AppMain",
+        "properties_file": "conf/spark-custom.conf",
         "application_args": [
             "-f",
             "foo",
@@ -120,6 +121,7 @@ class TestSparkSubmitOperator:
             ],
             "spark_binary": "sparky",
             "use_krb5ccache": True,
+            "properties_file": "conf/spark-custom.conf",
         }
 
         assert conn_id == operator._conn_id
@@ -147,6 +149,7 @@ class TestSparkSubmitOperator:
         assert expected_dict["driver_memory"] == operator._driver_memory
         assert expected_dict["application_args"] == operator._application_args
         assert expected_dict["spark_binary"] == operator._spark_binary
+        assert expected_dict["properties_file"] == operator._properties_file
         assert expected_dict["use_krb5ccache"] == operator._use_krb5ccache
 
     @pytest.mark.db_test

Reply via email to