avyuktsoni0731 commented on code in PR #45158:
URL: https://github.com/apache/airflow/pull/45158#discussion_r1895326078


##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -397,6 +397,44 @@ In the above example, the trigger will end the task 
instance directly if ``end_f
 .. note::
     Exiting from the trigger works only when listeners are not integrated for 
the deferrable operator. Currently, when deferrable operator has the 
``end_from_trigger`` attribute set to ``True`` and listeners are integrated it 
raises an exception during parsing to indicate this limitation. While writing 
the custom trigger, ensure that the trigger is not set to end the task instance 
directly if the listeners are added from plugins. If the ``end_from_trigger`` 
attribute is changed to different attribute by author of trigger, the DAG 
parsing would not raise any exception and the listeners dependent on this task 
would not work. This limitation will be addressed in future releases.
 
+Handling XComs for Deferred Tasks
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+When working with deferred tasks that exit directly from triggers, you may 
need to push XCom values for subsequent tasks in the pipeline. The method 
``_push_xcoms_if_necessary`` is responsible for pushing these values. Below is 
an example of how this can be implemented:
+
+.. code-block:: python
+
+    def _push_xcoms_if_necessary(self, *, task_instance: TaskInstance) -> None:
+        """
+        Push XComs if required based on the task's state and the provided 
events.
+        """
+        if task_instance.state == TaskInstanceState.SUCCESS:
+            task_instance.xcom_push(
+                key="result", value={"status": "success", "message": "Task 
completed successfully"}
+            )
+        elif task_instance.state == TaskInstanceState.FAILED:
+            task_instance.xcom_push(key="result", value={"status": "failure", 
"message": "Task failed"})
+
+You can call this method within your trigger to manage XComs when the task 
instance ends directly from the trigger. Here's an example:
+
+.. code-block:: python
+
+    class WaitFiveHourTrigger(BaseTrigger):
+        def __init__(self, duration: timedelta, *, end_from_trigger: bool = 
False):
+            super().__init__()
+            self.duration = duration
+            self.end_from_trigger = end_from_trigger
+
+        async def run(self) -> AsyncIterator[TriggerEvent]:
+            await asyncio.sleep(self.duration.total_seconds())
+            if self.end_from_trigger:
+                task_instance = ...  # Get the relevant TaskInstance

Review Comment:
   Right, let me look into this again, and try to make the necessary changes 
that @tirkarthi suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to