Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3068820273
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -222,12 +222,18 @@ def execute(self, context: Context) -> str | None:
if not self.job_id:
raise AirflowException("AWS Batch job - job_id was not found")
- job = self.hook.get_job_description(self.job_id)
+ # Persist operator links before deferring so they're available in
the UI
+ # Reuse job description to reduce API calls
+ job = self._persist_links(context)
job_status = job.get("status")
if job_status == self.hook.SUCCESS_STATE:
+ # Job already completed - persist CloudWatch logs
+ self._persist_cloudwatch_link(context)
self.log.info("Job completed.")
return self.job_id
if job_status == self.hook.FAILURE_STATE:
+ # Job already failed - persist CloudWatch logs
+ self._persist_cloudwatch_link(context)
Review Comment:
In deferrable mode, `execute()` now persists the CloudWatch link when the
job is already in `SUCCEEDED`/`FAILED`, regardless of `awslogs_enabled`. This
makes `awslogs_enabled` behave differently between deferrable vs non-deferrable
paths (where `monitor_job()` gates CloudWatch work behind `awslogs_enabled`)
and can add unexpected Batch/CloudWatch API calls when log integration is
disabled. Consider gating `_persist_cloudwatch_link(context)` here behind
`self.awslogs_enabled` (or clarifying the flag semantics if the link should
always be persisted).
```suggestion
# Job already completed - persist CloudWatch logs when
enabled
if self.awslogs_enabled:
self._persist_cloudwatch_link(context)
self.log.info("Job completed.")
return self.job_id
if job_status == self.hook.FAILURE_STATE:
# Job already failed - persist CloudWatch logs when enabled
if self.awslogs_enabled:
self._persist_cloudwatch_link(context)
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -404,6 +421,34 @@ def monitor_job(self, context: Context):
**awslogs[0],
)
+ def monitor_job(self, context: Context):
+ """
+ Monitor an AWS Batch job.
+
+ This can raise an exception or an AirflowTaskTimeout if the task was
+ created with ``execution_timeout``.
+ """
+ if not self.job_id:
+ raise AirflowException("AWS Batch job - job_id was not found")
+
+ # Persist job definition and queue links
+ self._persist_links(context)
+
+ if self.awslogs_enabled:
+ if self.waiters:
+ self.waiters.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
+ else:
+ self.hook.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
+ else:
+ if self.waiters:
+ self.waiters.wait_for_job(self.job_id)
+ else:
+ self.hook.wait_for_job(self.job_id)
+
+ # After job completes, persist CloudWatch logs when log integration is
enabled
+ if self.awslogs_enabled:
+ self._persist_cloudwatch_link(context)
Review Comment:
`monitor_job()` now persists the CloudWatch link only when
`awslogs_enabled=True`, which changes behavior vs previous versions and will
require updating/adding unit tests (e.g. `test_execute_without_failures`
currently asserts `get_job_description` is called twice due to unconditional
CloudWatch discovery). Please adjust tests to cover both branches (no
CloudWatch API calls when `awslogs_enabled=False`, and CloudWatch link
persistence when enabled).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]