henriquemeloo opened a new issue, #37449:
URL: https://github.com/apache/airflow/issues/37449

   ### Apache Airflow Provider(s)
   
   airbyte
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-airbyte==3.6.0
   apache-airflow-providers-http==4.5.1
   
   ### Apache Airflow version
   
   2.7.1
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   `AirbyteTriggerSyncOperator` does not work properly in deferrable mode.
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Create the following DAG, replacing 
`"id_of_airbyte_connection_to_be_synced"` with the corresponding value.
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.providers.airbyte.operators.airbyte import \
       AirbyteTriggerSyncOperator
   
   
   AIRFLOW_AIRBYTE_CONN_ID = "airbyte_default"
   AIRBYTE_CONNECTION_ID = "id_of_airbyte_connection_to_be_synced"
   
   
   with DAG("test_dag", start_date=datetime.min, catchup=False) as dag:
       not_deferrable = AirbyteTriggerSyncOperator(
           task_id="not_deferrable",
           airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID,
           connection_id=AIRBYTE_CONNECTION_ID,
           deferrable=False
       )
   
       deferrable = AirbyteTriggerSyncOperator(
           task_id="deferrable",
           airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID,
           connection_id=AIRBYTE_CONNECTION_ID,
           deferrable=True
       )
   
   ```
   The `not_deferrable` task works, while the `deferrable` task fails. The 
`"airbyte_default"` connection is set in all containers via the environment 
variable
   ```
   AIRFLOW_CONN_AIRBYTE_DEFAULT='{
       "conn_type": "airbyte",
       "host": "airbyte-proxy",
       "port": 8000
   }'
   ```
   
   ### Anything else
   
   The deferrable task fails with the following log:
   <details><summary>deferrable.log</summary>
   
   ```
   78f82177fe4f
   *** Found local files:
   ***   * 
/opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log
   ***   * 
/opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log.trigger.1205.log
   [2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable 
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
   [2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable 
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
   [2024-02-15, 15:14:38 UTC] {taskinstance.py:1359} INFO - Starting attempt 1 
of 1
   [2024-02-15, 15:14:38 UTC] {taskinstance.py:1380} INFO - Executing 
<Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14 
15:14:37.364888+00:00
   [2024-02-15, 15:14:38 UTC] {standard_task_runner.py:57} INFO - Started 
process 7210 to run task
   [2024-02-15, 15:14:38 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'test_dag', 'deferrable', 
'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1213', '--raw', 
'--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7v9zx5oh']
   [2024-02-15, 15:14:38 UTC] {standard_task_runner.py:85} INFO - Job 1213: 
Subtask deferrable
   [2024-02-15, 15:14:38 UTC] {task_command.py:415} INFO - Running 
<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 
[running]> on host 78f82177fe4f
   [2024-02-15, 15:14:38 UTC] {taskinstance.py:1660} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_dag' 
AIRFLOW_CTX_TASK_ID='deferrable' 
AIRFLOW_CTX_EXECUTION_DATE='2024-02-14T15:14:37.364888+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-14T15:14:37.364888+00:00'
   [2024-02-15, 15:14:38 UTC] {base.py:73} INFO - Using connection ID 
'airbyte_default' for task execution.
   [2024-02-15, 15:14:43 UTC] {airbyte.py:86} INFO - Job 224 was submitted to 
Airbyte Server
   [2024-02-15, 15:14:43 UTC] {airbyte.py:88} INFO - Waiting for job 224 to 
complete
   [2024-02-15, 15:14:43 UTC] {taskinstance.py:1524} INFO - Pausing task as 
DEFERRED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437, 
start_date=20240215T151438
   [2024-02-15, 15:14:44 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 100 (task deferral)
   [2024-02-15, 15:14:44 UTC] {airbyte.py:104} INFO - Getting the status of job 
run 224.
   [2024-02-15, 15:14:44 UTC] {base.py:73} INFO - Using connection ID 
'airbyte_default' for task execution.
   [2024-02-15, 15:14:44 UTC] {airbyte.py:88} INFO - URL for api request: 
airbyte-proxy/api/v1/jobs/get
   [2024-02-15, 15:14:44 UTC] {triggerer_job_runner.py:599} INFO - Trigger 
test_dag/scheduled__2024-02-14T15:14:37.364888+00:00/deferrable/-1/1 (ID 17) 
fired: TriggerEvent<{'status': 'error', 'message': 
'airbyte-proxy/api/v1/jobs/get', 'job_id': 224}>
   [2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable 
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
   [2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable 
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
   [2024-02-15, 15:14:46 UTC] {taskinstance.py:1357} INFO - Resuming after 
deferral
   [2024-02-15, 15:14:46 UTC] {taskinstance.py:1380} INFO - Executing 
<Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14 
15:14:37.364888+00:00
   [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:57} INFO - Started 
process 7238 to run task
   [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'test_dag', 'deferrable', 
'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1214', '--raw', 
'--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7eh43099']
   [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:85} INFO - Job 1214: 
Subtask deferrable
   [2024-02-15, 15:14:46 UTC] {task_command.py:415} INFO - Running 
<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 
[running]> on host 78f82177fe4f
   [2024-02-15, 15:14:46 UTC] {taskinstance.py:1935} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 1608, in resume_execution
       return execute_callable(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/airbyte/operators/airbyte.py",
 line 124, in execute_complete
       raise AirflowException(event["message"])
   airflow.exceptions.AirflowException: airbyte-proxy/api/v1/jobs/get
   [2024-02-15, 15:14:46 UTC] {taskinstance.py:1398} INFO - Marking task as 
FAILED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437, 
start_date=20240215T151438, end_date=20240215T151446
   [2024-02-15, 15:14:46 UTC] {standard_task_runner.py:104} ERROR - Failed to 
execute job 1214 for task deferrable (airbyte-proxy/api/v1/jobs/get; 7238)
   [2024-02-15, 15:14:46 UTC] {local_task_job_runner.py:228} INFO - Task exited 
with return code 1
   [2024-02-15, 15:14:46 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   </details>
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to