mandeepzemo commented on issue #54332: URL: https://github.com/apache/airflow/issues/54332#issuecomment-3199407809
I tried reproducing this issue using the below plugin code in my local Airflow setup: **plugins/debug_listener.py** ``` import logging from typing import Optional from airflow.listeners import hookimpl from airflow.utils.state import TaskInstanceState logger = logging.getLogger(__name__) class DebugTaskListener: @hookimpl def on_task_instance_running( self, previous_state: Optional[TaskInstanceState], task_instance ) -> None: try: task_id = getattr(task_instance, 'task_id', 'unknown') dag_id = getattr(getattr(task_instance, 'dag', None), 'dag_id', 'unknown') print(f"\n\n[DEBUG LISTENER] Task {task_id} in DAG {dag_id} is now RUNNING (was: {previous_state or 'N/A'})\n") logger.info( "[DEBUG LISTENER] Task %s in DAG %s is now RUNNING (was: %s)", task_id, dag_id, previous_state or "N/A" ) except Exception as e: print(f"\n\n[DEBUG LISTENER ERROR] {str(e)}\n") logger.exception("Error in debug listener (running): %s", str(e)) @hookimpl def on_task_instance_success( self, previous_state: Optional[TaskInstanceState], task_instance ) -> None: try: task_id = getattr(task_instance, 'task_id', 'unknown') dag_id = getattr(getattr(task_instance, 'dag', None), 'dag_id', 'unknown') print(f"\n\n[DEBUG LISTENER] Task {task_id} in DAG {dag_id} SUCCEEDED (was: {previous_state or 'N/A'})\n") logger.info( "[DEBUG LISTENER] MAIN Task %s in DAG %s SUCCEEDED (was: %s)", task_id, dag_id, previous_state or "N/A" ) except Exception as e: print(f"\n\n[DEBUG LISTENER ERROR] {str(e)}\n") logger.exception("Error in debug listener (success): %s", str(e)) debug_task_listener = DebugTaskListener() ``` **plugins/debug_plugin.py** ``` from __future__ import annotations import logging from airflow.plugins_manager import AirflowPlugin from debug_listener import debug_task_listener logger = logging.getLogger(__name__) class DebugPlugin(AirflowPlugin): name = "debug_plugin" def __init__(self): logger.info("Initializing DebugPlugin") try: from airflow.listeners.listener import get_listener_manager logger.info("Getting listener manager...") listener_manager = get_listener_manager() logger.info("Registering debug listener...") listener_manager.add_listener(debug_task_listener) logger.info("Debug listener registered successfully") except Exception as e: logger.exception("Failed to initialize DebugPlugin: %s", str(e)) raise super().__init__() print("\n\n[DEBUG PLUGIN] DebugPlugin has been loaded!\n\n") ``` **dags/debug_listener_test.py** ``` import logging import sys from datetime import datetime from airflow import DAG from airflow.operators.python import PythonOperator from airflow.listeners.listener import get_listener_manager logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout ) logger = logging.getLogger(__name__) def log_listeners(): try: lm = get_listener_manager() logger.info("=== LISTENER MANAGER INFO ===") logger.info("Has listeners: %s", lm.has_listeners) if hasattr(lm, 'pm') and hasattr(lm.pm, 'get_plugins'): plugins = lm.pm.get_plugins() logger.info("Registered plugins: %s", [type(p).__name__ for p in plugins]) for plugin in plugins: if hasattr(plugin, 'on_task_instance_running'): logger.info("Found task instance running listener: %s", type(plugin).__name__) except Exception as e: logger.exception("Error checking listeners: %s", str(e)) def test_task(): """Test function that verifies the listener is working.""" logger.info("\n=== TASK IS EXECUTING ===\n") log_listeners() return "Task completed successfully" logger.info("\n=== DAG IS BEING PARSED ===\n") log_listeners() with DAG( 'debug_listener_test', default_args={ 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), }, description='Test DAG for debugging task state change listeners', catchup=False, tags=['debug'], ) as dag: test_task = PythonOperator( task_id='debug_test_task', python_callable=test_task, ) test_task ``` Output: <img width="1465" height="621" alt="Image" src="https://github.com/user-attachments/assets/95aa4642-4648-4232-807b-2db2fae4746c" /> The listener was working properly, and I was able to see the success logs being printed. Based on my testing, it looks like this issue has already been resolved in the latest Airflow release. @plovegro Please confirm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org