This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new ed9080ae6a change spark connection form and add spark connections docs (#36419) ed9080ae6a is described below commit ed9080ae6a17d7b2478652b676579f162462bb70 Author: shohamy7 <46799583+shoha...@users.noreply.github.com> AuthorDate: Fri Dec 29 14:53:44 2023 +0200 change spark connection form and add spark connections docs (#36419) * change spark connection form and add spark connections docs * make SQL letter upercase in spark-sql connection header * rename from spark to spark-submit and add default values in connection form --- airflow/providers/apache/spark/hooks/spark_jdbc.py | 2 +- airflow/providers/apache/spark/hooks/spark_sql.py | 25 +++++++++++ .../providers/apache/spark/hooks/spark_submit.py | 45 +++++++++++++++++--- .../providers/apache/spark/operators/spark_jdbc.py | 2 +- .../apache/spark/operators/spark_submit.py | 2 +- .../connections/index.rst | 28 +++++++++++++ .../connections/{spark.rst => spark-connect.rst} | 32 +++------------ .../connections/spark-sql.rst | 48 ++++++++++++++++++++++ .../connections/{spark.rst => spark-submit.rst} | 31 ++++++-------- .../index.rst | 2 +- .../operators.rst | 4 +- 11 files changed, 166 insertions(+), 55 deletions(-) diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py index 8c8d02f1ec..b904ca4260 100644 --- a/airflow/providers/apache/spark/hooks/spark_jdbc.py +++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py @@ -29,7 +29,7 @@ class SparkJDBCHook(SparkSubmitHook): Extends the SparkSubmitHook for performing data transfers to/from JDBC-based databases with Apache Spark. :param spark_app_name: Name of the job (default airflow-spark-jdbc) - :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` + :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured in Airflow administration :param spark_conf: Any additional Spark configuration properties :param spark_py_files: Additional python files used (.zip, .egg, or .py) diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py index 41dc741ccd..46eec49f30 100644 --- a/airflow/providers/apache/spark/hooks/spark_sql.py +++ b/airflow/providers/apache/spark/hooks/spark_sql.py @@ -54,6 +54,31 @@ class SparkSqlHook(BaseHook): conn_type = "spark_sql" hook_name = "Spark SQL" + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom field behaviour.""" + return { + "hidden_fields": ["schema", "login", "password", "extra"], + "relabeling": {}, + } + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + from wtforms.validators import Optional + + return { + "queue": StringField( + lazy_gettext("YARN queue"), + widget=BS3TextFieldWidget(), + description="Default YARN queue to use", + validators=[Optional()], + ) + } + def __init__( self, sql: str, diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index b495cafeb6..b96d992bba 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -33,7 +33,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin with contextlib.suppress(ImportError, NameError): from airflow.providers.cncf.kubernetes import kube_client -ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit", "spark3-submit"] +DEFAULT_SPARK_BINARY = "spark-submit" +ALLOWED_SPARK_BINARIES = [DEFAULT_SPARK_BINARY, "spark2-submit", "spark3-submit"] class SparkSubmitHook(BaseHook, LoggingMixin): @@ -41,7 +42,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): Wrap the spark-submit binary to kick off a spark-submit job; requires "spark-submit" binary in the PATH. :param conf: Arbitrary Spark configuration properties - :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured + :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured in Airflow administration. When an invalid connection_id is supplied, it will default to yarn. :param files: Upload additional files to the executor running the job, separated by a @@ -98,10 +99,44 @@ class SparkSubmitHook(BaseHook, LoggingMixin): def get_ui_field_behaviour(cls) -> dict[str, Any]: """Return custom field behaviour.""" return { - "hidden_fields": ["schema", "login", "password"], + "hidden_fields": ["schema", "login", "password", "extra"], "relabeling": {}, } + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + from wtforms.validators import Optional, any_of + + return { + "queue": StringField( + lazy_gettext("YARN queue"), + widget=BS3TextFieldWidget(), + description="Default YARN queue to use", + validators=[Optional()], + ), + "deploy-mode": StringField( + lazy_gettext("Deploy mode"), + widget=BS3TextFieldWidget(), + description="Must be client or cluster", + validators=[any_of(["client", "cluster"])], + default="client", + ), + "spark-binary": StringField( + lazy_gettext("Spark binary"), + widget=BS3TextFieldWidget(), + description=f"Must be one of: {', '.join(ALLOWED_SPARK_BINARIES)}", + validators=[any_of(ALLOWED_SPARK_BINARIES)], + default=DEFAULT_SPARK_BINARY, + ), + "namespace": StringField( + lazy_gettext("Kubernetes namespace"), widget=BS3TextFieldWidget(), validators=[Optional()] + ), + } + def __init__( self, conf: dict[str, Any] | None = None, @@ -198,7 +233,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): "master": "yarn", "queue": None, "deploy_mode": None, - "spark_binary": self.spark_binary or "spark-submit", + "spark_binary": self.spark_binary or DEFAULT_SPARK_BINARY, "namespace": None, } @@ -216,7 +251,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): conn_data["queue"] = self._queue if self._queue else extra.get("queue") conn_data["deploy_mode"] = self._deploy_mode if self._deploy_mode else extra.get("deploy-mode") if not self.spark_binary: - self.spark_binary = extra.get("spark-binary", "spark-submit") + self.spark_binary = extra.get("spark-binary", DEFAULT_SPARK_BINARY) if self.spark_binary is not None and self.spark_binary not in ALLOWED_SPARK_BINARIES: raise RuntimeError( f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it" diff --git a/airflow/providers/apache/spark/operators/spark_jdbc.py b/airflow/providers/apache/spark/operators/spark_jdbc.py index 4b4dd648a6..e5ff5f9c65 100644 --- a/airflow/providers/apache/spark/operators/spark_jdbc.py +++ b/airflow/providers/apache/spark/operators/spark_jdbc.py @@ -37,7 +37,7 @@ class SparkJDBCOperator(SparkSubmitOperator): :ref:`howto/operator:SparkJDBCOperator` :param spark_app_name: Name of the job (default airflow-spark-jdbc) - :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` + :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured in Airflow administration :param spark_conf: Any additional Spark configuration properties :param spark_py_files: Additional python files used (.zip, .egg, or .py) diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index be2f2d0ac5..bd8480b815 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -37,7 +37,7 @@ class SparkSubmitOperator(BaseOperator): :param application: The application that submitted as a job, either jar or py file. (templated) :param conf: Arbitrary Spark configuration properties (templated) - :param conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured + :param conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured in Airflow administration. When an invalid connection_id is supplied, it will default to yarn. :param files: Upload additional files to the executor running the job, separated by a comma. Files will be placed in the working directory of each executor. diff --git a/docs/apache-airflow-providers-apache-spark/connections/index.rst b/docs/apache-airflow-providers-apache-spark/connections/index.rst new file mode 100644 index 0000000000..71716ec9d6 --- /dev/null +++ b/docs/apache-airflow-providers-apache-spark/connections/index.rst @@ -0,0 +1,28 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Apache Spark Connections +======================== + + +.. toctree:: + :maxdepth: 1 + :glob: + + * diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst similarity index 51% copy from docs/apache-airflow-providers-apache-spark/connections/spark.rst copy to docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst index 05b92ce75c..aa5ef07157 100644 --- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst +++ b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst @@ -17,34 +17,26 @@ -.. _howto/connection:spark: +.. _howto/connection:spark-connect: -Apache Spark Connection -======================= +Apache Spark Connect Connection +=============================== -The Apache Spark connection type enables connection to Apache Spark. +The Apache Spark Connect connection type enables connection to Apache Spark via the Spark connect interface. Default Connection IDs ---------------------- -Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default. +The Spark Connect hook uses ``spark_connect_default`` by default. Configuring the Connection -------------------------- Host (required) - The host to connect to, it can be ``local``, ``yarn`` or an URL. + The host to connect to, should be a valid hostname. Port (optional) Specify the port in case of host be an URL. -Extra (optional) - Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported: - - * ``queue`` - The name of the YARN queue to which the application is submitted. - * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client). - * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value. - * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota). - User ID (optional, only applies to Spark Connect) The user ID to authenticate with the proxy. @@ -54,18 +46,6 @@ Token (optional, only applies to Spark Connect) Use SSL (optional, only applies to Spark Connect) Whether to use SSL when connecting. -When specifying the connection in environment variable you should specify -it using URI syntax. - -Note that all components of the URI should be URL-encoded. The URI and the mongo -connection string are not the same. - -For example: - -.. code-block:: bash - - export AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace' - .. warning:: Make sure you trust your users with the ability to configure the host settings as it may enable the connection to diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst new file mode 100644 index 0000000000..c4e4c606de --- /dev/null +++ b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst @@ -0,0 +1,48 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:spark-sql: + +Apache Spark SQL Connection +=========================== + +The Apache Spark SQL connection type enables connection to Apache Spark via the ``spark-sql`` command. + +Default Connection IDs +---------------------- + +SparkSqlHook uses ``spark_sql_default`` by default. + +Configuring the Connection +-------------------------- +Host (required) + The host to connect to, it can be ``local``, ``yarn`` or an URL. + +Port (optional) + Specify the port in case of host be an URL. + +YARN Queue + The name of the YARN queue to which the application is submitted. + +.. warning:: + + Make sure you trust your users with the ability to configure the host settings as it may enable the connection to + establish communication with external servers. It's crucial to understand that directing the connection towards a + malicious server can lead to significant security vulnerabilities, including the risk of encountering + Remote Code Execution (RCE) attacks. diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst similarity index 58% rename from docs/apache-airflow-providers-apache-spark/connections/spark.rst rename to docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst index 05b92ce75c..f38a2908ba 100644 --- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst +++ b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst @@ -17,17 +17,17 @@ -.. _howto/connection:spark: +.. _howto/connection:spark-submit: -Apache Spark Connection -======================= +Apache Spark Submit Connection +============================== -The Apache Spark connection type enables connection to Apache Spark. +The Apache Spark Submit connection type enables connection to Apache Spark via the ``spark-submit`` command. Default Connection IDs ---------------------- -Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default. +Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Configuring the Connection -------------------------- @@ -37,22 +37,17 @@ Host (required) Port (optional) Specify the port in case of host be an URL. -Extra (optional) - Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported: +YARN Queue (optional, only applies to spark on YARN applications) + The name of the YARN queue to which the application is submitted. - * ``queue`` - The name of the YARN queue to which the application is submitted. - * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client). - * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value. - * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota). +Deploy mode (optional) + Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client). -User ID (optional, only applies to Spark Connect) - The user ID to authenticate with the proxy. +Spark binary (optional) + The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value. -Token (optional, only applies to Spark Connect) - The token to authenticate with the proxy. - -Use SSL (optional, only applies to Spark Connect) - Whether to use SSL when connecting. +Kubernetes namespace (optional, only applies to spark on kubernetes applications) + Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota). When specifying the connection in environment variable you should specify it using URI syntax. diff --git a/docs/apache-airflow-providers-apache-spark/index.rst b/docs/apache-airflow-providers-apache-spark/index.rst index 7bc8959b63..fa5698d61d 100644 --- a/docs/apache-airflow-providers-apache-spark/index.rst +++ b/docs/apache-airflow-providers-apache-spark/index.rst @@ -33,7 +33,7 @@ :maxdepth: 1 :caption: Guides - Connection types <connections/spark> + Connection types <connections/index> Decorators <decorators/pyspark> Operators <operators> diff --git a/docs/apache-airflow-providers-apache-spark/operators.rst b/docs/apache-airflow-providers-apache-spark/operators.rst index 30d23f47cd..f6c20985f2 100644 --- a/docs/apache-airflow-providers-apache-spark/operators.rst +++ b/docs/apache-airflow-providers-apache-spark/operators.rst @@ -23,9 +23,9 @@ Prerequisite ------------ * To use :class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator` - you must configure :doc:`Spark Connection <connections/spark>`. + you must configure :doc:`Spark Connection <connections/spark-submit>`. * To use :class:`~airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator` - you must configure both :doc:`Spark Connection <connections/spark>` + you must configure both :doc:`Spark Connection <connections/spark-submit>` and :doc:`JDBC connection <apache-airflow-providers-jdbc:connections/jdbc>`. * :class:`~airflow.providers.apache.spark.operators.spark_sql.SparkSqlOperator` gets all the configurations from operator parameters.