SameerMesiah97 commented on code in PR #63035:
URL: https://github.com/apache/airflow/pull/63035#discussion_r2928032513


##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -205,55 +213,97 @@ def sync(self):
             self.log.exception("An error occurred while syncing tasks")
 
     def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
-        from airflow.executors import workloads
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+            return
+
+        if AIRFLOW_V_3_2_PLUS and isinstance(workload, 
workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+            return
+
+        raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
+
+    def _process_workloads(self, workload_items: Sequence[workloads.All]) -> 
None:
+
+        for w in workload_items:
+            key: TaskInstanceKey | str
+            command: list[workloads.All]
+            queue: str | None
+            if isinstance(w, workloads.ExecuteTask):
+                command = [w]
+                key = w.ti.key
+                queue = w.ti.queue
+                executor_config = w.ti.executor_config or {}
+
+                del self.queued_tasks[key]
 
-    def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
-        from airflow.executors.workloads import ExecuteTask
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                    executor_config=executor_config,
+                )
+
+                self.running.add(key)
+                continue
 
-        for w in workloads:
-            if not isinstance(w, ExecuteTask):
-                raise RuntimeError(f"{type(self)} cannot handle workloads of 
type {type(w)}")
+            if AIRFLOW_V_3_2_PLUS and isinstance(w, workloads.ExecuteCallback):
+                command = [w]
+                key = w.callback.id
+                queue = None
 
-            command = [w]
-            key = w.ti.key
-            queue = w.ti.queue
-            executor_config = w.ti.executor_config or {}
+                if isinstance(w.callback.data, dict) and "queue" in 
w.callback.data:
+                    queue = w.callback.data["queue"]
 
-            del self.queued_tasks[key]
-            self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)  # type: ignore[arg-type]
-            self.running.add(key)
+                del self.queued_callbacks[key]
 
-    def execute_async(self, key: TaskInstanceKey, command: CommandType, 
queue=None, executor_config=None):
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                )
+
+                self.running.add(key)
+                continue
+
+            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(w)}")
+
+    def execute_async(
+        self,
+        key: TaskInstanceKey | str,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to