mandeepzemo commented on issue #54332:
URL: https://github.com/apache/airflow/issues/54332#issuecomment-3199407809

   I tried reproducing this issue using the below plugin code in my local 
Airflow setup:
   
   **plugins/debug_listener.py**
   ```
   import logging
   from typing import Optional
   from airflow.listeners import hookimpl
   from airflow.utils.state import TaskInstanceState
   
   logger = logging.getLogger(__name__)
   
   class DebugTaskListener:
       
       @hookimpl
       def on_task_instance_running(
           self, 
           previous_state: Optional[TaskInstanceState], 
           task_instance
       ) -> None:
           try:
               task_id = getattr(task_instance, 'task_id', 'unknown')
               dag_id = getattr(getattr(task_instance, 'dag', None), 'dag_id', 
'unknown')
               print(f"\n\n[DEBUG LISTENER] Task {task_id} in DAG {dag_id} is 
now RUNNING (was: {previous_state or 'N/A'})\n")
               logger.info(
                   "[DEBUG LISTENER] Task %s in DAG %s is now RUNNING (was: 
%s)",
                   task_id,
                   dag_id,
                   previous_state or "N/A"
               )
           except Exception as e:
               print(f"\n\n[DEBUG LISTENER ERROR] {str(e)}\n")
               logger.exception("Error in debug listener (running): %s", str(e))
   
       @hookimpl
       def on_task_instance_success(
           self, 
           previous_state: Optional[TaskInstanceState], 
           task_instance
       ) -> None:
           try:
               task_id = getattr(task_instance, 'task_id', 'unknown')
               dag_id = getattr(getattr(task_instance, 'dag', None), 'dag_id', 
'unknown')
               print(f"\n\n[DEBUG LISTENER] Task {task_id} in DAG {dag_id} 
SUCCEEDED (was: {previous_state or 'N/A'})\n")
               logger.info(
                   "[DEBUG LISTENER] MAIN Task %s in DAG %s SUCCEEDED (was: 
%s)",
                   task_id,
                   dag_id,
                   previous_state or "N/A"
               )
           except Exception as e:
               print(f"\n\n[DEBUG LISTENER ERROR] {str(e)}\n")
               logger.exception("Error in debug listener (success): %s", str(e))
   
   debug_task_listener = DebugTaskListener()
   ```
   
   
   **plugins/debug_plugin.py**
   ```
   from __future__ import annotations
   import logging
   from airflow.plugins_manager import AirflowPlugin
   
   from debug_listener import debug_task_listener
   
   logger = logging.getLogger(__name__)
   
   class DebugPlugin(AirflowPlugin):    
       name = "debug_plugin"    
       def __init__(self):
           logger.info("Initializing DebugPlugin")
           try:
               from airflow.listeners.listener import get_listener_manager
               
               logger.info("Getting listener manager...")
               listener_manager = get_listener_manager()
               logger.info("Registering debug listener...")
               listener_manager.add_listener(debug_task_listener)
               logger.info("Debug listener registered successfully")
               
           except Exception as e:
               logger.exception("Failed to initialize DebugPlugin: %s", str(e))
               raise
               
           super().__init__()
   print("\n\n[DEBUG PLUGIN] DebugPlugin has been loaded!\n\n")
   ```
   
   
   **dags/debug_listener_test.py**
   ```
   import logging
   import sys
   from datetime import datetime
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.listeners.listener import get_listener_manager
   
   logging.basicConfig(
       level=logging.INFO,
       format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
       stream=sys.stdout
   )
   logger = logging.getLogger(__name__)
   
   def log_listeners():
       try:
           lm = get_listener_manager()
           logger.info("=== LISTENER MANAGER INFO ===")
           logger.info("Has listeners: %s", lm.has_listeners)
           
           if hasattr(lm, 'pm') and hasattr(lm.pm, 'get_plugins'):
               plugins = lm.pm.get_plugins()
               logger.info("Registered plugins: %s", [type(p).__name__ for p in 
plugins])
               
               for plugin in plugins:
                   if hasattr(plugin, 'on_task_instance_running'):
                       logger.info("Found task instance running listener: %s", 
type(plugin).__name__)
       except Exception as e:
           logger.exception("Error checking listeners: %s", str(e))
   
   def test_task():
       """Test function that verifies the listener is working."""
       logger.info("\n=== TASK IS EXECUTING ===\n")
       
       log_listeners()
       
       return "Task completed successfully"
   
   logger.info("\n=== DAG IS BEING PARSED ===\n")
   log_listeners()
   
   with DAG(
       'debug_listener_test',
       default_args={
           'owner': 'airflow',
           'start_date': datetime(2023, 1, 1),
       },
       description='Test DAG for debugging task state change listeners',
       catchup=False,
       tags=['debug'],
   ) as dag:
   
       test_task = PythonOperator(
           task_id='debug_test_task',
           python_callable=test_task,
       )
   
       test_task
   ```
   
   Output: 
   
   <img width="1465" height="621" alt="Image" 
src="https://github.com/user-attachments/assets/95aa4642-4648-4232-807b-2db2fae4746c";
 />
   
   The listener was working properly, and I was able to see the success logs 
being printed.
   
   Based on my testing, it looks like this issue has already been resolved in 
the latest Airflow release. @plovegro  Please confirm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to