This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 98ea29a1e9f Upload log with `put_object` in `s3_task_handler` (#68619)
98ea29a1e9f is described below

commit 98ea29a1e9f212d8cd004887be95070b41a3762e
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Jun 16 22:31:57 2026 +0900

    Upload log with `put_object` in `s3_task_handler` (#68619)
    
    When S3 is configured as the remote log backend, task log files are 
incorrectly reported as task outputs in OpenLineage events.
    
    Upload the log via the boto3 client directly (get_conn().put_object(...)) 
instead of S3Hook.load_string(), so the log write never touches the hook 
lineage collector.
---
 .../providers/amazon/aws/log/s3_task_handler.py        | 18 +++++++++++++-----
 .../tests/unit/amazon/aws/log/test_s3_task_handler.py  | 15 +++++++++++++++
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py 
b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
index d27530f7c87..b1ce26098fb 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -128,16 +128,24 @@ class S3RemoteLogIO(LoggingMixin):  # noqa: D101
             self.log.exception("Could not verify previous log to append")
             return False
 
+        bucket, key = self.hook.parse_s3_url(remote_log_location)
+        # Upload the log via the boto3 client directly instead of 
S3Hook.load_string. The hook's
+        # upload helpers report the object to the hook lineage collector, 
which would make task
+        # logs show up as task outputs in OpenLineage events. Logs are not 
task data assets.
+        extra_args = {}
+        if conf.getboolean("logging", "ENCRYPT_S3_LOGS"):
+            extra_args["ServerSideEncryption"] = "AES256"
+
         # Default to a single retry attempt because s3 upload failures are
         # rare but occasionally occur.  Multiple retry attempts are unlikely
         # to help as they usually indicate non-ephemeral errors.
         for try_num in range(1 + max_retry):
             try:
-                self.hook.load_string(
-                    log,
-                    key=remote_log_location,
-                    replace=True,
-                    encrypt=conf.getboolean("logging", "ENCRYPT_S3_LOGS"),
+                self.hook.get_conn().put_object(
+                    Bucket=bucket,
+                    Key=key,
+                    Body=log.encode("utf-8"),
+                    **extra_args,
                 )
                 break
             except Exception:
diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py 
b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
index 211a213b3b8..01614ff732b 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
@@ -184,6 +184,21 @@ class TestS3RemoteLogIO:
 
         assert body == b"previous \ntext"
 
+    def test_write_does_not_expose_lineage(self, hook_lineage_collector):
+        # Remote task logs are not task data assets, so uploading them must 
not add the S3 object
+        # as a task output in OpenLineage events.
+        self.subject.write("text", self.remote_log_location)
+        assert hook_lineage_collector.collected_assets.outputs == []
+        assert hook_lineage_collector.collected_assets.inputs == []
+
+    @conf_vars({("logging", "encrypt_s3_logs"): "True"})
+    def test_write_with_encryption(self):
+        self.subject.write("text", self.remote_log_location)
+        resp = self.conn.head_object(Bucket="bucket", Key=self.remote_log_key)
+        assert resp["ServerSideEncryption"] == "AES256"
+        body = boto3.resource("s3").Object("bucket", 
self.remote_log_key).get()["Body"].read()
+        assert body == b"text"
+
     def test_upload_repeated_appends_no_duplication(self):
         """Simulate reschedule-mode sensor: each cycle appends to the local 
log, then uploads.
 

Reply via email to