charleschang0531 opened a new issue, #34448:
URL: https://github.com/apache/airflow/issues/34448

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We are running airflow 2.6.2 with `KubernetesExecutor`, and we have two dags
   1. dag with dbt (dag_test.py)
   2. dag with backfill command to rerun `dag_test.py` (dag_rerun.py)
   
   I'm trying to mock a situation when the job still failed when running 
airflow dag backfill, and there is a problem i cant solve
   when I try to backfill,  the `dag_test.py` `Run` shows `running` forever 
when some task in `dag_test.py` are in `failed`  state
   
   1. dag_test.py
   ```python
   from datetime import datetime, timedelta
   from airflow.configuration import conf
   from airflow import DAG, AirflowException
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.bash import BashOperator
   from textwrap import dedent
   from airflow.decorators import task
   from airflow.kubernetes.secret import Secret
   from airflow.models import Variable
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
       KubernetesPodOperator,
   )
   from kubernetes.client import models as k8s
   import os
   import random
   
   from common.airflow import create_dag_by_common
   from common.kubernetes import (
       CONTAINER_RESOURCES,
       NODE_AFFINITY,
       get_config
   )
   
   def create_dag(dag_id: str) -> DAG:
   
       # Define DAG args
       default_args = {
           "retries": 0,
           "retry_delay": timedelta(minutes=5),
       }
   
       dag = DAG(
           dag_id=dag_id,
           default_args=default_args,
           description="basic dag",
           doc_md="",
           schedule_interval="06 * * * *",
           start_date=datetime(2023, 9, 16),
           catchup=True,
           max_active_runs=4,
           max_active_tasks=10
       )
       with dag:
           dag_start = DummyOperator(
               task_id="dag_start",
               dag=dag,
           )
   
           dag_end = DummyOperator(
               task_id="dag_end",
               dag=dag,
           )
   
           @task()
           def random_failed():            
               number = random.randrange(1, 3)
               print(number)
               if number == 2 or number == 3:
                   # DagRun.set_state = "failed"?
                   raise AirflowException("Unexpected number 2 or 3")
   
           task_dbt_run = KubernetesPodOperator(
               task_id="task_dbt_run",
               image=f"{image_url}",
               secrets=[secret_volume],
               cmds=["sh", "-c", "dbt run -s path:models/test --target pixel"],
               env_vars={
                   "Airflow_Job_StartTime": "{{ ts }}"
               },
               name="task_dbt_run",
               is_delete_operator_pod=True,
               image_pull_policy="Always",
               in_cluster=True,
               get_logs=True,
               do_xcom_push=False,
               affinity=NODE_AFFINITY,
               dag=dag
           )
   
           dag_start >> random_failed() >> task_dbt_run >> dag_end
   
       return dag
   
   
   dag_id = "dag_test"
   
   create_dag_by_common(
       dag_id=dag_id, create_dag_func=create_dag
   )
   
   ```
   
   2. dag_rerun.py
   ```python
   from datetime import datetime, timedelta
   from airflow.configuration import conf
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.bash import BashOperator
   from textwrap import dedent
   from airflow.decorators import task
   from airflow.kubernetes.secret import Secret
   from airflow.models import Variable
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
       KubernetesPodOperator,
   )
   from kubernetes.client import models as k8s
   import os
   
   from common.airflow import create_dag_by_common
   from common.kubernetes import (
       get_config
   )
   
   
   def create_dag(dag_id: str) -> DAG:
   
       # Define DAG args
       default_args = {
           "retries": 0,
           "retry_delay": timedelta(minutes=5),
       }
   
       dag = DAG(
           dag_id=dag_id,
           default_args=default_args,
           description="basic dag",
           doc_md="",
           schedule_interval="45 * * * *",
           start_date=datetime(2023, 9, 14),
           catchup=False,
           max_active_runs=2,
           max_active_tasks=10
       )
       with dag:
           dag_start = DummyOperator(
               task_id="dag_start",
               dag=dag,
           )
   
           dag_end = DummyOperator(
               task_id="dag_end",
               dag=dag,
           )
   
           templated_command = dedent(
           """        
           airflow dags backfill dag_test -s 2023-09-15T08:00:00 -e 
2023-09-16T15:00:00 --rerun-failed-tasks --continue-on-failures --disable-retry
           """
           )
           
           rerun_task = BashOperator(
               task_id="rerun_task",
               depends_on_past=False,
               bash_command=templated_command
           )      
             
   
           dag_start >> rerun_task >> dag_end
   
       return dag
   
   
   dag_id = "dag_rerun"
   
   create_dag_by_common(
       dag_id=dag_id, create_dag_func=create_dag
   )
   ```
   
   3. Result
   ![image 
(9)](https://github.com/apache/airflow/assets/15166657/c8563b67-90e2-44c5-baa5-82fb09ab9437)
   
   
   
   
   ### What you think should happen instead
   
   The backfill jobs should rerun `failed` `Runs`, and if rerun is still 
failed, 
   we hope that can just mark `Run` `failed` status, and keep going on next job
   
   ### How to reproduce
   
   No response
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow                           2.6.2
   apache-airflow-providers-amazon          8.1.0
   apache-airflow-providers-celery          3.2.0
   apache-airflow-providers-cncf-kubernetes 7.0.0
   apache-airflow-providers-common-sql      1.5.1
   apache-airflow-providers-docker          3.7.0
   apache-airflow-providers-elasticsearch   4.5.0
   apache-airflow-providers-ftp             3.4.1
   apache-airflow-providers-google          10.1.1
   apache-airflow-providers-grpc            3.2.0
   apache-airflow-providers-hashicorp       3.4.0
   apache-airflow-providers-http            4.4.1
   apache-airflow-providers-imap            3.2.1
   apache-airflow-providers-microsoft-azure 6.1.1
   apache-airflow-providers-mysql           5.1.0
   apache-airflow-providers-odbc            3.3.0
   apache-airflow-providers-postgres        5.5.0
   apache-airflow-providers-redis           3.2.0
   apache-airflow-providers-sendgrid        3.2.0
   apache-airflow-providers-sftp            4.3.0
   apache-airflow-providers-slack           7.3.0
   apache-airflow-providers-snowflake       4.1.0
   apache-airflow-providers-sqlite          3.4.1
   apache-airflow-providers-ssh             3.7.0
   google-cloud-orchestration-airflow       1.9.0
   pytest-airflow                           0.0.3
   
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to