captify-mkambur opened a new issue, #42132:
URL: https://github.com/apache/airflow/issues/42132

   ### Apache Airflow version
   
   2.10.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I'm trying to run the Spark application with the custom image using Airflow 
2.10.0. Also, I use Kubernetes cluster connection (EKS). 
apache-airflow-providers-cncf-kubernetes version is 8.3.4. When I manually 
deploy SparkApplication resources to the cluster (via kubectl),the application 
works fine and is complete without errors. When I schedule DAG, it starts fine, 
and works without interruptions almost till the end, but then fails with error 
404. It seems like it cannot communicate with the driver. However, when I check 
the cluster, the driver pod is still there and looks fine.
   The error text:
   ```
   [2024-09-10, 10:16:22 UTC] {taskinstance.py:3301} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 767, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 733, in _execute_callable
       return ExecutionCallableRunner(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 406, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
 line 293, in execute
       return super().execute(context=context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 406, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 592, in execute
       return self.execute_sync(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 634, in execute_sync
       self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 727, in await_xcom_sidecar_container_start
       if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 645, in container_is_running
       remote_pod = self.read_pod(pod)
                    ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
336, in wrapped_f
       return copy(f, *args, **kw)
              ^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
475, in __call__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
376, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
418, in exc_check
       raise retry_exc.reraise()
             ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
185, in reraise
       raise self.last_attempt.result()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
478, in __call__
       result = fn(*args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 720, in read_pod
       return self._client.read_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py",
 line 23693, in read_namespaced_pod
       return self.read_namespaced_pod_with_http_info(name, namespace, 
**kwargs)  # noqa: E501
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py",
 line 23780, in read_namespaced_pod_with_http_info
       return self.api_client.call_api(
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
 line 348, in call_api
       return self.__call_api(resource_path, method,
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
 line 180, in __call_api
       response_data = self.request(
                       ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
 line 373, in request
       return self.rest_client.GET(url,
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", 
line 244, in GET
       return self.request("GET", url,
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", 
line 238, in request
       raise ApiException(http_resp=r)
   kubernetes.client.exceptions.ApiException: (404)
   Reason: Not Found
   HTTP response headers: HTTPHeaderDict({'Audit-Id': 
'b75ef5bb-a810-46c8-a052-6b42307c5229', 'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 
'034ee985-68e4-4ac9-ba26-c2b9e6b4cd1d', 'X-Kubernetes-Pf-Prioritylevel-Uid': 
'2e7745b6-a7a8-493c-99ec-ce7f9c416cff', 'Date': 'Tue, 10 Sep 2024 10:16:22 
GMT', 'Content-Length': '270'})
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods
 \"<my-spark-app>-task-qyzuffwi-driver\" not 
found","reason":"NotFound","details":{"name":"<my-spark-app>-task-qyzuffwi-driver","kind":"pods"},"code":404}
   [2024-09-10, 10:16:22 UTC] {taskinstance.py:1225} INFO - Marking task as 
FAILED. dag_id=<my-spark-app>, task_id=<my-spark-app>-task, 
run_id=scheduled__2024-09-10T09:15:00+00:00, execution_date=20240910T091500, 
start_date=20240910T091612, end_date=20240910T101622
   ```
   
   ### What you think should happen instead?
   
   DAG run should finish with the "success" status.
   
   ### How to reproduce
   
   Please keep in mind, that all the values in the angle brackets are not real, 
