[beam] tag nightly-master updated (205fbb1 -> 939fa99)

2021-07-28 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to tag nightly-master
in repository https://gitbox.apache.org/repos/asf/beam.git.


*** WARNING: tag nightly-master was modified! ***

from 205fbb1  (commit)
  to 939fa99  (commit)
from 205fbb1  [BEAM-12653] Fix container cleanup for Java Dataflow tests 
(#15213)
 add e39da4c  [BEAM-12506] Changed WindowedValueHolder into a Row type
 add 739dbb8  Merge pull request #15217 from KevinGG/BEAM-12506
 add 939fa99  [BEAM-1833] Preserve inputs names at graph construction and 
through proto transaltion. (#15202)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/pipeline.py| 58 --
 sdks/python/apache_beam/pipeline_test.py   | 18 +++
 .../runners/dataflow/dataflow_runner.py|  2 +-
 .../runners/dataflow/dataflow_runner_test.py   | 17 ---
 .../runners/interactive/pipeline_instrument.py |  8 +--
 .../interactive/pipeline_instrument_test.py| 10 ++--
 sdks/python/apache_beam/testing/test_stream.py | 45 -
 .../python/apache_beam/testing/test_stream_test.py | 21 
 sdks/python/apache_beam/transforms/ptransform.py   | 45 -
 9 files changed, 156 insertions(+), 68 deletions(-)


[beam] branch master updated (739dbb8 -> 939fa99)

2021-07-28 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 739dbb8  Merge pull request #15217 from KevinGG/BEAM-12506
 add 939fa99  [BEAM-1833] Preserve inputs names at graph construction and 
through proto transaltion. (#15202)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/pipeline.py| 58 --
 sdks/python/apache_beam/pipeline_test.py   | 18 +++
 .../runners/dataflow/dataflow_runner.py|  2 +-
 .../runners/dataflow/dataflow_runner_test.py   | 17 ---
 .../runners/interactive/pipeline_instrument.py |  8 +--
 .../interactive/pipeline_instrument_test.py| 10 ++--
 sdks/python/apache_beam/transforms/ptransform.py   | 45 -
 7 files changed, 92 insertions(+), 66 deletions(-)


[beam] branch release-2.32.0 updated (edd9a43 -> 1175c09)

2021-07-28 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch release-2.32.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from edd9a43  Merge pull request #15184: [BEAM-12589] - exclude 
testTwoTimersSettingEachOtherWithCreateAsInput… (#15240)
 add 1175c09  [BEAM-12625] Annotate 
testTwoTimersSettingEachOtherWithCreateAsInputUnbounded. (#15207)

No new revisions were added by this update.

Summary of changes:
 .../src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java| 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)


[beam] branch release-2.32.0 updated (eaf5776 -> edd9a43)

2021-07-28 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch release-2.32.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from eaf5776  [BEAM-12656] Fix go-licenses build (#15215) (#15222)
 add edd9a43  Merge pull request #15184: [BEAM-12589] - exclude 
testTwoTimersSettingEachOtherWithCreateAsInput… (#15240)

No new revisions were added by this update.

Summary of changes:
 runners/google-cloud-dataflow-java/build.gradle | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)


[beam] branch master updated: [BEAM-12506] Changed WindowedValueHolder into a Row type

2021-07-28 Thread ningk
This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e39da4c  [BEAM-12506] Changed WindowedValueHolder into a Row type
 new 739dbb8  Merge pull request #15217 from KevinGG/BEAM-12506
e39da4c is described below

commit e39da4cddd2a020815d2282cf5712c2799605cba
Author: KevinGG 
AuthorDate: Fri Jul 23 15:28:59 2021 -0700

[BEAM-12506] Changed WindowedValueHolder into a Row type

The change avoids introducing a pickled python coder when writing/reading 
WindowedValueHolders.
---
 sdks/python/apache_beam/testing/test_stream.py | 45 +-
 .../python/apache_beam/testing/test_stream_test.py | 21 ++
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/testing/test_stream.py 
b/sdks/python/apache_beam/testing/test_stream.py
index b3b4a96..d655a90 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -207,15 +207,56 @@ class ProcessingTimeEvent(Event):
 return 'ProcessingTimeEvent: <{}>'.format(self.advance_by)
 
 
-class WindowedValueHolder:
+class WindowedValueHolderMeta(type):
+  """A metaclass that overrides the isinstance check for WindowedValueHolder.
+
+  Python does a quick test for exact match. If an instance is exactly of
+  type WindowedValueHolder, the overridden isinstance check is omitted.
+  The override is needed because WindowedValueHolder elements encoded then
+  decoded become Row elements.
+  """
+  def __instancecheck__(cls, other):
+"""Checks if a beam.Row typed instance is a WindowedValueHolder.
+"""
+return (
+isinstance(other, beam.Row) and hasattr(other, 'windowed_value') and
+hasattr(other, 'urn') and
+isinstance(other.windowed_value, WindowedValue) and
+other.urn == common_urns.coders.ROW.urn)
+
+
+class WindowedValueHolder(beam.Row, metaclass=WindowedValueHolderMeta):
   """A class that holds a WindowedValue.
 
   This is a special class that can be used by the runner that implements the
   TestStream as a signal that the underlying value should be unreified to the
   specified window.
   """
+  # Register WindowedValueHolder to always use RowCoder.
+  coders.registry.register_coder(WindowedValueHolderMeta, coders.RowCoder)
+
   def __init__(self, windowed_value):
-self.windowed_value = windowed_value
+assert isinstance(windowed_value, WindowedValue), (
+'WindowedValueHolder can only hold %s type. Instead, %s is given.') % (
+WindowedValue, windowed_value)
+super().__init__(
+**{
+'windowed_value': windowed_value, 'urn': common_urns.coders.ROW.urn
+})
+
+  @classmethod
+  def from_row(cls, row):
+"""Converts a beam.Row typed instance to WindowedValueHolder.
+"""
+if isinstance(row, WindowedValueHolder):
+  return WindowedValueHolder(row.windowed_value)
+assert isinstance(row, beam.Row), 'The given row %s must be a %s type' % (
+row, beam.Row)
+assert hasattr(row, 'windowed_value'), (
+'The given %s must have a windowed_value attribute.') % row
+assert isinstance(row.windowed_value, WindowedValue), (
+'The windowed_value attribute of %s must be a %s type') % (
+row, WindowedValue)
 
 
 class TestStream(PTransform):
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py 
b/sdks/python/apache_beam/testing/test_stream_test.py
index 94445dd..a4580b7 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -332,6 +332,27 @@ class TestStreamTest(unittest.TestCase):
   ('a', timestamp.Timestamp(5), beam.window.IntervalWindow(5, 10)),
   ]))
 
+  def test_instance_check_windowed_value_holder(self):
+windowed_value = WindowedValue(
+'a',
+Timestamp(5), [beam.window.IntervalWindow(5, 10)],
+PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0))
+self.assertTrue(
+isinstance(WindowedValueHolder(windowed_value), WindowedValueHolder))
+self.assertTrue(
+isinstance(
+beam.Row(
+windowed_value=windowed_value, urn=common_urns.coders.ROW.urn),
+WindowedValueHolder))
+self.assertFalse(
+isinstance(
+beam.Row(windowed_value=windowed_value), WindowedValueHolder))
+self.assertFalse(isinstance(windowed_value, WindowedValueHolder))
+self.assertFalse(
+isinstance(beam.Row(x=windowed_value), WindowedValueHolder))
+self.assertFalse(
+isinstance(beam.Row(windowed_value=1), WindowedValueHolder))
+
   def test_gbk_execution_no_triggers(self):
 test_stream = (
 TestStream().advance_watermark_to(10).add_elements([


[beam] branch release-2.32.0 updated (ca866af -> eaf5776)

2021-07-28 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch release-2.32.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from ca866af  [BEAM-12399] Move CPython license to own file. (#15201) 
(#15211)
 add eaf5776  [BEAM-12656] Fix go-licenses build (#15215) (#15222)

No new revisions were added by this update.

Summary of changes:
 release/go-licenses/Dockerfile  | 2 +-
 release/go-licenses/get-licenses.sh | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)


[beam] branch release-2.32.0 updated (ad5d202 -> ca866af)

2021-07-28 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch release-2.32.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from ad5d202  Revert "Default to Runner v2 for Python Streaming jobs. 
(#15140)" (#15210) (#15212)
 add ca866af  [BEAM-12399] Move CPython license to own file. (#15201) 
(#15211)

No new revisions were added by this update.

Summary of changes:
 LICENSE| 260 +
 LICENSE.python | 258 
 build.gradle.kts   |   3 +
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 sdks/python/MANIFEST.in|   1 +
 sdks/python/container/Dockerfile   |   1 +
 6 files changed, 266 insertions(+), 258 deletions(-)
 create mode 100644 LICENSE.python


[beam] branch release-2.32.0 updated (8e32f3b -> ad5d202)

2021-07-28 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch release-2.32.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 8e32f3b  [BEAM-12661] Fix stuck GetData Windmill calls (#15224) 
(#15226)
 add ad5d202  Revert "Default to Runner v2 for Python Streaming jobs. 
(#15140)" (#15210) (#15212)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py   |  8 +---
 .../apache_beam/runners/dataflow/dataflow_runner_test.py  | 11 ++-
 2 files changed, 3 insertions(+), 16 deletions(-)


[beam] branch release-2.32.0 updated: [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226)

2021-07-28 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch release-2.32.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.32.0 by this push:
 new 8e32f3b  [BEAM-12661] Fix stuck GetData Windmill calls (#15224) 
(#15226)
8e32f3b is described below

commit 8e32f3b9a67dc5df63069b6f0271c97158cc9639
Author: Ankur 
AuthorDate: Wed Jul 28 13:01:53 2021 -0700

[BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226)

[BEAM-12661] Fix stuck GetData Windmill calls

Co-authored-by: slavachernyak 
---
 .../beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 81ae092..3c06ee9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -1525,8 +1525,8 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
 
 try {
   blockedStartMs.set(Instant.now().getMillis());
-  current = queue.take();
-  if (current != POISON_PILL) {
+  current = queue.poll(180, TimeUnit.SECONDS);
+  if (current != null && current != POISON_PILL) {
 return true;
   }
   if (cancelled.get()) {
@@ -1535,7 +1535,8 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
   if (complete.get()) {
 return false;
   }
-  throw new IllegalStateException("Got poison pill but stream 
is not done.");
+  throw new IllegalStateException(
+  "Got poison pill or timeout but stream is not done.");
 } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
   throw new CancellationException();