Ech-Charay commented on code in PR #63086:
URL: https://github.com/apache/airflow/pull/63086#discussion_r2955391993


##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +96,108 @@ def hook(self) -> AwsGenericHook:
             config=self.botocore_config,
         )
 
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        if not self.verbose:
+            async for event in super().run():
+                yield event
+            return
+
+        hook = self.hook()
+        async with (
+            await hook.get_async_conn() as glue_client,
+            await AwsLogsHook(
+                aws_conn_id=self.aws_conn_id, region_name=self.region_name
+            ).get_async_conn() as logs_client,
+        ):
+            # Get log group name from job run metadata and initial state in 
one call
+            job_run_resp = await 
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+            log_group_prefix = job_run_resp["JobRun"].get("LogGroupName", 
"/aws-glue/jobs")
+            log_group_output = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+            log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+
+            output_token: str | None = None
+            error_token: str | None = None
+            job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+            for _attempt in range(self.attempts):
+                # Fetch and print logs from both output and error streams
+                output_token = await self._forward_logs(
+                    logs_client, log_group_output, self.run_id, output_token
+                )
+                error_token = await self._forward_logs(logs_client, 
log_group_error, self.run_id, error_token)
+
+                if job_run_state in ("FAILED", "TIMEOUT"):
+                    raise AirflowException(
+                        f"Glue Job {self.job_name} Run {self.run_id} exited 
with state: {job_run_state}"
+                    )
+                if job_run_state in ("SUCCEEDED", "STOPPED"):
+                    self.log.info(
+                        "Exiting Job %s Run %s State: %s",
+                        self.job_name,
+                        self.run_id,
+                        job_run_state,
+                    )
+                    yield TriggerEvent({"status": "success", self.return_key: 
self.return_value})
+                    return
+
+                self.log.info(
+                    "Polling for AWS Glue Job %s current run state: %s",
+                    self.job_name,
+                    job_run_state,
+                )
+                await asyncio.sleep(self.waiter_delay)
+
+                # Fetch updated state for next iteration
+                resp = await glue_client.get_job_run(JobName=self.job_name, 
RunId=self.run_id)
+                job_run_state = resp["JobRun"]["JobRunState"]
+
+            raise AirflowException(
+                f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded 
max attempts ({self.attempts})"
+            )
+
+    async def _forward_logs(
+        self,
+        logs_client: Any,
+        log_group: str,
+        log_stream: str,
+        next_token: str | None,
+    ) -> str | None:
+        # Matches the format used by the synchronous 
GlueJobHook.print_job_logs.

Review Comment:
   Hi @shivaam , @o-nikolas , We're facing the same issue and I was about to 
start drafting a solution on our side when I saw this PR. I believe instead of 
copying code from the glue hook into here, maybe both the hook and the trigger 
need to be updated. Here are two approaches, depending on how much refactoring 
we are willing to do:
   
   **In both approaches, the following principle applies:**
   - The hook MUST NOT raise an `AirflowException` in case of glue job 
failure/timeout. It should just return the state of the job run (check 
`_handle_state`, line 345 `/../providers/amazon/aws/hooks/glue.py`).
   
   **Option A:**
   - `GlueJobHook.job_completion()` keeps its existing behavior: prints logs 
via `self.log.info` and raises `AirflowException` on failure — completely 
untouched. This will be used when deferrable==False and 
wait_for_completion==True
   - `GlueJobHook.async_job_completion()` is updated to return logs as part of 
the result dict instead of printing them, and returns the failure state instead 
of raising. (`GlueJobHook.async_job_completion()` is used only when 
deferrable==True)
   - The trigger receives the result dict, and on failure/timeout passes the 
logs through the `TriggerEvent` payload to `GlueJobOperator.execute_complete`, 
which logs them via `self.log.info` in the worker process — which is exactly 
where task logs need to originate to appear in the Airflow task UI.
   - Acknowledged trade-off: `job_completion()` and `async_job_completion()` 
now behave inconsistently with respect to logging and error handling. 
Acceptable only as a short-term pragmatic fix.
   - Lower risk, smallest diff
   
   **Option B:**
   - The hook exposes only two side-effect-free methods: `get_job_state()` / 
`async_get_job_state()` and `collect_job_logs()` which always returns logs as a 
string and never prints them.
   - The polling loop moves out of the hook entirely — `job_completion`, 
`async_job_completion`, `_handle_state`, and `print_job_logs` are removed.
   - In non-deferrable mode (`wait_for_completion=True`), the operator owns the 
polling loop, calls `hook.collect_job_logs()` each cycle, and logs via 
`self.log.info` — runs in the worker process, so logs appear correctly in the 
Airflow task UI.
   - In deferrable mode, the trigger owns the async polling loop symmetrically, 
gathers the logs, and passes them through the `TriggerEvent` payload to 
`GlueJobOperator.execute_complete`.
   - Cleaner separation of concerns: the hook has zero knowledge of execution 
mode or log destination. More robust long-term but a larger diff with a higher 
rebasing cost.
   
   These are just thoughts based on running into this issue ourselves. Happy to 
discuss further or help if any of this would be useful to the PR!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to