GitHub user rcrchawla added a comment to the discussion: Airflow task failed
but spark kube app is running
Hi @shaealh
1. Full Mysql error text
2026-03-10 02:45:23 [debug ] Processing heartbeat
hostname=airflow-worker-1.airflow-worker.de-services.svc.cluster.local
pid=155023 ti_id=019cd518-d7c9-7e7e-bde2-efc6322e36a3
[2026-03-10T02:45:23.575+0000] {exceptions.py:77} ERROR - Error with id 9zBmdizJ
File
"/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File
"/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line
75, in app
response = await f(request)
^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/fastapi/routing.py",
line 302, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/fastapi/routing.py",
line 213, in run_endpoint_function
return await dependant.call(**values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/structure/versions.py",
line 474, in decorator
response = await self._convert_endpoint_response_to_version(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/structure/versions.py",
line 520, in _convert_endpoint_response_to_version
response_or_response_body: Union[FastapiResponse, object] = await
run_in_threadpool(
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py",
line 38, in run_in_threadpool
return await anyio.to_thread.run_sync(func)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/anyio/to_thread.py",
line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py",
line 2476, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py",
line 967, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/schema_generation.py",
line 515, in __call__
return self._original_callable(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/api_fastapi/execution_api/routes/xcoms.py",
line 419, in set_xcom
session.flush()
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3449, in flush
self._flush(objects)
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3588, in _flush
with util.safe_reraise():
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3549, in _flush
flush_context.execute()
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
rec.execute(self)
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 630, in execute
util.preloaded.orm_persistence.save_obj(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 245, in save_obj
_emit_insert_statements(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 1097, in _emit_insert_statements
c = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1953, in _execute_context
self._handle_dbapi_exception(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 2134, in _handle_dbapi_exception
util.raise_(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1910, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
File "/home/airflow/.local/lib/python3.12/site-packages/MySQLdb/cursors.py",
line 179, in execute
res = self._query(mogrified_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/MySQLdb/cursors.py",
line 330, in _query
db.query(q)
File
"/home/airflow/.local/lib/python3.12/site-packages/MySQLdb/connections.py",
line 280, in query
_mysql.connection.query(self, query)
2026-03-10 02:45:23 [debug ] Heartbeat updated state=running
ti_id=019cd518-d7c9-7e7e-bde2-efc6322e36a3
2026-03-10 02:45:23 [debug ] Heartbeat updated state=running
ti_id=019cd54c-28ad-7db0-b0f8-d64ed0916d78
2026-03-10 02:45:23 [debug ] Retrieved current task state
2. Exact Airflow failure is not sure but from running it transition to failed
and this for all the task either it is a bashoperator,
speciesparkkubernetesoperator etc. And the specific time period only we see
this issue around 2:30 am - 3:45 am UTC
3. Spark Kube operator task config
load_track =
SparkKubernetesOperator(random_name_suffix=False,get_logs=False,delete_on_termination=False,
task_id=f'load_track_{group_value}',
namespace="dataengineering",
kubernetes_conn_id = 'kubernetes_default',
application_file=open(f"{ODS_LOAD_PATH}/load_to_snowflake/yamls/segment_sf_load_{group_value}.yaml").read(),
#application_file=open(f"{ETL_PATH}/segment_sf_load_{group_value}.yaml").read(),
do_xcom_push=False,
startup_timeout_seconds=2400,
on_failure_callback=custom_failure_function,
trigger_rule='none_failed_min_one_success',
dag=dag
)
**this is xcom related task**
load_track =
SparkKubernetesOperator(random_name_suffix=False,get_logs=False,delete_on_termination=False,
task_id=f'load_track_{group_value}',
namespace="dataengineering",
kubernetes_conn_id = 'kubernetes_default',
application_file=open(f"{ODS_LOAD_PATH}/load_to_snowflake/yamls/segment_sf_load_{group_value}.yaml").read(),
#application_file=open(f"{ETL_PATH}/segment_sf_load_{group_value}.yaml").read(),
do_xcom_push=False,
startup_timeout_seconds=2400,
on_failure_callback=custom_failure_function,
trigger_rule='none_failed_min_one_success',
dag=dag
)
GitHub link:
https://github.com/apache/airflow/discussions/63298#discussioncomment-16091804
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]