[beam] tag nightly-master updated (205fbb1 -> 939fa99)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to tag nightly-master in repository https://gitbox.apache.org/repos/asf/beam.git. *** WARNING: tag nightly-master was modified! *** from 205fbb1 (commit) to 939fa99 (commit) from 205fbb1 [BEAM-12653] Fix container cleanup for Java Dataflow tests (#15213) add e39da4c [BEAM-12506] Changed WindowedValueHolder into a Row type add 739dbb8 Merge pull request #15217 from KevinGG/BEAM-12506 add 939fa99 [BEAM-1833] Preserve inputs names at graph construction and through proto transaltion. (#15202) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/pipeline.py| 58 -- sdks/python/apache_beam/pipeline_test.py | 18 +++ .../runners/dataflow/dataflow_runner.py| 2 +- .../runners/dataflow/dataflow_runner_test.py | 17 --- .../runners/interactive/pipeline_instrument.py | 8 +-- .../interactive/pipeline_instrument_test.py| 10 ++-- sdks/python/apache_beam/testing/test_stream.py | 45 - .../python/apache_beam/testing/test_stream_test.py | 21 sdks/python/apache_beam/transforms/ptransform.py | 45 - 9 files changed, 156 insertions(+), 68 deletions(-)
[beam] branch master updated (739dbb8 -> 939fa99)
This is an automated email from the ASF dual-hosted git repository. robertwb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 739dbb8 Merge pull request #15217 from KevinGG/BEAM-12506 add 939fa99 [BEAM-1833] Preserve inputs names at graph construction and through proto transaltion. (#15202) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/pipeline.py| 58 -- sdks/python/apache_beam/pipeline_test.py | 18 +++ .../runners/dataflow/dataflow_runner.py| 2 +- .../runners/dataflow/dataflow_runner_test.py | 17 --- .../runners/interactive/pipeline_instrument.py | 8 +-- .../interactive/pipeline_instrument_test.py| 10 ++-- sdks/python/apache_beam/transforms/ptransform.py | 45 - 7 files changed, 92 insertions(+), 66 deletions(-)
[beam] branch release-2.32.0 updated (edd9a43 -> 1175c09)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a change to branch release-2.32.0 in repository https://gitbox.apache.org/repos/asf/beam.git. from edd9a43 Merge pull request #15184: [BEAM-12589] - exclude testTwoTimersSettingEachOtherWithCreateAsInput… (#15240) add 1175c09 [BEAM-12625] Annotate testTwoTimersSettingEachOtherWithCreateAsInputUnbounded. (#15207) No new revisions were added by this update. Summary of changes: .../src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java| 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-)
[beam] branch release-2.32.0 updated (eaf5776 -> edd9a43)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a change to branch release-2.32.0 in repository https://gitbox.apache.org/repos/asf/beam.git. from eaf5776 [BEAM-12656] Fix go-licenses build (#15215) (#15222) add edd9a43 Merge pull request #15184: [BEAM-12589] - exclude testTwoTimersSettingEachOtherWithCreateAsInput… (#15240) No new revisions were added by this update. Summary of changes: runners/google-cloud-dataflow-java/build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[beam] branch master updated: [BEAM-12506] Changed WindowedValueHolder into a Row type
This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new e39da4c [BEAM-12506] Changed WindowedValueHolder into a Row type new 739dbb8 Merge pull request #15217 from KevinGG/BEAM-12506 e39da4c is described below commit e39da4cddd2a020815d2282cf5712c2799605cba Author: KevinGG AuthorDate: Fri Jul 23 15:28:59 2021 -0700 [BEAM-12506] Changed WindowedValueHolder into a Row type The change avoids introducing a pickled python coder when writing/reading WindowedValueHolders. --- sdks/python/apache_beam/testing/test_stream.py | 45 +- .../python/apache_beam/testing/test_stream_test.py | 21 ++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index b3b4a96..d655a90 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -207,15 +207,56 @@ class ProcessingTimeEvent(Event): return 'ProcessingTimeEvent: <{}>'.format(self.advance_by) -class WindowedValueHolder: +class WindowedValueHolderMeta(type): + """A metaclass that overrides the isinstance check for WindowedValueHolder. + + Python does a quick test for exact match. If an instance is exactly of + type WindowedValueHolder, the overridden isinstance check is omitted. + The override is needed because WindowedValueHolder elements encoded then + decoded become Row elements. + """ + def __instancecheck__(cls, other): +"""Checks if a beam.Row typed instance is a WindowedValueHolder. +""" +return ( +isinstance(other, beam.Row) and hasattr(other, 'windowed_value') and +hasattr(other, 'urn') and +isinstance(other.windowed_value, WindowedValue) and +other.urn == common_urns.coders.ROW.urn) + + +class WindowedValueHolder(beam.Row, metaclass=WindowedValueHolderMeta): """A class that holds a WindowedValue. This is a special class that can be used by the runner that implements the TestStream as a signal that the underlying value should be unreified to the specified window. """ + # Register WindowedValueHolder to always use RowCoder. + coders.registry.register_coder(WindowedValueHolderMeta, coders.RowCoder) + def __init__(self, windowed_value): -self.windowed_value = windowed_value +assert isinstance(windowed_value, WindowedValue), ( +'WindowedValueHolder can only hold %s type. Instead, %s is given.') % ( +WindowedValue, windowed_value) +super().__init__( +**{ +'windowed_value': windowed_value, 'urn': common_urns.coders.ROW.urn +}) + + @classmethod + def from_row(cls, row): +"""Converts a beam.Row typed instance to WindowedValueHolder. +""" +if isinstance(row, WindowedValueHolder): + return WindowedValueHolder(row.windowed_value) +assert isinstance(row, beam.Row), 'The given row %s must be a %s type' % ( +row, beam.Row) +assert hasattr(row, 'windowed_value'), ( +'The given %s must have a windowed_value attribute.') % row +assert isinstance(row.windowed_value, WindowedValue), ( +'The windowed_value attribute of %s must be a %s type') % ( +row, WindowedValue) class TestStream(PTransform): diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py index 94445dd..a4580b7 100644 --- a/sdks/python/apache_beam/testing/test_stream_test.py +++ b/sdks/python/apache_beam/testing/test_stream_test.py @@ -332,6 +332,27 @@ class TestStreamTest(unittest.TestCase): ('a', timestamp.Timestamp(5), beam.window.IntervalWindow(5, 10)), ])) + def test_instance_check_windowed_value_holder(self): +windowed_value = WindowedValue( +'a', +Timestamp(5), [beam.window.IntervalWindow(5, 10)], +PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0)) +self.assertTrue( +isinstance(WindowedValueHolder(windowed_value), WindowedValueHolder)) +self.assertTrue( +isinstance( +beam.Row( +windowed_value=windowed_value, urn=common_urns.coders.ROW.urn), +WindowedValueHolder)) +self.assertFalse( +isinstance( +beam.Row(windowed_value=windowed_value), WindowedValueHolder)) +self.assertFalse(isinstance(windowed_value, WindowedValueHolder)) +self.assertFalse( +isinstance(beam.Row(x=windowed_value), WindowedValueHolder)) +self.assertFalse( +isinstance(beam.Row(windowed_value=1), WindowedValueHolder)) + def test_gbk_execution_no_triggers(self): test_stream = ( TestStream().advance_watermark_to(10).add_elements([
[beam] branch release-2.32.0 updated (ca866af -> eaf5776)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a change to branch release-2.32.0 in repository https://gitbox.apache.org/repos/asf/beam.git. from ca866af [BEAM-12399] Move CPython license to own file. (#15201) (#15211) add eaf5776 [BEAM-12656] Fix go-licenses build (#15215) (#15222) No new revisions were added by this update. Summary of changes: release/go-licenses/Dockerfile | 2 +- release/go-licenses/get-licenses.sh | 1 + 2 files changed, 2 insertions(+), 1 deletion(-)
[beam] branch release-2.32.0 updated (ad5d202 -> ca866af)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a change to branch release-2.32.0 in repository https://gitbox.apache.org/repos/asf/beam.git. from ad5d202 Revert "Default to Runner v2 for Python Streaming jobs. (#15140)" (#15210) (#15212) add ca866af [BEAM-12399] Move CPython license to own file. (#15201) (#15211) No new revisions were added by this update. Summary of changes: LICENSE| 260 + LICENSE.python | 258 build.gradle.kts | 3 + .../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + sdks/python/MANIFEST.in| 1 + sdks/python/container/Dockerfile | 1 + 6 files changed, 266 insertions(+), 258 deletions(-) create mode 100644 LICENSE.python
[beam] branch release-2.32.0 updated (8e32f3b -> ad5d202)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a change to branch release-2.32.0 in repository https://gitbox.apache.org/repos/asf/beam.git. from 8e32f3b [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226) add ad5d202 Revert "Default to Runner v2 for Python Streaming jobs. (#15140)" (#15210) (#15212) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 8 +--- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 11 ++- 2 files changed, 3 insertions(+), 16 deletions(-)
[beam] branch release-2.32.0 updated: [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch release-2.32.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.32.0 by this push: new 8e32f3b [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226) 8e32f3b is described below commit 8e32f3b9a67dc5df63069b6f0271c97158cc9639 Author: Ankur AuthorDate: Wed Jul 28 13:01:53 2021 -0700 [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226) [BEAM-12661] Fix stuck GetData Windmill calls Co-authored-by: slavachernyak --- .../beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 81ae092..3c06ee9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -1525,8 +1525,8 @@ public class GrpcWindmillServer extends WindmillServerStub { try { blockedStartMs.set(Instant.now().getMillis()); - current = queue.take(); - if (current != POISON_PILL) { + current = queue.poll(180, TimeUnit.SECONDS); + if (current != null && current != POISON_PILL) { return true; } if (cancelled.get()) { @@ -1535,7 +1535,8 @@ public class GrpcWindmillServer extends WindmillServerStub { if (complete.get()) { return false; } - throw new IllegalStateException("Got poison pill but stream is not done."); + throw new IllegalStateException( + "Got poison pill or timeout but stream is not done."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new CancellationException();