shunping commented on code in PR #39175:
URL: https://github.com/apache/beam/pull/39175#discussion_r3502225174
##########
sdks/python/apache_beam/runners/worker/bundle_processor.py:
##########
@@ -678,15 +682,26 @@ def clear(self) -> None:
self._added_elements = set()
def commit(self) -> None:
- to_await = None
if self._cleared:
- to_await = self._state_handler.clear(self._state_key)
+ self._futures.append(self._state_handler.clear(self._state_key))
+ self._cleared = False
if self._added_elements:
- to_await = self._state_handler.extend(
- self._state_key, self._value_coder.get_impl(), self._added_elements)
- if to_await:
- # To commit, we need to wait on the last state request future to
complete.
- to_await.get()
+ self._futures.append(
+ self._state_handler.extend(
+ self._state_key,
+ self._value_coder.get_impl(),
+ self._added_elements))
+ self._added_elements = set()
+
+ # Block on all outstanding async state requests to ensure data is
committed.
+ # We must swap and clear self._futures before awaiting them. Awaiting a
future
+ # yields control, during which new futures could be appended to
self._futures.
+ all_futures = self._futures
+ self._futures = []
+
+ for f in all_futures:
+ if f:
+ f.get()
Review Comment:
Nope. Those futures will not be lost and will be handled in the next commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]