lukecwik commented on a change in pull request #12419: URL: https://github.com/apache/beam/pull/12419#discussion_r477022288
########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -1237,8 +1639,10 @@ public Object restriction() { .setTransformId(pTransformId) .setInputId(mainInputId) .setElement(bytesOut.toByteString()); - // We don't want to change the output watermarks or set the checkpoint resume time since - // that applies to the current window. + if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) { Review comment: putAllOutputWatermarks should do nothing if the input map is empty so the `if` check is extraneous. ########## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java ########## @@ -133,3333 +138,4337 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.hamcrest.collection.IsMapContaining; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Tests for {@link FnApiDoFnRunner}. */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) Review comment: This is very hard to review, could we separate out the creation of the single enclosed class containing the existing tests as a separate commit. ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -247,8 +247,7 @@ /** Only valid during {@code processElement...} methods, null otherwise. */ private WindowedValue<InputT> currentElement; - /** Only valid during {@link #processElementForSizedElementAndRestriction}. */ - private ListIterator<BoundedWindow> currentWindowIterator; + private List<BoundedWindow> currentWindows; Review comment: Please add a comment stating the lifetime of currentWindows ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java ########## @@ -35,6 +36,12 @@ /** Returns the current progress of the active element as a fraction between 0.0 and 1.0. */ double getProgress(); + String getPtranformId(); Review comment: I was under the impression that we would be able to pass this information forward locally through the method without needing to expose it within HandleSplits. ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -1237,8 +1639,10 @@ public Object restriction() { .setTransformId(pTransformId) .setInputId(mainInputId) .setElement(bytesOut.toByteString()); - // We don't want to change the output watermarks or set the checkpoint resume time since - // that applies to the current window. + if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) { Review comment: We should leave the comment since it still makes sense. We could update it to just state that we are using the initial watermark for the output watermarks. ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -1080,136 +1121,484 @@ private static Progress scaleProgress( return Progress.from(completed, remaining); } + private WindowedSplitResult calculateRestrictionSize( + WindowedSplitResult splitResult, String errorContext) { + double fullSize = + splitResult.getResidualInUnprocessedWindowsRoot() == null + && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null + ? 0 + : doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) { + @Override + public Object restriction() { + return currentRestriction; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + double primarySize = + splitResult.getPrimarySplitRoot() == null + ? 0 + : doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) { + @Override + public Object restriction() { + return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue()) + .getValue() + .getKey(); + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + double residualSize = + splitResult.getResidualSplitRoot() == null + ? 0 + : doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) { + @Override + public Object restriction() { + return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue()) + .getValue() + .getKey(); + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + return WindowedSplitResult.forRoots( + splitResult.getPrimaryInFullyProcessedWindowsRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize), + splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(), + splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(), + splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()), + splitResult.getPrimarySplitRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize), + splitResult.getPrimarySplitRoot().getTimestamp(), + splitResult.getPrimarySplitRoot().getWindows(), + splitResult.getPrimarySplitRoot().getPane()), + splitResult.getResidualSplitRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize), + splitResult.getResidualSplitRoot().getTimestamp(), + splitResult.getResidualSplitRoot().getWindows(), + splitResult.getResidualSplitRoot().getPane()), + splitResult.getResidualInUnprocessedWindowsRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize), + splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(), + splitResult.getResidualInUnprocessedWindowsRoot().getWindows(), + splitResult.getResidualInUnprocessedWindowsRoot().getPane())); + } + + @VisibleForTesting + static <WatermarkEstimatorStateT> + KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate( Review comment: We should call this something else since trySplitForTruncate and trySplitForProcess should effectively be the same. Ditto for trySplitForWindowObservingTruncate and trySplitForWindowObservingProcess. ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -1264,17 +1668,21 @@ public Object restriction() { .setTransformId(pTransformId) .setInputId(mainInputId) .setElement(residualBytes.toByteString()); - + Map<String, org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp> + outputWatermarkMap = new HashMap<>(); if (!watermarkAndState.getKey().equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) { + org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark = + org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder() + .setSeconds(watermarkAndState.getKey().getMillis() / 1000) + .setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) * 1000000) + .build(); for (String outputId : pTransform.getOutputsMap().keySet()) { - residualApplication.putOutputWatermarks( - outputId, - org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder() - .setSeconds(watermarkAndState.getKey().getMillis() / 1000) - .setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) * 1000000) - .build()); + outputWatermarkMap.put(outputId, outputWatermark); } } + if (!outputWatermarkMap.isEmpty()) { + residualApplication.putAllOutputWatermarks(outputWatermarkMap); Review comment: Ditto, population the output map with an empty input map should be a no-op making the `if` check extraneous. ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -1080,136 +1121,484 @@ private static Progress scaleProgress( return Progress.from(completed, remaining); } + private WindowedSplitResult calculateRestrictionSize( + WindowedSplitResult splitResult, String errorContext) { + double fullSize = + splitResult.getResidualInUnprocessedWindowsRoot() == null + && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null + ? 0 + : doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) { + @Override + public Object restriction() { + return currentRestriction; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + double primarySize = + splitResult.getPrimarySplitRoot() == null + ? 0 + : doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) { + @Override + public Object restriction() { + return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue()) + .getValue() + .getKey(); + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + double residualSize = + splitResult.getResidualSplitRoot() == null + ? 0 + : doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) { + @Override + public Object restriction() { + return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue()) + .getValue() + .getKey(); + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + return WindowedSplitResult.forRoots( + splitResult.getPrimaryInFullyProcessedWindowsRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize), + splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(), + splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(), + splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()), + splitResult.getPrimarySplitRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize), + splitResult.getPrimarySplitRoot().getTimestamp(), + splitResult.getPrimarySplitRoot().getWindows(), + splitResult.getPrimarySplitRoot().getPane()), + splitResult.getResidualSplitRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize), + splitResult.getResidualSplitRoot().getTimestamp(), + splitResult.getResidualSplitRoot().getWindows(), + splitResult.getResidualSplitRoot().getPane()), + splitResult.getResidualInUnprocessedWindowsRoot() == null + ? null + : WindowedValue.of( + KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize), + splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(), + splitResult.getResidualInUnprocessedWindowsRoot().getWindows(), + splitResult.getResidualInUnprocessedWindowsRoot().getPane())); + } + + @VisibleForTesting + static <WatermarkEstimatorStateT> + KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate( + WindowedValue currentElement, + Object currentRestriction, + BoundedWindow currentWindow, + List<BoundedWindow> windows, + WatermarkEstimatorStateT currentWatermarkEstimatorState, + double fractionOfRemainder, + HandlesSplits splitDelegate, + int currentWindowIndex, + int stopWindowIndex) { + WindowedSplitResult windowedSplitResult = null; + HandlesSplits.SplitResult downstreamSplitResult = null; + int newWindowStopIndex = stopWindowIndex; + // If we are not on the last window, try to compute the split which is on the current window or + // on a future window. + if (currentWindowIndex != stopWindowIndex - 1) { + // Compute the fraction of the remainder relative to the scaled progress. + double elementCompleted = splitDelegate.getProgress(); + Progress elementProgress = Progress.from(elementCompleted, 1 - elementCompleted); + Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex); + double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder; + // The fraction is out of the current window and hence we will split at the closest window + // boundary. + if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) { + newWindowStopIndex = + (int) + Math.min( + stopWindowIndex - 1, + currentWindowIndex + + Math.max( + 1, + Math.round( + (elementProgress.getWorkCompleted() + scaledFractionOfRemainder) + / (elementProgress.getWorkCompleted() + + elementProgress.getWorkRemaining())))); + windowedSplitResult = + computeWindowSplitResult( + currentElement, + currentRestriction, + currentWindow, + windows, + currentWatermarkEstimatorState, + newWindowStopIndex, + newWindowStopIndex, + stopWindowIndex, + null, + null); + + } else { + // Compute the downstream element split with the scaled fraction. + downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder); + newWindowStopIndex = currentWindowIndex + 1; + windowedSplitResult = + computeWindowSplitResult( + currentElement, + currentRestriction, + currentWindow, + windows, + currentWatermarkEstimatorState, + currentWindowIndex, + newWindowStopIndex, + stopWindowIndex, + null, + null); + } + } else { + // We are on the last window then compute the downstream element split with given fraction. + newWindowStopIndex = stopWindowIndex; + downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder); + // We cannot produce any split if the downstream is not splittable. + if (downstreamSplitResult == null) { + return null; + } + windowedSplitResult = + computeWindowSplitResult( + currentElement, + currentRestriction, + currentWindow, + windows, + currentWatermarkEstimatorState, + currentWindowIndex, + stopWindowIndex, + stopWindowIndex, + null, + null); + } + return KV.of(KV.of(windowedSplitResult, downstreamSplitResult), newWindowStopIndex); + } + + private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction( + double fractionOfRemainder, HandlesSplits splitDelegate) { + // Note that the assumption here is the fullInputCoder of the Truncate transform should be the + // the same as the SDF/Process transform. + Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder); + WindowedSplitResult windowedSplitResult = null; + HandlesSplits.SplitResult downstreamSplitResult = null; + synchronized (splitLock) { + // There is nothing to split if we are between truncate processing calls. + if (currentWindow == null) { + return null; + } + + KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> result = + trySplitForTruncate( + currentElement, + currentRestriction, + currentWindow, + currentWindows, + currentWatermarkEstimatorState, + fractionOfRemainder, + splitDelegate, + windowCurrentIndex, + windowStopIndex); + if (result == null) { + return null; + } + windowStopIndex = result.getValue(); + windowedSplitResult = + calculateRestrictionSize( + result.getKey().getKey(), + PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN + "/GetSize"); + downstreamSplitResult = result.getKey().getValue(); + } + + List<BundleApplication> primaryRoots = new ArrayList<>(); + List<DelayedBundleApplication> residualRoots = new ArrayList<>(); + + if (windowedSplitResult != null + && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) { + ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput(); + try { + fullInputCoder.encode( + windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(), + primaryInOtherWindowsBytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + BundleApplication.Builder primaryApplicationInOtherWindows = + BundleApplication.newBuilder() + .setTransformId(splitDelegate.getPtranformId()) + .setInputId(splitDelegate.getMainInputId()) + .setElement(primaryInOtherWindowsBytes.toByteString()); + primaryRoots.add(primaryApplicationInOtherWindows.build()); + } + if (windowedSplitResult != null + && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) { + ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput(); + try { + fullInputCoder.encode( + windowedSplitResult.getResidualInUnprocessedWindowsRoot(), + residualInUnprocessedWindowsBytesOut); + } catch (IOException e) { + throw new RuntimeException(e); + } + Map<String, org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp> + outputWatermarkMap = new HashMap<>(); + if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) { + org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark = + org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder() + .setSeconds(initialWatermark.getMillis() / 1000) + .setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000) + .build(); + for (String outputId : splitDelegate.getOutputIds()) { + outputWatermarkMap.put(outputId, outputWatermark); + } + } + + BundleApplication.Builder residualApplicationInUnprocessedWindows = + BundleApplication.newBuilder() + .setTransformId(splitDelegate.getPtranformId()) + .setInputId(splitDelegate.getMainInputId()) + .putAllOutputWatermarks(outputWatermarkMap) + .setElement(residualInUnprocessedWindowsBytesOut.toByteString()); + + residualRoots.add( + DelayedBundleApplication.newBuilder() + .setApplication(residualApplicationInUnprocessedWindows) + .build()); + } + + if (downstreamSplitResult != null) { + primaryRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getPrimaryRoots())); + residualRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getResidualRoots())); + } + + return HandlesSplits.SplitResult.of(primaryRoots, residualRoots); + } + + private static <WatermarkEstimatorStateT> WindowedSplitResult computeWindowSplitResult( + WindowedValue currentElement, + Object currentRestriction, + BoundedWindow currentWindow, + List<BoundedWindow> windows, + WatermarkEstimatorStateT currentWatermarkEstimatorState, + int toIndex, + int fromIndex, + int stopWindowIndex, + SplitResult<?> splitResult, + KV<Instant, WatermarkEstimatorStateT> watermarkAndState) { + List<BoundedWindow> primaryFullyProcessedWindows = windows.subList(0, toIndex); + List<BoundedWindow> residualUnprocessedWindows = windows.subList(fromIndex, stopWindowIndex); + WindowedSplitResult windowedSplitResult; + + windowedSplitResult = + WindowedSplitResult.forRoots( + primaryFullyProcessedWindows.isEmpty() + ? null + : WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(currentRestriction, currentWatermarkEstimatorState)), + currentElement.getTimestamp(), + primaryFullyProcessedWindows, + currentElement.getPane()), + splitResult == null + ? null + : WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane()), + splitResult == null + ? null + : WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(splitResult.getResidual(), watermarkAndState.getValue())), + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane()), + residualUnprocessedWindows.isEmpty() + ? null + : WindowedValue.of( + KV.of( + currentElement.getValue(), + KV.of(currentRestriction, currentWatermarkEstimatorState)), + currentElement.getTimestamp(), + residualUnprocessedWindows, + currentElement.getPane())); + return windowedSplitResult; + } + + @VisibleForTesting + static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> trySplitForProcess( + WindowedValue currentElement, + Object currentRestriction, + BoundedWindow currentWindow, + List<BoundedWindow> windows, + WatermarkEstimatorStateT currentWatermarkEstimatorState, + double fractionOfRemainder, + RestrictionTracker currentTracker, + KV<Instant, WatermarkEstimatorStateT> watermarkAndState, + int currentWindowIndex, + int stopWindowIndex) { + WindowedSplitResult windowedSplitResult = null; + int newWindowStopIndex = stopWindowIndex; + // If we are not on the last window, try to compute the split which is on the current window or + // on a future window. + if (currentWindowIndex != stopWindowIndex - 1) { + // Compute the fraction of the remainder relative to the scaled progress. + Progress elementProgress; + if (currentTracker instanceof HasProgress) { + elementProgress = ((HasProgress) currentTracker).getProgress(); + } else { + elementProgress = Progress.from(0, 1); + } + Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex); + double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder; + + // The fraction is out of the current window and hence we will split at the closest window + // boundary. + if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) { + newWindowStopIndex = + (int) + Math.min( + stopWindowIndex - 1, + currentWindowIndex + + Math.max( + 1, + Math.round( + (elementProgress.getWorkCompleted() + scaledFractionOfRemainder) + / (elementProgress.getWorkCompleted() + + elementProgress.getWorkRemaining())))); + windowedSplitResult = + computeWindowSplitResult( + currentElement, + currentRestriction, + currentWindow, + windows, + currentWatermarkEstimatorState, + newWindowStopIndex, + newWindowStopIndex, + stopWindowIndex, + null, + watermarkAndState); + } else { + // Compute the element split with the scaled fraction. + SplitResult<?> elementSplit = + currentTracker.trySplit(scaledFractionOfRemainder / elementProgress.getWorkRemaining()); + newWindowStopIndex = currentWindowIndex + 1; + if (elementSplit != null) { + windowedSplitResult = + computeWindowSplitResult( + currentElement, + currentRestriction, + currentWindow, + windows, + currentWatermarkEstimatorState, + currentWindowIndex, + newWindowStopIndex, + stopWindowIndex, + elementSplit, + watermarkAndState); + } else { + windowedSplitResult = + computeWindowSplitResult( + currentElement, + currentRestriction, + currentWindow, + windows, + currentWatermarkEstimatorState, + newWindowStopIndex, + newWindowStopIndex, + stopWindowIndex, + null, + watermarkAndState); + } + } + } else { + // We are on the last window then compute the element split with given fraction. + newWindowStopIndex = stopWindowIndex; + SplitResult<?> splitResult = currentTracker.trySplit(fractionOfRemainder); + if (splitResult == null) { + return null; + } + windowedSplitResult = + computeWindowSplitResult( + currentElement, + currentRestriction, + currentWindow, + windows, + currentWatermarkEstimatorState, + currentWindowIndex, + stopWindowIndex, + stopWindowIndex, + splitResult, + watermarkAndState); + } + return KV.of(windowedSplitResult, newWindowStopIndex); + } + private HandlesSplits.SplitResult trySplitForElementAndRestriction( double fractionOfRemainder, Duration resumeDelay) { KV<Instant, WatermarkEstimatorStateT> watermarkAndState; - WindowedSplitResult windowedSplitResult; + WindowedSplitResult windowedSplitResult = null; synchronized (splitLock) { // There is nothing to split if we are between element and restriction processing calls. if (currentTracker == null) { return null; } - // Make sure to get the output watermark before we split to ensure that the lower bound // applies to the residual. watermarkAndState = currentWatermarkEstimator.getWatermarkAndState(); - SplitResult<RestrictionT> splitResult = currentTracker.trySplit(fractionOfRemainder); + KV<WindowedSplitResult, Integer> splitResult = Review comment: You might want to make use of an `@AutoValue` for the return type to make it clear the updated stop index is being returned. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org