This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 195abf8f71 SparkSubmit: Adding propertyfiles option (#36164) 195abf8f71 is described below commit 195abf8f7116c9e37fd3dc69bfee8cbf546c5a3f Author: Ashish Patel <ashishpatel0...@gmail.com> AuthorDate: Mon Dec 11 22:02:32 2023 +0530 SparkSubmit: Adding propertyfiles option (#36164) Introduce a new parameter 'properties_file' in SparkSubmitHook and SparkSubmitOperator. This allows loading of extra properties from a specified file, defaulting to conf/spark-defaults.conf if nothing is specified. The goal is to provide greater flexibility for Spark configuration. The changes have been tested for the affected files. --------- Co-authored-by: ghostp13409 <gajjarpart...@gmail.com> --- airflow/providers/apache/spark/hooks/spark_submit.py | 6 ++++++ airflow/providers/apache/spark/operators/spark_submit.py | 6 ++++++ tests/providers/apache/spark/operators/test_spark_submit.py | 3 +++ 3 files changed, 15 insertions(+) diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index be087dada9..b64f3a0c22 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -78,6 +78,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin): :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :param spark_binary: The command to use for spark submit. Some distros may use spark2-submit or spark3-submit. + :param properties_file: Path to a file from which to load extra properties. If not + specified, this will look for conf/spark-defaults.conf. :param use_krb5ccache: if True, configure spark to use ticket cache instead of relying on keytab for Kerberos login """ @@ -122,6 +124,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): env_vars: dict[str, Any] | None = None, verbose: bool = False, spark_binary: str | None = None, + properties_file: str | None = None, *, use_krb5ccache: bool = False, ) -> None: @@ -155,6 +158,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): self._yarn_application_id: str | None = None self._kubernetes_driver_pod: str | None = None self.spark_binary = spark_binary + self._properties_file = properties_file self._connection = self._resolve_connection() self._is_yarn = "yarn" in self._connection["master"] self._is_kubernetes = "k8s" in self._connection["master"] @@ -292,6 +296,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin): "--conf", f"spark.kubernetes.namespace={self._connection['namespace']}", ] + if self._properties_file: + connection_cmd += ["--properties-file", self._properties_file] if self._files: connection_cmd += ["--files", self._files] if self._py_files: diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index 903ff9b720..3f4c539536 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -69,6 +69,8 @@ class SparkSubmitOperator(BaseOperator): :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :param spark_binary: The command to use for spark submit. Some distros may use spark2-submit or spark3-submit. + :param properties_file: Path to a file from which to load extra properties. If not + specified, this will look for conf/spark-defaults.conf. :param use_krb5ccache: if True, configure spark to use ticket cache instead of relying on keytab for Kerberos login """ @@ -88,6 +90,7 @@ class SparkSubmitOperator(BaseOperator): "_name", "_application_args", "_env_vars", + "_properties_file", ) ui_color = WEB_COLORS["LIGHTORANGE"] @@ -120,6 +123,7 @@ class SparkSubmitOperator(BaseOperator): env_vars: dict[str, Any] | None = None, verbose: bool = False, spark_binary: str | None = None, + properties_file: str | None = None, use_krb5ccache: bool = False, **kwargs: Any, ) -> None: @@ -149,6 +153,7 @@ class SparkSubmitOperator(BaseOperator): self._env_vars = env_vars self._verbose = verbose self._spark_binary = spark_binary + self._properties_file = properties_file self._hook: SparkSubmitHook | None = None self._conn_id = conn_id self._use_krb5ccache = use_krb5ccache @@ -191,5 +196,6 @@ class SparkSubmitOperator(BaseOperator): env_vars=self._env_vars, verbose=self._verbose, spark_binary=self._spark_binary, + properties_file=self._properties_file, use_krb5ccache=self._use_krb5ccache, ) diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py b/tests/providers/apache/spark/operators/test_spark_submit.py index 3c6aa78336..884036467f 100644 --- a/tests/providers/apache/spark/operators/test_spark_submit.py +++ b/tests/providers/apache/spark/operators/test_spark_submit.py @@ -53,6 +53,7 @@ class TestSparkSubmitOperator: "application": "test_application.py", "driver_memory": "3g", "java_class": "com.foo.bar.AppMain", + "properties_file": "conf/spark-custom.conf", "application_args": [ "-f", "foo", @@ -120,6 +121,7 @@ class TestSparkSubmitOperator: ], "spark_binary": "sparky", "use_krb5ccache": True, + "properties_file": "conf/spark-custom.conf", } assert conn_id == operator._conn_id @@ -147,6 +149,7 @@ class TestSparkSubmitOperator: assert expected_dict["driver_memory"] == operator._driver_memory assert expected_dict["application_args"] == operator._application_args assert expected_dict["spark_binary"] == operator._spark_binary + assert expected_dict["properties_file"] == operator._properties_file assert expected_dict["use_krb5ccache"] == operator._use_krb5ccache @pytest.mark.db_test