tirkarthi opened a new issue, #47733:
URL: https://github.com/apache/airflow/issues/47733

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Triggerer crashes on trigger completion trying to upload logs. Caused by 
43deb993a0abbce87332792c29ae706e355d0b14 .
   
   ```
   [2025-03-13T20:39:41.425+0530] {cli_action_loggers.py:97} DEBUG - Calling 
callbacks: []
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", 
line 148, in _execute
       self.trigger_runner.run()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", 
line 325, in run
       self._service_subprocess(1)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 599, in _service_subprocess
   [2025-03-13 20:39:41 +0530] [5571] [INFO] Worker exiting (pid: 5571)
   [2025-03-13 20:39:41 +0530] [5570] [INFO] Worker exiting (pid: 5570)
       need_more = socket_handler(key.fileobj)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 971, in cb
       gen.send(line)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 488, in handle_requests
       self._handle_request(msg, log)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", 
line 312, in _handle_request
       factory.upload_to_remote()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", 
line 244, in upload_to_remote
       upload_to_remote(self.bound_logger)
   TypeError: upload_to_remote() missing 1 required positional argument: 
'log_meta_dict'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, 
in <module>
       sys.exit(main())
                ^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 
58, in main
       args.func(args)
     File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 
111, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/triggerer_command.py",
 line 65, in triggerer
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/triggerer_command.py",
 line 68, in <lambda>
       callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, 
triggerer_heartrate),
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/triggerer_command.py",
 line 54, in triggerer_run
       run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
342, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
371, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", 
line 158, in _execute
       self.trigger_runner.kill(escalation_delay=10, force=True)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 542, in kill
       exit_code := self._service_subprocess(max_wait_time=end - now, 
raise_on_timeout=False)
                    
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 599, in _service_subprocess
       need_more = socket_handler(key.fileobj)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 961, in cb
       gen.send(buffer)
   StopIteration
   [2025-03-13 20:39:41 +0530] [5569] [INFO] Shutting down: Master
   
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Run below dag and triggerer in a different tab.
   2. `touch /tmp/a` for the trigger to return an event.
   
   ```python
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow import DAG
   from airflow.models.baseoperator import BaseOperator
   
   # from airflow.operators.bash import BashOperator
   from airflow.triggers.file import FileTrigger
   
   
   class FileCheckOperator(BaseOperator):
       def __init__(self, filepath, **kwargs):
           self.filepath = filepath
           super().__init__(**kwargs)
   
       def execute(self, context):
           self.defer(
               trigger=FileTrigger(filepath=self.filepath),
               method_name="execute_complete",
           )
   
       def execute_complete(self, context, event=None):
           pass
   
   
   with DAG(
       dag_id="file_trigger",
       start_date=datetime(2021, 1, 1),
       catchup=False,
       schedule=None,
   ) as dag:
       t1 = FileCheckOperator(task_id="t1", filepath="/tmp/a")
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to