This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 882eca8 [BEAM-7141] Add window & timestamp in timer callback method argument (#8408) 882eca8 is described below commit 882eca84327629ddd8f167c764573ee53a4e7ed8 Author: Rakesh Kumar <rakeshcu...@gmail.com> AuthorDate: Tue May 28 02:29:25 2019 -0700 [BEAM-7141] Add window & timestamp in timer callback method argument (#8408) --- sdks/python/apache_beam/runners/common.pxd | 2 ++ sdks/python/apache_beam/runners/common.py | 29 ++++++++++++++---- .../apache_beam/transforms/userstate_test.py | 35 ++++++++++++++++++++++ 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 1d87507..b901c68 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -37,6 +37,8 @@ cdef class MethodWrapper(object): cdef bint has_userstate_arguments cdef object state_args_to_replace cdef object timer_args_to_replace + cdef object timestamp_arg_name + cdef object window_arg_name cdef class DoFnSignature(object): diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index cae0d4c..6d821e3 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -44,6 +44,7 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn from apache_beam.utils.counters import Counter from apache_beam.utils.counters import CounterName +from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.windowed_value import WindowedValue @@ -168,6 +169,9 @@ class MethodWrapper(object): self.has_userstate_arguments = False self.state_args_to_replace = {} self.timer_args_to_replace = {} + self.timestamp_arg_name = None + self.window_arg_name = None + for kw, v in zip(args[-len(defaults):], defaults): if isinstance(v, core.DoFn.StateParam): self.state_args_to_replace[kw] = v.state_spec @@ -175,15 +179,30 @@ class MethodWrapper(object): elif isinstance(v, core.DoFn.TimerParam): self.timer_args_to_replace[kw] = v.timer_spec self.has_userstate_arguments = True - - def invoke_timer_callback(self, user_state_context, key, window): - # TODO(ccy): support WindowParam, TimestampParam and side inputs. + elif v == core.DoFn.TimestampParam: + self.timestamp_arg_name = kw + elif v == core.DoFn.WindowParam: + self.window_arg_name = kw + + def invoke_timer_callback(self, + user_state_context, + key, + window, + timestamp): + # TODO(ccy): support side inputs. + kwargs = {} if self.has_userstate_arguments: - kwargs = {} for kw, state_spec in self.state_args_to_replace.items(): kwargs[kw] = user_state_context.get_state(state_spec, key, window) for kw, timer_spec in self.timer_args_to_replace.items(): kwargs[kw] = user_state_context.get_timer(timer_spec, key, window) + + if self.timestamp_arg_name: + kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp) + if self.window_arg_name: + kwargs[self.window_arg_name] = window + + if kwargs: return self.method_value(**kwargs) else: return self.method_value() @@ -384,7 +403,7 @@ class DoFnInvoker(object): self.output_processor.process_outputs( WindowedValue(None, timestamp, (window,)), self.signature.timer_methods[timer_spec].invoke_timer_callback( - self.user_state_context, key, window)) + self.user_state_context, key, window, timestamp)) def invoke_split(self, element, restriction): return self.signature.split_method.method_value(element, restriction) diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index 0a3e13c..53a7e36 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -27,11 +27,14 @@ from apache_beam.coders import BytesCoder from apache_beam.coders import IterableCoder from apache_beam.coders import StrUtf8Coder from apache_beam.coders import VarIntCoder +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.common import DoFnSignature from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.test_stream import TestStream from apache_beam.testing.util import equal_to +from apache_beam.transforms import trigger from apache_beam.transforms import userstate +from apache_beam.transforms import window from apache_beam.transforms.combiners import ToListCombineFn from apache_beam.transforms.combiners import TopCombineFn from apache_beam.transforms.core import DoFn @@ -557,6 +560,38 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase): [('timer1', 10), ('timer2', 20), ('timer3', 30)], sorted(StatefulDoFnOnDirectRunnerTest.all_records)) + def test_timer_output_timestamp_and_window(self): + + class TimerEmittingStatefulDoFn(DoFn): + EMIT_TIMER_1 = TimerSpec('emit1', TimeDomain.WATERMARK) + + def process(self, element, timer1=DoFn.TimerParam(EMIT_TIMER_1)): + timer1.set(10) + + @on_timer(EMIT_TIMER_1) + def emit_callback_1(self, + window=DoFn.WindowParam, + ts=DoFn.TimestampParam): + yield ('timer1', int(ts), int(window.start), int(window.end)) + + pipeline_options = PipelineOptions() + with TestPipeline(options=pipeline_options) as p: + test_stream = (TestStream() + .advance_watermark_to(10) + .add_elements([1])) + (p + | test_stream + | beam.Map(lambda x: ('mykey', x)) + | "window_into" >> beam.WindowInto( + window.FixedWindows(5), + accumulation_mode=trigger.AccumulationMode.DISCARDING) + | beam.ParDo(TimerEmittingStatefulDoFn()) + | beam.ParDo(self.record_dofn())) + + self.assertEqual( + [('timer1', 10, 10, 15)], + sorted(StatefulDoFnOnDirectRunnerTest.all_records)) + def test_index_assignment(self): class IndexAssigningStatefulDoFn(DoFn): INDEX_STATE = BagStateSpec('index', VarIntCoder())