ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2791325362


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -352,32 +353,37 @@ def _get_team_names_for_dag_ids(
             # Return dict with all None values to ensure graceful degradation
             return {}
 
-    def _get_task_team_name(self, task_instance: TaskInstance, session: 
Session) -> str | None:
+    def _get_workload_team_name(self, workload: SchedulerWorkload, session: 
Session) -> str | None:
         """
-        Resolve team name for a task instance using the DAG > Bundle > Team 
relationship chain.
+        Resolve team name for a workload using the DAG > Bundle > Team 
relationship chain.
 
-        TaskInstance > DagModel (via dag_id) > DagBundleModel (via 
bundle_name) > Team
+        SchedulerWorkload > DagModel (via dag_id) > DagBundleModel (via 
bundle_name) > Team
 
-        :param task_instance: The TaskInstance to resolve team name for
+        :param workload: The SchedulerWorkload to resolve team name for
         :param session: Database session for queries
         :return: Team name if found or None
         """
+        dag_id = workload.get_dag_id()
+        if dag_id is None:
+            self.log.debug("Workload %s has no dag_id, cannot resolve team 
name", workload)

Review Comment:
   It'll get executed fine, but it falls back on the global executor.  Maybe it 
warrants a `warning`?  An `error` feels heavy-handed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to