This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cbe4dfb Refactoring code from direct runner, and adding unit test for processing time timers. (#8271) cbe4dfb is described below commit cbe4dfbdbe5d0da5152568853ee5e17334dd1b54 Author: Pablo <pabl...@users.noreply.github.com> AuthorDate: Thu Apr 11 11:35:25 2019 -0700 Refactoring code from direct runner, and adding unit test for processing time timers. (#8271) * Small refactor of direct runner code, and adding unit test. * Fixing lint issue --- sdks/python/apache_beam/runners/common.py | 8 +-- .../apache_beam/runners/direct/direct_runner.py | 11 ++-- .../runners/direct/evaluation_context.py | 28 ++++++---- .../apache_beam/transforms/userstate_test.py | 59 ++++++++++++++++++++++ 4 files changed, 85 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index f1fda35..84ac116 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -547,7 +547,7 @@ class PerWindowInvoker(DoFnInvoker): try: self.current_windowed_value = windowed_value self.restriction_tracker = restriction_tracker - return self._invoke_per_window( + return self._invoke_process_per_window( windowed_value, additional_args, additional_kwargs, output_processor) finally: @@ -556,14 +556,14 @@ class PerWindowInvoker(DoFnInvoker): elif self.has_windowed_inputs and len(windowed_value.windows) != 1: for w in windowed_value.windows: - self._invoke_per_window( + self._invoke_process_per_window( WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)), additional_args, additional_kwargs, output_processor) else: - self._invoke_per_window( + self._invoke_process_per_window( windowed_value, additional_args, additional_kwargs, output_processor) - def _invoke_per_window( + def _invoke_process_per_window( self, windowed_value, additional_args, additional_kwargs, output_processor): if self.has_windowed_inputs: diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 43e8c7f..e880460 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -69,11 +69,6 @@ class SwitchingDirectRunner(PipelineRunner): """ def run_pipeline(self, pipeline, options): - use_fnapi_runner = True - - # Streaming mode is not yet supported on the FnApiRunner. - if options.view_as(StandardOptions).streaming: - use_fnapi_runner = False from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.dataflow.native_io.iobase import NativeSource @@ -113,8 +108,10 @@ class SwitchingDirectRunner(PipelineRunner): self.supported_by_fnapi_runner = False # Check whether all transforms used in the pipeline are supported by the - # FnApiRunner. - use_fnapi_runner = _FnApiRunnerSupportVisitor().accept(pipeline) + # FnApiRunner, and the pipeline was not meant to be run as streaming. + use_fnapi_runner = ( + _FnApiRunnerSupportVisitor().accept(pipeline) + and not options.view_as(StandardOptions).streaming) # Also ensure grpc is available. try: diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 24b05b6..a042ded 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -274,16 +274,7 @@ class EvaluationContext(object): result.logical_metric_updates) # If the result is for a view, update side inputs container. - if (result.uncommitted_output_bundles - and result.uncommitted_output_bundles[0].pcollection - in self._pcollection_to_views): - for view in self._pcollection_to_views[ - result.uncommitted_output_bundles[0].pcollection]: - for committed_bundle in committed_bundles: - # side_input must be materialized. - self._side_inputs_container.add_values( - view, - committed_bundle.get_elements_iterable(make_copy=True)) + self._update_side_inputs_container(committed_bundles, result) # Tasks generated from unblocked side inputs as the watermark progresses. tasks = self._watermark_manager.update_watermarks( @@ -304,6 +295,23 @@ class EvaluationContext(object): existing_keyed_state[k] = v return committed_bundles + def _update_side_inputs_container(self, committed_bundles, result): + """Update the side inputs container if we are outputting into a side input. + + Look at the result, and if it's outputing into a PCollection that we have + registered as a PCollectionView, we add the result to the PCollectionView. + """ + if (result.uncommitted_output_bundles + and result.uncommitted_output_bundles[0].pcollection + in self._pcollection_to_views): + for view in self._pcollection_to_views[ + result.uncommitted_output_bundles[0].pcollection]: + for committed_bundle in committed_bundles: + # side_input must be materialized. + self._side_inputs_container.add_values( + view, + committed_bundle.get_elements_iterable(make_copy=True)) + def get_aggregator_values(self, aggregator_or_name): return self._counter_factory.get_aggregator_values(aggregator_or_name) diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index 6935a3a..0a3e13c 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -418,6 +418,65 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase): 'key-value pairs.')): values | beam.ParDo(TestStatefulDoFn()) + def test_generate_sequence_with_realtime_timer(self): + from apache_beam.transforms.combiners import CountCombineFn + + class GenerateRecords(beam.DoFn): + + EMIT_TIMER = TimerSpec('emit_timer', TimeDomain.REAL_TIME) + COUNT_STATE = CombiningValueStateSpec( + 'count_state', VarIntCoder(), CountCombineFn()) + + def __init__(self, frequency, total_records): + self.total_records = total_records + self.frequency = frequency + + def process(self, + element, + emit_timer=beam.DoFn.TimerParam(EMIT_TIMER)): + # Processing time timers should be set on ABSOLUTE TIME. + emit_timer.set(self.frequency) + yield element[1] + + @on_timer(EMIT_TIMER) + def emit_values(self, + emit_timer=beam.DoFn.TimerParam(EMIT_TIMER), + count_state=beam.DoFn.StateParam(COUNT_STATE)): + count = count_state.read() or 0 + if self.total_records == count: + return + + count_state.add(1) + # Processing time timers should be set on ABSOLUTE TIME. + emit_timer.set(count + 1 + self.frequency) + yield 'value' + + TOTAL_RECORDS = 3 + FREQUENCY = 1 + + test_stream = (TestStream() + .advance_watermark_to(0) + .add_elements([('key', 0)]) + .advance_processing_time(1) # Timestamp: 1 + .add_elements([('key', 1)]) + .advance_processing_time(1) # Timestamp: 2 + .add_elements([('key', 2)]) + .advance_processing_time(1) # Timestamp: 3 + .add_elements([('key', 3)])) + + with beam.Pipeline(argv=['--streaming', '--runner=DirectRunner']) as p: + _ = (p + | test_stream + | beam.ParDo(GenerateRecords(FREQUENCY, TOTAL_RECORDS)) + | beam.ParDo(self.record_dofn())) + + self.assertEqual( + # 4 RECORDS go through process + # 3 values are emitted from timer + # Timestamp moves gradually. + [0, 'value', 1, 'value', 2, 'value', 3], + StatefulDoFnOnDirectRunnerTest.all_records) + def test_simple_stateful_dofn_combining(self): class SimpleTestStatefulDoFn(DoFn): BUFFER_STATE = CombiningValueStateSpec(