kaxil commented on code in PR #45627:
URL: https://github.com/apache/airflow/pull/45627#discussion_r1914774050


##########
task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -171,6 +182,66 @@ def label(self) -> str | None:
             return self.task_id[len(tg.node_id) + 1 :]
         return self.task_id
 
+    @property
+    def priority_weight_total(self) -> int:
+        """
+        Total priority weight for the task. It might include all upstream or 
downstream tasks.
+
+        Depending on the weight rule:
+
+        - WeightRule.ABSOLUTE - only own weight
+        - WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks
+        - WeightRule.UPSTREAM - adds priority weight of all upstream tasks
+        """
+        # TODO: This should live in the WeightStragies themselves, not in here
+        from airflow.task.priority_strategy import (
+            _AbsolutePriorityWeightStrategy,
+            _DownstreamPriorityWeightStrategy,
+            _UpstreamPriorityWeightStrategy,
+        )
+
+        if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy):
+            return db_safe_priority(self.priority_weight)
+        elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy):
+            upstream = False
+        elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy):
+            upstream = True
+        else:
+            upstream = False
+        dag = self.get_dag()
+        if dag is None:
+            return db_safe_priority(self.priority_weight)

Review Comment:
   :D yeah keep them as-is for now, we can change/optimize it separately once 
this is merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to