This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fb8efe3 BEAM-7141: add key value timer callback (#8739)
fb8efe3 is described below
commit fb8efe3af66ddfe3ce7d4020c43d10d8467f0ab9
Author: Rakesh Kumar <[email protected]>
AuthorDate: Mon Jun 24 18:59:05 2019 -0700
BEAM-7141: add key value timer callback (#8739)
* BEAM-7141: Add key parameter in timer callback
Why?
Key parameter was missing in the timer callback
so it makes the debugging harder.
---
sdks/python/apache_beam/runners/common.pxd | 2 +
sdks/python/apache_beam/runners/common.py | 27 ++++++++--
sdks/python/apache_beam/runners/common_test.py | 59 ++++++++++++++++++++++
sdks/python/apache_beam/transforms/core.py | 13 +++--
.../apache_beam/transforms/userstate_test.py | 11 ++--
5 files changed, 99 insertions(+), 13 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.pxd
b/sdks/python/apache_beam/runners/common.pxd
index b901c68..ebbd1bb 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -39,6 +39,7 @@ cdef class MethodWrapper(object):
cdef object timer_args_to_replace
cdef object timestamp_arg_name
cdef object window_arg_name
+ cdef object key_arg_name
cdef class DoFnSignature(object):
@@ -90,6 +91,7 @@ cdef class PerWindowInvoker(DoFnInvoker):
cdef bint is_splittable
cdef object restriction_tracker
cdef WindowedValue current_windowed_value
+ cdef bint is_key_param_required
cdef class DoFnRunner(Receiver):
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index 6d821e3..89c2cb7 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -171,6 +171,7 @@ class MethodWrapper(object):
self.timer_args_to_replace = {}
self.timestamp_arg_name = None
self.window_arg_name = None
+ self.key_arg_name = None
for kw, v in zip(args[-len(defaults):], defaults):
if isinstance(v, core.DoFn.StateParam):
@@ -183,6 +184,8 @@ class MethodWrapper(object):
self.timestamp_arg_name = kw
elif v == core.DoFn.WindowParam:
self.window_arg_name = kw
+ elif v == core.DoFn.KeyParam:
+ self.key_arg_name = kw
def invoke_timer_callback(self,
user_state_context,
@@ -201,6 +204,8 @@ class MethodWrapper(object):
kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
if self.window_arg_name:
kwargs[self.window_arg_name] = window
+ if self.key_arg_name:
+ kwargs[self.key_arg_name] = key
if kwargs:
return self.method_value(**kwargs)
@@ -310,6 +315,13 @@ class DoFnInvoker(object):
represented by a given DoFnSignature."""
def __init__(self, output_processor, signature):
+ """
+ Initializes `DoFnInvoker`
+
+ :param output_processor: an OutputProcessor for receiving elements produced
+ by invoking functions of the DoFn.
+ :param signature: a DoFnSignature for the DoFn being invoked
+ """
self.output_processor = output_processor
self.signature = signature
self.user_state_context = None
@@ -474,6 +486,7 @@ class PerWindowInvoker(DoFnInvoker):
self.restriction_tracker = None
self.current_windowed_value = None
self.bundle_finalizer_param = bundle_finalizer_param
+ self.is_key_param_required = False
# Try to prepare all the arguments that can just be filled in
# without any additional work. in the process function.
@@ -504,11 +517,14 @@ class PerWindowInvoker(DoFnInvoker):
args_to_pick = len(arguments) - len(defaults) - self_in_args
args_with_placeholders = input_args[:args_to_pick]
- # Fill the OtherPlaceholders for context, window or timestamp
+ # Fill the OtherPlaceholders for context, key, window or timestamp
remaining_args_iter = iter(input_args[args_to_pick:])
for a, d in zip(arguments[-len(defaults):], defaults):
if d == core.DoFn.ElementParam:
args_with_placeholders.append(ArgPlaceholder(d))
+ elif d == core.DoFn.KeyParam:
+ self.is_key_param_required = True
+ args_with_placeholders.append(ArgPlaceholder(d))
elif d == core.DoFn.WindowParam:
args_with_placeholders.append(ArgPlaceholder(d))
elif d == core.DoFn.TimestampParam:
@@ -625,18 +641,19 @@ class PerWindowInvoker(DoFnInvoker):
# stateful DoFn, we set during __init__ self.has_windowed_inputs to be
# True. Therefore, windows will be exploded coming into this method, and
# we can rely on the window variable being set above.
- if self.user_state_context:
+ if self.user_state_context or self.is_key_param_required:
try:
key, unused_value = windowed_value.value
except (TypeError, ValueError):
raise ValueError(
- ('Input value to a stateful DoFn must be a KV tuple; instead, '
- 'got %s.') % (windowed_value.value,))
+ ('Input value to a stateful DoFn or KeyParam must be a KV tuple; '
+ 'instead, got \'%s\'.') % (windowed_value.value,))
- # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
for i, p in self.placeholders:
if p == core.DoFn.ElementParam:
args_for_process[i] = windowed_value.value
+ elif p == core.DoFn.KeyParam:
+ args_for_process[i] = key
elif p == core.DoFn.WindowParam:
args_for_process[i] = window
elif p == core.DoFn.TimestampParam:
diff --git a/sdks/python/apache_beam/runners/common_test.py
b/sdks/python/apache_beam/runners/common_test.py
index 18e2c45..9377708 100644
--- a/sdks/python/apache_beam/runners/common_test.py
+++ b/sdks/python/apache_beam/runners/common_test.py
@@ -19,7 +19,13 @@ from __future__ import absolute_import
import unittest
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.common import DoFnSignature
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
from apache_beam.transforms.core import DoFn
@@ -56,5 +62,58 @@ class DoFnSignatureTest(unittest.TestCase):
DoFnSignature(MyDoFn())
+class DoFnProcessTest(unittest.TestCase):
+ # pylint: disable=expression-not-assigned
+ all_records = None
+
+ def setUp(self):
+ DoFnProcessTest.all_records = []
+
+ def record_dofn(self):
+ class RecordDoFn(DoFn):
+ def process(self, element):
+ DoFnProcessTest.all_records.append(element)
+
+ return RecordDoFn()
+
+ def test_dofn_process_keyparam(self):
+
+ class DoFnProcessWithKeyparam(DoFn):
+
+ def process(self, element, mykey=DoFn.KeyParam):
+ yield "{key}-verify".format(key=mykey)
+
+ pipeline_options = PipelineOptions()
+
+ with TestPipeline(options=pipeline_options) as p:
+ test_stream = (TestStream().advance_watermark_to(10).add_elements([1,
2]))
+ (p
+ | test_stream
+ | beam.Map(lambda x: (x, "some-value"))
+ | "window_into" >> beam.WindowInto(
+ window.FixedWindows(5),
+ accumulation_mode=trigger.AccumulationMode.DISCARDING)
+ | beam.ParDo(DoFnProcessWithKeyparam())
+ | beam.ParDo(self.record_dofn()))
+
+ self.assertEqual(
+ ['1-verify', '2-verify'],
+ sorted(DoFnProcessTest.all_records))
+
+ def test_dofn_process_keyparam_error_no_key(self):
+ class DoFnProcessWithKeyparam(DoFn):
+
+ def process(self, element, mykey=DoFn.KeyParam):
+ yield "{key}-verify".format(key=mykey)
+
+ pipeline_options = PipelineOptions()
+ with self.assertRaises(ValueError),\
+ TestPipeline(options=pipeline_options) as p:
+ test_stream = (TestStream().advance_watermark_to(10).add_elements([1,
2]))
+ (p
+ | test_stream
+ | beam.ParDo(DoFnProcessWithKeyparam()))
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index ead094b..f51dc0f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -423,17 +423,19 @@ class DoFn(WithTypeHints, HasDisplayData,
urns.RunnerApiFn):
PaneInfoParam = _DoFnParam('PaneInfoParam')
WatermarkReporterParam = _DoFnParam('WatermarkReporterParam')
BundleFinalizerParam = _BundleFinalizerParam
-
- DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
- WindowParam, WatermarkReporterParam, PaneInfoParam,
- BundleFinalizerParam]
+ KeyParam = _DoFnParam('KeyParam')
# Parameters to access state and timers. Not restricted to use only in the
# .process() method. Usage: DoFn.StateParam(state_spec),
- # DoFn.TimerParam(timer_spec).
+ # DoFn.TimerParam(timer_spec), DoFn.TimestampParam, DoFn.WindowParam,
+ # DoFn.KeyParam
StateParam = _StateDoFnParam
TimerParam = _TimerDoFnParam
+ DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
+ WindowParam, WatermarkReporterParam, PaneInfoParam,
+ BundleFinalizerParam, KeyParam, StateParam, TimerParam]
+
RestrictionParam = _RestrictionDoFnParam
@staticmethod
@@ -460,6 +462,7 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
of the parameter.
``DoFn.StateParam``: a ``userstate.RuntimeState`` object defined by the
spec
of the parameter.
+ ``DoFn.KeyParam``: key associated with the element.
``DoFn.RestrictionParam``: an ``iobase.RestrictionTracker`` will be
provided here to allow treatment as a Splittable ``DoFn``. The restriction
tracker will be derived from the restriction provider in the parameter.
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py
b/sdks/python/apache_beam/transforms/userstate_test.py
index 7a05c73..0d98337 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -66,6 +66,9 @@ class TestStatefulDoFn(DoFn):
@on_timer(EXPIRY_TIMER_1)
def on_expiry_1(self,
+ window=DoFn.WindowParam,
+ timestamp=DoFn.TimestampParam,
+ key=DoFn.KeyParam,
buffer=DoFn.StateParam(BUFFER_STATE_1),
timer_1=DoFn.TimerParam(EXPIRY_TIMER_1),
timer_2=DoFn.TimerParam(EXPIRY_TIMER_2),
@@ -571,8 +574,10 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
@on_timer(EMIT_TIMER_1)
def emit_callback_1(self,
window=DoFn.WindowParam,
- ts=DoFn.TimestampParam):
- yield ('timer1', int(ts), int(window.start), int(window.end))
+ ts=DoFn.TimestampParam,
+ key=DoFn.KeyParam):
+ yield ('timer1-{key}'.format(key=key),
+ int(ts), int(window.start), int(window.end))
pipeline_options = PipelineOptions()
with TestPipeline(options=pipeline_options) as p:
@@ -589,7 +594,7 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
| beam.ParDo(self.record_dofn()))
self.assertEqual(
- [('timer1', 10, 10, 15)],
+ [('timer1-mykey', 10, 10, 15)],
sorted(StatefulDoFnOnDirectRunnerTest.all_records))
def test_index_assignment(self):