This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cbe4dfb  Refactoring code from direct runner, and adding unit test for 
processing time timers. (#8271)
cbe4dfb is described below

commit cbe4dfbdbe5d0da5152568853ee5e17334dd1b54
Author: Pablo <pabl...@users.noreply.github.com>
AuthorDate: Thu Apr 11 11:35:25 2019 -0700

    Refactoring code from direct runner, and adding unit test for processing 
time timers. (#8271)
    
    * Small refactor of direct runner code, and adding unit test.
    
    * Fixing lint issue
---
 sdks/python/apache_beam/runners/common.py          |  8 +--
 .../apache_beam/runners/direct/direct_runner.py    | 11 ++--
 .../runners/direct/evaluation_context.py           | 28 ++++++----
 .../apache_beam/transforms/userstate_test.py       | 59 ++++++++++++++++++++++
 4 files changed, 85 insertions(+), 21 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index f1fda35..84ac116 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -547,7 +547,7 @@ class PerWindowInvoker(DoFnInvoker):
       try:
         self.current_windowed_value = windowed_value
         self.restriction_tracker = restriction_tracker
-        return self._invoke_per_window(
+        return self._invoke_process_per_window(
             windowed_value, additional_args, additional_kwargs,
             output_processor)
       finally:
@@ -556,14 +556,14 @@ class PerWindowInvoker(DoFnInvoker):
 
     elif self.has_windowed_inputs and len(windowed_value.windows) != 1:
       for w in windowed_value.windows:
-        self._invoke_per_window(
+        self._invoke_process_per_window(
             WindowedValue(windowed_value.value, windowed_value.timestamp, 
(w,)),
             additional_args, additional_kwargs, output_processor)
     else:
-      self._invoke_per_window(
+      self._invoke_process_per_window(
           windowed_value, additional_args, additional_kwargs, output_processor)
 
-  def _invoke_per_window(
+  def _invoke_process_per_window(
       self, windowed_value, additional_args,
       additional_kwargs, output_processor):
     if self.has_windowed_inputs:
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 43e8c7f..e880460 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -69,11 +69,6 @@ class SwitchingDirectRunner(PipelineRunner):
   """
 
   def run_pipeline(self, pipeline, options):
-    use_fnapi_runner = True
-
-    # Streaming mode is not yet supported on the FnApiRunner.
-    if options.view_as(StandardOptions).streaming:
-      use_fnapi_runner = False
 
     from apache_beam.pipeline import PipelineVisitor
     from apache_beam.runners.dataflow.native_io.iobase import NativeSource
@@ -113,8 +108,10 @@ class SwitchingDirectRunner(PipelineRunner):
               self.supported_by_fnapi_runner = False
 
     # Check whether all transforms used in the pipeline are supported by the
-    # FnApiRunner.
-    use_fnapi_runner = _FnApiRunnerSupportVisitor().accept(pipeline)
+    # FnApiRunner, and the pipeline was not meant to be run as streaming.
+    use_fnapi_runner = (
+        _FnApiRunnerSupportVisitor().accept(pipeline)
+        and not options.view_as(StandardOptions).streaming)
 
     # Also ensure grpc is available.
     try:
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 24b05b6..a042ded 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -274,16 +274,7 @@ class EvaluationContext(object):
                                    result.logical_metric_updates)
 
       # If the result is for a view, update side inputs container.
-      if (result.uncommitted_output_bundles
-          and result.uncommitted_output_bundles[0].pcollection
-          in self._pcollection_to_views):
-        for view in self._pcollection_to_views[
-            result.uncommitted_output_bundles[0].pcollection]:
-          for committed_bundle in committed_bundles:
-            # side_input must be materialized.
-            self._side_inputs_container.add_values(
-                view,
-                committed_bundle.get_elements_iterable(make_copy=True))
+      self._update_side_inputs_container(committed_bundles, result)
 
       # Tasks generated from unblocked side inputs as the watermark progresses.
       tasks = self._watermark_manager.update_watermarks(
@@ -304,6 +295,23 @@ class EvaluationContext(object):
         existing_keyed_state[k] = v
       return committed_bundles
 
+  def _update_side_inputs_container(self, committed_bundles, result):
+    """Update the side inputs container if we are outputting into a side input.
+
+    Look at the result, and if it's outputing into a PCollection that we have
+    registered as a PCollectionView, we add the result to the PCollectionView.
+    """
+    if (result.uncommitted_output_bundles
+        and result.uncommitted_output_bundles[0].pcollection
+        in self._pcollection_to_views):
+      for view in self._pcollection_to_views[
+          result.uncommitted_output_bundles[0].pcollection]:
+        for committed_bundle in committed_bundles:
+          # side_input must be materialized.
+          self._side_inputs_container.add_values(
+              view,
+              committed_bundle.get_elements_iterable(make_copy=True))
+
   def get_aggregator_values(self, aggregator_or_name):
     return self._counter_factory.get_aggregator_values(aggregator_or_name)
 
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py 
b/sdks/python/apache_beam/transforms/userstate_test.py
index 6935a3a..0a3e13c 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -418,6 +418,65 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
          'key-value pairs.')):
       values | beam.ParDo(TestStatefulDoFn())
 
+  def test_generate_sequence_with_realtime_timer(self):
+    from apache_beam.transforms.combiners import CountCombineFn
+
+    class GenerateRecords(beam.DoFn):
+
+      EMIT_TIMER = TimerSpec('emit_timer', TimeDomain.REAL_TIME)
+      COUNT_STATE = CombiningValueStateSpec(
+          'count_state', VarIntCoder(), CountCombineFn())
+
+      def __init__(self, frequency, total_records):
+        self.total_records = total_records
+        self.frequency = frequency
+
+      def process(self,
+                  element,
+                  emit_timer=beam.DoFn.TimerParam(EMIT_TIMER)):
+        # Processing time timers should be set on ABSOLUTE TIME.
+        emit_timer.set(self.frequency)
+        yield element[1]
+
+      @on_timer(EMIT_TIMER)
+      def emit_values(self,
+                      emit_timer=beam.DoFn.TimerParam(EMIT_TIMER),
+                      count_state=beam.DoFn.StateParam(COUNT_STATE)):
+        count = count_state.read() or 0
+        if self.total_records == count:
+          return
+
+        count_state.add(1)
+        # Processing time timers should be set on ABSOLUTE TIME.
+        emit_timer.set(count + 1 + self.frequency)
+        yield 'value'
+
+    TOTAL_RECORDS = 3
+    FREQUENCY = 1
+
+    test_stream = (TestStream()
+                   .advance_watermark_to(0)
+                   .add_elements([('key', 0)])
+                   .advance_processing_time(1) # Timestamp: 1
+                   .add_elements([('key', 1)])
+                   .advance_processing_time(1) # Timestamp: 2
+                   .add_elements([('key', 2)])
+                   .advance_processing_time(1) # Timestamp: 3
+                   .add_elements([('key', 3)]))
+
+    with beam.Pipeline(argv=['--streaming', '--runner=DirectRunner']) as p:
+      _ = (p
+           | test_stream
+           | beam.ParDo(GenerateRecords(FREQUENCY, TOTAL_RECORDS))
+           | beam.ParDo(self.record_dofn()))
+
+    self.assertEqual(
+        # 4 RECORDS go through process
+        # 3 values are emitted from timer
+        # Timestamp moves gradually.
+        [0, 'value', 1, 'value', 2, 'value', 3],
+        StatefulDoFnOnDirectRunnerTest.all_records)
+
   def test_simple_stateful_dofn_combining(self):
     class SimpleTestStatefulDoFn(DoFn):
       BUFFER_STATE = CombiningValueStateSpec(

Reply via email to