patgarz opened a new issue, #32158:
URL: https://github.com/apache/airflow/issues/32158

   ### Apache Airflow version
   
   2.6.2
   
   ### What happened
   
   Defining `on_*_callbacks` in Airflow 2.6.0+ using `functools.partial` (for 
example, to define a callable with args) causes unexpected behavior. Most 
notably, defining an `on_success_callback` that fails causes the process to 
crash(? I think), causing a failing heartbeat and marking the task as a zombie 
job: `ERROR - Detected zombie job`.
   
   For `on_success_callback`, this most notably causes tasks to be marked as 
failure even though it was the callback that failed. However, this issue also 
persists in other erratic ways (likely due to the zombie job):
   
   1. Logging presents a strange message, `'functools.partial' object has no 
attribute '__module__'`, rather than the actual callback exception
   2. Configuring `on_failure_callback` as well as a partial-defined, failing 
`on_success_callback` causes the `on_failure_callback` to fire when the 
`on_success_callback` fails
   3. Configuring `on_failure_callback` as a `partial` and a failing 
`email_on_failure` causes the failure callback to continuously trigger on a 
loop and the DAGRun never exits
   4. HTTP logs cannot be retrieved
   
   ### What you think should happen instead
   
   When using Airflow <=2.5.3, defining a callback with or without partial is 
identical. Likewise, defining a function in 2.6.0+ without `partial` causes the 
callback to process as expected:
   
   - If the callback fails, the task status matches the task execution, not the 
callback status
   - Callbacks do not initiate retries
   - Exception text matches the callback exception
   
   ### How to reproduce
   
   Sample DAG. Remove/comment/etc. various pieces to see how they interact 
(partials, callbacks, email_on_failure, etc).
   
   ```py
   from datetime import timedelta
   from functools import partial
   import time
   from airflow.decorators import dag, task
   import pendulum
   import logging
   
   
   def my_callback(context):
       logging.info("success callback")
       raise Exception("exception text")
   
   
   default_args = {
       "on_success_callback": partial(my_callback),
       "on_failure_callback": partial(my_callback),
       "email_on_failure": True,
       "email": "whatever@whatever.whatever",
   }
   
   
   @dag(
       dag_id="Start_and_Wait",
       schedule=None,
       start_date=pendulum.datetime(2022, 11, 15, tz="UTC"),
       catchup=False,
       default_args=default_args,
   )
   def wait_flow():
       @task
       def wait():
           time.sleep(1)
   
       @task
       def log():
           logging.info("done!")
   
       wait() >> log()
   
   
   wait_flow()
   
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   Experiencing issue in apache-airflow 2.6.0+
   Not experiencing issue in apache-airflow <=2.5.3
   
   Recommended / upstream constraints file in all cases
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   Presents in both LocalExecutor and CeleryExecutor
   
   ### Anything else
   
   Expected execution in 2.5.3:
   ```txt
   c336145d4ae9
   *** Reading local file: 
/usr/local/airflow/shared/logs/dag_id=Start_and_Wait/run_id=manual__2023-06-26T20:07:31.314971+00:00/task_id=wait/attempt=1.log
   [2023-06-26, 20:07:31 UTC] {taskinstance.py:1090} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: Start_and_Wait.wait 
manual__2023-06-26T20:07:31.314971+00:00 [queued]>
   [2023-06-26, 20:07:31 UTC] {taskinstance.py:1090} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: Start_and_Wait.wait 
manual__2023-06-26T20:07:31.314971+00:00 [queued]>
   [2023-06-26, 20:07:31 UTC] {taskinstance.py:1288} INFO - 
   
--------------------------------------------------------------------------------
   [2023-06-26, 20:07:31 UTC] {taskinstance.py:1289} INFO - Starting attempt 1 
of 1
   [2023-06-26, 20:07:31 UTC] {taskinstance.py:1290} INFO - 
   
