lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477022288



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1237,8 +1639,10 @@ public Object restriction() {
               .setTransformId(pTransformId)
               .setInputId(mainInputId)
               .setElement(bytesOut.toByteString());
-      // We don't want to change the output watermarks or set the checkpoint 
resume time since
-      // that applies to the current window.
+      if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) {

Review comment:
       putAllOutputWatermarks should do nothing if the input map is empty so 
the `if` check is extraneous.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -133,3333 +138,4337 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.collection.IsMapContaining;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)

Review comment:
       This is very hard to review, could we separate out the creation of the 
single enclosed class containing the existing tests as a separate commit.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -247,8 +247,7 @@
   /** Only valid during {@code processElement...} methods, null otherwise. */
   private WindowedValue<InputT> currentElement;
 
-  /** Only valid during {@link #processElementForSizedElementAndRestriction}. 
*/
-  private ListIterator<BoundedWindow> currentWindowIterator;
+  private List<BoundedWindow> currentWindows;

Review comment:
       Please add a comment stating the lifetime of currentWindows

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java
##########
@@ -35,6 +36,12 @@
   /** Returns the current progress of the active element as a fraction between 
0.0 and 1.0. */
   double getProgress();
 
+  String getPtranformId();

Review comment:
       I was under the impression that we would be able to pass this 
information forward locally through the method without needing to expose it 
within HandleSplits.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1237,8 +1639,10 @@ public Object restriction() {
               .setTransformId(pTransformId)
               .setInputId(mainInputId)
               .setElement(bytesOut.toByteString());
-      // We don't want to change the output watermarks or set the checkpoint 
resume time since
-      // that applies to the current window.
+      if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) {

Review comment:
       We should leave the comment since it still makes sense. We could update 
it to just state that we are using the initial watermark for the output 
watermarks.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) 
splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) 
splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), 
primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), 
residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> 
trySplitForTruncate(

Review comment:
       We should call this something else since trySplitForTruncate and 
trySplitForProcess should effectively be the same.
   
   Ditto for trySplitForWindowObservingTruncate and 
trySplitForWindowObservingProcess.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1264,17 +1668,21 @@ public Object restriction() {
             .setTransformId(pTransformId)
             .setInputId(mainInputId)
             .setElement(residualBytes.toByteString());
-
+    Map<String, 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp>
+        outputWatermarkMap = new HashMap<>();
     if (!watermarkAndState.getKey().equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
+      org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp 
outputWatermark =
+          
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
+              .setSeconds(watermarkAndState.getKey().getMillis() / 1000)
+              .setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) 
* 1000000)
+              .build();
       for (String outputId : pTransform.getOutputsMap().keySet()) {
-        residualApplication.putOutputWatermarks(
-            outputId,
-            
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
-                .setSeconds(watermarkAndState.getKey().getMillis() / 1000)
-                .setNanos((int) (watermarkAndState.getKey().getMillis() % 
1000) * 1000000)
-                .build());
+        outputWatermarkMap.put(outputId, outputWatermark);
       }
     }
