Hi Ash,
Thanks for the quick reply.
While trying to copy-paste the DAG here I found the problem. I
accidentally set the dagrun timeout to too low (timeout was 4 hours
while a normal run took around 6 hours). So it was a silly mistake from
my side and the problem is solved by increasing the dagrun timeout to 16
hours.
I would have seen the DagRuns as failed, but for older DagRuns were
marked as success because I had a special operator that was backfilling
tasks (not full DAG) in previous days to make sure that older
intermediate data is available.
For future reference, here's a more minimal example DAG:
import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.models import DAG
from datetime import timedelta
import time
operator_default_args = dict(
owner="Gabor Hermann",
start_date=timezone.datetime(2020, 5, 3),
)
with DAG(
dag_id="test_timeout",
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=4),
start_date=timezone.datetime(2020, 5, 3),
end_date=timezone.datetime(2020, 5, 5),
) as dag:
daily_incremental = DummyOperator(task_id="daily_incremental")
check_daily_incrementals = BashOperator(
task_id="check_daily_incrementals",
bash_command="""
airflow backfill test_timeout -s {{ macros.ds_add(ds, -3)
}} -e {{ ds }} --ignore_dependencies --rerun_failed_tasks -t
^daily_incremental$
"""
)
aggregate1 = PythonOperator(task_id="aggregate1",
python_callable=lambda: time.sleep(90))
aggregate2 = PythonOperator(task_id="aggregate2",
python_callable=lambda: time.sleep(90))
aggregate3 = PythonOperator(task_id="aggregate3",
python_callable=lambda: time.sleep(90))
daily_incremental >> aggregate1
check_daily_incrementals >> aggregate1 >> aggregate2 >> aggregate3
In this example we would like to make sure that the last 3 days of
`daily_incremental` is completed before we start aggregating them
(`aggregate1`). For this purpose I used a BashOperator that executed an
Airflow backfill command specifically backfilling the task
`daily_incremental`.
The DAG always times out, we never get to `aggregate3` task. But only
the latest DagRun is in state `failed`, previous ones are in `success`
even though they did not get to run `aggregate3` because of timeout.
(See visually here: https://i.imgur.com/ll0Rg19.png).
A separate question: is there a typical way to solve this "daily
incremental" processing?
I can think of other ways, e.g. using `depends_on_past=True` and
handling catch up with a sensor that checks if it's the latest dagrun
and only runs aggregates for the latest dagrun.
Cheers,
Gabor
On 5/7/20 1:13 PM, Ash Berlin-Taylor wrote:
That does sound very odd, I've not heard of that happening before.
Are you able to share your DAG file (you can remove any queries etc) -
it may help us debug it.
Thanks,
Ash
On May 7 2020, at 12:11 pm, Gabor Hermann <[email protected]> wrote:
Hello fellow Airflowers, I'm relatively new to Airflow and I'm
really grateful as it already saved us some pain in production. So
thanks for all the work! 🙏 Now I'm trying to set up DAG with
around 20-30 tasks (BigQuery queries, Pyspark Dataproc jobs) and
I've seen a weird behavior where a DAG run stops running, the DAG
is marked as success but some tasks are clear. The annoying is
that there's not even a sign of failure. Do you know why this
might be happening? I couldn't find a related issue on GitHub. One
thing I'm suspecting is DAG importing timing out, could that cause
such behavior? (I'm using version 1.10.3.) Thanks in advance for
any pointers. Cheers, Gabor