ferruzzi commented on code in PR #63035:
URL: https://github.com/apache/airflow/pull/63035#discussion_r2914876060


##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/utils.py:
##########
@@ -66,5 +65,5 @@ class AllLambdaConfigKeys(InvokeLambdaKwargsConfigKeys):
     END_WAIT_TIMEOUT = "end_wait_timeout"
 
 
-CommandType = Sequence[str]
+CommandType = Sequence[Any]

Review Comment:
   Can we narrow this down any?



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/utils.py:
##########
@@ -37,9 +36,9 @@
 
 @dataclass
 class LambdaQueuedTask:
-    """Represents a Lambda task that is queued. The task will be run in the 
next heartbeat."""
+    """Represents a Lambda workload that is queued. The task will be run in 
the next heartbeat."""
 
-    key: TaskInstanceKey
+    key: TaskInstanceKey | str

Review Comment:
   Please use `WorkloadKey` here from 
`airflow-core/src/airflow/executors/workloads/types.py`



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -461,6 +517,8 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
         """
         Adopt task instances which have an external_executor_id (the 
serialized task key).
 
+        The external_executor_id may contain either a serialized 
TaskInstanceKey or a callback identifier string.

Review Comment:
   WorkloadKey?



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -71,7 +76,10 @@ class AwsLambdaExecutor(BaseExecutor):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         self.pending_tasks: deque = deque()
-        self.running_tasks: dict[str, TaskInstanceKey] = {}
+        self.running_tasks: dict[str, TaskInstanceKey | str] = {}

Review Comment:
   Same as below, `TaskInstanceKey | str`  should be 
`executors.workloads.types.WorkloadKey` instead



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -179,9 +187,9 @@ def load_connections(self, check_connection: bool = True):
 
     def sync(self):
         """
-        Sync the executor with the current state of tasks.
+        Sync the executor with the current state of workloads.
 
-        Check in on currently running tasks and attempt to run any new tasks 
that have been queued.
+        Check in on currently running workloads and attempt to run any new 
workloads that have been queued.

Review Comment:
   Here and elsewhere:  As Niko pointed out in one of my recent PRs, the user 
may not be familiar with the term `workload` yet so it would be beneficial to 
expand it a little so we can teach them.  Consider:
   
   
   ```suggestion
           Check in on currently running tasks and callbacks and attempt to run 
any new workloads that have been queued.
   ```
   
   That way we're kind of teaching them what a `workload` is without making 
them go look it up.



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -205,55 +213,97 @@ def sync(self):
             self.log.exception("An error occurred while syncing tasks")
 
     def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
-        from airflow.executors import workloads
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+            return
+
+        if AIRFLOW_V_3_2_PLUS and isinstance(workload, 
workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+            return
+
+        raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
+
+    def _process_workloads(self, workload_items: Sequence[workloads.All]) -> 
None:
+
+        for w in workload_items:
+            key: TaskInstanceKey | str
+            command: list[workloads.All]
+            queue: str | None
+            if isinstance(w, workloads.ExecuteTask):
+                command = [w]
+                key = w.ti.key
+                queue = w.ti.queue
+                executor_config = w.ti.executor_config or {}
+
+                del self.queued_tasks[key]
 
-    def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
-        from airflow.executors.workloads import ExecuteTask
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                    executor_config=executor_config,
+                )
+
+                self.running.add(key)
+                continue
 
-        for w in workloads:
-            if not isinstance(w, ExecuteTask):
-                raise RuntimeError(f"{type(self)} cannot handle workloads of 
type {type(w)}")
+            if AIRFLOW_V_3_2_PLUS and isinstance(w, workloads.ExecuteCallback):
+                command = [w]
+                key = w.callback.id
+                queue = None
 
-            command = [w]
-            key = w.ti.key
-            queue = w.ti.queue
-            executor_config = w.ti.executor_config or {}
+                if isinstance(w.callback.data, dict) and "queue" in 
w.callback.data:
+                    queue = w.callback.data["queue"]
 
-            del self.queued_tasks[key]
-            self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)  # type: ignore[arg-type]
-            self.running.add(key)
+                del self.queued_callbacks[key]
 
-    def execute_async(self, key: TaskInstanceKey, command: CommandType, 
queue=None, executor_config=None):
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                )
+
+                self.running.add(key)
+                continue
+
+            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(w)}")
+
+    def execute_async(
+        self,
+        key: TaskInstanceKey | str,
+        command: CommandType | Sequence[workloads.All],
+        queue=None,
+        executor_config=None,
+    ):
         """
-        Save the task to be executed in the next sync by inserting the 
commands into a queue.
+        Save the workload to be executed in the next sync by inserting the 
commands into a queue.
 
-        :param key: A unique task key (typically a tuple identifying the task 
instance).
+        :param key: Unique workload key. Task workloads use TaskInstanceKey, 
callback workloads use a string id.
         :param command: The shell command string to execute.
         :param executor_config:  (Unused) to keep the same signature as the 
base.
         :param queue: (Unused) to keep the same signature as the base.
         """
         if len(command) == 1:
-            from airflow.executors.workloads import ExecuteTask
-
-            if isinstance(command[0], ExecuteTask):
-                workload = command[0]
-                ser_input = workload.model_dump_json()
-                command = [
-                    "python",
-                    "-m",
-                    "airflow.sdk.execution_time.execute_workload",
-                    "--json-string",
-                    ser_input,
-                ]
+            if AIRFLOW_V_3_2_PLUS:
+                if not isinstance(command[0], (workloads.ExecuteTask, 
workloads.ExecuteCallback)):
+                    raise RuntimeError(f"{type(self)} cannot handle workloads 
of type {type(command[0])}")
             else:
-                raise RuntimeError(
-                    f"LambdaExecutor doesn't know how to handle workload of 
type: {type(command[0])}"
-                )
+                if not isinstance(command[0], workloads.ExecuteTask):
+                    raise RuntimeError(f"{type(self)} cannot handle workloads 
of type {type(command[0])}")
+
+            workload = command[0]

Review Comment:
   Consider moving this `workload = command[0]` to the top of the method and 
using `workload` throughout the method, it would clean things up a bit.



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -205,55 +213,97 @@ def sync(self):
             self.log.exception("An error occurred while syncing tasks")
 
     def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
-        from airflow.executors import workloads
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+            return
+
+        if AIRFLOW_V_3_2_PLUS and isinstance(workload, 
workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+            return
+
+        raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
+
+    def _process_workloads(self, workload_items: Sequence[workloads.All]) -> 
None:
+
+        for w in workload_items:
+            key: TaskInstanceKey | str
+            command: list[workloads.All]
+            queue: str | None
+            if isinstance(w, workloads.ExecuteTask):
+                command = [w]
+                key = w.ti.key
+                queue = w.ti.queue
+                executor_config = w.ti.executor_config or {}
+
+                del self.queued_tasks[key]
 
-    def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
-        from airflow.executors.workloads import ExecuteTask
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                    executor_config=executor_config,
+                )
+
+                self.running.add(key)
+                continue
 
-        for w in workloads:
-            if not isinstance(w, ExecuteTask):
-                raise RuntimeError(f"{type(self)} cannot handle workloads of 
type {type(w)}")
+            if AIRFLOW_V_3_2_PLUS and isinstance(w, workloads.ExecuteCallback):
+                command = [w]
+                key = w.callback.id
+                queue = None
 
-            command = [w]
-            key = w.ti.key
-            queue = w.ti.queue
-            executor_config = w.ti.executor_config or {}
+                if isinstance(w.callback.data, dict) and "queue" in 
w.callback.data:
+                    queue = w.callback.data["queue"]
 
-            del self.queued_tasks[key]
-            self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)  # type: ignore[arg-type]
-            self.running.add(key)
+                del self.queued_callbacks[key]
 
-    def execute_async(self, key: TaskInstanceKey, command: CommandType, 
queue=None, executor_config=None):
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                )
+
+                self.running.add(key)
+                continue
+
+            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(w)}")
+
+    def execute_async(
+        self,
+        key: TaskInstanceKey | str,
+        command: CommandType | Sequence[workloads.All],
+        queue=None,
+        executor_config=None,
+    ):
         """
-        Save the task to be executed in the next sync by inserting the 
commands into a queue.
+        Save the workload to be executed in the next sync by inserting the 
commands into a queue.
 
-        :param key: A unique task key (typically a tuple identifying the task 
instance).
+        :param key: Unique workload key. Task workloads use TaskInstanceKey, 
callback workloads use a string id.
         :param command: The shell command string to execute.
         :param executor_config:  (Unused) to keep the same signature as the 
base.
         :param queue: (Unused) to keep the same signature as the base.
         """
         if len(command) == 1:
-            from airflow.executors.workloads import ExecuteTask
-
-            if isinstance(command[0], ExecuteTask):
-                workload = command[0]
-                ser_input = workload.model_dump_json()
-                command = [
-                    "python",
-                    "-m",
-                    "airflow.sdk.execution_time.execute_workload",
-                    "--json-string",
-                    ser_input,
-                ]
+            if AIRFLOW_V_3_2_PLUS:
+                if not isinstance(command[0], (workloads.ExecuteTask, 
workloads.ExecuteCallback)):

Review Comment:
   can this be `if not isinstance(command[0], SchedulerWorkload)`, using the 
SchedulerWorkload from executors/workloads/types.py?  If not, then leave this 
for now, @anishgirianish has a really cool PR in  he works to simplify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to