+    if (!outputWatermarkMap.isEmpty()) {
+      residualApplication.putAllOutputWatermarks(outputWatermarkMap);

Review comment:
       Ditto, population the output map with an empty input map should be a 
no-op making the `if` check extraneous.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) 
splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, 
OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) 
splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), 
primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), 
residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> 
trySplitForTruncate(
+          WindowedValue currentElement,
+          Object currentRestriction,
+          BoundedWindow currentWindow,
+          List<BoundedWindow> windows,
+          WatermarkEstimatorStateT currentWatermarkEstimatorState,
+          double fractionOfRemainder,
+          HandlesSplits splitDelegate,
+          int currentWindowIndex,
+          int stopWindowIndex) {
+    WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
+    int newWindowStopIndex = stopWindowIndex;
+    // If we are not on the last window, try to compute the split which is on 
the current window or
+    // on a future window.
+    if (currentWindowIndex != stopWindowIndex - 1) {
+      // Compute the fraction of the remainder relative to the scaled progress.
+      double elementCompleted = splitDelegate.getProgress();
+      Progress elementProgress = Progress.from(elementCompleted, 1 - 
elementCompleted);
+      Progress scaledProgress = scaleProgress(elementProgress, 
currentWindowIndex, stopWindowIndex);
+      double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * 
fractionOfRemainder;
+      // The fraction is out of the current window and hence we will split at 
the closest window
+      // boundary.
+      if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
+        newWindowStopIndex =
+            (int)
+                Math.min(
+                    stopWindowIndex - 1,
+                    currentWindowIndex
+                        + Math.max(
+                            1,
+                            Math.round(
+                                (elementProgress.getWorkCompleted() + 
scaledFractionOfRemainder)
+                                    / (elementProgress.getWorkCompleted()
+                                        + 
elementProgress.getWorkRemaining()))));
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                newWindowStopIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                null);
+
+      } else {
+        // Compute the downstream element split with the scaled fraction.
+        downstreamSplitResult = 
splitDelegate.trySplit(scaledFractionOfRemainder);
+        newWindowStopIndex = currentWindowIndex + 1;
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                currentWindowIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                null);
+      }
+    } else {
+      // We are on the last window then compute the downstream element split 
with given fraction.
+      newWindowStopIndex = stopWindowIndex;
+      downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
+      // We cannot produce any split if the downstream is not splittable.
+      if (downstreamSplitResult == null) {
+        return null;
+      }
+      windowedSplitResult =
+          computeWindowSplitResult(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              windows,
+              currentWatermarkEstimatorState,
+              currentWindowIndex,
+              stopWindowIndex,
+              stopWindowIndex,
+              null,
+              null);
+    }
+    return KV.of(KV.of(windowedSplitResult, downstreamSplitResult), 
newWindowStopIndex);
+  }
+
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindow == null) {
+        return null;
+      }
+
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> result =
+          trySplitForTruncate(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              currentWindows,
+              currentWatermarkEstimatorState,
+              fractionOfRemainder,
+              splitDelegate,
+              windowCurrentIndex,
+              windowStopIndex);
+      if (result == null) {
+        return null;
+      }
+      windowStopIndex = result.getValue();
+      windowedSplitResult =
+          calculateRestrictionSize(
+              result.getKey().getKey(),
+              PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN 
+ "/GetSize");
+      downstreamSplitResult = result.getKey().getValue();
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (windowedSplitResult != null
+        && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != 
null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
+            primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(splitDelegate.getPtranformId())
+              .setInputId(splitDelegate.getMainInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (windowedSplitResult != null
+        && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = 
ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            windowedSplitResult.getResidualInUnprocessedWindowsRoot(),
+            residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Map<String, 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp>
+          outputWatermarkMap = new HashMap<>();
+      if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
+        org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp 
outputWatermark =
+            
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
+                .setSeconds(initialWatermark.getMillis() / 1000)
+                .setNanos((int) (initialWatermark.getMillis() % 1000) * 
1000000)
+                .build();
+        for (String outputId : splitDelegate.getOutputIds()) {
+          outputWatermarkMap.put(outputId, outputWatermark);
+        }
+      }
+
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(splitDelegate.getPtranformId())
+              .setInputId(splitDelegate.getMainInputId())
+              .putAllOutputWatermarks(outputWatermarkMap)
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+
+      residualRoots.add(
+          DelayedBundleApplication.newBuilder()
+              .setApplication(residualApplicationInUnprocessedWindows)
+              .build());
+    }
+
+    if (downstreamSplitResult != null) {
+      
primaryRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getPrimaryRoots()));
+      
residualRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getResidualRoots()));
+    }
+
+    return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
+  }
+
+  private static <WatermarkEstimatorStateT> WindowedSplitResult 
computeWindowSplitResult(
+      WindowedValue currentElement,
+      Object currentRestriction,
+      BoundedWindow currentWindow,
+      List<BoundedWindow> windows,
+      WatermarkEstimatorStateT currentWatermarkEstimatorState,
+      int toIndex,
+      int fromIndex,
+      int stopWindowIndex,
+      SplitResult<?> splitResult,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState) {
+    List<BoundedWindow> primaryFullyProcessedWindows = windows.subList(0, 
toIndex);
+    List<BoundedWindow> residualUnprocessedWindows = 
windows.subList(fromIndex, stopWindowIndex);
+    WindowedSplitResult windowedSplitResult;
+
+    windowedSplitResult =
+        WindowedSplitResult.forRoots(
+            primaryFullyProcessedWindows.isEmpty()
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    primaryFullyProcessedWindows,
+                    currentElement.getPane()),
+            splitResult == null
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(splitResult.getPrimary(), 
currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    currentWindow,
+                    currentElement.getPane()),
+            splitResult == null
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(splitResult.getResidual(), 
watermarkAndState.getValue())),
+                    currentElement.getTimestamp(),
+                    currentWindow,
+                    currentElement.getPane()),
+            residualUnprocessedWindows.isEmpty()
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    residualUnprocessedWindows,
+                    currentElement.getPane()));
+    return windowedSplitResult;
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> 
trySplitForProcess(
+      WindowedValue currentElement,
+      Object currentRestriction,
+      BoundedWindow currentWindow,
+      List<BoundedWindow> windows,
+      WatermarkEstimatorStateT currentWatermarkEstimatorState,
+      double fractionOfRemainder,
+      RestrictionTracker currentTracker,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
+      int currentWindowIndex,
+      int stopWindowIndex) {
+    WindowedSplitResult windowedSplitResult = null;
+    int newWindowStopIndex = stopWindowIndex;
+    // If we are not on the last window, try to compute the split which is on 
the current window or
+    // on a future window.
+    if (currentWindowIndex != stopWindowIndex - 1) {
+      // Compute the fraction of the remainder relative to the scaled progress.
+      Progress elementProgress;
+      if (currentTracker instanceof HasProgress) {
+        elementProgress = ((HasProgress) currentTracker).getProgress();
+      } else {
+        elementProgress = Progress.from(0, 1);
+      }
+      Progress scaledProgress = scaleProgress(elementProgress, 
currentWindowIndex, stopWindowIndex);
+      double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * 
fractionOfRemainder;
+
+      // The fraction is out of the current window and hence we will split at 
the closest window
+      // boundary.
+      if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
+        newWindowStopIndex =
+            (int)
+                Math.min(
+                    stopWindowIndex - 1,
+                    currentWindowIndex
+                        + Math.max(
+                            1,
+                            Math.round(
+                                (elementProgress.getWorkCompleted() + 
scaledFractionOfRemainder)
+                                    / (elementProgress.getWorkCompleted()
+                                        + 
elementProgress.getWorkRemaining()))));
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                newWindowStopIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                watermarkAndState);
+      } else {
+        // Compute the element split with the scaled fraction.
+        SplitResult<?> elementSplit =
+            currentTracker.trySplit(scaledFractionOfRemainder / 
elementProgress.getWorkRemaining());
+        newWindowStopIndex = currentWindowIndex + 1;
+        if (elementSplit != null) {
+          windowedSplitResult =
+              computeWindowSplitResult(
+                  currentElement,
+                  currentRestriction,
+                  currentWindow,
+                  windows,
+                  currentWatermarkEstimatorState,
+                  currentWindowIndex,
+                  newWindowStopIndex,
+                  stopWindowIndex,
+                  elementSplit,
+                  watermarkAndState);
+        } else {
+          windowedSplitResult =
+              computeWindowSplitResult(
+                  currentElement,
+                  currentRestriction,
+                  currentWindow,
+                  windows,
+                  currentWatermarkEstimatorState,
+                  newWindowStopIndex,
+                  newWindowStopIndex,
+                  stopWindowIndex,
+                  null,
+                  watermarkAndState);
+        }
+      }
+    } else {
+      // We are on the last window then compute the element split with given 
fraction.
+      newWindowStopIndex = stopWindowIndex;
+      SplitResult<?> splitResult = 
currentTracker.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+      windowedSplitResult =
+          computeWindowSplitResult(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              windows,
+              currentWatermarkEstimatorState,
+              currentWindowIndex,
+              stopWindowIndex,
+              stopWindowIndex,
+              splitResult,
+              watermarkAndState);
+    }
+    return KV.of(windowedSplitResult, newWindowStopIndex);
+  }
+
   private HandlesSplits.SplitResult trySplitForElementAndRestriction(
       double fractionOfRemainder, Duration resumeDelay) {
     KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
-    WindowedSplitResult windowedSplitResult;
+    WindowedSplitResult windowedSplitResult = null;
     synchronized (splitLock) {
       // There is nothing to split if we are between element and restriction 
processing calls.
       if (currentTracker == null) {
         return null;
       }
-
       // Make sure to get the output watermark before we split to ensure that 
the lower bound
       // applies to the residual.
       watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
-      SplitResult<RestrictionT> splitResult = 
currentTracker.trySplit(fractionOfRemainder);
+      KV<WindowedSplitResult, Integer> splitResult =

Review comment:
       You might want to make use of an `@AutoValue` for the return type to 
make it clear the updated stop index is being returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to