henriquemeloo opened a new issue, #37449: URL: https://github.com/apache/airflow/issues/37449
### Apache Airflow Provider(s) airbyte ### Versions of Apache Airflow Providers apache-airflow-providers-airbyte==3.6.0 apache-airflow-providers-http==4.5.1 ### Apache Airflow version 2.7.1 ### Operating System Debian GNU/Linux 11 (bullseye) ### Deployment Docker-Compose ### Deployment details _No response_ ### What happened `AirbyteTriggerSyncOperator` does not work properly in deferrable mode. ### What you think should happen instead _No response_ ### How to reproduce Create the following DAG, replacing `"id_of_airbyte_connection_to_be_synced"` with the corresponding value. ```python from datetime import datetime from airflow import DAG from airflow.providers.airbyte.operators.airbyte import \ AirbyteTriggerSyncOperator AIRFLOW_AIRBYTE_CONN_ID = "airbyte_default" AIRBYTE_CONNECTION_ID = "id_of_airbyte_connection_to_be_synced" with DAG("test_dag", start_date=datetime.min, catchup=False) as dag: not_deferrable = AirbyteTriggerSyncOperator( task_id="not_deferrable", airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID, connection_id=AIRBYTE_CONNECTION_ID, deferrable=False ) deferrable = AirbyteTriggerSyncOperator( task_id="deferrable", airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID, connection_id=AIRBYTE_CONNECTION_ID, deferrable=True ) ``` The `not_deferrable` task works, while the `deferrable` task fails. The `"airbyte_default"` connection is set in all containers via the environment variable ``` AIRFLOW_CONN_AIRBYTE_DEFAULT='{ "conn_type": "airbyte", "host": "airbyte-proxy", "port": 8000 }' ``` ### Anything else The deferrable task fails with the following log: <details><summary>deferrable.log</summary> ``` 78f82177fe4f *** Found local files: *** * /opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log *** * /opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log.trigger.1205.log [2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]> [2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]> [2024-02-15, 15:14:38 UTC] {taskinstance.py:1359} INFO - Starting attempt 1 of 1 [2024-02-15, 15:14:38 UTC] {taskinstance.py:1380} INFO - Executing <Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14 15:14:37.364888+00:00 [2024-02-15, 15:14:38 UTC] {standard_task_runner.py:57} INFO - Started process 7210 to run task [2024-02-15, 15:14:38 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_dag', 'deferrable', 'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1213', '--raw', '--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7v9zx5oh'] [2024-02-15, 15:14:38 UTC] {standard_task_runner.py:85} INFO - Job 1213: Subtask deferrable [2024-02-15, 15:14:38 UTC] {task_command.py:415} INFO - Running <TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [running]> on host 78f82177fe4f [2024-02-15, 15:14:38 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='deferrable' AIRFLOW_CTX_EXECUTION_DATE='2024-02-14T15:14:37.364888+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-14T15:14:37.364888+00:00' [2024-02-15, 15:14:38 UTC] {base.py:73} INFO - Using connection ID 'airbyte_default' for task execution. [2024-02-15, 15:14:43 UTC] {airbyte.py:86} INFO - Job 224 was submitted to Airbyte Server [2024-02-15, 15:14:43 UTC] {airbyte.py:88} INFO - Waiting for job 224 to complete [2024-02-15, 15:14:43 UTC] {taskinstance.py:1524} INFO - Pausing task as DEFERRED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437, start_date=20240215T151438 [2024-02-15, 15:14:44 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 100 (task deferral) [2024-02-15, 15:14:44 UTC] {airbyte.py:104} INFO - Getting the status of job run 224. [2024-02-15, 15:14:44 UTC] {base.py:73} INFO - Using connection ID 'airbyte_default' for task execution. [2024-02-15, 15:14:44 UTC] {airbyte.py:88} INFO - URL for api request: airbyte-proxy/api/v1/jobs/get [2024-02-15, 15:14:44 UTC] {triggerer_job_runner.py:599} INFO - Trigger test_dag/scheduled__2024-02-14T15:14:37.364888+00:00/deferrable/-1/1 (ID 17) fired: TriggerEvent<{'status': 'error', 'message': 'airbyte-proxy/api/v1/jobs/get', 'job_id': 224}> [2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]> [2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]> [2024-02-15, 15:14:46 UTC] {taskinstance.py:1357} INFO - Resuming after deferral [2024-02-15, 15:14:46 UTC] {taskinstance.py:1380} INFO - Executing <Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14 15:14:37.364888+00:00 [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:57} INFO - Started process 7238 to run task [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_dag', 'deferrable', 'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1214', '--raw', '--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7eh43099'] [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:85} INFO - Job 1214: Subtask deferrable [2024-02-15, 15:14:46 UTC] {task_command.py:415} INFO - Running <TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [running]> on host 78f82177fe4f [2024-02-15, 15:14:46 UTC] {taskinstance.py:1935} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1608, in resume_execution return execute_callable(context) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/airbyte/operators/airbyte.py", line 124, in execute_complete raise AirflowException(event["message"]) airflow.exceptions.AirflowException: airbyte-proxy/api/v1/jobs/get [2024-02-15, 15:14:46 UTC] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437, start_date=20240215T151438, end_date=20240215T151446 [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 1214 for task deferrable (airbyte-proxy/api/v1/jobs/get; 7238) [2024-02-15, 15:14:46 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1 [2024-02-15, 15:14:46 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` </details> ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org