Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh edited a comment on the discussion: Real Sequential Dynamic Task To close this discussion, I will put my workaround for now, at least it will prevent subsequent map task (next index) to run when the previous map task (previous index) failed or retrying. Because in my case, running the subsequent task before the previous one succeed will break the data. Basically, in my run_dbt_snapshot method that will be expanded, I will raise an AirflowSkipException('Some error messages') if some condition is not met. In my case by checking my log table of the dates that has successfully run. GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11538902 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh closed a discussion: Real Sequential Dynamic Task I wanted to generate a dynamic tasks using expand based on a range of date. I am able to do that, but it seems the tasks is running in parallel. I wanted it to run in sequential, by waiting for the previous tasks to succeed. I am able to limit the task to run only 1 at a time by setting max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the issue is when one of the task failed or is retrying, it will still trigger the next task. This is not the behaviour that I wanted. Let me give an illustration: **Current behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress) **Expected behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed) The error occured on 9 Nov **This is a part of my code:** ```python def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context): scd_run_dbt_snapshot(snapshot_date, params, context) date_range = get_date_range() run_snapshots = run_dbt_snapshot.expand( snapshot_date=date_range, ) date_range >> run_snapshots ``` GitHub link: https://github.com/apache/airflow/discussions/44789 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh closed the discussion with a comment: Real Sequential Dynamic Task To close this discussion, I will put my workaround for now, at least it will prevent subsequent map task (next index) to run when the previous map task (previous index) failed or retrying. Because in my case, running the subsequent task before the previous one succeeds will break the data. Basically, in my run_dbt_snapshot method that will be expanded, I will raise an AirflowSkipException('Some error messages') if some condition is not met. In my case by checking my log table of the dates that has successfully run. GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11538902 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh added a comment to the discussion: Real Sequential Dynamic Task Hi, yes I think what I needed is Dynamic DAG and it seems to break the rules of DAG. Thanks for the clarification GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11538871 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic Task Some more explanation: If I am understanding correctly - that you are essentially asking for is "Dynamic DAG" - not "Dynamic Task" -> i.e. you want to have DAG that has variable n tasks that depend on one another: ``` t1 -> t2 -> t3 -> t4 -> ``` Unfortunately that is something not really possible if you want to expand it at runtime because: ``` t1->t2->t3 ``` has a different DAG structure than ``` t1->t2->t3->t4 ``` Essentially airflow DAG structure should not change between runs - at least not often and the structure is established at parsing time not at the moment when DAG is run, so if your case is pretty much static - i.e. number of task does not change between runs normally, you can use Dynamic DAG generation. On the other hand - dynamic task mapping works in the way (essentially) that it does not change the DAG structure, it takes one of the "nodes" of the DAG (say t2) and replaces it with multiple task instances - basically copies of the t2 in this case. But the DAG structure remains the same only `t2` is essentially replaced with multiple copies of the same task. That's why this expansion of task is dynamic and can change between runs - every run can have different number of expanded tasks. GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11517347 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic Task Not that I am aware. You can have some other workarounds in place though . For example you could have a pool with size 1 and have all your task belong to the same pool. You will not be able to control sequence of execution, but it will give you the "one task run at a time" feature. GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11517286 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh added a comment to the discussion: Real Sequential Dynamic Task Hi all, thanks for the quick answer. To sum it up, it means that it is not possible at the moment? Please correct me if I'm wrong. Thanks GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11516955 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic Task This something I really like - if we can come up with a more generic case that solves some similar but not exactly the same expectations - this is what differentiates `product` from `solution` that we spend a bit more time on figuring out what to do and to do it in a more generic way, while we have a number of cases it serves - so that even if it is more costly to agree on and develop, eventually we can kill many birds with the same stone. GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-1152 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user dabla added a comment to the discussion: Real Sequential Dynamic Task Yes indeed, the use case I was trying the solve is a bit different, but the one here above could also be solved with the one. Still working on the draft documenting regarding this, I hope to get something out this week. GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11510920 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic Task This is similar thing to recent discussion by @dabla https://lists.apache.org/thread/bx2w6dpr3ynvnq06n4v62n7g8dodt2rt -> and while it is even yet different use case, it can (again) be solved via similar means. I think @dabla was going to propose an Airflow Improvement Proposal so maybe good idea to discuss it there. GitHub link: https://github.com/apache/airflow/discussions/44789#discussioncomment-11510640 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh edited a discussion: Real Sequential Dynamic Task I wanted to generate a dynamic tasks using expand based on a range of date. I am able to do that, but it seems the tasks is running in parallel. I wanted it to run in sequential, by waiting for the previous tasks to succeed. I am able to limit the task to run only 1 at a time by setting max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the issue is when one of the task failed or is retrying, it will still trigger the next task. This is not the behaviour that I wanted. Let me give an illustration: **Current behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress) **Expected behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed) The error occured on 9 Nov **This is a part of my code:** ```python def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context): scd_run_dbt_snapshot(snapshot_date, params, context) date_range = get_date_range() run_snapshots = run_dbt_snapshot.expand( snapshot_date=date_range, ) date_range >> run_snapshots ``` GitHub link: https://github.com/apache/airflow/discussions/44789 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh edited a discussion: Real Sequential Dynamic Task I wanted to generate a dynamic tasks using expand based on a range of date. I am able to do that, but it seems the tasks is running in parallel. I wanted it to run in sequential, by waiting for the previous tasks to succeed. I am able to limit the task to run only 1 at a time by setting max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the issue is when one of the task failed or is retrying, it will still trigger the next task. This is not the behaviour that I wanted. Let me give an illustration: **Current behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress) **Expected behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed) The error occured on 9 Nov **This is a part of my code:** ```python def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context): scd_run_dbt_snapshot(snapshot_date, params, context) date_range = get_date_range() run_snapshots = run_dbt_snapshot.expand( snapshot_date=date_range, ) date_range >> run_snapshots ``` GitHub link: https://github.com/apache/airflow/discussions/44789 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org
Re: [D] Real Sequential Dynamic Task [airflow]
GitHub user ricardoalexanderh edited a discussion: Real Sequential Dynamic Task I wanted to generate a dynamic tasks using expand based on a range of date. I am able to do that, but it seems the tasks is running in parallel. I wanted it to run in sequential, by waiting for the previous tasks to succeed. I am able to limit the task to run only 1 at a time by setting max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the issue is when one of the task failed or is retrying, it will still trigger the next task. This is not the behaviour that I wanted. Let me give an illustration: **Current behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress) **Expected behaviour:** Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled) Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed) The error occured on 9 Nov **This is a part of my code:** ```python def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context): scd_run_dbt_snapshot(snapshot_date, params, context) date_range = get_date_range() run_snapshots = run_dbt_snapshot.expand( snapshot_date=date_range, ) date_range >> run_snapshots ``` GitHub link: https://github.com/apache/airflow/discussions/44789 This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org