This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e39da4c [BEAM-12506] Changed WindowedValueHolder into a Row type new 739dbb8 Merge pull request #15217 from KevinGG/BEAM-12506 e39da4c is described below commit e39da4cddd2a020815d2282cf5712c2799605cba Author: KevinGG <kawai...@gmail.com> AuthorDate: Fri Jul 23 15:28:59 2021 -0700 [BEAM-12506] Changed WindowedValueHolder into a Row type The change avoids introducing a pickled python coder when writing/reading WindowedValueHolders. --- sdks/python/apache_beam/testing/test_stream.py | 45 +++++++++++++++++++++- .../python/apache_beam/testing/test_stream_test.py | 21 ++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index b3b4a96..d655a90 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -207,15 +207,56 @@ class ProcessingTimeEvent(Event): return 'ProcessingTimeEvent: <{}>'.format(self.advance_by) -class WindowedValueHolder: +class WindowedValueHolderMeta(type): + """A metaclass that overrides the isinstance check for WindowedValueHolder. + + Python does a quick test for exact match. If an instance is exactly of + type WindowedValueHolder, the overridden isinstance check is omitted. + The override is needed because WindowedValueHolder elements encoded then + decoded become Row elements. + """ + def __instancecheck__(cls, other): + """Checks if a beam.Row typed instance is a WindowedValueHolder. + """ + return ( + isinstance(other, beam.Row) and hasattr(other, 'windowed_value') and + hasattr(other, 'urn') and + isinstance(other.windowed_value, WindowedValue) and + other.urn == common_urns.coders.ROW.urn) + + +class WindowedValueHolder(beam.Row, metaclass=WindowedValueHolderMeta): """A class that holds a WindowedValue. This is a special class that can be used by the runner that implements the TestStream as a signal that the underlying value should be unreified to the specified window. """ + # Register WindowedValueHolder to always use RowCoder. + coders.registry.register_coder(WindowedValueHolderMeta, coders.RowCoder) + def __init__(self, windowed_value): - self.windowed_value = windowed_value + assert isinstance(windowed_value, WindowedValue), ( + 'WindowedValueHolder can only hold %s type. Instead, %s is given.') % ( + WindowedValue, windowed_value) + super().__init__( + **{ + 'windowed_value': windowed_value, 'urn': common_urns.coders.ROW.urn + }) + + @classmethod + def from_row(cls, row): + """Converts a beam.Row typed instance to WindowedValueHolder. + """ + if isinstance(row, WindowedValueHolder): + return WindowedValueHolder(row.windowed_value) + assert isinstance(row, beam.Row), 'The given row %s must be a %s type' % ( + row, beam.Row) + assert hasattr(row, 'windowed_value'), ( + 'The given %s must have a windowed_value attribute.') % row + assert isinstance(row.windowed_value, WindowedValue), ( + 'The windowed_value attribute of %s must be a %s type') % ( + row, WindowedValue) class TestStream(PTransform): diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py index 94445dd..a4580b7 100644 --- a/sdks/python/apache_beam/testing/test_stream_test.py +++ b/sdks/python/apache_beam/testing/test_stream_test.py @@ -332,6 +332,27 @@ class TestStreamTest(unittest.TestCase): ('a', timestamp.Timestamp(5), beam.window.IntervalWindow(5, 10)), ])) + def test_instance_check_windowed_value_holder(self): + windowed_value = WindowedValue( + 'a', + Timestamp(5), [beam.window.IntervalWindow(5, 10)], + PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0)) + self.assertTrue( + isinstance(WindowedValueHolder(windowed_value), WindowedValueHolder)) + self.assertTrue( + isinstance( + beam.Row( + windowed_value=windowed_value, urn=common_urns.coders.ROW.urn), + WindowedValueHolder)) + self.assertFalse( + isinstance( + beam.Row(windowed_value=windowed_value), WindowedValueHolder)) + self.assertFalse(isinstance(windowed_value, WindowedValueHolder)) + self.assertFalse( + isinstance(beam.Row(x=windowed_value), WindowedValueHolder)) + self.assertFalse( + isinstance(beam.Row(windowed_value=1), WindowedValueHolder)) + def test_gbk_execution_no_triggers(self): test_stream = ( TestStream().advance_watermark_to(10).add_elements([