captify-mkambur opened a new issue, #42132: URL: https://github.com/apache/airflow/issues/42132
### Apache Airflow version 2.10.0 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? I'm trying to run the Spark application with the custom image using Airflow 2.10.0. Also, I use Kubernetes cluster connection (EKS). apache-airflow-providers-cncf-kubernetes version is 8.3.4. When I manually deploy SparkApplication resources to the cluster (via kubectl),the application works fine and is complete without errors. When I schedule DAG, it starts fine, and works without interruptions almost till the end, but then fails with error 404. It seems like it cannot communicate with the driver. However, when I check the cluster, the driver pod is still there and looks fine. The error text: ``` [2024-09-10, 10:16:22 UTC] {taskinstance.py:3301} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 293, in execute return super().execute(context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 592, in execute return self.execute_sync(context) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 634, in execute_sync self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 727, in await_xcom_sidecar_container_start if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 645, in container_is_running remote_pod = self.read_pod(pod) ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 336, in wrapped_f return copy(f, *args, **kw) ^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 475, in __call__ do = self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 376, in iter result = action(retry_state) ^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 418, in exc_check raise retry_exc.reraise() ^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 185, in reraise raise self.last_attempt.result() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 478, in __call__ result = fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 720, in read_pod return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", line 23693, in read_namespaced_pod return self.read_namespaced_pod_with_http_info(name, namespace, **kwargs) # noqa: E501 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", line 23780, in read_namespaced_pod_with_http_info return self.api_client.call_api( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( ^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 373, in request return self.rest_client.GET(url, ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", line 244, in GET return self.request("GET", url, ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", line 238, in request raise ApiException(http_resp=r) kubernetes.client.exceptions.ApiException: (404) Reason: Not Found HTTP response headers: HTTPHeaderDict({'Audit-Id': 'b75ef5bb-a810-46c8-a052-6b42307c5229', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '034ee985-68e4-4ac9-ba26-c2b9e6b4cd1d', 'X-Kubernetes-Pf-Prioritylevel-Uid': '2e7745b6-a7a8-493c-99ec-ce7f9c416cff', 'Date': 'Tue, 10 Sep 2024 10:16:22 GMT', 'Content-Length': '270'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"<my-spark-app>-task-qyzuffwi-driver\" not found","reason":"NotFound","details":{"name":"<my-spark-app>-task-qyzuffwi-driver","kind":"pods"},"code":404} [2024-09-10, 10:16:22 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=<my-spark-app>, task_id=<my-spark-app>-task, run_id=scheduled__2024-09-10T09:15:00+00:00, execution_date=20240910T091500, start_date=20240910T091612, end_date=20240910T101622 ``` ### What you think should happen instead? DAG run should finish with the "success" status. ### How to reproduce Please keep in mind, that all the values in the angle brackets are not real, they were changed for security reasons. If you need any other information, e.g. any Airflow config values, please let me know. DAG file looks like this: ``` from airflow import DAG from datetime import datetime, timedelta from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor from airflow.utils.dates import days_ago default_args = { 'owner': '<me>', 'depends_on_past': False, 'start_date': '2024-09-06', 'email': ['<my@mail>'], 'email_on_failure': True, 'email_on_retry': False, 'max_active_runs': 1, 'retries': 0, 'max_active_tis_per_dag': 1 #This parameter controls the number of concurrent running task instances across dag_runs per task. #'catchup': False } with DAG( '<my-spark-app>', default_args=default_args, schedule_interval="*/10 * * * *", tags=['<my-spark-app>'] ) as dag: spark_task = SparkKubernetesOperator( task_id="c<my-spark-app>-task", application_file='<my-spark-app>.yaml', dag=dag, namespace='default', kubernetes_conn_id='<EKS_CONN>', do_xcom_push=True, params={"app_name": f"<my-spark-app>"} ) sensor = SparkKubernetesSensor( task_id='<my-spark-app>-monitor', namespace="default", application_name="{{ task_instance.xcom_pull(task_ids='<my-spark-app>-task')['metadata']['name'] }}", kubernetes_conn_id="<EKS_CONN>", dag=dag, api_group="sparkoperator.k8s.io", api_version='v1beta2', attach_log=True ) spark_task >> sensor ``` ``` SparkApplication resource YAML looks like this: apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: labels: &Labels app: <my-spark-app> name: <my-spark-app> spec: type: Python restartPolicy: type: Never pythonVersion: "3" sparkVersion: "3.5.0" mode: cluster image: <my-ecr-repo>:<my-spark-app> mainApplicationFile: local:///<my-spark-app>/main.py sparkConf: spark.ui.port: "4040" spark.ui.showConsoleProgress: "true" spark.sql.broadcastTimeout: "6000" spark.hadoop.fs.s3a.multiobjectdelete.enable: "false" spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled: "true" spark.metrics.namespace: '<my-spark-app>-qa' spark.metrics.conf.*.sink.graphite.host: <graphite-exporter-address> spark.metrics.conf.*.sink.graphite.port: "<graphite-port>" spark.metrics.conf.*.sink.graphite.class: "org.apache.spark.metrics.sink.GraphiteSink" spark.metrics.conf.*.sink.graphite.period: "10" spark.metrics.conf.*.sink.graphite.unit": "seconds" spark.metrics.appStatusSource.enabled: "true" spark.driver.extraJavaOptions: "-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties -Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> -Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false" spark.executor.extraJavaOptions: "-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties -Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> -Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false" spark.metrics.conf.driver.source.jvm.class: org.apache.spark.metrics.source.JvmSource spark.metrics.conf.executor.source.jvm.class: org.apache.spark.metrics.source.JvmSource spark.metrics.conf.worker.source.jvm.class: org.apache.spark.metrics.source.JvmSource spark.metrics.conf.master.source.jvm.class: org.apache.spark.metrics.source.JvmSource hadoopConf: spark.hadoop.fs.s3a.user.agent.prefix: '<my-spark-app>-qa' fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3a.bucket.all.committer.magic.enabled: "true" fs.s3a.endpoint: http://s3.eu-west-1.amazonaws.com fs.s3a.connection.ssl.enabled: "false" executor: nodeSelector: karpenter.sh/nodepool: default labels: *Labels serviceAccount: spark-operator-spark cores: 8 coreRequest: "7500m" instances: 1 memory: "40g" driver: nodeSelector: karpenter.sh/nodepool: ondemand labels: *Labels serviceAccount: spark-operator-spark cores: 4 memory: "16g" env: - name: DEPLOY_ENV value: qa ``` ### Operating System Airflow runs on EKS cluster and was deployed using Helm chart (version 1.16.0-dev) ### Versions of Apache Airflow Providers apache-airflow-providers-cncf-kubernetes -- 8.3.4 ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org