they were changed for security reasons. If you need any other information, e.g. 
any Airflow config values, please let me know.
   DAG file looks like this:
   ```
   from airflow import DAG
   from datetime import datetime, timedelta
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import 
SparkKubernetesSensor
   from airflow.utils.dates import days_ago
   
   default_args = {
       'owner': '<me>',
       'depends_on_past': False,
       'start_date': '2024-09-06',
       'email': ['<my@mail>'],
       'email_on_failure': True,
       'email_on_retry': False,
       'max_active_runs': 1,
       'retries': 0,
       'max_active_tis_per_dag': 1 #This parameter controls the number of 
concurrent running task instances across dag_runs per task.
       #'catchup': False
   }
   
   with DAG(
       '<my-spark-app>',
       default_args=default_args,
       schedule_interval="*/10 * * * *",
       tags=['<my-spark-app>']
   ) as dag:
   
       spark_task = SparkKubernetesOperator(
           task_id="c<my-spark-app>-task",
           application_file='<my-spark-app>.yaml',
           dag=dag,
           namespace='default',
           kubernetes_conn_id='<EKS_CONN>',
           do_xcom_push=True,
           params={"app_name": f"<my-spark-app>"}
       )
   
       sensor = SparkKubernetesSensor(
           task_id='<my-spark-app>-monitor',
           namespace="default",
           application_name="{{ 
task_instance.xcom_pull(task_ids='<my-spark-app>-task')['metadata']['name'] }}",
           kubernetes_conn_id="<EKS_CONN>",
           dag=dag,
           api_group="sparkoperator.k8s.io",
           api_version='v1beta2',
           attach_log=True
       )
   
       spark_task >> sensor
   ```
   
   ```
   SparkApplication resource YAML looks like this:
   apiVersion: sparkoperator.k8s.io/v1beta2
   kind: SparkApplication
   metadata:
     labels: &Labels
       app:  <my-spark-app>
     name:  <my-spark-app>
   spec:
     type: Python
     restartPolicy:
       type: Never
     pythonVersion: "3"
     sparkVersion: "3.5.0"
     mode: cluster
     image: <my-ecr-repo>:<my-spark-app>
     mainApplicationFile: local:///<my-spark-app>/main.py
     sparkConf:
       spark.ui.port: "4040"
       spark.ui.showConsoleProgress: "true"
       spark.sql.broadcastTimeout: "6000"
       spark.hadoop.fs.s3a.multiobjectdelete.enable: "false"
       spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled: "true"
       spark.metrics.namespace: '<my-spark-app>-qa'
       spark.metrics.conf.*.sink.graphite.host: <graphite-exporter-address>
       spark.metrics.conf.*.sink.graphite.port: "<graphite-port>"
       spark.metrics.conf.*.sink.graphite.class: 
"org.apache.spark.metrics.sink.GraphiteSink"
       spark.metrics.conf.*.sink.graphite.period: "10"
       spark.metrics.conf.*.sink.graphite.unit": "seconds"
       spark.metrics.appStatusSource.enabled: "true"
       spark.driver.extraJavaOptions: 
"-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties 
-Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> 
-Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
       spark.executor.extraJavaOptions: 
"-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties 
-Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> 
-Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
       spark.metrics.conf.driver.source.jvm.class: 
org.apache.spark.metrics.source.JvmSource
       spark.metrics.conf.executor.source.jvm.class: 
org.apache.spark.metrics.source.JvmSource
       spark.metrics.conf.worker.source.jvm.class: 
org.apache.spark.metrics.source.JvmSource
       spark.metrics.conf.master.source.jvm.class: 
org.apache.spark.metrics.source.JvmSource
     hadoopConf:
       spark.hadoop.fs.s3a.user.agent.prefix: '<my-spark-app>-qa'
       fs.s3a.aws.credentials.provider: 
com.amazonaws.auth.WebIdentityTokenCredentialsProvider
       fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
       fs.s3a.bucket.all.committer.magic.enabled: "true"
       fs.s3a.endpoint: http://s3.eu-west-1.amazonaws.com
       fs.s3a.connection.ssl.enabled: "false"
     executor:
       nodeSelector:
         karpenter.sh/nodepool: default
       labels: *Labels
       serviceAccount: spark-operator-spark
       cores: 8
       coreRequest: "7500m"
       instances: 1
       memory: "40g"
     driver:
       nodeSelector:
         karpenter.sh/nodepool: ondemand
       labels: *Labels
       serviceAccount: spark-operator-spark
       cores: 4
       memory: "16g"
       env:
         - name: DEPLOY_ENV
           value: qa
   ```
   
   
   ### Operating System
   
   Airflow runs on EKS cluster and was deployed using Helm chart (version 
1.16.0-dev)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes  -- 8.3.4
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to