ashb commented on code in PR #67217:
URL: https://github.com/apache/airflow/pull/67217#discussion_r3290948971


##########
airflow-core/src/airflow/executors/workloads/task.py:
##########
@@ -118,9 +118,13 @@ def make(
 
         ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
         if not bundle_info:
+            version_data = None
+            if ti.dag_version is not None:
+                version_data = ti.dag_version.version_data

Review Comment:
   Why of this read off ti, but other things just below are ti.dag_model.bundle*



##########
airflow-core/src/airflow/executors/workloads/types.py:
##########
@@ -41,6 +41,4 @@
 def state_class_for_key(key: WorkloadKey) -> type[TaskInstanceState] | 
type[CallbackState]:
     if isinstance(key, TaskInstanceKey):
         return TaskInstanceState
-    if isinstance(key, CallbackKey):
-        return CallbackState
-    raise TypeError(f"Unknown workload key type: {type(key)!r}")
+    return CallbackState

Review Comment:
   Unrelated change? Bad rebase?



##########
providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml:
##########
@@ -959,6 +959,12 @@ components:
           - type: string
           - type: 'null'
           title: Version
+        version_data:
+          anyOf:
+          - additionalProperties: true
+            type: object
+          - type: 'null'

Review Comment:
   If the field is present it should be an object. 
   
   (I.e. its worker not sent, or its an `{...}` object )



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -648,6 +648,7 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
                     ranked_query.c.map_index_for_ordering,
                 )
                 .options(selectinload(TI.dag_model))
+                .options(selectinload(TI.dag_version))

Review Comment:
   Hmm, joins aren't free, and this isn't used for most places. 
   
   I'm wondering if this needs to be based on what the bundle backend needs 
somehow?



##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -885,7 +885,7 @@ def test_celery_tasks_registered_on_import():
         )
 
 
[email protected](not AIRFLOW_V_3_2_PLUS, reason="ExecuteCallback requires 
Airflow 3.2+")
[email protected](not AIRFLOW_V_3_3_PLUS, reason="CallbackKey dataclass 
requires Airflow 3.3+")

Review Comment:
   Unrelated change to this pr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to