ashb commented on code in PR #45627:
URL: https://github.com/apache/airflow/pull/45627#discussion_r1914677553
##########
task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -171,6 +182,66 @@ def label(self) -> str | None:
return self.task_id[len(tg.node_id) + 1 :]
return self.task_id
+ @property
+ def priority_weight_total(self) -> int:
+ """
+ Total priority weight for the task. It might include all upstream or
downstream tasks.
+
+ Depending on the weight rule:
+
+ - WeightRule.ABSOLUTE - only own weight
+ - WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks
+ - WeightRule.UPSTREAM - adds priority weight of all upstream tasks
+ """
+ # TODO: This should live in the WeightStragies themselves, not in here
+ from airflow.task.priority_strategy import (
+ _AbsolutePriorityWeightStrategy,
+ _DownstreamPriorityWeightStrategy,
+ _UpstreamPriorityWeightStrategy,
+ )
+
+ if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy):
+ return db_safe_priority(self.priority_weight)
+ elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy):
+ upstream = False
+ elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy):
+ upstream = True
+ else:
+ upstream = False
+ dag = self.get_dag()
+ if dag is None:
+ return db_safe_priority(self.priority_weight)
Review Comment:
:shrug: I just moved the function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]