vandonr-amz commented on code in PR #28321:
URL: https://github.com/apache/airflow/pull/28321#discussion_r1050994658
##
airflow/providers/amazon/aws/hooks/batch_client.py:
##
@@ -416,8 +416,43 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str,
str] | None:
:param job_id: AWS Batch Job ID
"""
-job_container_desc =
self.get_job_description(job_id=job_id).get("container", {})
-log_configuration = job_container_desc.get("logConfiguration", {})
+job_desc = self.get_job_description(job_id=job_id)
+
+job_node_properties = job_desc.get("nodeProperties", {})
+job_container_desc = job_desc.get("container", {})
+
+if job_node_properties:
+job_node_range_properties =
job_node_properties.get("nodeRangeProperties", {})
+if len(job_node_range_properties) > 1:
+self.log.warning(
+"AWS Batch job (%s) has more than one node group. Only
returning logs from first group.",
+job_id,
+)
+log_configuration = (
+job_node_range_properties[0].get("container",
{}).get("logConfiguration", {})
Review Comment:
Is it possible to have zero element in the array ? i.e. should we add a
check on `len == 0` and a user-friendly error message ?
##
tests/providers/amazon/aws/operators/test_batch.py:
##
@@ -187,6 +188,47 @@ def test_kill_job(self):
self.client_mock.terminate_job.assert_called_once_with(jobId=JOB_ID,
reason="Task killed by the user")
+class TestBatchOperatorTrimmedArgs:
+"""test class that does not inherit from unittest.TestCase"""
Review Comment:
I added this class before @Taragolis removed `unittest.TestCase` in all AWS
tests, this can be merged with the class above now :)
##
airflow/providers/amazon/aws/hooks/batch_client.py:
##
@@ -416,8 +416,43 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str,
str] | None:
:param job_id: AWS Batch Job ID
"""
-job_container_desc =
self.get_job_description(job_id=job_id).get("container", {})
-log_configuration = job_container_desc.get("logConfiguration", {})
+job_desc = self.get_job_description(job_id=job_id)
+
+job_node_properties = job_desc.get("nodeProperties", {})
+job_container_desc = job_desc.get("container", {})
+
+if job_node_properties:
+job_node_range_properties =
job_node_properties.get("nodeRangeProperties", {})
+if len(job_node_range_properties) > 1:
+self.log.warning(
+"AWS Batch job (%s) has more than one node group. Only
returning logs from first group.",
+job_id,
+)
+log_configuration = (
+job_node_range_properties[0].get("container",
{}).get("logConfiguration", {})
+)
+# "logStreamName" value is not available in the "container" object
for multinode jobs --
+# it is available in the "attempts" object
+job_attempts = job_desc.get("attempts", [])
+if len(job_attempts):
+if len(job_attempts) > 1:
+self.log.warning(
+"AWS Batch job (%s) has had more than one attempt. \
+Only returning logs from the most recent attempt.",
+job_id,
+)
+awslogs_stream_name = job_attempts[-1].get("container",
{}).get("logStreamName")
+else:
+awslogs_stream_name = None
+
+elif job_container_desc:
+log_configuration = job_container_desc.get("logConfiguration", {})
+awslogs_stream_name = job_container_desc.get("logStreamName")
+else:
+self.log.warning(
+"AWS Batch job (%s) is neither a container nor multinode job.
Log info not found."
Review Comment:
Maybe this could be an error log, considering the user-provided input is
invalid for this kind of request ?
The other warning logs in this method are mostly informative (there are
several node groups, which is important info for the user to know, but doesn't
require any action), this one I think requires user action, and thus more
attention.
##
tests/providers/amazon/aws/hooks/test_batch_client.py:
##
@@ -309,6 +326,40 @@ def test_job_splunk_logs(self, caplog):
assert len(caplog.records) == 1
assert "uses logDriver (splunk). AWS CloudWatch logging disabled."
in caplog.messages[0]
+def test_job_awslogs_multinode_job(self):
+self.client_mock.describe_jobs.return_value = {
+"jobs": [
+{
+"jobId": JOB_ID,
+"attempts": [
+{"container": {"exitCode": 0, "logStreamName":
"test/stream/attempt0"}},
+