This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 9223ddf519 test(pyamber): add unit tests for _QueueReader and 
cleanup-failed-upload paths (#4707)
9223ddf519 is described below

commit 9223ddf5192ca65672663cc968a46d10d769ca51
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 15:00:50 2026 -0700

    test(pyamber): add unit tests for _QueueReader and cleanup-failed-upload 
paths (#4707)
    
    ### What changes were proposed in this PR?
    
    Adds direct pytest coverage for two pieces of
    `large_binary_output_stream.py` that previously had only end-to-end
    coverage:
    
    - The private `_QueueReader` helper (partial-read buffering, EOF
    sentinel handling).
    - `_cleanup_failed_upload`'s silent-swallow path (when both upload and
    the post-failure `delete_object` raise).
    
    Writing the second test surfaced a real bug, which is also fixed here:
    `LargeBinaryInputStream` and `LargeBinaryOutputStream` extend
    `io.IOBase` but maintained their own `self._closed` flag and overrode
    the `closed` property without ever calling `super().close()`. IOBase's
    internal closed flag stayed `False`, so on Python 3.13 `IOBase.__del__`
    re-invoked `close()` during finalization. On the failed-upload path that
    re-entry called `_cleanup_failed_upload` a second time, sending a
    duplicate `delete_object` to S3 and tripping `assert_called_once_with` —
    only on the 3.13 matrix leg.
    
    Fix: drop the redundant `self._closed` flag and `closed` property
    override in both classes; wrap the existing `close()` logic in
    `try/finally` and call `super().close()` in the `finally`. IOBase now
    records the closed state even when our cleanup raises, so the implicit
    finalizer call early-returns on `self.closed` and the cleanup runs at
    most once. Existing `self._closed` reads (`write`, `writable`,
    `readable`, `_require_open` decorator) move to `self.closed`.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4706.
    
    ### How was this PR tested?
    
    ```
    cd amber/src/main/python
    ruff check 
pytexera/storage/{large_binary_input_stream,large_binary_output_stream,test_large_binary_output_stream}.py
    ruff format --check 
pytexera/storage/{large_binary_input_stream,large_binary_output_stream,test_large_binary_output_stream}.py
    python -m pytest pytexera/storage/test_large_binary_output_stream.py
    ```
    
    CI exercises the change across the full `python` matrix; before the fix
    only the 3.13 leg failed on `test_delete_object_failure_is_swallowed`.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
---
 .../pytexera/storage/large_binary_input_stream.py  |  24 ++--
 .../pytexera/storage/large_binary_output_stream.py |  51 +++++----
 .../storage/test_large_binary_input_stream.py      |  12 +-
 .../storage/test_large_binary_output_stream.py     | 127 ++++++++++++++++++++-
 4 files changed, 169 insertions(+), 45 deletions(-)

diff --git 
a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py 
b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
index 8e7d864040..68368c5c12 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
@@ -34,7 +34,7 @@ def _require_open(func):
 
     @wraps(func)
     def wrapper(self, *args, **kwargs):
-        if self._closed:
+        if self.closed:
             raise ValueError("I/O operation on closed stream")
         if self._underlying is None:
             self._lazy_init()
@@ -57,7 +57,6 @@ class LargeBinaryInputStream(IOBase):
             raise ValueError("largebinary cannot be None")
         self._large_binary = large_binary
         self._underlying: Optional[BinaryIO] = None
-        self._closed = False
 
     def _lazy_init(self):
         """Download from S3 on first read operation."""
@@ -87,23 +86,26 @@ class LargeBinaryInputStream(IOBase):
 
     def readable(self) -> bool:
         """Return True if the stream can be read from."""
-        return not self._closed
+        return not self.closed
 
     def seekable(self) -> bool:
         """Return False - this stream does not support seeking."""
         return False
 
-    @property
-    def closed(self) -> bool:
-        """Return True if the stream is closed."""
-        return self._closed
-
     def close(self) -> None:
-        """Close the stream and release resources."""
-        if not self._closed:
-            self._closed = True
+        """Close the stream and release resources.
+
+        Idempotent: subsequent calls (including IOBase's __del__-driven
+        finalize on Python 3.13+) are no-ops because IOBase tracks the
+        closed state via super().close() below.
+        """
+        if self.closed:
+            return
+        try:
             if self._underlying is not None:
                 self._underlying.close()
+        finally:
+            super().close()
 
     def __enter__(self):
         return self
diff --git 
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py 
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
index af4f1a275c..0cdf8a3679 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -117,7 +117,6 @@ class LargeBinaryOutputStream(IOBase):
         self._large_binary = large_binary
         self._bucket_name = large_binary.get_bucket_name()
         self._object_key = large_binary.get_object_key()
-        self._closed = False
 
         # Background upload thread state
         self._queue: queue.Queue = queue.Queue(maxsize=_CHUNK_SIZE)
@@ -140,7 +139,7 @@ class LargeBinaryOutputStream(IOBase):
             ValueError: If stream is closed
             IOError: If previous upload failed
         """
-        if self._closed:
+        if self.closed:
             raise ValueError("I/O operation on closed stream")
 
         # Check if upload has failed
@@ -177,17 +176,12 @@ class LargeBinaryOutputStream(IOBase):
 
     def writable(self) -> bool:
         """Return True if the stream can be written to."""
-        return not self._closed
+        return not self.closed
 
     def seekable(self) -> bool:
         """Return False - this stream does not support seeking."""
         return False
 
-    @property
-    def closed(self) -> bool:
-        """Return True if the stream is closed."""
-        return self._closed
-
     def flush(self) -> None:
         """
         Flush the write buffer.
@@ -203,27 +197,36 @@ class LargeBinaryOutputStream(IOBase):
         Close the stream and complete the S3 upload.
         Blocks until upload is complete. Raises IOError if upload failed.
 
+        Idempotent: subsequent calls (including IOBase's __del__-driven
+        finalize on Python 3.13+) are no-ops because IOBase tracks the
+        closed state via super().close() below.
+
         Raises:
             IOError: If upload failed
         """
-        if self._closed:
+        if self.closed:
             return
 
-        self._closed = True
-
-        # Signal EOF to upload thread and wait for completion
-        if self._upload_thread is not None:
-            self._queue.put(None, block=True)  # EOF marker
-            self._upload_thread.join()
-            self._upload_complete.wait()
-
-            # Check for errors and cleanup if needed
-            with self._lock:
-                exception = self._upload_exception
-
-            if exception is not None:
-                self._cleanup_failed_upload()
-                raise IOError(f"Failed to complete upload: {exception}") from 
exception
+        try:
+            # Signal EOF to upload thread and wait for completion
+            if self._upload_thread is not None:
+                self._queue.put(None, block=True)  # EOF marker
+                self._upload_thread.join()
+                self._upload_complete.wait()
+
+                # Check for errors and cleanup if needed
+                with self._lock:
+                    exception = self._upload_exception
+
+                if exception is not None:
+                    self._cleanup_failed_upload()
+                    raise IOError(
+                        f"Failed to complete upload: {exception}"
+                    ) from exception
+        finally:
+            # Mark IOBase as closed even if we raised, so __del__ skips
+            # the second close() call on Python 3.13+.
+            super().close()
 
     def _cleanup_failed_upload(self):
         """Clean up a failed upload by deleting the S3 object."""
diff --git 
a/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py 
b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
index 85bdbd13fa..35bc5bc634 100644
--- a/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
@@ -40,7 +40,7 @@ class TestLargeBinaryInputStream:
         try:
             assert stream._large_binary == large_binary
             assert stream._underlying is None
-            assert not stream._closed
+            assert not stream.closed
         finally:
             stream.close()
 
@@ -139,7 +139,7 @@ class TestLargeBinaryInputStream:
             stream.close()
             assert stream.readable() is False
         finally:
-            if not stream._closed:
+            if not stream.closed:
                 stream.close()
 
     def test_seekable(self, large_binary):
@@ -159,7 +159,7 @@ class TestLargeBinaryInputStream:
             stream.close()
             assert stream.closed is True
         finally:
-            if not stream._closed:
+            if not stream.closed:
                 stream.close()
 
     def test_close(self, large_binary, mock_s3_response):
@@ -174,7 +174,7 @@ class TestLargeBinaryInputStream:
             assert stream._underlying is not None
 
             stream.close()
-            assert stream._closed is True
+            assert stream.closed is True
             assert stream._underlying.closed
 
     def test_context_manager(self, large_binary, mock_s3_response):
@@ -187,10 +187,10 @@ class TestLargeBinaryInputStream:
             with LargeBinaryInputStream(large_binary) as stream:
                 data = stream.read()
                 assert data == b"test data content"
-                assert not stream._closed
+                assert not stream.closed
 
             # Stream should be closed after context exit
-            assert stream._closed
+            assert stream.closed
 
     def test_iteration(self, large_binary):
         """Test iteration over lines."""
diff --git 
a/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py 
b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
index 7ebcc9b4cf..ce9fc4e5d1 100644
--- a/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
@@ -15,11 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import queue
 import pytest
 import time
 from unittest.mock import patch, MagicMock
 from core.models.type.large_binary import largebinary
-from pytexera.storage.large_binary_output_stream import LargeBinaryOutputStream
+from pytexera.storage.large_binary_output_stream import (
+    LargeBinaryOutputStream,
+    _QueueReader,
+)
 from pytexera.storage import large_binary_manager
 
 
@@ -35,7 +39,7 @@ class TestLargeBinaryOutputStream:
         assert stream._large_binary == large_binary
         assert stream._bucket_name == "test-bucket"
         assert stream._object_key == "path/to/object"
-        assert not stream._closed
+        assert not stream.closed
         assert stream._upload_thread is None
 
     def test_init_with_none_raises_error(self):
@@ -165,10 +169,10 @@ class TestLargeBinaryOutputStream:
 
             with LargeBinaryOutputStream(large_binary) as stream:
                 stream.write(b"test data")
-                assert not stream._closed
+                assert not stream.closed
 
             # Stream should be closed after context exit
-            assert stream._closed
+            assert stream.closed
 
     def test_write_after_close_raises_error(self, large_binary):
         """Test that writing after close raises ValueError."""
@@ -236,3 +240,118 @@ class TestLargeBinaryOutputStream:
             stream.close()
             # Second close should not raise error
             stream.close()
+
+
+class TestCleanupFailedUpload:
+    """Direct unit tests for _cleanup_failed_upload's silent-swallow path."""
+
+    @pytest.fixture
+    def large_binary(self):
+        return largebinary("s3://test-bucket/path/to/object")
+
+    def test_delete_object_failure_is_swallowed(self, large_binary):
+        # If the post-failure cleanup itself raises, the original upload
+        # IOError must still surface unmasked. Pinning this so a future
+        # change that propagates cleanup errors is intentional.
+        with (
+            patch.object(large_binary_manager, "_get_s3_client") as 
mock_get_s3_client,
+            patch.object(
+                large_binary_manager, "_ensure_bucket_exists"
+            ) as mock_ensure_bucket,
+        ):
+            mock_s3 = MagicMock()
+            mock_get_s3_client.return_value = mock_s3
+            mock_ensure_bucket.return_value = None
+            mock_s3.upload_fileobj.side_effect = Exception("upload failed")
+            mock_s3.delete_object.side_effect = Exception("delete also failed")
+
+            stream = LargeBinaryOutputStream(large_binary)
+            stream.write(b"data")
+            with pytest.raises(IOError, match="Failed to complete upload"):
+                stream.close()
+            mock_s3.delete_object.assert_called_once_with(
+                Bucket="test-bucket", Key="path/to/object"
+            )
+
+
+class TestQueueReader:
+    """Direct unit tests for the private _QueueReader helper."""
+
+    @staticmethod
+    def _populate(q: queue.Queue, *items):
+        for item in items:
+            q.put(item)
+        return q
+
+    def test_read_returns_empty_on_immediate_eof(self):
+        q = self._populate(queue.Queue(), None)
+        reader = _QueueReader(q)
+        assert reader.read() == b""
+
+    def test_read_after_eof_returns_empty_repeatedly(self):
+        q = self._populate(queue.Queue(), b"abc", None)
+        reader = _QueueReader(q)
+        assert reader.read() == b"abc"
+        # Subsequent reads must keep returning empty without blocking.
+        assert reader.read() == b""
+        assert reader.read(10) == b""
+
+    def test_read_default_size_joins_all_chunks_until_eof(self):
+        q = self._populate(queue.Queue(), b"abc", b"def", b"ghi", None)
+        reader = _QueueReader(q)
+        assert reader.read() == b"abcdefghi"
+
+    def test_read_with_explicit_size_smaller_than_first_chunk(self):
+        q = self._populate(queue.Queue(), b"abcdef", None)
+        reader = _QueueReader(q)
+        assert reader.read(3) == b"abc"
+        # Remainder is buffered for the next read; EOF marker drained next.
+        assert reader.read() == b"def"
+
+    def test_read_buffer_remainder_carries_over_subsequent_calls(self):
+        q = self._populate(queue.Queue(), b"helloworld", None)
+        reader = _QueueReader(q)
+        assert reader.read(5) == b"hello"
+        # Pull two more bytes from the buffer; rest stays buffered.
+        assert reader.read(2) == b"wo"
+        assert reader.read() == b"rld"
+
+    def test_read_size_can_span_multiple_queued_chunks(self):
+        q = self._populate(queue.Queue(), b"ab", b"cd", b"ef", None)
+        reader = _QueueReader(q)
+        assert reader.read(5) == b"abcde"
+        assert reader.read() == b"f"
+
+    def test_read_size_zero_returns_empty_and_preserves_buffer(self):
+        # _QueueReader.read(size=0) must short-circuit without consuming
+        # bytes that the caller hasn't asked for.
+        q = self._populate(queue.Queue(), b"abc", None)
+        reader = _QueueReader(q)
+        # Prime the buffer by reading 1 byte, leaving "bc" buffered.
+        assert reader.read(1) == b"a"
+        assert reader.read(0) == b""
+        # Nothing was lost: a follow-up read still surfaces the rest.
+        assert reader.read() == b"bc"
+
+    def test_read_with_size_larger_than_available_returns_all_before_eof(self):
+        q = self._populate(queue.Queue(), b"abc", None)
+        reader = _QueueReader(q)
+        assert reader.read(100) == b"abc"
+
+    def test_eof_only_terminates_when_queue_drained_first(self):
+        # Bytes queued before the EOF sentinel must all surface in the first 
read.
+        q = self._populate(queue.Queue(), b"x", b"y", b"z", None)
+        reader = _QueueReader(q)
+        assert reader.read() == b"xyz"
+
+    def test_read_polls_until_data_arrives(self):
+        # Validates the queue.Empty retry path: the reader must continue
+        # past a timeout and only return once data is available.
+        # Using a mock with a deterministic side_effect avoids real sleeps
+        # and the flakiness of relying on a background thread under load.
+        q = MagicMock()
+        q.get.side_effect = [queue.Empty(), b"late", None]
+        reader = _QueueReader(q)
+        assert reader.read() == b"late"
+        # The first call raised Empty, so we expect three total get() calls.
+        assert q.get.call_count == 3

Reply via email to