Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-11 Thread via GitHub


GitHub user ricardoalexanderh edited a comment on the discussion: Real 
Sequential Dynamic Task

To close this discussion, I will put my workaround for now, at least it will 
prevent subsequent map task (next index) to run when the previous map task 
(previous index) failed or retrying. Because in my case, running the subsequent 
task before the previous one succeed will break the data.

Basically, in my run_dbt_snapshot method that will be expanded, I will raise an 
AirflowSkipException('Some error messages') if some condition is not met. In my 
case by checking my log table of the dates that has successfully run.

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11538902


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-11 Thread via GitHub


GitHub user ricardoalexanderh closed a discussion: Real Sequential Dynamic Task

I wanted to generate a dynamic tasks using expand based on a range of date. I 
am able to do that, but it seems the tasks is running in parallel. I wanted it 
to run in sequential, by waiting for the previous tasks to succeed.

I am able to limit the task to run only 1 at a time by setting 
max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the 
issue is when one of the task failed or is retrying, it will still trigger the 
next task. This is not the behaviour that I wanted. Let me give an illustration:

**Current behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress)

**Expected behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed)
  The error occured on 9 Nov

**This is a part of my code:**

```python
def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context):
scd_run_dbt_snapshot(snapshot_date, params, context)

date_range = get_date_range()

run_snapshots = run_dbt_snapshot.expand(
snapshot_date=date_range,
)

date_range >> run_snapshots
``` 

GitHub link: https://github.com/apache/airflow/discussions/44789


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-11 Thread via GitHub


GitHub user ricardoalexanderh closed the discussion with a comment: Real 
Sequential Dynamic Task

To close this discussion, I will put my workaround for now, at least it will 
prevent subsequent map task (next index) to run when the previous map task 
(previous index) failed or retrying. Because in my case, running the subsequent 
task before the previous one succeeds will break the data.

Basically, in my run_dbt_snapshot method that will be expanded, I will raise an 
AirflowSkipException('Some error messages') if some condition is not met. In my 
case by checking my log table of the dates that has successfully run.

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11538902


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-11 Thread via GitHub


GitHub user ricardoalexanderh added a comment to the discussion: Real 
Sequential Dynamic Task

Hi, yes I think what I needed is Dynamic DAG and it seems to break the rules of 
DAG. Thanks for the clarification

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11538871


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-10 Thread via GitHub


GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic 
Task

Some more explanation:

If I am understanding correctly - that you are essentially asking for is 
"Dynamic DAG" - not "Dynamic Task" -> i.e. you want to have DAG that has 
variable n tasks that depend on one another:

```
t1 -> t2 -> t3 -> t4 -> 
```

Unfortunately that is something not really possible if you want to expand it at 
runtime because:

```
t1->t2->t3
```

has a different DAG structure than

```
t1->t2->t3->t4 
```
Essentially airflow DAG structure should not change between runs - at least not 
often and the structure is established at parsing time not at the moment when 
DAG is run, so if your case is pretty much static - i.e. number of task does 
not change between runs normally, you can use Dynamic DAG generation.


On the other hand - dynamic task mapping works in the way (essentially) that it 
does not change the DAG structure, it takes one of the "nodes" of the DAG (say 
t2) and replaces it with multiple task instances - basically copies of the t2 
in this case. But the DAG structure remains the same only `t2` is essentially 
replaced with multiple copies of the same task.  That's why this expansion of 
task is dynamic and can change between runs - every run can have different 
number of expanded tasks.



GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11517347


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-10 Thread via GitHub


GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic 
Task

Not that I am aware. You can have some other workarounds in place though . For 
example you could have a pool with size 1 and have all your task belong to the 
same pool. You will not be able to control sequence of execution, but it will 
give you the "one task run at a time" feature.

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11517286


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-09 Thread via GitHub


GitHub user ricardoalexanderh added a comment to the discussion: Real 
Sequential Dynamic Task

Hi all, thanks for the quick answer. To sum it up, it means that it is not 
possible at the moment?

Please correct me if I'm wrong.

Thanks

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11516955


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-09 Thread via GitHub


GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic 
Task

This something I really like - if we can come up with a more generic case that 
solves some similar but not exactly the same expectations - this is what 
differentiates `product` from `solution` that we spend a bit more time on 
figuring out what to do and to do it in a more generic way, while we have a 
number of cases it serves - so that even if it is more costly to agree on and 
develop, eventually we can kill many birds with the same stone.

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-1152


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-09 Thread via GitHub


