This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9223ddf519 test(pyamber): add unit tests for _QueueReader and
cleanup-failed-upload paths (#4707)
9223ddf519 is described below
commit 9223ddf5192ca65672663cc968a46d10d769ca51
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 15:00:50 2026 -0700
test(pyamber): add unit tests for _QueueReader and cleanup-failed-upload
paths (#4707)
### What changes were proposed in this PR?
Adds direct pytest coverage for two pieces of
`large_binary_output_stream.py` that previously had only end-to-end
coverage:
- The private `_QueueReader` helper (partial-read buffering, EOF
sentinel handling).
- `_cleanup_failed_upload`'s silent-swallow path (when both upload and
the post-failure `delete_object` raise).
Writing the second test surfaced a real bug, which is also fixed here:
`LargeBinaryInputStream` and `LargeBinaryOutputStream` extend
`io.IOBase` but maintained their own `self._closed` flag and overrode
the `closed` property without ever calling `super().close()`. IOBase's
internal closed flag stayed `False`, so on Python 3.13 `IOBase.__del__`
re-invoked `close()` during finalization. On the failed-upload path that
re-entry called `_cleanup_failed_upload` a second time, sending a
duplicate `delete_object` to S3 and tripping `assert_called_once_with` —
only on the 3.13 matrix leg.
Fix: drop the redundant `self._closed` flag and `closed` property
override in both classes; wrap the existing `close()` logic in
`try/finally` and call `super().close()` in the `finally`. IOBase now
records the closed state even when our cleanup raises, so the implicit
finalizer call early-returns on `self.closed` and the cleanup runs at
most once. Existing `self._closed` reads (`write`, `writable`,
`readable`, `_require_open` decorator) move to `self.closed`.
### Any related issues, documentation, discussions?
Closes #4706.
### How was this PR tested?
```
cd amber/src/main/python
ruff check
pytexera/storage/{large_binary_input_stream,large_binary_output_stream,test_large_binary_output_stream}.py
ruff format --check
pytexera/storage/{large_binary_input_stream,large_binary_output_stream,test_large_binary_output_stream}.py
python -m pytest pytexera/storage/test_large_binary_output_stream.py
```
CI exercises the change across the full `python` matrix; before the fix
only the 3.13 leg failed on `test_delete_object_failure_is_swallowed`.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---
.../pytexera/storage/large_binary_input_stream.py | 24 ++--
.../pytexera/storage/large_binary_output_stream.py | 51 +++++----
.../storage/test_large_binary_input_stream.py | 12 +-
.../storage/test_large_binary_output_stream.py | 127 ++++++++++++++++++++-
4 files changed, 169 insertions(+), 45 deletions(-)
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
index 8e7d864040..68368c5c12 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
@@ -34,7 +34,7 @@ def _require_open(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
- if self._closed:
+ if self.closed:
raise ValueError("I/O operation on closed stream")
if self._underlying is None:
self._lazy_init()
@@ -57,7 +57,6 @@ class LargeBinaryInputStream(IOBase):
raise ValueError("largebinary cannot be None")
self._large_binary = large_binary
self._underlying: Optional[BinaryIO] = None
- self._closed = False
def _lazy_init(self):
"""Download from S3 on first read operation."""
@@ -87,23 +86,26 @@ class LargeBinaryInputStream(IOBase):
def readable(self) -> bool:
"""Return True if the stream can be read from."""
- return not self._closed
+ return not self.closed
def seekable(self) -> bool:
"""Return False - this stream does not support seeking."""
return False
- @property
- def closed(self) -> bool:
- """Return True if the stream is closed."""
- return self._closed
-
def close(self) -> None:
- """Close the stream and release resources."""
- if not self._closed:
- self._closed = True
+ """Close the stream and release resources.
+
+ Idempotent: subsequent calls (including IOBase's __del__-driven
+ finalize on Python 3.13+) are no-ops because IOBase tracks the
+ closed state via super().close() below.
+ """
+ if self.closed:
+ return
+ try:
if self._underlying is not None:
self._underlying.close()
+ finally:
+ super().close()
def __enter__(self):
return self
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
index af4f1a275c..0cdf8a3679 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -117,7 +117,6 @@ class LargeBinaryOutputStream(IOBase):
self._large_binary = large_binary
self._bucket_name = large_binary.get_bucket_name()
self._object_key = large_binary.get_object_key()
- self._closed = False
# Background upload thread state
self._queue: queue.Queue = queue.Queue(maxsize=_CHUNK_SIZE)
@@ -140,7 +139,7 @@ class LargeBinaryOutputStream(IOBase):
ValueError: If stream is closed
IOError: If previous upload failed
"""
- if self._closed:
+ if self.closed:
raise ValueError("I/O operation on closed stream")
# Check if upload has failed
@@ -177,17 +176,12 @@ class LargeBinaryOutputStream(IOBase):
def writable(self) -> bool:
"""Return True if the stream can be written to."""
- return not self._closed
+ return not self.closed
def seekable(self) -> bool:
"""Return False - this stream does not support seeking."""
return False
- @property
- def closed(self) -> bool:
- """Return True if the stream is closed."""
- return self._closed
-
def flush(self) -> None:
"""
Flush the write buffer.
@@ -203,27 +197,36 @@ class LargeBinaryOutputStream(IOBase):
Close the stream and complete the S3 upload.
Blocks until upload is complete. Raises IOError if upload failed.
+ Idempotent: subsequent calls (including IOBase's __del__-driven
+ finalize on Python 3.13+) are no-ops because IOBase tracks the
+ closed state via super().close() below.
+
Raises:
IOError: If upload failed
"""
- if self._closed:
+ if self.closed:
return
- self._closed = True
-
- # Signal EOF to upload thread and wait for completion
- if self._upload_thread is not None:
- self._queue.put(None, block=True) # EOF marker
- self._upload_thread.join()
- self._upload_complete.wait()
-
- # Check for errors and cleanup if needed
- with self._lock:
- exception = self._upload_exception
-
- if exception is not None:
- self._cleanup_failed_upload()
- raise IOError(f"Failed to complete upload: {exception}") from
exception
+ try:
+ # Signal EOF to upload thread and wait for completion
+ if self._upload_thread is not None:
+ self._queue.put(None, block=True) # EOF marker
+ self._upload_thread.join()
+ self._upload_complete.wait()
+
+ # Check for errors and cleanup if needed
+ with self._lock:
+ exception = self._upload_exception
+
+ if exception is not None:
+ self._cleanup_failed_upload()
+ raise IOError(
+ f"Failed to complete upload: {exception}"
+ ) from exception
+ finally:
+ # Mark IOBase as closed even if we raised, so __del__ skips
+ # the second close() call on Python 3.13+.
+ super().close()
def _cleanup_failed_upload(self):
"""Clean up a failed upload by deleting the S3 object."""
diff --git
a/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
index 85bdbd13fa..35bc5bc634 100644
--- a/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
@@ -40,7 +40,7 @@ class TestLargeBinaryInputStream:
try:
assert stream._large_binary == large_binary
assert stream._underlying is None
- assert not stream._closed
+ assert not stream.closed
finally:
stream.close()
@@ -139,7 +139,7 @@ class TestLargeBinaryInputStream:
stream.close()
assert stream.readable() is False
finally:
- if not stream._closed:
+ if not stream.closed:
stream.close()
def test_seekable(self, large_binary):
@@ -159,7 +159,7 @@ class TestLargeBinaryInputStream:
stream.close()
assert stream.closed is True
finally:
- if not stream._closed:
+ if not stream.closed:
stream.close()
def test_close(self, large_binary, mock_s3_response):
@@ -174,7 +174,7 @@ class TestLargeBinaryInputStream:
assert stream._underlying is not None
stream.close()
- assert stream._closed is True
+ assert stream.closed is True
assert stream._underlying.closed
def test_context_manager(self, large_binary, mock_s3_response):
@@ -187,10 +187,10 @@ class TestLargeBinaryInputStream:
with LargeBinaryInputStream(large_binary) as stream:
data = stream.read()
assert data == b"test data content"
- assert not stream._closed
+ assert not stream.closed
# Stream should be closed after context exit
- assert stream._closed
+ assert stream.closed
def test_iteration(self, large_binary):
"""Test iteration over lines."""
diff --git
a/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
index 7ebcc9b4cf..ce9fc4e5d1 100644
--- a/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
@@ -15,11 +15,15 @@
# specific language governing permissions and limitations
# under the License.
+import queue
import pytest
import time
from unittest.mock import patch, MagicMock
from core.models.type.large_binary import largebinary
-from pytexera.storage.large_binary_output_stream import LargeBinaryOutputStream
+from pytexera.storage.large_binary_output_stream import (
+ LargeBinaryOutputStream,
+ _QueueReader,
+)
from pytexera.storage import large_binary_manager
@@ -35,7 +39,7 @@ class TestLargeBinaryOutputStream:
assert stream._large_binary == large_binary
assert stream._bucket_name == "test-bucket"
assert stream._object_key == "path/to/object"
- assert not stream._closed
+ assert not stream.closed
assert stream._upload_thread is None
def test_init_with_none_raises_error(self):
@@ -165,10 +169,10 @@ class TestLargeBinaryOutputStream:
with LargeBinaryOutputStream(large_binary) as stream:
stream.write(b"test data")
- assert not stream._closed
+ assert not stream.closed
# Stream should be closed after context exit
- assert stream._closed
+ assert stream.closed
def test_write_after_close_raises_error(self, large_binary):
"""Test that writing after close raises ValueError."""
@@ -236,3 +240,118 @@ class TestLargeBinaryOutputStream:
stream.close()
# Second close should not raise error
stream.close()
+
+
+class TestCleanupFailedUpload:
+ """Direct unit tests for _cleanup_failed_upload's silent-swallow path."""
+
+ @pytest.fixture
+ def large_binary(self):
+ return largebinary("s3://test-bucket/path/to/object")
+
+ def test_delete_object_failure_is_swallowed(self, large_binary):
+ # If the post-failure cleanup itself raises, the original upload
+ # IOError must still surface unmasked. Pinning this so a future
+ # change that propagates cleanup errors is intentional.
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+ mock_s3.upload_fileobj.side_effect = Exception("upload failed")
+ mock_s3.delete_object.side_effect = Exception("delete also failed")
+
+ stream = LargeBinaryOutputStream(large_binary)
+ stream.write(b"data")
+ with pytest.raises(IOError, match="Failed to complete upload"):
+ stream.close()
+ mock_s3.delete_object.assert_called_once_with(
+ Bucket="test-bucket", Key="path/to/object"
+ )
+
+
+class TestQueueReader:
+ """Direct unit tests for the private _QueueReader helper."""
+
+ @staticmethod
+ def _populate(q: queue.Queue, *items):
+ for item in items:
+ q.put(item)
+ return q
+
+ def test_read_returns_empty_on_immediate_eof(self):
+ q = self._populate(queue.Queue(), None)
+ reader = _QueueReader(q)
+ assert reader.read() == b""
+
+ def test_read_after_eof_returns_empty_repeatedly(self):
+ q = self._populate(queue.Queue(), b"abc", None)
+ reader = _QueueReader(q)
+ assert reader.read() == b"abc"
+ # Subsequent reads must keep returning empty without blocking.
+ assert reader.read() == b""
+ assert reader.read(10) == b""
+
+ def test_read_default_size_joins_all_chunks_until_eof(self):
+ q = self._populate(queue.Queue(), b"abc", b"def", b"ghi", None)
+ reader = _QueueReader(q)
+ assert reader.read() == b"abcdefghi"
+
+ def test_read_with_explicit_size_smaller_than_first_chunk(self):
+ q = self._populate(queue.Queue(), b"abcdef", None)
+ reader = _QueueReader(q)
+ assert reader.read(3) == b"abc"
+ # Remainder is buffered for the next read; EOF marker drained next.
+ assert reader.read() == b"def"
+
+ def test_read_buffer_remainder_carries_over_subsequent_calls(self):
+ q = self._populate(queue.Queue(), b"helloworld", None)
+ reader = _QueueReader(q)
+ assert reader.read(5) == b"hello"
+ # Pull two more bytes from the buffer; rest stays buffered.
+ assert reader.read(2) == b"wo"
+ assert reader.read() == b"rld"
+
+ def test_read_size_can_span_multiple_queued_chunks(self):
+ q = self._populate(queue.Queue(), b"ab", b"cd", b"ef", None)
+ reader = _QueueReader(q)
+ assert reader.read(5) == b"abcde"
+ assert reader.read() == b"f"
+
+ def test_read_size_zero_returns_empty_and_preserves_buffer(self):
+ # _QueueReader.read(size=0) must short-circuit without consuming
+ # bytes that the caller hasn't asked for.
+ q = self._populate(queue.Queue(), b"abc", None)
+ reader = _QueueReader(q)
+ # Prime the buffer by reading 1 byte, leaving "bc" buffered.
+ assert reader.read(1) == b"a"
+ assert reader.read(0) == b""
+ # Nothing was lost: a follow-up read still surfaces the rest.
+ assert reader.read() == b"bc"
+
+ def test_read_with_size_larger_than_available_returns_all_before_eof(self):
+ q = self._populate(queue.Queue(), b"abc", None)
+ reader = _QueueReader(q)
+ assert reader.read(100) == b"abc"
+
+ def test_eof_only_terminates_when_queue_drained_first(self):
+ # Bytes queued before the EOF sentinel must all surface in the first
read.
+ q = self._populate(queue.Queue(), b"x", b"y", b"z", None)
+ reader = _QueueReader(q)
+ assert reader.read() == b"xyz"
+
+ def test_read_polls_until_data_arrives(self):
+ # Validates the queue.Empty retry path: the reader must continue
+ # past a timeout and only return once data is available.
+ # Using a mock with a deterministic side_effect avoids real sleeps
+ # and the flakiness of relying on a background thread under load.
+ q = MagicMock()
+ q.get.side_effect = [queue.Empty(), b"late", None]
+ reader = _QueueReader(q)
+ assert reader.read() == b"late"
+ # The first call raised Empty, so we expect three total get() calls.
+ assert q.get.call_count == 3