robertwb commented on a change in pull request #11916:
URL: https://github.com/apache/beam/pull/11916#discussion_r438296283
##########
File path: sdks/python/apache_beam/transforms/userstate_test.py
##########
@@ -452,6 +459,40 @@ def clear_values(self,
bag_state=beam.DoFn.StateParam(BAG_STATE)):
self.assertEqual(['extra'], StatefulDoFnOnDirectRunnerTest.all_records)
+ def test_simple_read_modify_write_stateful_dofn(self):
+ class SimpleTestReadModifyWriteStatefulDoFn(DoFn):
+ VALUE_STATE = ReadModifyWriteStateSpec('value', StrUtf8Coder())
+
+ def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+ last_element.write('%s:%s' % element)
+ yield last_element.read()
+
+ with TestPipeline() as p:
+ (
+ p | beam.Create([('a', 1), ('b', 3), ('c', 5)])
+ | beam.ParDo(SimpleTestReadModifyWriteStatefulDoFn())
+ | beam.ParDo(self.record_dofn()))
+ self.assertEqual(['a:1', 'b:3', 'c:5'],
+ StatefulDoFnOnDirectRunnerTest.all_records)
+
+ def test_clearing_read_modify_write_state(self):
+ class SimpleClearingReadModifyWriteStatefulDoFn(DoFn):
+ VALUE_STATE = ReadModifyWriteStateSpec('value', VarIntCoder())
+
+ def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+ last_element.write(element[1])
Review comment:
Maybe to make this a stronger test, also write something at the end of
process, and try reading it at the very beginning.
##########
File path: sdks/python/apache_beam/runners/direct/direct_userstate.py
##########
@@ -25,6 +25,7 @@
from apache_beam.transforms import userstate
from apache_beam.transforms.trigger import _ListStateTag
from apache_beam.transforms.trigger import _SetStateTag
+from apache_beam.transforms.trigger import _ValueStateTag
Review comment:
Should this be renamed as well?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]