wilsonhooi86 opened a new issue, #59075:
URL: https://github.com/apache/airflow/issues/59075

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==9.0.0
   
   ### Apache Airflow version
   
   2.10.3
   
   ### Operating System
   
   Amazon Linux 2023
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   MWAA in production and mwaa-local-runner for local testing. Bug exists on 
both.
   
   ### What happened
   
   When a Deferred GlueJobOperator tasks failed with internal errors , the 
glueJobOperator task is marked as `failed`. However the existing glue 
`job_run_id` still running.
   
   When the GlueJobOperator task retry 2nd time, it will automatically create a 
new glue `job_run_id` while the existing glue `job_run_id` still running. This 
caused duplicate run glue jobs at AWS Glue side.
   
   
   <img width="1707" height="685" alt="Image" 
src="https://github.com/user-attachments/assets/bd1616d3-2507-459d-acef-051d204b0f2f";
 />
   
   
   Example of internal errors:
   ```
   [2025-12-05, 01:10:33 UTC] {baseoperator.py:1798} ERROR - Trigger failed:
   Traceback (most recent call last):
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 558, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 630, in run_trigger
       async for event in trigger.run():
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/triggers/glue.py",
 line 73, in run
       await hook.async_job_completion(self.job_name, self.run_id, self.verbose)
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py",
 line 315, in async_job_completion
       ret = self._handle_state(job_run_state, job_name, run_id, verbose, 
next_log_tokens)
             
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py",
 line 334, in _handle_state
       self.print_job_logs(
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py",
 line 278, in print_job_logs
       continuation_tokens.output_stream_continuation = display_logs_from(
                                                        ^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py",
 line 245, in display_logs_from
       for response in paginator.paginate(
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", 
line 269, in __iter__
       response = self._make_request(current_kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", 
line 357, in _make_request
       return self._method(**current_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", 
line 569, in _api_call
       return self._make_api_call(operation_name, kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", 
line 1023, in _make_api_call
       raise error_class(parsed_response, operation_name)
   
   botocore.errorfactory.ThrottlingException: An error occurred 
(ThrottlingException) when calling the FilterLogEvents operation (reached max 
retries: 4): Rate exceeded
   
   [2025-12-05, 01:10:33 UTC] {taskinstance.py:3311} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 767, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 733, in _execute_callable
       return ExecutionCallableRunner(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 1799, in resume_execution
       raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
   airflow.exceptions.TaskDeferralError: Trigger failure
   ```
   or
   ```
   The above exception was the direct cause of the following exception:
   
   
   Traceback (most recent call last):
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/httpsession.py",
 line 222, in send
       response = await session.request(
                  ^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/aiohttp/client.py", 
line 667, in _request
       raise ConnectionTimeoutError(
   
   aiohttp.client_exceptions.ConnectionTimeoutError: Connection timeout to host 
http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7
   
   
   During handling of the above exception, another exception occurred:
   
   
   Traceback (most recent call last):
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/utils.py", 
line 722, in _get_response
       response = await session.send(request.prepare())
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/httpsession.py",
 line 259, in send
       raise ConnectTimeoutError(endpoint_url=request.url, error=e)
   
   botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: 
"[http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7";](http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7%22)
   ```
   
   ### What you think should happen instead
   
   I would propose to include a logic for GlueJobOperator during next retry, 
the GlueJobOperator would check if there is a previous glue `job_run_id` , it 
would check the the glue `STATE`,
   
   If the glue `STATE` = in progress / completed, GlueJobOperator  should **not 
create a new glue**  `job_run_id`
   If the glue `STATE` = failed, should **create a new glue**   `job_run_id`
   
   ### How to reproduce
   
   1. Create a GlueJobOperator task.
   2. Set deferrable=True and verbose=False
   3. Run the task and watch the Airflow logs while it is in a deferred state.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to