--------------------------------------------------------------------------------
   [2023-06-26, 20:07:31 UTC] {taskinstance.py:1309} INFO - Executing 
<Task(_PythonDecoratedOperator): wait> on 2023-06-26 20:07:31.314971+00:00
   [2023-06-26, 20:07:31 UTC] {standard_task_runner.py:55} INFO - Started 
process 901 to run task
   [2023-06-26, 20:07:31 UTC] {standard_task_runner.py:82} INFO - Running: 
['***', 'tasks', 'run', 'Start_and_Wait', 'wait', 
'manual__2023-06-26T20:07:31.314971+00:00', '--job-id', '12', '--raw', 
'--subdir', 'DAGS_FOLDER/_local/wait.py', '--cfg-path', '/tmp/tmpmm0tra8e']
   [2023-06-26, 20:07:31 UTC] {standard_task_runner.py:83} INFO - Job 12: 
Subtask wait
   [2023-06-26, 20:07:32 UTC] {task_command.py:389} INFO - Running 
<TaskInstance: Start_and_Wait.wait manual__2023-06-26T20:07:31.314971+00:00 
[running]> on host c336145d4ae9
   [2023-06-26, 20:07:32 UTC] {taskinstance.py:1516} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_EMAIL=***
   AIRFLOW_CTX_DAG_OWNER=***
   AIRFLOW_CTX_DAG_ID=Start_and_Wait
   AIRFLOW_CTX_TASK_ID=wait
   AIRFLOW_CTX_EXECUTION_DATE=2023-06-26T20:07:31.314971+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2023-06-26T20:07:31.314971+00:00
   [2023-06-26, 20:07:33 UTC] {python.py:177} INFO - Done. Returned value was: 
None
   [2023-06-26, 20:07:33 UTC] {taskinstance.py:1327} INFO - Marking task as 
SUCCESS. dag_id=Start_and_Wait, task_id=wait, execution_date=20230626T200731, 
start_date=20230626T200731, end_date=20230626T200733
   [2023-06-26, 20:07:33 UTC] {wait.py:9} INFO - success callback
   [2023-06-26, 20:07:33 UTC] {taskinstance.py:1544} ERROR - Error when 
executing on_success callback
   Traceback (most recent call last):
     File 
"/app/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1542, in _run_finished_callback
       callback(context)
     File "/usr/local/airflow/dags/_local/wait.py", line 10, in my_callback
       raise Exception("exception text")
   Exception: exception text
   [2023-06-26, 20:07:33 UTC] {local_task_job.py:212} INFO - Task exited with 
