gemini-code-assist[bot] commented on code in PR #39175:
URL: https://github.com/apache/beam/pull/39175#discussion_r3502215205
##########
sdks/python/apache_beam/runners/worker/bundle_processor.py:
##########
@@ -678,15 +682,26 @@ def clear(self) -> None:
self._added_elements = set()
def commit(self) -> None:
- to_await = None
if self._cleared:
- to_await = self._state_handler.clear(self._state_key)
+ self._futures.append(self._state_handler.clear(self._state_key))
+ self._cleared = False
if self._added_elements:
- to_await = self._state_handler.extend(
- self._state_key, self._value_coder.get_impl(), self._added_elements)
- if to_await:
- # To commit, we need to wait on the last state request future to
complete.
- to_await.get()
+ self._futures.append(
+ self._state_handler.extend(
+ self._state_key,
+ self._value_coder.get_impl(),
+ self._added_elements))
+ self._added_elements = set()
+
+ # Block on all outstanding async state requests to ensure data is
committed.
+ # We must swap and clear self._futures before awaiting them. Awaiting a
future
+ # yields control, during which new futures could be appended to
self._futures.
+ all_futures = self._futures
+ self._futures = []
+
+ for f in all_futures:
+ if f:
+ f.get()
Review Comment:

The current implementation swaps `self._futures` into `all_futures` and
clears `self._futures` before awaiting them. However, as the comment correctly
points out, awaiting a future yields control, during which new futures could be
appended to `self._futures` (e.g., via concurrent state updates or callbacks).
Since the loop only iterates over the snapshot `all_futures`, any newly
appended futures in `self._futures` will be left unawaited when `commit()`
returns. This can lead to lost state updates or out-of-order execution.
To ensure all outstanding futures (including those transitively added during
the await phase) are fully resolved before `commit()` returns, we should use a
`while` loop to drain `self._futures` until it is completely empty.
```suggestion
# Block on all outstanding async state requests to ensure data is
committed.
# We must swap and clear self._futures before awaiting them. Awaiting a
future
# yields control, during which new futures could be appended to
self._futures.
while self._futures:
all_futures = self._futures
self._futures = []
for f in all_futures:
if f:
f.get()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]