shunping opened a new pull request, #39175:
URL: https://github.com/apache/beam/pull/39175

   When elements are added to `SynchronousSetRuntimeState`, it occasionally 
triggers data compaction (`_compact_data`) which launches asynchronous `clear` 
and `extend` requests to the State Handler. However, these futures were not 
tracked. The `commit()` method only blocked on the last futures generated 
during the commit phase, causing outstanding compaction futures to complete out 
of order or after the bundle processor finished, resulting in lost state 
updates.
   
   An example failed test:
   https://github.com/apache/beam/actions/runs/28001498086/job/82874623596
   
   Traceback:
   ```
   _______ StatefulDoFnOnDirectRunnerTest.test_stateful_set_state_portably 
________
   [gw2] linux -- Python 3.10.20 
/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/bin/python
   
   self = <apache_beam.transforms.userstate_test.StatefulDoFnOnDirectRunnerTest 
testMethod=test_stateful_set_state_portably>
   
       def test_stateful_set_state_portably(self):
         class SetStatefulDoFn(beam.DoFn):
       
           SET_STATE = SetStateSpec('buffer', VarIntCoder())
       
           def process(self, element, 
set_state=beam.DoFn.StateParam(SET_STATE)):
             _, value = element
             aggregated_value = 0
             set_state.add(value)
             for saved_value in set_state.read():
               aggregated_value += saved_value
             yield aggregated_value
       
   >     with TestPipeline() as p:
   
   apache_beam/transforms/userstate_test.py:715: 
   ...
       
         if duration:
           state_thread = threading.Thread(
               target=functools.partial(self._observe_state, message_thread),
               name='wait_until_finish_state_observer')
           state_thread.daemon = True
           state_thread.start()
           start_time = time.time()
           duration_secs = duration / 1000
           while (time.time() - start_time < duration_secs and
                  state_thread.is_alive()):
             time.sleep(1)
         else:
           self._observe_state(message_thread)
       
         if self._runtime_exception:
   >       raise self._runtime_exception
   E       RuntimeError: Pipeline job-045 failed in state FAILED: bundle 
inst010 stage-009 failed:Traceback (most recent call last):
   E         File "apache_beam/runners/common.py", line 1499, in 
apache_beam.runners.common.DoFnRunner.process
   E           return self.do_fn_invoker.invoke_process(windowed_value)
   E         File "apache_beam/runners/common.py", line 913, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
   E           self._invoke_process_per_window(
   E         File "apache_beam/runners/common.py", line 1058, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
   E           self.process_method(*args_for_process, **kwargs_for_process),
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/lib/python3.10/site-packages/apache_beam/transforms/core.py",
 line 2126, in <lambda>
   E           wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/lib/python3.10/site-packages/apache_beam/testing/util.py",
 line 202, in _equal
   E           raise BeamAssertException(msg)
   E       apache_beam.testing.util.BeamAssertException: Failed assert: [1, 3, 
6, 10, 10] == [1, 4, 6, 3, 7], unexpected elements [4, 7], missing elements 
[10, 10]
   E       
   ```
   
   This PR fixes a race condition in `SynchronousSetRuntimeState` where 
asynchronous state requests triggered by state compaction or early clears were 
not being awaited. 
   - Introduced `self._futures` to record all asynchronous state requests 
initiated by compaction and early clears.
   - Updated `commit()` to combine `self._futures` with current commit-time 
futures and await all of them before completing.
   - Added a regression test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to