return code 0
   [2023-06-26, 20:07:33 UTC] {taskinstance.py:2596} INFO - 1 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   In 2.6.0+ (task marked as failed by scheduler? regardless of log here saying 
marked success):
   ```txt
   5c688d484e65
   *** Found local files:
   ***   * 
/usr/local/airflow/shared/logs/dag_id=Start_and_Wait/run_id=manual__2023-06-26T20:04:44.289200+00:00/task_id=wait/attempt=1.log
   *** Could not read served logs: timed out
   [2023-06-26, 20:04:45 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: Start_and_Wait.wait 
manual__2023-06-26T20:04:44.289200+00:00 [queued]>
   [2023-06-26, 20:04:45 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: Start_and_Wait.wait 
manual__2023-06-26T20:04:44.289200+00:00 [queued]>
   [2023-06-26, 20:04:45 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-06-26, 20:04:45 UTC] {taskinstance.py:1327} INFO - Executing 
<Task(_PythonDecoratedOperator): wait> on 2023-06-26 20:04:44.289200+00:00
   [2023-06-26, 20:04:45 UTC] {standard_task_runner.py:57} INFO - Started 
process 786 to run task
   [2023-06-26, 20:04:45 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'Start_and_Wait', 'wait', 
'manual__2023-06-26T20:04:44.289200+00:00', '--job-id', '14', '--raw', 
'--subdir', 'DAGS_FOLDER/_local/wait.py', '--cfg-path', '/tmp/tmpx798o7bj']
   [2023-06-26, 20:04:45 UTC] {standard_task_runner.py:85} INFO - Job 14: 
Subtask wait
   [2023-06-26, 20:04:45 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: Start_and_Wait.wait manual__2023-06-26T20:04:44.289200+00:00 
[running]> on host 5c688d484e65
   [2023-06-26, 20:04:45 UTC] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_EMAIL='pri...@capitalone.com' AIRFLOW_CTX_DAG_OWNER='***' 
AIRFLOW_CTX_DAG_ID='Start_and_Wait' AIRFLOW_CTX_TASK_ID='wait' 
AIRFLOW_CTX_EXECUTION_DATE='2023-06-26T20:04:44.289200+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-26T20:04:44.289200+00:00'
   [2023-06-26, 20:04:46 UTC] {python.py:183} INFO - Done. Returned value was: 
None
   [2023-06-26, 20:04:46 UTC] {taskinstance.py:1345} INFO - Marking task as 
SUCCESS. dag_id=Start_and_Wait, task_id=wait, execution_date=20230626T200444, 
start_date=20230626T200445, end_date=20230626T200446
   [2023-06-26, 20:04:46 UTC] {wait.py:9} INFO - success callback
   [2023-06-26, 20:04:46 UTC] {standard_task_runner.py:104} ERROR - Failed to 
execute job 14 for task wait ('functools.partial' object has no attribute 
'__module__'; 786)
   [2023-06-26, 20:04:46 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 1
   [2023-06-26, 20:04:46 UTC] {taskinstance.py:2651} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   Server logs:
   ```txt
   ***-local-webserver-local-1  | [2023-06-26T20:29:28.371+0000] 
{scheduler_job_runner.py:1688} WARNING - Failing (1) jobs without heartbeat 
after 2023-06-26 20:24:28.364681+00:00
   ***-local-webserver-local-1  | [2023-06-26T20:29:28.371+0000] 
{scheduler_job_runner.py:1698} ERROR - Detected zombie job: {'full_filepath': 
'/usr/local/airflow/dags/_local/wait.py', 'processor_subdir': 
'/usr/local/airflow/dags', 'msg': "{'DAG Id': 'Start_and_Wait', 'Task Id': 
'wait', 'Run Id': 'manual__2023-06-26T20:29:06.100360+00:00', 'Hostname': 
'ee1eef189157'}", 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7fa1e59eaac0>, 
'is_failure_callback': True}
   ```
   
   Failure to fetch logfile via HTTP:
   ```txt
   ***-local-webserver-local-1  | 172.23.0.1 - - [26/Jun/2023:20:15:14 +0000] 
"GET 
/api/v1/dags/Start_and_Wait/dagRuns/manual__2023-06-26T20:14:45.191590+00:00/taskInstances/wait/logs/1?full_content=false
 HTTP/1.1" 200 2618 
"http://localhost:8080/dags/Start_and_Wait/grid?tab=logs&dag_run_id=manual__2023-06-26T20%3A14%3A45.191590%2B00%3A00&task_id=wait";
 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, 
like Gecko) Chrome/114.0.0.0 Safari/537.36"
   ***-local-webserver-local-1  | [2023-06-26T20:15:14.800+0000] 
{file_task_handler.py:522} ERROR - Could not read served logs
   ***-local-webserver-local-1  | Traceback (most recent call last):
   ***-local-webserver-local-1  |   File 
"/app/.local/lib/python3.9/site-packages/httpcore/_exceptions.py", line 10, in 
map_exceptions
   ***-local-webserver-local-1  |     yield
   ***-local-webserver-local-1  |   File 
"/app/.local/lib/python3.9/site-packages/httpcore/backends/sync.py", line 28, 
in read
   ***-local-webserver-local-1  |     return self._sock.recv(max_bytes)
   ***-local-webserver-local-1  | socket.timeout: timed out
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to