gemini-code-assist[bot] commented on code in PR #38407:
URL: https://github.com/apache/beam/pull/38407#discussion_r3219238217


##########
sdks/python/apache_beam/utils/byte_limited_queue_test.py:
##########
@@ -0,0 +1,254 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for byte-limited queue."""
+
+import queue
+import threading
+import time
+import unittest
+
+from apache_beam.utils.byte_limited_queue import ByteLimitedQueue
+
+
+class ByteLimitedQueueTest(unittest.TestCase):
+  def test_unbounded(self):
+    bq = ByteLimitedQueue()
+    for i in range(201):
+      bq.put(str(i), i)
+    self.assertEqual(bq.byte_size(), sum(range(201)))
+    self.assertEqual(bq.qsize(), 201)
+
+  def test_put_and_get(self):
+    bq = ByteLimitedQueue(maxbytes=200)
+    bq.put('50', 50)
+    bq.put('140', 140)
+    self.assertEqual(bq.byte_size(), 190)
+    self.assertEqual(bq.qsize(), 2)
+    # Putting another would exceed 200.
+    with self.assertRaises(queue.Full):
+      bq.put('20', 20, block=False)
+    bq.put('10', 10, block=False)
+    self.assertEqual(bq.byte_size(), 200)
+    self.assertEqual(bq.qsize(), 3)
+
+    self.assertEqual(bq.get(), '50')
+    self.assertEqual(bq.byte_size(), 150)
+    self.assertEqual(bq.qsize(), 2)
+    bq.put('20', 20, block=False)
+
+  def test_dual_limit(self):
+    # Queue limits: at most 3 items, OR at most 100 item bytes.
+    bq = ByteLimitedQueue(maxsize=3, maxbytes=100)
+    bq.put('30', 30)
+    bq.put('40', 40)
+    bq.put('20', 20)
+    self.assertEqual(bq.byte_size(), 90)
+    self.assertEqual(bq.qsize(), 3)
+    # Full on element count (size=3).
+    with self.assertRaises(queue.Full):
+      bq.put('10', 10, block=False)
+    self.assertEqual(bq.get(), '30')
+    self.assertEqual(bq.get(), '40')
+    bq.put('10', 10)
+    # Full on byte count
+    with self.assertRaises(queue.Full):
+      bq.put('90', 90, block=False)
+    self.assertEqual(bq.get(), '20')
+    bq.put('90', 90, block=False)
+
+  def test_multithreading(self):
+    bq = ByteLimitedQueue(maxsize=0, maxbytes=100)
+    received = []
+
+    def producer():
+      for i in range(101):
+        bq.put(str(i), i)
+
+    poison_pill = 'POISON'
+
+    def consumer():
+      while True:
+        item = bq.get()
+        if item == poison_pill:
+          break
+        received.append(int(item))
+
+    t1 = threading.Thread(target=producer)
+    t2 = threading.Thread(target=producer)
+    t3 = threading.Thread(target=consumer)
+
+    t1.start()
+    t2.start()
+    t3.start()
+
+    t1.join()
+    t2.join()
+    bq.put(poison_pill, 0)
+
+    t3.join()
+
+    self.assertEqual(len(received), 202)
+    self.assertEqual(sum(received), 2 * sum(range(101)))
+
+  def test_multithreading_timeout(self):
+    bq = ByteLimitedQueue(maxsize=0, maxbytes=10)
+    bq.put('10', 10)
+
+    # The queue is completely full. A timeout put should raise queue.Full.
+    with self.assertRaises(queue.Full):
+      bq.put('5', 5, timeout=0.01)
+
+    def delayed_consumer():
+      time.sleep(0.05)
+      bq.get()
+
+    # Start a thread that will free up space after 50ms.
+    t = threading.Thread(target=delayed_consumer)
+    t.start()
+
+    # The put should succeed once the consumer runs, use a high timeout to
+    # flakiness.
+    bq.put(FakeItem(5), timeout=60)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This test call is broken: `FakeItem` is not defined in this file, and the 
`item_bytes` argument is missing from the `put` call.
   
   ```suggestion
       bq.put('5', 5, timeout=60)
   ```



##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -455,10 +456,15 @@ class _GrpcDataChannel(DataChannel):
 
   def __init__(self, data_buffer_time_limit_ms=0):
     # type: (int) -> None
+
     self._data_buffer_time_limit_ms = data_buffer_time_limit_ms
-    self._to_send = queue.Queue()  # type: queue.Queue[DataOrTimers]
+    self._to_send = ByteLimitedQueue(
+        maxsize=10000, maxbytes=100 << 20,
+    )  # type: queue.Queue[DataOrTimers]
     self._received = collections.defaultdict(
-        lambda: queue.Queue(maxsize=5)
+        lambda: ByteLimitedQueue(
+            maxsize=5, maxbytes=100 << 20,
+        )
     )  # type: DefaultDict[str, queue.Queue[DataOrTimers]]

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The type hint for `_received` should be updated to reflect that it contains 
`ByteLimitedQueue` instances.
   
   ```suggestion
       )  # type: DefaultDict[str, ByteLimitedQueue]
   ```



##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -455,10 +456,15 @@ class _GrpcDataChannel(DataChannel):
 
   def __init__(self, data_buffer_time_limit_ms=0):
     # type: (int) -> None
+
     self._data_buffer_time_limit_ms = data_buffer_time_limit_ms
-    self._to_send = queue.Queue()  # type: queue.Queue[DataOrTimers]
+    self._to_send = ByteLimitedQueue(
+        maxsize=10000, maxbytes=100 << 20,
+    )  # type: queue.Queue[DataOrTimers]

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The type hint still refers to `queue.Queue`, but the object is now a 
`ByteLimitedQueue`. These classes are not API-compatible (specifically the 
`put` method signature). The type hint should be updated to avoid static 
analysis errors.
   
   ```suggestion
       )  # type: ByteLimitedQueue
   ```



##########
sdks/python/apache_beam/utils/byte_limited_queue.py:
##########
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A thread-safe queue that limits capacity by total byte size."""
+
+import collections
+import queue
+import threading
+import time
+
+
+class ByteLimitedQueue(object):
+  """A fair queue that limits by both element count and total weight.
+
+  A single element is allowed to exceed the maxweight to avoid deadlock.
+  """
+  def __init__(
+      self,
+      maxsize=0,  # type: int
+      maxbytes=0,  # type: int
+  ):
+    # type: (...) -> None
+
+    """Initializes a ByteLimitedQueue.
+
+    Args:
+      maxsize: The maximum number of items allowed in the queue. If 0 or
+        negative, there is no limit on the number of elements.
+      maxweight: The maximum accumulated weight allowed in the queue. If 0 or
+        negative, there is no limit on the total size of the elements.
+    """
+    self.max_elements = maxsize
+    self.max_bytes = maxbytes
+    self._byte_size = 0
+    self._blocked_bytes = 0
+    self._mutex = threading.Lock()
+    self._not_empty = threading.Condition(self._mutex)
+    self._waiting_writers = collections.deque()
+    self._queue = collections.deque()
+
+  def put(self, item, item_bytes, block=True, timeout=None):
+    """Put an item into the queue.
+
+    If the queue is full, block until a free slot is available, unless `block`
+    is false or a timeout occurs.
+
+    Args:
+      item: The item to put into the queue.
+      item_size: The size of the item.

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The docstring refers to `item_size`, but the parameter name is `item_bytes`.
   
   ```suggestion
         item_bytes: The size of the item in bytes.
   ```



##########
sdks/python/apache_beam/utils/byte_limited_queue_test.py:
##########
@@ -0,0 +1,254 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for byte-limited queue."""
+
+import queue
+import threading
+import time
+import unittest
+
+from apache_beam.utils.byte_limited_queue import ByteLimitedQueue
+
+
+class ByteLimitedQueueTest(unittest.TestCase):
+  def test_unbounded(self):
+    bq = ByteLimitedQueue()
+    for i in range(201):
+      bq.put(str(i), i)
+    self.assertEqual(bq.byte_size(), sum(range(201)))
+    self.assertEqual(bq.qsize(), 201)
+
+  def test_put_and_get(self):
+    bq = ByteLimitedQueue(maxbytes=200)
+    bq.put('50', 50)
+    bq.put('140', 140)
+    self.assertEqual(bq.byte_size(), 190)
+    self.assertEqual(bq.qsize(), 2)
+    # Putting another would exceed 200.
+    with self.assertRaises(queue.Full):
+      bq.put('20', 20, block=False)
+    bq.put('10', 10, block=False)
+    self.assertEqual(bq.byte_size(), 200)
+    self.assertEqual(bq.qsize(), 3)
+
+    self.assertEqual(bq.get(), '50')
+    self.assertEqual(bq.byte_size(), 150)
+    self.assertEqual(bq.qsize(), 2)
+    bq.put('20', 20, block=False)
+
+  def test_dual_limit(self):
+    # Queue limits: at most 3 items, OR at most 100 item bytes.
+    bq = ByteLimitedQueue(maxsize=3, maxbytes=100)
+    bq.put('30', 30)
+    bq.put('40', 40)
+    bq.put('20', 20)
+    self.assertEqual(bq.byte_size(), 90)
+    self.assertEqual(bq.qsize(), 3)
+    # Full on element count (size=3).
+    with self.assertRaises(queue.Full):
+      bq.put('10', 10, block=False)
+    self.assertEqual(bq.get(), '30')
+    self.assertEqual(bq.get(), '40')
+    bq.put('10', 10)
+    # Full on byte count
+    with self.assertRaises(queue.Full):
+      bq.put('90', 90, block=False)
+    self.assertEqual(bq.get(), '20')
+    bq.put('90', 90, block=False)
+
+  def test_multithreading(self):
+    bq = ByteLimitedQueue(maxsize=0, maxbytes=100)
+    received = []
+
+    def producer():
+      for i in range(101):
+        bq.put(str(i), i)
+
+    poison_pill = 'POISON'
+
+    def consumer():
+      while True:
+        item = bq.get()
+        if item == poison_pill:
+          break
+        received.append(int(item))
+
+    t1 = threading.Thread(target=producer)
+    t2 = threading.Thread(target=producer)
+    t3 = threading.Thread(target=consumer)
+
+    t1.start()
+    t2.start()
+    t3.start()
+
+    t1.join()
+    t2.join()
+    bq.put(poison_pill, 0)
+
+    t3.join()
+
+    self.assertEqual(len(received), 202)
+    self.assertEqual(sum(received), 2 * sum(range(101)))
+
+  def test_multithreading_timeout(self):
+    bq = ByteLimitedQueue(maxsize=0, maxbytes=10)
+    bq.put('10', 10)
+
+    # The queue is completely full. A timeout put should raise queue.Full.
+    with self.assertRaises(queue.Full):
+      bq.put('5', 5, timeout=0.01)
+
+    def delayed_consumer():
+      time.sleep(0.05)
+      bq.get()
+
+    # Start a thread that will free up space after 50ms.
+    t = threading.Thread(target=delayed_consumer)
+    t.start()
+
+    # The put should succeed once the consumer runs, use a high timeout to
+    # flakiness.
+    bq.put(FakeItem(5), timeout=60)
+    t.join()
+
+  def test_negative_timeout(self):
+    bq = ByteLimitedQueue()
+    # Putting an item with a negative timeout should raise ValueError.
+    with self.assertRaises(ValueError):
+      bq.put('5', 5, timeout=-1)
+
+  def test_single_element_override(self):
+    bq = ByteLimitedQueue(maxbytes=10)
+    # An item of size 50 exceeds maxbytes 10, but should be admitted
+    # immediately without blocking since the queue is currently empty!
+    bq.put('50', 50, block=False)
+    self.assertEqual(bq.qsize(), 1)
+    self.assertEqual(bq.byte_size(), 50)
+
+  def test_inconsistent_weighing_fn(self):
+    # Return a different weight for the same item.
+    weights = [10, 5]
+    bq = ByteLimitedQueue(lambda x: weights.pop(0), maxweight=100)
+
+    bq.put(1)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This test appears to be copied from an implementation that used a weighing 
function. `ByteLimitedQueue` does not accept a callable in its constructor, and 
`put` requires two positional arguments (`item` and `item_bytes`).
   
   ```suggestion
       bq = ByteLimitedQueue(maxbytes=100)
   
       bq.put(1, 10)
   ```



