Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3289450942


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +339,25 @@ def submit_job(self, context: Context):
             job_id=self.job_id,
         )
 
-    def monitor_job(self, context: Context):
+    def _persist_links(
+        self,
+        context: Context,
+        job_description: dict | None = None,
+    ) -> dict:
         """
-        Monitor an AWS Batch job.
+        Persist job definition and queue links for UI display.
 
-        This can raise an exception or an AirflowTaskTimeout if the task was
-        created with ``execution_timeout``.
+        :param context: Task context
+        :param job_description: Optional pre-fetched job description to avoid 
redundant API calls
+        :return: Job description dict
         """
         if not self.job_id:
-            raise AirflowException("AWS Batch job - job_id was not found")
+            raise ValueError("AWS Batch job - job_id was not found")

Review Comment:
   `_persist_links()` now raises `ValueError` for a missing `job_id`, while the 
rest of the operator (and Airflow operators generally) use `AirflowException` 
for runtime task failures. This change can alter error classification/handling 
and is inconsistent with nearby code (e.g., `execute()` already raises 
`AirflowException` for the same condition). Prefer raising `AirflowException` 
here for consistency and clearer operator failure semantics.
   



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -252,14 +258,17 @@ def execute(self, context: Context) -> str | None:
     def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> str:
         validated_event = validate_execute_complete_event(event)
 
+        # Set job_id first so CloudWatch link can be persisted even on failure
+        self.job_id = validated_event["job_id"]
+
+        # Persist CloudWatch logs for both success and failure
+        self._persist_cloudwatch_link(context)
+
         if validated_event["status"] != "success":
             raise AirflowException(f"Error while running job: 
{validated_event}")
 
-        self.job_id = validated_event["job_id"]
-
-        # Fetch logs if awslogs_enabled
-        if self.awslogs_enabled:
-            self.monitor_job(context)  # fetch logs, no need to return
+        # Check job success (already know status is "success" from above)
+        self.hook.check_job_success(self.job_id)
 
         self.log.info("Job completed successfully for job_id: %s", self.job_id)
         return self.job_id

Review Comment:
   `awslogs_enabled` is no longer honored in deferrable mode: 
`execute_complete()` used to call `monitor_job()` to fetch/emit CloudWatch logs 
when `awslogs_enabled=True`, but now it only persists a UI link and checks 
success. This is a behavior regression relative to the parameter’s documented 
purpose (“logs … should be printed or not”). Consider restoring log fetching in 
`execute_complete()` when `awslogs_enabled` is enabled (or update 
docs/parameter behavior if intentionally changed).



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -404,6 +424,33 @@ def monitor_job(self, context: Context):
                 **awslogs[0],
             )
 
+    def monitor_job(self, context: Context):
+        """
+        Monitor an AWS Batch job.
+
+        This can raise an exception or an AirflowTaskTimeout if the task was
+        created with ``execution_timeout``.
+        """
+        if not self.job_id:
+            raise ValueError("AWS Batch job - job_id was not found")

Review Comment:
   `monitor_job()` is a public method and previously raised `AirflowException` 
for missing `job_id`; changing it to `ValueError` is a breaking behavioral 
change for any callers expecting an `AirflowException` (including consistent 
task-failure messaging). Recommend keeping `AirflowException` (or another 
Airflow-specific exception type) for backward compatibility.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to