Hi Ash,

Thanks for the quick reply.

While trying to copy-paste the DAG here I found the problem. I accidentally set the dagrun timeout to too low (timeout was 4 hours while a normal run took around 6 hours). So it was a silly mistake from my side and the problem is solved by increasing the dagrun timeout to 16 hours.

I would have seen the DagRuns as failed, but for older DagRuns were marked as success because I had a special operator that was backfilling tasks (not full DAG) in previous days to make sure that older intermediate data is available.

For future reference, here's a more minimal example DAG:

import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.models import DAG

from datetime import timedelta
import time

operator_default_args = dict(
    owner="Gabor Hermann",
    start_date=timezone.datetime(2020, 5, 3),
)

with DAG(
        dag_id="test_timeout",
        schedule_interval="@daily",
        dagrun_timeout=timedelta(minutes=4),
        start_date=timezone.datetime(2020, 5, 3),
        end_date=timezone.datetime(2020, 5, 5),
) as dag:
    daily_incremental = DummyOperator(task_id="daily_incremental")
    check_daily_incrementals = BashOperator(
        task_id="check_daily_incrementals",
        bash_command="""
            airflow backfill test_timeout -s {{ macros.ds_add(ds, -3) }} -e {{ ds }} --ignore_dependencies --rerun_failed_tasks -t ^daily_incremental$
        """
    )
    aggregate1 = PythonOperator(task_id="aggregate1", python_callable=lambda: time.sleep(90))     aggregate2 = PythonOperator(task_id="aggregate2", python_callable=lambda: time.sleep(90))     aggregate3 = PythonOperator(task_id="aggregate3", python_callable=lambda: time.sleep(90))

    daily_incremental >> aggregate1
    check_daily_incrementals >> aggregate1 >> aggregate2 >> aggregate3


In this example we would like to make sure that the last 3 days of `daily_incremental` is completed before we start aggregating them (`aggregate1`). For this purpose I used a BashOperator that executed an Airflow backfill command specifically backfilling the task `daily_incremental`.

The DAG always times out, we never get to `aggregate3` task. But only the latest DagRun is in state `failed`, previous ones are in `success` even though they did not get to run `aggregate3` because of timeout. (See visually here: https://i.imgur.com/ll0Rg19.png).


A separate question: is there a typical way to solve this "daily incremental" processing?

I can think of other ways, e.g. using `depends_on_past=True` and handling catch up with a sensor that checks if it's the latest dagrun and only runs aggregates for the latest dagrun.

Cheers,
Gabor

On 5/7/20 1:13 PM, Ash Berlin-Taylor wrote:

That does sound very odd, I've not heard of that happening before.

Are you able to share your DAG file (you can remove any queries etc) - it may help us debug it.

Thanks,
Ash

On May 7 2020, at 12:11 pm, Gabor Hermann <[email protected]> wrote:

    Hello fellow Airflowers, I'm relatively new to Airflow and I'm
    really grateful as it already saved us some pain in production. So
    thanks for all the work! 🙏 Now I'm trying to set up DAG with
    around 20-30 tasks (BigQuery queries, Pyspark Dataproc jobs) and
    I've seen a weird behavior where a DAG run stops running, the DAG
    is marked as success but some tasks are clear. The annoying is
    that there's not even a sign of failure. Do you know why this
    might be happening? I couldn't find a related issue on GitHub. One
    thing I'm suspecting is DAG importing timing out, could that cause
    such behavior? (I'm using version 1.10.3.) Thanks in advance for
    any pointers. Cheers, Gabor

Reply via email to