##########
sdks/python/apache_beam/utils/byte_limited_queue.py:
##########
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A thread-safe queue that limits capacity by total byte size."""
+
+import collections
+import queue
+import threading
+import time
+
+
+class ByteLimitedQueue(object):
+  """A fair queue that limits by both element count and total weight.
+
+  A single element is allowed to exceed the maxweight to avoid deadlock.
+  """
+  def __init__(
+      self,
+      maxsize=0,  # type: int
+      maxbytes=0,  # type: int
+  ):
+    # type: (...) -> None
+
+    """Initializes a ByteLimitedQueue.
+
+    Args:
+      maxsize: The maximum number of items allowed in the queue. If 0 or
+        negative, there is no limit on the number of elements.
+      maxweight: The maximum accumulated weight allowed in the queue. If 0 or
+        negative, there is no limit on the total size of the elements.
+    """

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The docstrings for the class and `__init__` method refer to `maxweight`, but 
the parameter name is `maxbytes`. Please update the docstrings for consistency.
   
   ```suggestion
     """A fair queue that limits by both element count and total weight.
   
     A single element is allowed to exceed the maxbytes to avoid deadlock.
     """
     def __init__(
         self,
         maxsize=0,  # type: int
         maxbytes=0,  # type: int
     ):
       # type: (...) -> None
   
       """Initializes a ByteLimitedQueue.
   
       Args:
         maxsize: The maximum number of items allowed in the queue. If 0 or
           negative, there is no limit on the number of elements.
         maxbytes: The maximum accumulated bytes allowed in the queue. If 0 or
           negative, there is no limit on the total size of the elements.
       """
   ```



##########
sdks/python/apache_beam/utils/byte_limited_queue.py:
##########
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A thread-safe queue that limits capacity by total byte size."""
+
+import collections
+import queue
+import threading
+import time
+
+
+class ByteLimitedQueue(object):
+  """A fair queue that limits by both element count and total weight.
+
+  A single element is allowed to exceed the maxweight to avoid deadlock.
+  """
+  def __init__(
+      self,
+      maxsize=0,  # type: int
+      maxbytes=0,  # type: int
+  ):
+    # type: (...) -> None
+
+    """Initializes a ByteLimitedQueue.
+
+    Args:
+      maxsize: The maximum number of items allowed in the queue. If 0 or
+        negative, there is no limit on the number of elements.
+      maxweight: The maximum accumulated weight allowed in the queue. If 0 or
+        negative, there is no limit on the total size of the elements.
+    """
+    self.max_elements = maxsize
+    self.max_bytes = maxbytes
+    self._byte_size = 0
+    self._blocked_bytes = 0
+    self._mutex = threading.Lock()
+    self._not_empty = threading.Condition(self._mutex)
+    self._waiting_writers = collections.deque()
+    self._queue = collections.deque()
+
+  def put(self, item, item_bytes, block=True, timeout=None):
+    """Put an item into the queue.
+
+    If the queue is full, block until a free slot is available, unless `block`
+    is false or a timeout occurs.
+
+    Args:
+      item: The item to put into the queue.
+      item_size: The size of the item.
+      block: If True, block until space is available. If False, raise 
queue.Full
+        immediately if the queue is full.
+      timeout: If block is True, wait for at most `timeout` seconds. If None,
+        block indefinitely.
+
+    Raises:
+      ValueError: If timeout or item_size is negative.
+      queue.Full: If the queue is full and block is False or the timeout 
occurs.
+    """
+    if timeout is not None and timeout < 0:
+      raise ValueError("'timeout' must be a non-negative number")
+    if item_bytes < 0:
+      raise ValueError("'item_bytes' must be a positive number")

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The error message says 'positive number', but the check `item_bytes < 0` 
allows 0. Since 0-byte elements (like markers) are likely valid, the message 
should be updated to 'non-negative'.
   
   ```suggestion
         raise ValueError("'item_bytes' must be a non-negative number")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to