Runner API encoding of WindowFns.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aad32b7a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aad32b7a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aad32b7a Branch: refs/heads/gearpump-runner Commit: aad32b7a00d1aea1e7e51b68ff609d2fb3b82a8f Parents: bc76a18 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Mar 7 12:21:02 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Mar 9 20:29:01 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/window.py | 117 +++++++++++++++++++ .../apache_beam/transforms/window_test.py | 11 ++ sdks/python/apache_beam/utils/proto_utils.py | 37 ++++++ sdks/python/apache_beam/utils/urns.py | 7 ++ 4 files changed, 172 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 14cf2f6..a562bcf 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -49,13 +49,20 @@ WindowFn. from __future__ import absolute_import +from google.protobuf import struct_pb2 +from google.protobuf import wrappers_pb2 + from apache_beam import coders +from apache_beam.internal import pickler +from apache_beam.runners.api import beam_runner_api_pb2 from apache_beam.transforms import timeutil from apache_beam.transforms.timeutil import Duration from apache_beam.transforms.timeutil import MAX_TIMESTAMP from apache_beam.transforms.timeutil import MIN_TIMESTAMP from apache_beam.transforms.timeutil import Timestamp from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.utils import proto_utils +from apache_beam.utils import urns # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their @@ -131,6 +138,41 @@ class WindowFn(object): # By default, just return the input timestamp. return input_timestamp + _known_urns = {} + + @classmethod + def register_urn(cls, urn, parameter_type, constructor): + cls._known_urns[urn] = parameter_type, constructor + + @classmethod + def from_runner_api(cls, fn_proto, context): + parameter_type, constructor = cls._known_urns[fn_proto.spec.urn] + return constructor( + proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type), + context) + + def to_runner_api(self, context): + urn, typed_param = self.to_runner_api_parameter(context) + return beam_runner_api_pb2.FunctionSpec( + spec=beam_runner_api_pb2.UrnWithParameter( + urn=urn, + parameter=proto_utils.pack_Any(typed_param))) + + @staticmethod + def from_runner_api_parameter(fn_parameter, unused_context): + return pickler.loads(fn_parameter.value) + + def to_runner_api_parameter(self, context): + raise TypeError(self) + return (urns.PICKLED_WINDOW_FN, + wrappers_pb2.BytesValue(value=pickler.dumps(self))) + + +WindowFn.register_urn( + urns.PICKLED_WINDOW_FN, + wrappers_pb2.BytesValue, + WindowFn.from_runner_api_parameter) + class BoundedWindow(object): """A window for timestamps in range (-infinity, end). @@ -251,6 +293,16 @@ class GlobalWindows(WindowFn): def __ne__(self, other): return not self == other + @staticmethod + def from_runner_api_parameter(unused_fn_parameter, unused_context): + return GlobalWindows() + + def to_runner_api_parameter(self, context): + return urns.GLOBAL_WINDOWS_FN, None + +WindowFn.register_urn( + urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter) + class FixedWindows(WindowFn): """A windowing function that assigns each element to one time interval. @@ -280,6 +332,29 @@ class FixedWindows(WindowFn): def merge(self, merge_context): pass # No merging. + def __eq__(self, other): + if type(self) == type(other) == FixedWindows: + return self.size == other.size and self.offset == other.offset + + def __ne__(self, other): + return not self == other + + @staticmethod + def from_runner_api_parameter(fn_parameter, unused_context): + return FixedWindows( + size=Duration(micros=fn_parameter['size']), + offset=Timestamp(micros=fn_parameter['offset'])) + + def to_runner_api_parameter(self, context): + return (urns.FIXED_WINDOWS_FN, + proto_utils.pack_Struct(size=self.size.micros, + offset=self.offset.micros)) + +WindowFn.register_urn( + urns.FIXED_WINDOWS_FN, + struct_pb2.Struct, + FixedWindows.from_runner_api_parameter) + class SlidingWindows(WindowFn): """A windowing function that assigns each element to a set of sliding windows. @@ -312,6 +387,31 @@ class SlidingWindows(WindowFn): def merge(self, merge_context): pass # No merging. + def __eq__(self, other): + if type(self) == type(other) == SlidingWindows: + return (self.size == other.size + and self.offset == other.offset + and self.period == other.period) + + @staticmethod + def from_runner_api_parameter(fn_parameter, unused_context): + return SlidingWindows( + size=Duration(micros=fn_parameter['size']), + offset=Timestamp(micros=fn_parameter['offset']), + period=Duration(micros=fn_parameter['period'])) + + def to_runner_api_parameter(self, context): + return (urns.SLIDING_WINDOWS_FN, + proto_utils.pack_Struct( + size=self.size.micros, + offset=self.offset.micros, + period=self.period.micros)) + +WindowFn.register_urn( + urns.SLIDING_WINDOWS_FN, + struct_pb2.Struct, + SlidingWindows.from_runner_api_parameter) + class Sessions(WindowFn): """A windowing function that groups elements into sessions. @@ -352,3 +452,20 @@ class Sessions(WindowFn): end = w.end if len(to_merge) > 1: merge_context.merge(to_merge, IntervalWindow(to_merge[0].start, end)) + + def __eq__(self, other): + if type(self) == type(other) == Sessions: + return self.gap_size == other.gap_size + + @staticmethod + def from_runner_api_parameter(fn_parameter, unused_context): + return Sessions(gap_size=Duration(micros=fn_parameter['gap_size'])) + + def to_runner_api_parameter(self, context): + return (urns.SESSION_WINDOWS_FN, + proto_utils.pack_Struct(gap_size=self.gap_size.micros)) + +WindowFn.register_urn( + urns.SESSION_WINDOWS_FN, + struct_pb2.Struct, + Sessions.from_runner_api_parameter) http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index c4072ac..821b143 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -19,6 +19,7 @@ import unittest +from apache_beam import pipeline from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import CombinePerKey from apache_beam.transforms import combiners @@ -32,6 +33,7 @@ from apache_beam.transforms.timeutil import MIN_TIMESTAMP from apache_beam.transforms.util import assert_that, equal_to from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import GlobalWindow +from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms.window import Sessions from apache_beam.transforms.window import SlidingWindows @@ -224,6 +226,15 @@ class WindowTest(unittest.TestCase): label='assert:mean') p.run() + def test_runner_api(self): + for window_fn in (GlobalWindows(), + FixedWindows(37), + SlidingWindows(2, 389), + Sessions(5077)): + context = pipeline.PipelineContext() + self.assertEqual( + window_fn, + WindowFn.from_runner_api(window_fn.to_runner_api(context), context)) if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/proto_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py new file mode 100644 index 0000000..0ece8f5 --- /dev/null +++ b/sdks/python/apache_beam/utils/proto_utils.py @@ -0,0 +1,37 @@ +from google.protobuf import any_pb2 +from google.protobuf import struct_pb2 + + +def pack_Any(msg): + """Creates a protobuf Any with msg as its content. + + Returns None if msg is None. + """ + if msg is None: + return None + else: + result = any_pb2.Any() + result.Pack(msg) + return result + + +def unpack_Any(any_msg, msg_class): + """Unpacks any_msg into msg_class. + + Returns None if msg_class is None. + """ + if msg_class is None: + return None + else: + msg = msg_class() + any_msg.Unpack(msg) + return msg + + +def pack_Struct(**kwargs): + """Returns a struct containing the values indicated by kwargs. + """ + msg = struct_pb2.Struct() + for key, value in kwargs.items(): + msg[key] = value # pylint: disable=unsubscriptable-object + return msg http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py new file mode 100644 index 0000000..4d1c2f7 --- /dev/null +++ b/sdks/python/apache_beam/utils/urns.py @@ -0,0 +1,7 @@ +PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1" +GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1" +FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1" +SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1" +SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1" + +PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"