This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new af71782 Fix SplittableParDoNaiveBounded DoFnInvoker. new c00dc4a Merge pull request #11475 from boyuanzz/fix af71782 is described below commit af717820535d3990aec15575b012e1df296fb3cc Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Mon Apr 20 20:52:24 2020 -0700 Fix SplittableParDoNaiveBounded DoFnInvoker. --- .../runners/core/construction/SplittableParDoNaiveBounded.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java index b002169..31ce3b6 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -300,7 +301,11 @@ public class SplittableParDoNaiveBounded { // Fetch the watermark before splitting to ensure that the watermark applies to both // the primary and the residual. watermarkEstimatorState = watermarkEstimator.getState(); - restriction = tracker.trySplit(0).getResidual(); + SplitResult<RestrictionT> split = tracker.trySplit(0); + if (split == null) { + break; + } + restriction = split.getResidual(); Uninterruptibles.sleepUninterruptibly( continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS); } else {