This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e39da4c  [BEAM-12506] Changed WindowedValueHolder into a Row type
     new 739dbb8  Merge pull request #15217 from KevinGG/BEAM-12506
e39da4c is described below

commit e39da4cddd2a020815d2282cf5712c2799605cba
Author: KevinGG <kawai...@gmail.com>
AuthorDate: Fri Jul 23 15:28:59 2021 -0700

    [BEAM-12506] Changed WindowedValueHolder into a Row type
    
    The change avoids introducing a pickled python coder when writing/reading 
WindowedValueHolders.
---
 sdks/python/apache_beam/testing/test_stream.py     | 45 +++++++++++++++++++++-
 .../python/apache_beam/testing/test_stream_test.py | 21 ++++++++++
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/testing/test_stream.py 
b/sdks/python/apache_beam/testing/test_stream.py
index b3b4a96..d655a90 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -207,15 +207,56 @@ class ProcessingTimeEvent(Event):
     return 'ProcessingTimeEvent: <{}>'.format(self.advance_by)
 
 
-class WindowedValueHolder:
+class WindowedValueHolderMeta(type):
+  """A metaclass that overrides the isinstance check for WindowedValueHolder.
+
+  Python does a quick test for exact match. If an instance is exactly of
+  type WindowedValueHolder, the overridden isinstance check is omitted.
+  The override is needed because WindowedValueHolder elements encoded then
+  decoded become Row elements.
+  """
+  def __instancecheck__(cls, other):
+    """Checks if a beam.Row typed instance is a WindowedValueHolder.
+    """
+    return (
+        isinstance(other, beam.Row) and hasattr(other, 'windowed_value') and
+        hasattr(other, 'urn') and
+        isinstance(other.windowed_value, WindowedValue) and
+        other.urn == common_urns.coders.ROW.urn)
+
+
+class WindowedValueHolder(beam.Row, metaclass=WindowedValueHolderMeta):
   """A class that holds a WindowedValue.
 
   This is a special class that can be used by the runner that implements the
   TestStream as a signal that the underlying value should be unreified to the
   specified window.
   """
+  # Register WindowedValueHolder to always use RowCoder.
+  coders.registry.register_coder(WindowedValueHolderMeta, coders.RowCoder)
+
   def __init__(self, windowed_value):
-    self.windowed_value = windowed_value
+    assert isinstance(windowed_value, WindowedValue), (
+        'WindowedValueHolder can only hold %s type. Instead, %s is given.') % (
+            WindowedValue, windowed_value)
+    super().__init__(
+        **{
+            'windowed_value': windowed_value, 'urn': common_urns.coders.ROW.urn
+        })
+
+  @classmethod
+  def from_row(cls, row):
+    """Converts a beam.Row typed instance to WindowedValueHolder.
+    """
+    if isinstance(row, WindowedValueHolder):
+      return WindowedValueHolder(row.windowed_value)
+    assert isinstance(row, beam.Row), 'The given row %s must be a %s type' % (
+        row, beam.Row)
+    assert hasattr(row, 'windowed_value'), (
+        'The given %s must have a windowed_value attribute.') % row
+    assert isinstance(row.windowed_value, WindowedValue), (
+        'The windowed_value attribute of %s must be a %s type') % (
+            row, WindowedValue)
 
 
 class TestStream(PTransform):
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py 
b/sdks/python/apache_beam/testing/test_stream_test.py
index 94445dd..a4580b7 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -332,6 +332,27 @@ class TestStreamTest(unittest.TestCase):
               ('a', timestamp.Timestamp(5), beam.window.IntervalWindow(5, 10)),
           ]))
 
+  def test_instance_check_windowed_value_holder(self):
+    windowed_value = WindowedValue(
+        'a',
+        Timestamp(5), [beam.window.IntervalWindow(5, 10)],
+        PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0))
+    self.assertTrue(
+        isinstance(WindowedValueHolder(windowed_value), WindowedValueHolder))
+    self.assertTrue(
+        isinstance(
+            beam.Row(
+                windowed_value=windowed_value, urn=common_urns.coders.ROW.urn),
+            WindowedValueHolder))
+    self.assertFalse(
+        isinstance(
+            beam.Row(windowed_value=windowed_value), WindowedValueHolder))
+    self.assertFalse(isinstance(windowed_value, WindowedValueHolder))
+    self.assertFalse(
+        isinstance(beam.Row(x=windowed_value), WindowedValueHolder))
+    self.assertFalse(
+        isinstance(beam.Row(windowed_value=1), WindowedValueHolder))
+
   def test_gbk_execution_no_triggers(self):
     test_stream = (
         TestStream().advance_watermark_to(10).add_elements([

Reply via email to