GitHub user dabla added a comment to the discussion: Real Sequential Dynamic 
Task

Yes indeed, the use case I was trying the solve is a bit different, but the one 
here above could also be solved with the one. Still working on the draft 
documenting regarding this, I hope to get something out this week.

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11510920


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-09 Thread via GitHub


GitHub user potiuk added a comment to the discussion: Real Sequential Dynamic 
Task

This is similar thing to recent discussion by @dabla 
https://lists.apache.org/thread/bx2w6dpr3ynvnq06n4v62n7g8dodt2rt -> and while 
it is even yet different use case, it can (again) be solved via similar means. 
I think @dabla was going to propose an Airflow Improvement Proposal so maybe 
good idea to discuss it there.

GitHub link: 
https://github.com/apache/airflow/discussions/44789#discussioncomment-11510640


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-09 Thread via GitHub


GitHub user ricardoalexanderh edited a discussion: Real Sequential Dynamic Task

I wanted to generate a dynamic tasks using expand based on a range of date. I 
am able to do that, but it seems the tasks is running in parallel. I wanted it 
to run in sequential, by waiting for the previous tasks to succeed.

I am able to limit the task to run only 1 at a time by setting 
max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the 
issue is when one of the task failed or is retrying, it will still trigger the 
next task. This is not the behaviour that I wanted. Let me give an illustration:

**Current behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress)

**Expected behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed)
  The error occured on 9 Nov

**This is a part of my code:**

```python
def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context):
scd_run_dbt_snapshot(snapshot_date, params, context)

date_range = get_date_range()

run_snapshots = run_dbt_snapshot.expand(
snapshot_date=date_range,
)

date_range >> run_snapshots
``` 

GitHub link: https://github.com/apache/airflow/discussions/44789


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-09 Thread via GitHub


GitHub user ricardoalexanderh edited a discussion: Real Sequential Dynamic Task

I wanted to generate a dynamic tasks using expand based on a range of date. I 
am able to do that, but it seems the tasks is running in parallel. I wanted it 
to run in sequential, by waiting for the previous tasks to succeed.

I am able to limit the task to run only 1 at a time by setting 
max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the 
issue is when one of the task failed or is retrying, it will still trigger the 
next task. This is not the behaviour that I wanted. Let me give an illustration:

**Current behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress)

**Expected behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed)
  The error occured on 9 Nov

**This is a part of my code:**

```python
def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context):
scd_run_dbt_snapshot(snapshot_date, params, context)

date_range = get_date_range()

run_snapshots = run_dbt_snapshot.expand(
snapshot_date=date_range,
)

date_range >> run_snapshots
``` 

GitHub link: https://github.com/apache/airflow/discussions/44789


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org



Re: [D] Real Sequential Dynamic Task [airflow]

2024-12-09 Thread via GitHub


GitHub user ricardoalexanderh edited a discussion: Real Sequential Dynamic Task

I wanted to generate a dynamic tasks using expand based on a range of date. I 
am able to do that, but it seems the tasks is running in parallel. I wanted it 
to run in sequential, by waiting for the previous tasks to succeed.

I am able to limit the task to run only 1 at a time by setting 
max_active_tasks=1, max_active_runs=1, and max_active_tis_per_dag=1. But the 
issue is when one of the task failed or is retrying, it will still trigger the 
next task. This is not the behaviour that I wanted. Let me give an illustration:

**Current behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (In Progress)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (In Progress)

**Expected behaviour:**
Run 8 Nov (Success) -> Run 9 Nov (Retry) -> Run 10 Nov (Scheduled)
Run 8 Nov (Success) -> Run 9 Nov (Failed) -> Run 10 Nov (Failed)
  The error occured on 9 Nov

**This is a part of my code:**

```python
def run_dbt_snapshot(snapshot_date: str, params: ParamsDict, **context):
  scd_run_dbt_snapshot(snapshot_date, params, context)

date_range = get_date_range()

run_snapshots = run_dbt_snapshot.expand(
snapshot_date=date_range,
)

date_range >> run_snapshots
``` 

GitHub link: https://github.com/apache/airflow/discussions/44789


This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org