Copilot commented on code in PR #4707:
URL: https://github.com/apache/texera/pull/4707#discussion_r3177371218
##########
amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py:
##########
@@ -236,3 +241,88 @@ def test_multiple_close_calls(self, large_binary):
stream.close()
# Second close should not raise error
stream.close()
+
+
+class TestQueueReader:
+ """Direct unit tests for the private _QueueReader helper."""
+
+ @staticmethod
+ def _populate(q: queue.Queue, *items):
+ for item in items:
+ q.put(item)
+ return q
+
+ def test_read_returns_empty_on_immediate_eof(self):
+ q = self._populate(queue.Queue(), None)
+ reader = _QueueReader(q)
+ assert reader.read() == b""
+
+ def test_read_after_eof_returns_empty_repeatedly(self):
+ q = self._populate(queue.Queue(), b"abc", None)
+ reader = _QueueReader(q)
+ assert reader.read() == b"abc"
+ # Subsequent reads must keep returning empty without blocking.
+ assert reader.read() == b""
+ assert reader.read(10) == b""
+
+ def test_read_default_size_joins_all_chunks_until_eof(self):
+ q = self._populate(queue.Queue(), b"abc", b"def", b"ghi", None)
+ reader = _QueueReader(q)
+ assert reader.read() == b"abcdefghi"
+
+ def test_read_with_explicit_size_smaller_than_first_chunk(self):
+ q = self._populate(queue.Queue(), b"abcdef", None)
+ reader = _QueueReader(q)
+ assert reader.read(3) == b"abc"
+ # Remainder is buffered for the next read; EOF marker drained next.
+ assert reader.read() == b"def"
+
+ def test_read_buffer_remainder_carries_over_subsequent_calls(self):
+ q = self._populate(queue.Queue(), b"helloworld", None)
+ reader = _QueueReader(q)
+ assert reader.read(5) == b"hello"
+ # Pull two more bytes from the buffer; rest stays buffered.
+ assert reader.read(2) == b"wo"
+ assert reader.read() == b"rld"
+
+ def test_read_size_can_span_multiple_queued_chunks(self):
+ q = self._populate(queue.Queue(), b"ab", b"cd", b"ef", None)
+ reader = _QueueReader(q)
+ assert reader.read(5) == b"abcde"
+ assert reader.read() == b"f"
+
+ def test_read_size_zero_returns_empty_and_preserves_buffer(self):
+ # _QueueReader.read(size=0) must short-circuit without consuming
+ # bytes that the caller hasn't asked for.
+ q = self._populate(queue.Queue(), b"abc", None)
+ reader = _QueueReader(q)
+ # Prime the buffer by reading 1 byte, leaving "bc" buffered.
+ assert reader.read(1) == b"a"
+ assert reader.read(0) == b""
+ # Nothing was lost: a follow-up read still surfaces the rest.
+ assert reader.read() == b"bc"
+
+ def test_read_with_size_larger_than_available_returns_all_before_eof(self):
+ q = self._populate(queue.Queue(), b"abc", None)
+ reader = _QueueReader(q)
+ assert reader.read(100) == b"abc"
+
+ def test_eof_only_terminates_when_queue_drained_first(self):
+ # Bytes queued before the EOF sentinel must all surface in the first
read.
+ q = self._populate(queue.Queue(), b"x", b"y", b"z", None)
+ reader = _QueueReader(q)
+ assert reader.read() == b"xyz"
+
+ def test_read_polls_until_data_arrives(self):
+ # Validates the queue.Empty retry path: the reader keeps polling
+ # past the timeout and only returns once the producer pushes data.
+ q: queue.Queue = queue.Queue()
+
+ def producer():
+ time.sleep(0.15) # > _QUEUE_TIMEOUT (0.1)
+ q.put(b"late")
+ q.put(None)
+
+ threading.Thread(target=producer, daemon=True).start()
+ reader = _QueueReader(q)
+ assert reader.read() == b"late"
Review Comment:
`test_read_polls_until_data_arrives` relies on real time sleeps + a
background thread to force a `queue.Empty` timeout. This makes the test slower
and can be flaky under load/slow CI timing. Prefer a deterministic approach by
mocking/stubbing `q.get` to raise `queue.Empty` at least once before returning
`b"late"` and then `None`, or by monkeypatching `_QUEUE_TIMEOUT` to a very
small value and synchronizing the producer with an `Event` instead of `sleep`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]