Runner API encoding of WindowFns.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aad32b7a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aad32b7a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aad32b7a

Branch: refs/heads/master
Commit: aad32b7a00d1aea1e7e51b68ff609d2fb3b82a8f
Parents: bc76a18
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Tue Mar 7 12:21:02 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/window.py    | 117 +++++++++++++++++++
 .../apache_beam/transforms/window_test.py       |  11 ++
 sdks/python/apache_beam/utils/proto_utils.py    |  37 ++++++
 sdks/python/apache_beam/utils/urns.py           |   7 ++
 4 files changed, 172 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 14cf2f6..a562bcf 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,13 +49,20 @@ WindowFn.
 
 from __future__ import absolute_import
 
+from google.protobuf import struct_pb2
+from google.protobuf import wrappers_pb2
+
 from apache_beam import coders
+from apache_beam.internal import pickler
+from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.transforms.timeutil import Duration
 from apache_beam.transforms.timeutil import MAX_TIMESTAMP
 from apache_beam.transforms.timeutil import MIN_TIMESTAMP
 from apache_beam.transforms.timeutil import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
 
 
 # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
@@ -131,6 +138,41 @@ class WindowFn(object):
     # By default, just return the input timestamp.
     return input_timestamp
 
+  _known_urns = {}
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, constructor):
+    cls._known_urns[urn] = parameter_type, constructor
+
+  @classmethod
+  def from_runner_api(cls, fn_proto, context):
+    parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+    return constructor(
+        proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+        context)
+
+  def to_runner_api(self, context):
+    urn, typed_param = self.to_runner_api_parameter(context)
+    return beam_runner_api_pb2.FunctionSpec(
+        spec=beam_runner_api_pb2.UrnWithParameter(
+            urn=urn,
+            parameter=proto_utils.pack_Any(typed_param)))
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return pickler.loads(fn_parameter.value)
+
+  def to_runner_api_parameter(self, context):
+    raise TypeError(self)
+    return (urns.PICKLED_WINDOW_FN,
+            wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+
+WindowFn.register_urn(
+    urns.PICKLED_WINDOW_FN,
+    wrappers_pb2.BytesValue,
+    WindowFn.from_runner_api_parameter)
+
 
 class BoundedWindow(object):
   """A window for timestamps in range (-infinity, end).
@@ -251,6 +293,16 @@ class GlobalWindows(WindowFn):
   def __ne__(self, other):
     return not self == other
 
+  @staticmethod
+  def from_runner_api_parameter(unused_fn_parameter, unused_context):
+    return GlobalWindows()
+
+  def to_runner_api_parameter(self, context):
+    return urns.GLOBAL_WINDOWS_FN, None
+
+WindowFn.register_urn(
+    urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+
 
 class FixedWindows(WindowFn):
   """A windowing function that assigns each element to one time interval.
@@ -280,6 +332,29 @@ class FixedWindows(WindowFn):
   def merge(self, merge_context):
     pass  # No merging.
 
+  def __eq__(self, other):
+    if type(self) == type(other) == FixedWindows:
+      return self.size == other.size and self.offset == other.offset
+
+  def __ne__(self, other):
+    return not self == other
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return FixedWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']))
+
+  def to_runner_api_parameter(self, context):
+    return (urns.FIXED_WINDOWS_FN,
+            proto_utils.pack_Struct(size=self.size.micros,
+                                    offset=self.offset.micros))
+
+WindowFn.register_urn(
+    urns.FIXED_WINDOWS_FN,
+    struct_pb2.Struct,
+    FixedWindows.from_runner_api_parameter)
+
 
 class SlidingWindows(WindowFn):
   """A windowing function that assigns each element to a set of sliding 
windows.
@@ -312,6 +387,31 @@ class SlidingWindows(WindowFn):
   def merge(self, merge_context):
     pass  # No merging.
 
+  def __eq__(self, other):
+    if type(self) == type(other) == SlidingWindows:
+      return (self.size == other.size
+              and self.offset == other.offset
+              and self.period == other.period)
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return SlidingWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']),
+        period=Duration(micros=fn_parameter['period']))
+
+  def to_runner_api_parameter(self, context):
+    return (urns.SLIDING_WINDOWS_FN,
+            proto_utils.pack_Struct(
+                size=self.size.micros,
+                offset=self.offset.micros,
+                period=self.period.micros))
+
+WindowFn.register_urn(
+    urns.SLIDING_WINDOWS_FN,
+    struct_pb2.Struct,
+    SlidingWindows.from_runner_api_parameter)
+
 
 class Sessions(WindowFn):
   """A windowing function that groups elements into sessions.
@@ -352,3 +452,20 @@ class Sessions(WindowFn):
         end = w.end
     if len(to_merge) > 1:
       merge_context.merge(to_merge, IntervalWindow(to_merge[0].start, end))
+
+  def __eq__(self, other):
+    if type(self) == type(other) == Sessions:
+      return self.gap_size == other.gap_size
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
+
+  def to_runner_api_parameter(self, context):
+    return (urns.SESSION_WINDOWS_FN,
+            proto_utils.pack_Struct(gap_size=self.gap_size.micros))
+
+WindowFn.register_urn(
+    urns.SESSION_WINDOWS_FN,
+    struct_pb2.Struct,
+    Sessions.from_runner_api_parameter)

http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index c4072ac..821b143 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,6 +19,7 @@
 
 import unittest
 
+from apache_beam import pipeline
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
@@ -32,6 +33,7 @@ from apache_beam.transforms.timeutil import MIN_TIMESTAMP
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import Sessions
 from apache_beam.transforms.window import SlidingWindows
@@ -224,6 +226,15 @@ class WindowTest(unittest.TestCase):
                 label='assert:mean')
     p.run()
 
+  def test_runner_api(self):
+    for window_fn in (GlobalWindows(),
+                      FixedWindows(37),
+                      SlidingWindows(2, 389),
+                      Sessions(5077)):
+      context = pipeline.PipelineContext()
+      self.assertEqual(
+          window_fn,
+          WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py 
b/sdks/python/apache_beam/utils/proto_utils.py
new file mode 100644
index 0000000..0ece8f5
--- /dev/null
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -0,0 +1,37 @@
+from google.protobuf import any_pb2
+from google.protobuf import struct_pb2
+
+
+def pack_Any(msg):
+  """Creates a protobuf Any with msg as its content.
+
+  Returns None if msg is None.
+  """
+  if msg is None:
+    return None
+  else:
+    result = any_pb2.Any()
+    result.Pack(msg)
+    return result
+
+
+def unpack_Any(any_msg, msg_class):
+  """Unpacks any_msg into msg_class.
+
+  Returns None if msg_class is None.
+  """
+  if msg_class is None:
+    return None
+  else:
+    msg = msg_class()
+    any_msg.Unpack(msg)
+    return msg
+
+
+def pack_Struct(**kwargs):
+  """Returns a struct containing the values indicated by kwargs.
+  """
+  msg = struct_pb2.Struct()
+  for key, value in kwargs.items():
+    msg[key] = value  # pylint: disable=unsubscriptable-object
+  return msg

http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py 
b/sdks/python/apache_beam/utils/urns.py
new file mode 100644
index 0000000..4d1c2f7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -0,0 +1,7 @@
+PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
+GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
+FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
+SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
+SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
+
+PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"

Reply via email to