[ https://issues.apache.org/jira/browse/BEAM-2687?focusedWorklogId=145597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145597 ]
ASF GitHub Bot logged work on BEAM-2687: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Sep/18 08:53 Start Date: 19/Sep/18 08:53 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #6349: [BEAM-2687] Implement State over the Fn API URL: https://github.com/apache/beam/pull/6349#discussion_r218719163 ########## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ########## @@ -198,6 +203,85 @@ def is_globally_windowed(self): == sideinputs._global_window_mapping_fn) +class CombiningValueRuntimeState(userstate.RuntimeState): + def __init__(self, underlying_bag_state, combinefn): + self._combinefn = combinefn + self._underlying_bag_state = underlying_bag_state + + def _read_accumulator(self, rewrite=True): + merged_accumulator = self._combinefn.merge_accumulators( + self._underlying_bag_state.read()) + if rewrite: + self._underlying_bag_state.clear() + self._underlying_bag_state.add(merged_accumulator) + return merged_accumulator + + def read(self): + return self._combinefn.extract_output(self._read_accumulator()) + + def add(self, value): + # Prefer blind writes, but don't let them grow unboundedly. + if random.random() < 0.5: Review comment: Can you add a rationale for this threshold or state that it is arbitrary for now? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 145597) Time Spent: 1h 10m (was: 1h) > Python SDK support for Stateful Processing > ------------------------------------------ > > Key: BEAM-2687 > URL: https://issues.apache.org/jira/browse/BEAM-2687 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core > Reporter: Ahmet Altay > Assignee: Charles Chen > Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Python SDK should support stateful processing > (https://beam.apache.org/blog/2017/02/13/stateful-processing.html) > In the meantime, runner capability matrix should be updated to show the lack > of this feature > (https://beam.apache.org/documentation/runners/capability-matrix/) > Use this as an umbrella issue for all related issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)