charleschang0531 opened a new issue, #34448:
URL: https://github.com/apache/airflow/issues/34448
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
We are running airflow 2.6.2 with `KubernetesExecutor`, and we have two dags
1. dag with dbt (dag_test.py)
2. dag with backfill command to rerun `dag_test.py` (dag_rerun.py)
I'm trying to mock a situation when the job still failed when running
airflow dag backfill, and there is a problem i cant solve
when I try to backfill, the `dag_test.py` `Run` shows `running` forever
when some task in `dag_test.py` are in `failed` state
1. dag_test.py
```python
from datetime import datetime, timedelta
from airflow.configuration import conf
from airflow import DAG, AirflowException
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from textwrap import dedent
from airflow.decorators import task
from airflow.kubernetes.secret import Secret
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s
import os
import random
from common.airflow import create_dag_by_common
from common.kubernetes import (
CONTAINER_RESOURCES,
NODE_AFFINITY,
get_config
)
def create_dag(dag_id: str) -> DAG:
# Define DAG args
default_args = {
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_id=dag_id,
default_args=default_args,
description="basic dag",
doc_md="",
schedule_interval="06 * * * *",
start_date=datetime(2023, 9, 16),
catchup=True,
max_active_runs=4,
max_active_tasks=10
)
with dag:
dag_start = DummyOperator(
task_id="dag_start",
dag=dag,
)
dag_end = DummyOperator(
task_id="dag_end",
dag=dag,
)
@task()
def random_failed():
number = random.randrange(1, 3)
print(number)
if number == 2 or number == 3:
# DagRun.set_state = "failed"?
raise AirflowException("Unexpected number 2 or 3")
task_dbt_run = KubernetesPodOperator(
task_id="task_dbt_run",
image=f"{image_url}",
secrets=[secret_volume],
cmds=["sh", "-c", "dbt run -s path:models/test --target pixel"],
env_vars={
"Airflow_Job_StartTime": "{{ ts }}"
},
name="task_dbt_run",
is_delete_operator_pod=True,
image_pull_policy="Always",
in_cluster=True,
get_logs=True,
do_xcom_push=False,
affinity=NODE_AFFINITY,
dag=dag
)
dag_start >> random_failed() >> task_dbt_run >> dag_end
return dag
dag_id = "dag_test"
create_dag_by_common(
dag_id=dag_id, create_dag_func=create_dag
)
```
2. dag_rerun.py
```python
from datetime import datetime, timedelta
from airflow.configuration import conf
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from textwrap import dedent
from airflow.decorators import task
from airflow.kubernetes.secret import Secret
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s
import os
from common.airflow import create_dag_by_common
from common.kubernetes import (
get_config
)
def create_dag(dag_id: str) -> DAG:
# Define DAG args
default_args = {
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_id=dag_id,
default_args=default_args,
description="basic dag",
doc_md="",
schedule_interval="45 * * * *",
start_date=datetime(2023, 9, 14),
catchup=False,
max_active_runs=2,
max_active_tasks=10
)
with dag:
dag_start = DummyOperator(
task_id="dag_start",
dag=dag,
)
dag_end = DummyOperator(
task_id="dag_end",
dag=dag,
)
templated_command = dedent(
"""
airflow dags backfill dag_test -s 2023-09-15T08:00:00 -e
2023-09-16T15:00:00 --rerun-failed-tasks --continue-on-failures --disable-retry
"""
)
rerun_task = BashOperator(
task_id="rerun_task",
depends_on_past=False,
bash_command=templated_command
)
dag_start >> rerun_task >> dag_end
return dag
dag_id = "dag_rerun"
create_dag_by_common(
dag_id=dag_id, create_dag_func=create_dag
)
```
3. Result

### What you think should happen instead
The backfill jobs should rerun `failed` `Runs`, and if rerun is still
failed,
we hope that can just mark `Run` `failed` status, and keep going on next job
### How to reproduce
No response
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
apache-airflow 2.6.2
apache-airflow-providers-amazon 8.1.0
apache-airflow-providers-celery 3.2.0
apache-airflow-providers-cncf-kubernetes 7.0.0
apache-airflow-providers-common-sql 1.5.1
apache-airflow-providers-docker 3.7.0
apache-airflow-providers-elasticsearch 4.5.0
apache-airflow-providers-ftp 3.4.1
apache-airflow-providers-google 10.1.1
apache-airflow-providers-grpc 3.2.0
apache-airflow-providers-hashicorp 3.4.0
apache-airflow-providers-http 4.4.1
apache-airflow-providers-imap 3.2.1
apache-airflow-providers-microsoft-azure 6.1.1
apache-airflow-providers-mysql 5.1.0
apache-airflow-providers-odbc 3.3.0
apache-airflow-providers-postgres 5.5.0
apache-airflow-providers-redis 3.2.0
apache-airflow-providers-sendgrid 3.2.0
apache-airflow-providers-sftp 4.3.0
apache-airflow-providers-slack 7.3.0
apache-airflow-providers-snowflake 4.1.0
apache-airflow-providers-sqlite 3.4.1
apache-airflow-providers-ssh 3.7.0
google-cloud-orchestration-airflow 1.9.0
pytest-airflow 0.0.3
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]