This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new af71782  Fix SplittableParDoNaiveBounded DoFnInvoker.
     new c00dc4a  Merge pull request #11475 from boyuanzz/fix
af71782 is described below

commit af717820535d3990aec15575b012e1df296fb3cc
Author: Boyuan Zhang <boyu...@google.com>
AuthorDate: Mon Apr 20 20:52:24 2020 -0700

    Fix SplittableParDoNaiveBounded DoFnInvoker.
---
 .../runners/core/construction/SplittableParDoNaiveBounded.java     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index b002169..31ce3b6 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -42,6 +42,7 @@ import 
org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -300,7 +301,11 @@ public class SplittableParDoNaiveBounded {
           // Fetch the watermark before splitting to ensure that the watermark 
applies to both
           // the primary and the residual.
           watermarkEstimatorState = watermarkEstimator.getState();
-          restriction = tracker.trySplit(0).getResidual();
+          SplitResult<RestrictionT> split = tracker.trySplit(0);
+          if (split == null) {
+            break;
+          }
+          restriction = split.getResidual();
           Uninterruptibles.sleepUninterruptibly(
               continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
         } else {

Reply via email to