shunping opened a new pull request, #39175: URL: https://github.com/apache/beam/pull/39175
When elements are added to `SynchronousSetRuntimeState`, it occasionally triggers data compaction (`_compact_data`) which launches asynchronous `clear` and `extend` requests to the State Handler. However, these futures were not tracked. The `commit()` method only blocked on the last futures generated during the commit phase, causing outstanding compaction futures to complete out of order or after the bundle processor finished, resulting in lost state updates. An example failed test: https://github.com/apache/beam/actions/runs/28001498086/job/82874623596 Traceback: ``` _______ StatefulDoFnOnDirectRunnerTest.test_stateful_set_state_portably ________ [gw2] linux -- Python 3.10.20 /runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/bin/python self = <apache_beam.transforms.userstate_test.StatefulDoFnOnDirectRunnerTest testMethod=test_stateful_set_state_portably> def test_stateful_set_state_portably(self): class SetStatefulDoFn(beam.DoFn): SET_STATE = SetStateSpec('buffer', VarIntCoder()) def process(self, element, set_state=beam.DoFn.StateParam(SET_STATE)): _, value = element aggregated_value = 0 set_state.add(value) for saved_value in set_state.read(): aggregated_value += saved_value yield aggregated_value > with TestPipeline() as p: apache_beam/transforms/userstate_test.py:715: ... if duration: state_thread = threading.Thread( target=functools.partial(self._observe_state, message_thread), name='wait_until_finish_state_observer') state_thread.daemon = True state_thread.start() start_time = time.time() duration_secs = duration / 1000 while (time.time() - start_time < duration_secs and state_thread.is_alive()): time.sleep(1) else: self._observe_state(message_thread) if self._runtime_exception: > raise self._runtime_exception E RuntimeError: Pipeline job-045 failed in state FAILED: bundle inst010 stage-009 failed:Traceback (most recent call last): E File "apache_beam/runners/common.py", line 1499, in apache_beam.runners.common.DoFnRunner.process E return self.do_fn_invoker.invoke_process(windowed_value) E File "apache_beam/runners/common.py", line 913, in apache_beam.runners.common.PerWindowInvoker.invoke_process E self._invoke_process_per_window( E File "apache_beam/runners/common.py", line 1058, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window E self.process_method(*args_for_process, **kwargs_for_process), E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2126, in <lambda> E wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/lib/python3.10/site-packages/apache_beam/testing/util.py", line 202, in _equal E raise BeamAssertException(msg) E apache_beam.testing.util.BeamAssertException: Failed assert: [1, 3, 6, 10, 10] == [1, 4, 6, 3, 7], unexpected elements [4, 7], missing elements [10, 10] E ``` This PR fixes a race condition in `SynchronousSetRuntimeState` where asynchronous state requests triggered by state compaction or early clears were not being awaited. - Introduced `self._futures` to record all asynchronous state requests initiated by compaction and early clears. - Updated `commit()` to combine `self._futures` with current commit-time futures and await all of them before completing. - Added a regression test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
