This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 98ea29a1e9f Upload log with `put_object` in `s3_task_handler` (#68619)
98ea29a1e9f is described below
commit 98ea29a1e9f212d8cd004887be95070b41a3762e
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Jun 16 22:31:57 2026 +0900
Upload log with `put_object` in `s3_task_handler` (#68619)
When S3 is configured as the remote log backend, task log files are
incorrectly reported as task outputs in OpenLineage events.
Upload the log via the boto3 client directly (get_conn().put_object(...))
instead of S3Hook.load_string(), so the log write never touches the hook
lineage collector.
---
.../providers/amazon/aws/log/s3_task_handler.py | 18 +++++++++++++-----
.../tests/unit/amazon/aws/log/test_s3_task_handler.py | 15 +++++++++++++++
2 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
index d27530f7c87..b1ce26098fb 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -128,16 +128,24 @@ class S3RemoteLogIO(LoggingMixin): # noqa: D101
self.log.exception("Could not verify previous log to append")
return False
+ bucket, key = self.hook.parse_s3_url(remote_log_location)
+ # Upload the log via the boto3 client directly instead of
S3Hook.load_string. The hook's
+ # upload helpers report the object to the hook lineage collector,
which would make task
+ # logs show up as task outputs in OpenLineage events. Logs are not
task data assets.
+ extra_args = {}
+ if conf.getboolean("logging", "ENCRYPT_S3_LOGS"):
+ extra_args["ServerSideEncryption"] = "AES256"
+
# Default to a single retry attempt because s3 upload failures are
# rare but occasionally occur. Multiple retry attempts are unlikely
# to help as they usually indicate non-ephemeral errors.
for try_num in range(1 + max_retry):
try:
- self.hook.load_string(
- log,
- key=remote_log_location,
- replace=True,
- encrypt=conf.getboolean("logging", "ENCRYPT_S3_LOGS"),
+ self.hook.get_conn().put_object(
+ Bucket=bucket,
+ Key=key,
+ Body=log.encode("utf-8"),
+ **extra_args,
)
break
except Exception:
diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
index 211a213b3b8..01614ff732b 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
@@ -184,6 +184,21 @@ class TestS3RemoteLogIO:
assert body == b"previous \ntext"
+ def test_write_does_not_expose_lineage(self, hook_lineage_collector):
+ # Remote task logs are not task data assets, so uploading them must
not add the S3 object
+ # as a task output in OpenLineage events.
+ self.subject.write("text", self.remote_log_location)
+ assert hook_lineage_collector.collected_assets.outputs == []
+ assert hook_lineage_collector.collected_assets.inputs == []
+
+ @conf_vars({("logging", "encrypt_s3_logs"): "True"})
+ def test_write_with_encryption(self):
+ self.subject.write("text", self.remote_log_location)
+ resp = self.conn.head_object(Bucket="bucket", Key=self.remote_log_key)
+ assert resp["ServerSideEncryption"] == "AES256"
+ body = boto3.resource("s3").Object("bucket",
self.remote_log_key).get()["Body"].read()
+ assert body == b"text"
+
def test_upload_repeated_appends_no_duplication(self):
"""Simulate reschedule-mode sensor: each cycle appends to the local
log, then uploads.