hussein-awala commented on PR #35210: URL: https://github.com/apache/airflow/pull/35210#issuecomment-1784249764
I had to make some changes to provide a task instance instead of a task to get_weight method. I tested it in Breeze (CeleryExecutor + Postgres) with this dag: ```python from datetime import datetime from airflow.decorators import dag, task @dag( dag_id="test_weight_rule", schedule_interval=None, start_date=datetime(2023, 1, 1), tags=["test"] ) def test_weight_rule(): @task(weight_rule="absolute", priority_weight=2) def task1(): print("task1") @task(weight_rule="downstream") def task2(): print("task2") @task(weight_rule="airflow.custom_priority_strategy.CustomPriorityStrategy") def task3(): print("task3") @task def task4(): print("task4") [task1(), task2(), task3()] >> task4() test_weight_rule() ``` And for the class `CustomPriorityStrategy `, I used: ```python from airflow.models.taskinstance import TaskInstance from airflow.task.priority_strategy import PriorityWeightStrategy class CustomPriorityStrategy(PriorityWeightStrategy): def get_weight(self, ti: TaskInstance): return max(3 - ti._try_number + 1, 1) ``` I created a Dag run, and then I cleared the task a few times: - first try (when I triggered the dag run) -> priority_weight=3 - on the first clear (try 1) -> priority_weight=2 - after the second clear (try 2+) -> priority_weight=1 I will test in more complex examples, multiple concurrent dags, and in different situations (failure then retry, back from the deferred state, backfill, etc.); if all looks good, I'll add the documentation with a few examples and mark it as ready for review. cc: @eladkal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org