Repository: beam
Updated Branches:
  refs/heads/master 168d6a18f -> a27d3cfa2


Gives the runner access to RestrictionTracker

Changes the SplittableParDo transform so that ProcessFn uses a
runner-supplied hook to run the @ProcessElement method, giving
it, among other things, the RestrictionTracker, so the runner
can initiate checkpointing/splitting with it at will.

Introduces a default implementation of said hook, which limits
the number of outputs and duration of the call. This implementation
is used in tests and in Direct runner. Dataflow Streaming runner
will also use this implementation, while Dataflow Batch runner
will be more sophisticated.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7dc9e86f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7dc9e86f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7dc9e86f

Branch: refs/heads/master
Commit: 7dc9e86fa76e5117bfc6825e120ed74ba3d2f910
Parents: 38208ea
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Nov 18 11:21:19 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Feb 1 19:43:29 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnAdapters.java  |   4 +-
 ...eBoundedSplittableProcessElementInvoker.java | 285 +++++++++++++++++++
 .../beam/runners/core/SimpleDoFnRunner.java     |   6 +-
 .../beam/runners/core/SplittableParDo.java      | 190 ++-----------
 .../core/SplittableProcessElementInvoker.java   |  65 +++++
 ...ndedSplittableProcessElementInvokerTest.java | 146 ++++++++++
 .../beam/runners/core/SplittableParDoTest.java  | 147 +++++++---
 ...littableProcessElementsEvaluatorFactory.java |  66 +++--
 .../apache/beam/sdk/transforms/DoFnTester.java  |  15 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     |   4 +-
 .../transforms/splittabledofn/OffsetRange.java  |  71 +++++
 .../splittabledofn/OffsetRangeTracker.java      |  75 +++++
 .../splittabledofn/RestrictionTracker.java      |   2 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java |  68 +----
 .../transforms/reflect/DoFnInvokersTest.java    |   2 +-
 .../splittabledofn/OffsetRangeTrackerTest.java  | 111 ++++++++
 16 files changed, 956 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index dcd7969..693cb2f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -204,7 +204,7 @@ public class DoFnAdapters {
     }
 
     @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+    public RestrictionTracker<?> restrictionTracker() {
       throw new UnsupportedOperationException("This is a non-splittable DoFn");
     }
 
@@ -306,7 +306,7 @@ public class DoFnAdapters {
     }
 
     @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+    public RestrictionTracker<?> restrictionTracker() {
       throw new UnsupportedOperationException("This is a non-splittable DoFn");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
new file mode 100644
index 0000000..5aa7605
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link SplittableProcessElementInvoker} that requests a checkpoint after 
the {@link
+ * DoFn.ProcessElement} call either outputs at least a given number of 
elements (in total over all
+ * outputs), or runs for the given duration.
+ */
+public class OutputAndTimeBoundedSplittableProcessElementInvoker<
+        InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
+    extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT> {
+  private final DoFn<InputT, OutputT> fn;
+  private final PipelineOptions pipelineOptions;
+  private final OutputWindowedValue<OutputT> output;
+  private final SideInputReader sideInputReader;
+  private final ScheduledExecutorService executor;
+  private final int maxNumOutputs;
+  private final Duration maxDuration;
+
+  /**
+   * Creates a new invoker from components.
+   *
+   * @param fn The original {@link DoFn}.
+   * @param pipelineOptions {@link PipelineOptions} to include in the {@link 
DoFn.ProcessContext}.
+   * @param output Hook for outputting from the {@link DoFn.ProcessElement} 
method.
+   * @param sideInputReader Hook for accessing side inputs.
+   * @param executor Executor on which a checkpoint will be scheduled after 
the given duration.
+   * @param maxNumOutputs Maximum number of outputs, in total over all output 
tags, after which a
+   *     checkpoint will be requested. This is a best-effort request - the 
{@link DoFn} may output
+   *     more after receiving the request.
+   * @param maxDuration Maximum duration of the {@link DoFn.ProcessElement} 
call after which a
+   *     checkpoint will be requested. This is a best-effort request - the 
{@link DoFn} may run for
+   *     longer after receiving the request.
+   */
+  public OutputAndTimeBoundedSplittableProcessElementInvoker(
+      DoFn<InputT, OutputT> fn,
+      PipelineOptions pipelineOptions,
+      OutputWindowedValue<OutputT> output,
+      SideInputReader sideInputReader,
+      ScheduledExecutorService executor,
+      int maxNumOutputs,
+      Duration maxDuration) {
+    this.fn = fn;
+    this.pipelineOptions = pipelineOptions;
+    this.output = output;
+    this.sideInputReader = sideInputReader;
+    this.executor = executor;
+    this.maxNumOutputs = maxNumOutputs;
+    this.maxDuration = maxDuration;
+  }
+
+  @Override
+  public Result invokeProcessElement(
+      DoFnInvoker<InputT, OutputT> invoker,
+      final WindowedValue<InputT> element,
+      final TrackerT tracker) {
+    final ProcessContext processContext = new ProcessContext(element, tracker);
+    DoFn.ProcessContinuation cont =
+        invoker.invokeProcessElement(
+            new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+              @Override
+              public DoFn<InputT, OutputT>.ProcessContext processContext(
+                  DoFn<InputT, OutputT> doFn) {
+                return processContext;
+              }
+
+              @Override
+              public RestrictionTracker<?> restrictionTracker() {
+                return tracker;
+              }
+
+              // Unsupported methods below.
+
+              @Override
+              public BoundedWindow window() {
+                throw new UnsupportedOperationException(
+                    "Access to window of the element not supported in 
Splittable DoFn");
+              }
+
+              @Override
+              public DoFn<InputT, OutputT>.Context context(DoFn<InputT, 
OutputT> doFn) {
+                throw new IllegalStateException(
+                    "Should not access context() from @"
+                        + DoFn.ProcessElement.class.getSimpleName());
+              }
+
+              @Override
+              public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(
+                  DoFn<InputT, OutputT> doFn) {
+                throw new UnsupportedOperationException(
+                    "Access to timers not supported in Splittable DoFn");
+              }
+
+              @Override
+              public State state(String stateId) {
+                throw new UnsupportedOperationException(
+                    "Access to state not supported in Splittable DoFn");
+              }
+
+              @Override
+              public Timer timer(String timerId) {
+                throw new UnsupportedOperationException(
+                    "Access to timers not supported in Splittable DoFn");
+              }
+            });
+    RestrictionT residual;
+    RestrictionT forcedCheckpoint = processContext.extractCheckpoint();
+    if (cont.shouldResume()) {
+      if (forcedCheckpoint == null) {
+        // If no checkpoint was forced, the call returned voluntarily (i.e. 
all tryClaim() calls
+        // succeeded) - but we still need to have a checkpoint to resume from.
+        residual = tracker.checkpoint();
+      } else {
+        // A checkpoint was forced - i.e. the call probably (but not 
guaranteed) returned because of
+        // a failed tryClaim() call.
+        residual = forcedCheckpoint;
+      }
+    } else {
+      residual = null;
+    }
+    return new Result(residual, cont);
+  }
+
+  private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
+    private final WindowedValue<InputT> element;
+    private final TrackerT tracker;
+
+    private int numOutputs;
+    // Checkpoint may be initiated either when the given number of outputs is 
reached,
+    // or when the call runs for the given duration. It must be initiated at 
most once,
+    // even if these events happen almost at the same time.
+    // This is either the result of the sole tracker.checkpoint() call, or 
null if
+    // the call completed before reaching the given number of outputs or 
duration.
+    private RestrictionT checkpoint;
+    // A handle on the scheduled action to take a checkpoint.
+    private Future<?> scheduledCheckpoint;
+
+    public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
+      fn.super();
+      this.element = element;
+      this.tracker = tracker;
+
+      this.scheduledCheckpoint =
+          executor.schedule(
+              new Runnable() {
+                @Override
+                public void run() {
+                  initiateCheckpoint();
+                }
+              },
+              maxDuration.getMillis(),
+              TimeUnit.MILLISECONDS);
+    }
+
+    @Nullable
+    RestrictionT extractCheckpoint() {
+      scheduledCheckpoint.cancel(true);
+      try {
+        Futures.getUnchecked(scheduledCheckpoint);
+      } catch (CancellationException e) {
+        // This is expected if the call took less than the maximum duration.
+      }
+      // By now, a checkpoint may or may not have been taken;
+      // via .output() or via scheduledCheckpoint.
+      synchronized (this) {
+        return checkpoint;
+      }
+    }
+
+    private synchronized void initiateCheckpoint() {
+      // This method may be entered either via .output(), or via 
scheduledCheckpoint.
+      // Only one of them "wins" - tracker.checkpoint() must be called only 
once.
+      if (checkpoint == null) {
+        checkpoint = checkNotNull(tracker.checkpoint());
+      }
+    }
+
+    @Override
+    public InputT element() {
+      return element.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return sideInputReader.get(
+          view,
+          view.getWindowingStrategyInternal()
+              .getWindowFn()
+              
.getSideInputWindow(Iterables.getOnlyElement(element.getWindows())));
+    }
+
+    @Override
+    public Instant timestamp() {
+      return element.getTimestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return element.getPane();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public void output(OutputT output) {
+      outputWithTimestamp(output, element.getTimestamp());
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT value, Instant timestamp) {
+      output.outputWindowedValue(value, timestamp, element.getWindows(), 
element.getPane());
+      noteOutput();
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T value) {
+      sideOutputWithTimestamp(tag, value, element.getTimestamp());
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant 
timestamp) {
+      output.sideOutputWindowedValue(
+          tag, value, timestamp, element.getWindows(), element.getPane());
+      noteOutput();
+    }
+
+    private void noteOutput() {
+      ++numOutputs;
+      if (numOutputs >= maxNumOutputs) {
+        initiateCheckpoint();
+      }
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
+        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index d54daf6..588e31d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -437,7 +437,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+    public RestrictionTracker<?> restrictionTracker() {
       throw new UnsupportedOperationException(
           "Cannot access RestrictionTracker outside of @ProcessElement 
method.");
     }
@@ -614,7 +614,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+    public RestrictionTracker<?> restrictionTracker() {
       throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
     }
 
@@ -722,7 +722,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+    public RestrictionTracker<?> restrictionTracker() {
       throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index d1cbf8f..78acb19 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -48,13 +48,10 @@ import 
org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.util.state.StateNamespace;
@@ -188,7 +185,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
    * Runner-specific primitive {@link PTransform} that invokes the {@link 
DoFn.ProcessElement}
    * method for a splittable {@link DoFn}.
    */
-  public static class ProcessElements<InputT, OutputT, RestrictionT>
+  public static class ProcessElements<
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
       extends PTransform<
           PCollection<? extends KeyedWorkItem<String, 
ElementAndRestriction<InputT, RestrictionT>>>,
           PCollectionTuple> {
@@ -240,7 +238,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       return sideOutputTags;
     }
 
-    public ProcessFn<InputT, OutputT, RestrictionT, ?> 
newProcessFn(DoFn<InputT, OutputT> fn) {
+    public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
+        DoFn<InputT, OutputT> fn) {
       return new SplittableParDo.ProcessFn<>(
           fn, elementCoder, restrictionCoder, 
windowingStrategy.getWindowFn().windowCoder());
     }
@@ -330,12 +329,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   public static class ProcessFn<
           InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
       extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT> {
-    // Commit at least once every 10k output records.  This keeps the 
watermark advancing
-    // smoothly, and ensures that not too much work will have to be 
reprocessed in the event of
-    // a crash.
-    // TODO: Also commit at least once every N seconds (runner-specific 
parameter).
-    @VisibleForTesting static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
-
     /**
      * The state cell containing a watermark hold for the output of this 
{@link DoFn}. The hold is
      * acquired during the first {@link DoFn.ProcessElement} call for each 
element and restriction,
@@ -366,13 +359,14 @@ public class SplittableParDo<InputT, OutputT, 
RestrictionT>
      */
     private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
 
-    private transient StateInternalsFactory<String> stateInternalsFactory;
-    private transient TimerInternalsFactory<String> timerInternalsFactory;
-    private transient OutputWindowedValue<OutputT> outputWindowedValue;
-
     private final DoFn<InputT, OutputT> fn;
     private final Coder<? extends BoundedWindow> windowCoder;
 
+    private transient StateInternalsFactory<String> stateInternalsFactory;
+    private transient TimerInternalsFactory<String> timerInternalsFactory;
+    private transient SplittableProcessElementInvoker<InputT, OutputT, 
RestrictionT, TrackerT>
+        processElementInvoker;
+
     private transient DoFnInvoker<InputT, OutputT> invoker;
 
     public ProcessFn(
@@ -396,8 +390,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       this.timerInternalsFactory = timerInternalsFactory;
     }
 
-    public void setOutputWindowedValue(OutputWindowedValue<OutputT> 
outputWindowedValue) {
-      this.outputWindowedValue = outputWindowedValue;
+    public void setProcessElementInvoker(
+        SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT> invoker) {
+      this.processElementInvoker = invoker;
     }
 
     @StartBundle
@@ -412,9 +407,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
     @ProcessElement
     public void processElement(final ProcessContext c) {
+      String key = c.element().key();
       StateInternals<String> stateInternals =
-          stateInternalsFactory.stateInternalsForKey(c.element().key());
-      TimerInternals timerInternals = 
timerInternalsFactory.timerInternalsForKey(c.element().key());
+          stateInternalsFactory.stateInternalsForKey(key);
+      TimerInternals timerInternals = 
timerInternalsFactory.timerInternalsForKey(key);
 
       // Initialize state (element and restriction) depending on whether this 
is the seed call.
       // The seed call is the first call for this element, which actually has 
the element.
@@ -454,34 +450,25 @@ public class SplittableParDo<InputT, OutputT, 
RestrictionT>
       }
 
       final TrackerT tracker = 
invoker.invokeNewTracker(elementAndRestriction.restriction());
-      @SuppressWarnings("unchecked")
-      final RestrictionT[] residual = (RestrictionT[]) new Object[1];
-      // TODO: Only let the call run for a limited amount of time, rather than 
simply
-      // producing a limited amount of output.
-      DoFn.ProcessContinuation cont =
-          invoker.invokeProcessElement(
-              wrapTracker(
-                  tracker, wrapContext(c, elementAndRestriction.element(), 
tracker, residual)));
-      if (residual[0] == null) {
-        // This means the call completed unsolicited, and the context produced 
by makeContext()
-        // did not take a checkpoint. Take one now.
-        residual[0] = checkNotNull(tracker.checkpoint());
-      }
+      SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT>.Result result =
+          processElementInvoker.invokeProcessElement(
+              invoker, elementAndRestriction.element(), tracker);
 
       // Save state for resuming.
-      if (!cont.shouldResume()) {
+      if (!result.getContinuation().shouldResume()) {
         // All work for this element/restriction is completed. Clear state and 
release hold.
         elementState.clear();
         restrictionState.clear();
         holdState.clear();
         return;
       }
-      restrictionState.write(residual[0]);
-      Instant futureOutputWatermark = cont.getWatermark();
+      restrictionState.write(result.getResidualRestriction());
+      Instant futureOutputWatermark = result.getContinuation().getWatermark();
       if (futureOutputWatermark == null) {
         futureOutputWatermark = elementAndRestriction.element().getTimestamp();
       }
-      Instant wakeupTime = 
timerInternals.currentProcessingTime().plus(cont.resumeDelay());
+      Instant wakeupTime =
+          
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
       holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.
       timerInternals.setTimer(
@@ -555,137 +542,6 @@ public class SplittableParDo<InputT, OutputT, 
RestrictionT>
         }
       };
     }
-
-    private DoFn<InputT, OutputT>.ProcessContext wrapContext(
-        final ProcessContext baseContext,
-        final WindowedValue<InputT> element,
-        final TrackerT tracker,
-        final RestrictionT[] residualRestrictionHolder) {
-      return fn.new ProcessContext() {
-        private int numOutputs = 0;
-
-        public InputT element() {
-          return element.getValue();
-        }
-
-        public Instant timestamp() {
-          return element.getTimestamp();
-        }
-
-        public PaneInfo pane() {
-          return element.getPane();
-        }
-
-        public void output(OutputT output) {
-          outputWindowedValue.outputWindowedValue(
-              output, element.getTimestamp(), element.getWindows(), 
element.getPane());
-          noteOutput();
-        }
-
-        public void outputWithTimestamp(OutputT output, Instant timestamp) {
-          outputWindowedValue.outputWindowedValue(
-              output, timestamp, element.getWindows(), element.getPane());
-          noteOutput();
-        }
-
-        private void noteOutput() {
-          // Take the checkpoint only if it hasn't been taken yet, because:
-          // 1) otherwise we'd lose the previous checkpoint stored in 
residualRestrictionHolder
-          // 2) it's not allowed to checkpoint a RestrictionTracker twice, 
since the first call
-          // by definition already maximally narrows its restriction, so a 
second checkpoint would
-          // have produced a useless empty residual restriction anyway.
-          if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE && 
residualRestrictionHolder[0] == null) {
-            // Request a checkpoint. The fn *may* produce more output, but 
hopefully not too much.
-            residualRestrictionHolder[0] = checkNotNull(tracker.checkpoint());
-          }
-        }
-
-        public <T> T sideInput(PCollectionView<T> view) {
-          return baseContext.sideInput(view);
-        }
-
-        public PipelineOptions getPipelineOptions() {
-          return baseContext.getPipelineOptions();
-        }
-
-        public <T> void sideOutput(TupleTag<T> tag, T output) {
-          outputWindowedValue.sideOutputWindowedValue(
-              tag, output, element.getTimestamp(), element.getWindows(), 
element.getPane());
-          noteOutput();
-        }
-
-        public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
-          outputWindowedValue.sideOutputWindowedValue(
-              tag, output, timestamp, element.getWindows(), element.getPane());
-          noteOutput();
-        }
-
-        @Override
-        protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
-            String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) 
{
-          return fn.createAggregator(name, combiner);
-        }
-      };
-    }
-
-    /**
-     * Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given 
tracker as well as
-     * the given {@link ProcessContext} (which is also provided when a {@link 
Context} is requested.
-     */
-    private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker(
-        TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) 
{
-
-      return new ArgumentProviderForTracker<>(tracker, processContext);
-    }
-
-    private static class ArgumentProviderForTracker<
-            InputT, OutputT, TrackerT extends RestrictionTracker<?>>
-        implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-      private final TrackerT tracker;
-      private final DoFn<InputT, OutputT>.ProcessContext processContext;
-
-      ArgumentProviderForTracker(
-          TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext 
processContext) {
-        this.tracker = tracker;
-        this.processContext = processContext;
-      }
-
-      @Override
-      public BoundedWindow window() {
-        // DoFnSignatures should have verified that this DoFn doesn't access 
extra context.
-        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
-      }
-
-      @Override
-      public DoFn.Context context(DoFn<InputT, OutputT> doFn) {
-        return processContext;
-      }
-
-      @Override
-      public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-        return processContext;
-      }
-
-      @Override
-      public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
-      }
-
-      @Override
-      public TrackerT restrictionTracker() {
-        return tracker;
-      }
-
-      @Override
-      public State state(String stateId) {
-        throw new UnsupportedOperationException("State cannot be used with a 
splittable DoFn");
-      }
-
-      @Override
-      public Timer timer(String timerId) {
-        throw new UnsupportedOperationException("Timers cannot be used with a 
splittable DoFn");
-      }
-    }
   }
 
   /** Splits the restriction using the given {@link DoFn.SplitRestriction} 
method. */

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
new file mode 100644
index 0000000..cfc39e7
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A runner-specific hook for invoking a {@link DoFn.ProcessElement} method 
for a splittable {@link
+ * DoFn}, in particular, allowing the runner to access the {@link 
RestrictionTracker}.
+ */
+public abstract class SplittableProcessElementInvoker<
+    InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>> {
+  /** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */
+  public class Result {
+    @Nullable private final RestrictionT residualRestriction;
+    private final DoFn.ProcessContinuation continuation;
+
+    public Result(
+        @Nullable RestrictionT residualRestriction, DoFn.ProcessContinuation 
continuation) {
+      this.residualRestriction = residualRestriction;
+      this.continuation = continuation;
+    }
+
+    /**
+     * Can be {@code null} only if {@link #getContinuation} specifies the call 
should not resume.
+     */
+    @Nullable
+    public RestrictionT getResidualRestriction() {
+      return residualRestriction;
+    }
+
+    public DoFn.ProcessContinuation getContinuation() {
+      return continuation;
+    }
+  }
+
+  /**
+   * Invokes the {@link DoFn.ProcessElement} method using the given {@link 
DoFnInvoker} for the
+   * original {@link DoFn}, on the given element and with the given {@link 
RestrictionTracker}.
+   *
+   * @return Information on how to resume the call: residual restriction and a 
{@link
+   *     DoFn.ProcessContinuation}.
+   */
+  public abstract Result invokeProcessElement(
+      DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, 
TrackerT tracker);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
new file mode 100644
index 0000000..d7c9889
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.concurrent.Executors;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for {@link OutputAndTimeBoundedSplittableProcessElementInvoker}. */
+public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
+  private static class SomeFn extends DoFn<Integer, String> {
+    private final Duration sleepBeforeEachOutput;
+
+    private SomeFn(Duration sleepBeforeEachOutput) {
+      this.sleepBeforeEachOutput = sleepBeforeEachOutput;
+    }
+
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext context, 
OffsetRangeTracker tracker)
+        throws Exception {
+      OffsetRange range = tracker.currentRestriction();
+      for (int i = (int) range.getFrom(); i < range.getTo(); ++i) {
+        if (!tracker.tryClaim(i)) {
+          return resume();
+        }
+        Thread.sleep(sleepBeforeEachOutput.getMillis());
+        context.output("" + i);
+      }
+      return stop();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(Integer element) {
+      throw new UnsupportedOperationException("Should not be called in this 
test");
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      throw new UnsupportedOperationException("Should not be called in this 
test");
+    }
+  }
+
+  private SplittableProcessElementInvoker<Integer, String, OffsetRange, 
OffsetRangeTracker>.Result
+      runTest(int count, Duration sleepPerElement) {
+    SomeFn fn = new SomeFn(sleepPerElement);
+    SplittableProcessElementInvoker<Integer, String, OffsetRange, 
OffsetRangeTracker> invoker =
+        new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
+            fn,
+            PipelineOptionsFactory.create(),
+            new OutputWindowedValue<String>() {
+              @Override
+              public void outputWindowedValue(
+                  String output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {}
+
+              @Override
+              public <SideOutputT> void sideOutputWindowedValue(
+                  TupleTag<SideOutputT> tag,
+                  SideOutputT output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {}
+            },
+            NullSideInputReader.empty(),
+            Executors.newSingleThreadScheduledExecutor(),
+            1000,
+            Duration.standardSeconds(3));
+
+    return invoker.invokeProcessElement(
+        DoFnInvokers.invokerFor(fn),
+        WindowedValue.of(count, Instant.now(), GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING),
+        new OffsetRangeTracker(new OffsetRange(0, count)));
+  }
+
+  @Test
+  public void testInvokeProcessElementOutputBounded() throws Exception {
+    SplittableProcessElementInvoker<Integer, String, OffsetRange, 
OffsetRangeTracker>.Result res =
+        runTest(10000, Duration.ZERO);
+    assertTrue(res.getContinuation().shouldResume());
+    OffsetRange residualRange = res.getResidualRestriction();
+    // Should process the first 100 elements.
+    assertEquals(1000, residualRange.getFrom());
+    assertEquals(10000, residualRange.getTo());
+  }
+
+  @Test
+  public void testInvokeProcessElementTimeBounded() throws Exception {
+    SplittableProcessElementInvoker<Integer, String, OffsetRange, 
OffsetRangeTracker>.Result res =
+        runTest(10000, Duration.millis(100));
+    assertTrue(res.getContinuation().shouldResume());
+    OffsetRange residualRange = res.getResidualRestriction();
+    // Should process ideally around 30 elements - but due to timing 
flakiness, we can't enforce
+    // that precisely. Just test that it's not egregiously off.
+    assertThat(residualRange.getFrom(), greaterThan(10L));
+    assertThat(residualRange.getFrom(), lessThan(100L));
+    assertEquals(10000, residualRange.getTo());
+  }
+
+  @Test
+  public void testInvokeProcessElementVoluntaryReturn() throws Exception {
+    SplittableProcessElementInvoker<Integer, String, OffsetRange, 
OffsetRangeTracker>.Result res =
+        runTest(5, Duration.millis(100));
+    assertFalse(res.getContinuation().shouldResume());
+    assertNull(res.getResidualRestriction());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 74a566b..bb7fd8c 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core;
 import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
 import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
@@ -32,6 +33,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -47,12 +51,14 @@ import 
org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.util.state.TimerInternalsFactory;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -66,6 +72,9 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link SplittableParDo}. */
 @RunWith(JUnit4.class)
 public class SplittableParDoTest {
+  private static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
+  private static final Duration MAX_BUNDLE_DURATION = 
Duration.standardSeconds(5);
+
   // ----------------- Tests for whether the transform sets boundedness 
correctly --------------
   private static class SomeRestriction implements Serializable {}
 
@@ -201,11 +210,13 @@ public class SplittableParDoTest {
 
     ProcessFnTester(
         Instant currentProcessingTime,
-        DoFn<InputT, OutputT> fn,
+        final DoFn<InputT, OutputT> fn,
         Coder<InputT> inputCoder,
-        Coder<RestrictionT> restrictionCoder)
+        Coder<RestrictionT> restrictionCoder,
+        int maxOutputsPerBundle,
+        Duration maxBundleDuration)
         throws Exception {
-      SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> 
processFn =
+      final SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> 
processFn =
           new SplittableParDo.ProcessFn<>(
               fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
       this.tester = DoFnTester.of(processFn);
@@ -224,35 +235,32 @@ public class SplittableParDoTest {
               return timerInternals;
             }
           });
-      processFn.setOutputWindowedValue(
-          new OutputWindowedValue<OutputT>() {
-            @Override
-            public void outputWindowedValue(
-                OutputT output,
-                Instant timestamp,
-                Collection<? extends BoundedWindow> windows,
-                PaneInfo pane) {
-              for (BoundedWindow window : windows) {
-                tester
-                    .getMutableOutput(tester.getMainOutputTag())
-                    .add(ValueInSingleWindow.of(output, timestamp, window, 
pane));
-              }
-            }
-
-            @Override
-            public <SideOutputT> void sideOutputWindowedValue(
-                TupleTag<SideOutputT> tag,
-                SideOutputT output,
-                Instant timestamp,
-                Collection<? extends BoundedWindow> windows,
-                PaneInfo pane) {
-              for (BoundedWindow window : windows) {
-                tester
-                    .getMutableOutput(tag)
-                    .add(ValueInSingleWindow.of(output, timestamp, window, 
pane));
-              }
-            }
-          });
+      processFn.setProcessElementInvoker(
+          new OutputAndTimeBoundedSplittableProcessElementInvoker<
+              InputT, OutputT, RestrictionT, TrackerT>(
+              fn,
+              tester.getPipelineOptions(),
+              new OutputWindowedValueToDoFnTester<>(tester),
+              new SideInputReader() {
+                @Nullable
+                @Override
+                public <T> T get(PCollectionView<T> view, BoundedWindow 
window) {
+                  throw new NoSuchElementException();
+                }
+
+                @Override
+                public <T> boolean contains(PCollectionView<T> view) {
+                  return false;
+                }
+
+                @Override
+                public boolean isEmpty() {
+                  return true;
+                }
+              },
+              
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+              maxOutputsPerBundle,
+              maxBundleDuration));
       // Do not clone since ProcessFn references non-serializable DoFnTester 
itself
       // through the state/timer/output callbacks.
       this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
@@ -318,6 +326,37 @@ public class SplittableParDoTest {
     List<OutputT> takeOutputElements() {
       return tester.takeOutputElements();
     }
+
+  }
+
+  private static class OutputWindowedValueToDoFnTester<OutputT>
+      implements OutputWindowedValue<OutputT> {
+    private final DoFnTester<?, OutputT> tester;
+
+    private OutputWindowedValueToDoFnTester(DoFnTester<?, OutputT> tester) {
+      this.tester = tester;
+    }
+
+    @Override
+    public void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      sideOutputWindowedValue(tester.getMainOutputTag(), output, timestamp, 
windows, pane);
+    }
+
+    @Override
+    public <SideOutputT> void sideOutputWindowedValue(
+        TupleTag<SideOutputT> tag,
+        SideOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      for (BoundedWindow window : windows) {
+        tester.getMutableOutput(tag).add(ValueInSingleWindow.of(output, 
timestamp, window, pane));
+      }
+    }
   }
 
   /** A simple splittable {@link DoFn} that's actually monolithic. */
@@ -362,7 +401,8 @@ public class SplittableParDoTest {
     for (WindowExplosion explosion : WindowExplosion.values()) {
       ProcessFnTester<Integer, String, SomeRestriction, 
SomeRestrictionTracker> tester =
           new ProcessFnTester<>(
-              base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeRestriction.class));
+              base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeRestriction.class),
+              MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
       tester.startElement(
           WindowedValue.of(
               ElementAndRestriction.of(42, new SomeRestriction()),
@@ -407,7 +447,8 @@ public class SplittableParDoTest {
     Instant base = Instant.now();
     ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> 
tester =
         new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeRestriction.class));
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeRestriction.class),
+            MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
 
     tester.startElement(42, new SomeRestriction());
     assertThat(tester.takeOutputElements(), contains("42"));
@@ -511,7 +552,8 @@ public class SplittableParDoTest {
     Instant base = Instant.now();
     ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> 
tester =
         new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeCheckpoint.class));
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeCheckpoint.class),
+            MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
 
     tester.startElement(42, new SomeCheckpoint(0));
     assertThat(tester.takeOutputElements(), contains("42"));
@@ -527,8 +569,8 @@ public class SplittableParDoTest {
   }
 
   @Test
-  public void testReactsToCheckpoint() throws Exception {
-    int max = SplittableParDo.ProcessFn.MAX_OUTPUTS_PER_BUNDLE;
+  public void testCheckpointsAfterNumOutputs() throws Exception {
+    int max = 100;
     // Create an fn that attempts to 2x output more than checkpointing allows.
     DoFn<Integer, String> fn = new CounterFn(2 * max + max / 2, 2 * max);
     Instant base = Instant.now();
@@ -536,7 +578,8 @@ public class SplittableParDoTest {
 
     ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> 
tester =
         new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeCheckpoint.class));
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeCheckpoint.class),
+            max, MAX_BUNDLE_DURATION);
 
     List<String> elements;
 
@@ -562,4 +605,32 @@ public class SplittableParDoTest {
     assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max + max / 2 
- 1)));
     assertThat(elements, not(hasItem((String.valueOf(baseIndex + 2 * max + max 
/ 2)))));
   }
+
+  @Test
+  public void testCheckpointsAfterDuration() throws Exception {
+    // Don't bound number of outputs.
+    int max = Integer.MAX_VALUE;
+    // But bound bundle duration - the bundle should terminate.
+    Duration maxBundleDuration = Duration.standardSeconds(1);
+    // Create an fn that attempts to 2x output more than checkpointing allows.
+    DoFn<Integer, String> fn = new CounterFn(max, max);
+    Instant base = Instant.now();
+    int baseIndex = 42;
+
+    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> 
tester =
+        new ProcessFnTester<>(
+            base, fn, BigEndianIntegerCoder.of(), 
SerializableCoder.of(SomeCheckpoint.class),
+            max, maxBundleDuration);
+
+    List<String> elements;
+
+    tester.startElement(baseIndex, new SomeCheckpoint(0));
+    // Bundle should terminate, and should do at least some processing.
+    elements = tester.takeOutputElements();
+    assertFalse(elements.isEmpty());
+    // Bundle should have run for at least the requested duration.
+    assertThat(
+        Instant.now().getMillis() - base.getMillis(),
+        greaterThanOrEqualTo(maxBundleDuration.getMillis()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 18f3909..64593cd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -18,13 +18,16 @@
 package org.apache.beam.runners.direct;
 
 import java.util.Collection;
+import java.util.concurrent.Executors;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ElementAndRestriction;
 import org.apache.beam.runners.core.KeyedWorkItem;
+import 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimerInternals;
@@ -35,9 +38,11 @@ import org.apache.beam.sdk.util.state.TimerInternalsFactory;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
-class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT>
+class SplittableProcessElementsEvaluatorFactory<
+        InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
     implements TransformEvaluatorFactory {
   private final ParDoEvaluatorFactory<
           KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, 
OutputT>
@@ -69,16 +74,17 @@ class SplittableProcessElementsEvaluatorFactory<InputT, 
OutputT, RestrictionT>
       createEvaluator(
           AppliedPTransform<
                   PCollection<KeyedWorkItem<String, 
ElementAndRestriction<InputT, RestrictionT>>>,
-                  PCollectionTuple, SplittableParDo.ProcessElements<InputT, 
OutputT, RestrictionT>>
+                  PCollectionTuple,
+                  SplittableParDo.ProcessElements<InputT, OutputT, 
RestrictionT, TrackerT>>
               application,
           CommittedBundle<InputT> inputBundle)
           throws Exception {
-    final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT> 
transform =
+    final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, 
TrackerT> transform =
         application.getTransform();
 
     DoFnLifecycleManager fnManager = 
delegateFactory.getManagerForCloneOf(transform.getFn());
 
-    SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
+    SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> 
processFn =
         transform.newProcessFn(fnManager.<InputT, OutputT>get());
 
     String stepName = evaluationContext.getStepName(application);
@@ -117,28 +123,38 @@ class SplittableProcessElementsEvaluatorFactory<InputT, 
OutputT, RestrictionT>
         });
 
     final OutputManager outputManager = parDoEvaluator.getOutputManager();
-    processFn.setOutputWindowedValue(
-        new OutputWindowedValue<OutputT>() {
-          @Override
-          public void outputWindowedValue(
-              OutputT output,
-              Instant timestamp,
-              Collection<? extends BoundedWindow> windows,
-              PaneInfo pane) {
-            outputManager.output(
-                transform.getMainOutputTag(), WindowedValue.of(output, 
timestamp, windows, pane));
-          }
+    processFn.setProcessElementInvoker(
+        new OutputAndTimeBoundedSplittableProcessElementInvoker<
+            InputT, OutputT, RestrictionT, TrackerT>(
+            transform.getFn(),
+            evaluationContext.getPipelineOptions(),
+            new OutputWindowedValue<OutputT>() {
+              @Override
+              public void outputWindowedValue(
+                  OutputT output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {
+                outputManager.output(
+                    transform.getMainOutputTag(),
+                    WindowedValue.of(output, timestamp, windows, pane));
+              }
 
-          @Override
-          public <SideOutputT> void sideOutputWindowedValue(
-              TupleTag<SideOutputT> tag,
-              SideOutputT output,
-              Instant timestamp,
-              Collection<? extends BoundedWindow> windows,
-              PaneInfo pane) {
-            outputManager.output(tag, WindowedValue.of(output, timestamp, 
windows, pane));
-          }
-        });
+              @Override
+              public <SideOutputT> void sideOutputWindowedValue(
+                  TupleTag<SideOutputT> tag,
+                  SideOutputT output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {
+                outputManager.output(tag, WindowedValue.of(output, timestamp, 
windows, pane));
+              }
+            },
+            evaluationContext.createSideInputReader(transform.getSideInputs()),
+            // TODO: For better performance, use a higher-level executor?
+            
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+            10000,
+            Duration.standardSeconds(10)));
 
     return 
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, 
fnManager);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 0d1f96d..87ae1f5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -140,6 +140,10 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
     return (StateInternals<K>) stateInternals;
   }
 
+  public PipelineOptions getPipelineOptions() {
+    return options;
+  }
+
   /**
    * When a {@link DoFnTester} should clone the {@link DoFn} under test and 
how it should manage
    * the lifecycle of the {@link DoFn}.
@@ -287,8 +291,8 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
       startBundle();
     }
     try {
-      final TestProcessContext processContext =
-          new TestProcessContext(
+      final DoFn<InputT, OutputT>.ProcessContext processContext =
+          createProcessContext(
               ValueInSingleWindow.of(element, timestamp, window, 
PaneInfo.NO_FIRING));
       fnInvoker.invokeProcessElement(
           new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
@@ -315,7 +319,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
             }
 
             @Override
-            public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+            public RestrictionTracker<?> restrictionTracker() {
               throw new UnsupportedOperationException(
                   "Not expected to access RestrictionTracker from a regular 
DoFn in DoFnTester");
             }
@@ -630,6 +634,11 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
     }
   }
 
+  public DoFn<InputT, OutputT>.ProcessContext createProcessContext(
+      ValueInSingleWindow<InputT> element) {
+    return new TestProcessContext(element);
+  }
+
   private class TestProcessContext extends DoFn<InputT, 
OutputT>.ProcessContext {
     private final TestContext context;
     private final ValueInSingleWindow<InputT> element;

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 5f61349..85831a7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -115,7 +115,7 @@ public interface DoFnInvoker<InputT, OutputT> {
      * If this is a splittable {@link DoFn}, returns the {@link 
RestrictionTracker} associated with
      * the current {@link ProcessElement} call.
      */
-    <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
+    RestrictionTracker<?> restrictionTracker();
 
     /** Returns the state cell for the given {@link StateId}. */
     State state(String stateId);
@@ -156,7 +156,7 @@ public interface DoFnInvoker<InputT, OutputT> {
       return null;
     }
 
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+    public RestrictionTracker<?> restrictionTracker() {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
new file mode 100644
index 0000000..67031c4
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+
+/** A restriction represented by a range of integers [from, to). */
+public class OffsetRange implements Serializable {
+  private final long from;
+  private final long to;
+
+  public OffsetRange(long from, long to) {
+    checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
+    this.from = from;
+    this.to = to;
+  }
+
+  public long getFrom() {
+    return from;
+  }
+
+  public long getTo() {
+    return to;
+  }
+
+  @Override
+  public String toString() {
+    return "[" + from + ", " + to + ')';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    OffsetRange that = (OffsetRange) o;
+
+    if (from != that.from) {
+      return false;
+    }
+    return to == that.to;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = (int) (from ^ (from >>> 32));
+    result = 31 * result + (int) (to ^ (to >>> 32));
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
new file mode 100644
index 0000000..87c7bfd
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} 
in a monotonically
+ * increasing fashion.
+ */
+public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
+  private OffsetRange range;
+  private Long lastClaimedOffset = null;
+
+  public OffsetRangeTracker(OffsetRange range) {
+    this.range = checkNotNull(range);
+  }
+
+  @Override
+  public synchronized OffsetRange currentRestriction() {
+    return range;
+  }
+
+  @Override
+  public synchronized OffsetRange checkpoint() {
+    if (lastClaimedOffset == null) {
+      OffsetRange res = range;
+      range = new OffsetRange(range.getFrom(), range.getFrom());
+      return res;
+    }
+    OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
+    this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1);
+    return res;
+  }
+
+  /**
+   * Attempts to claim the given offset.
+   *
+   * <p>Must be larger than the last successfully claimed offset.
+   *
+   * @return {@code true} if the offset was successfully claimed, {@code 
false} if it is outside the
+   *     current {@link OffsetRange} of this tracker (in that case this 
operation is a no-op).
+   */
+  public synchronized boolean tryClaim(long i) {
+    checkArgument(
+        lastClaimedOffset == null || i > lastClaimedOffset,
+        "Trying to claim offset %s while last claimed was %s",
+        i,
+        lastClaimedOffset);
+    checkArgument(
+        i >= range.getFrom(), "Trying to claim offset %s before start of the 
range %s", i, range);
+    // No respective checkArgument for i < range.to() - it's ok to try 
claiming offsets beyond it.
+    if (i >= range.getTo()) {
+      return false;
+    }
+    lastClaimedOffset = i;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 268766b..e9b718e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -34,7 +34,7 @@ public interface RestrictionTracker<RestrictionT> {
    * Signals that the current {@link DoFn.ProcessElement} call should 
terminate as soon as possible.
    * Modifies {@link #currentRestriction}. Returns a restriction representing 
the rest of the work:
    * the old value of {@link #currentRestriction} is equivalent to the new 
value and the return
-   * value of this method combined.
+   * value of this method combined. Must be called at most once on a given 
object.
    */
   RestrictionT checkpoint();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index e3b58b7..e5c0ef0 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
 import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
@@ -25,7 +24,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -38,7 +36,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
@@ -64,61 +63,12 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class SplittableDoFnTest {
-  static class OffsetRange implements Serializable {
-    public final int from;
-    public final int to;
-
-    OffsetRange(int from, int to) {
-      this.from = from;
-      this.to = to;
-    }
-
-    @Override
-    public String toString() {
-      return "OffsetRange{" + "from=" + from + ", to=" + to + '}';
-    }
-  }
-
-  private static class OffsetRangeTracker implements 
RestrictionTracker<OffsetRange> {
-    private OffsetRange range;
-    private Integer lastClaimedIndex = null;
-
-    OffsetRangeTracker(OffsetRange range) {
-      this.range = checkNotNull(range);
-    }
-
-    @Override
-    public OffsetRange currentRestriction() {
-      return range;
-    }
-
-    @Override
-    public OffsetRange checkpoint() {
-      if (lastClaimedIndex == null) {
-        OffsetRange res = range;
-        range = new OffsetRange(range.from, range.from);
-        return res;
-      }
-      OffsetRange res = new OffsetRange(lastClaimedIndex + 1, range.to);
-      this.range = new OffsetRange(range.from, lastClaimedIndex + 1);
-      return res;
-    }
-
-    boolean tryClaim(int i) {
-      checkState(lastClaimedIndex == null || i > lastClaimedIndex);
-      if (i >= range.to) {
-        return false;
-      }
-      lastClaimedIndex = i;
-      return true;
-    }
-  }
 
   static class PairStringWithIndexToLength extends DoFn<String, KV<String, 
Integer>> {
     @ProcessElement
     public ProcessContinuation process(ProcessContext c, OffsetRangeTracker 
tracker) {
-      for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); 
++i) {
-        c.output(KV.of(c.element(), i));
+      for (long i = tracker.currentRestriction().getFrom(); 
tracker.tryClaim(i); ++i) {
+        c.output(KV.of(c.element(), (int) i));
         if (i % 3 == 0) {
           return resume();
         }
@@ -134,8 +84,8 @@ public class SplittableDoFnTest {
     @SplitRestriction
     public void splitRange(
         String element, OffsetRange range, OutputReceiver<OffsetRange> 
receiver) {
-      receiver.output(new OffsetRange(range.from, (range.from + range.to) / 
2));
-      receiver.output(new OffsetRange((range.from + range.to) / 2, range.to));
+      receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() + 
range.getTo()) / 2));
+      receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2, 
range.getTo()));
     }
 
     @NewTracker
@@ -252,8 +202,8 @@ public class SplittableDoFnTest {
     @ProcessElement
     public ProcessContinuation processElement(ProcessContext c, 
OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
-      int trueStart = snapToNextBlock(tracker.currentRestriction().from, 
blockStarts);
-      int trueEnd = snapToNextBlock(tracker.currentRestriction().to, 
blockStarts);
+      int trueStart = snapToNextBlock((int) 
tracker.currentRestriction().getFrom(), blockStarts);
+      int trueEnd = snapToNextBlock((int) 
tracker.currentRestriction().getTo(), blockStarts);
       for (int i = trueStart; i < trueEnd; ++i) {
         if (!tracker.tryClaim(blockStarts[i])) {
           return resume();
@@ -298,7 +248,7 @@ public class SplittableDoFnTest {
 
     @ProcessElement
     public void process(ProcessContext c, OffsetRangeTracker tracker) {
-      checkState(tracker.tryClaim(tracker.currentRestriction().from));
+      checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
       String side = c.sideInput(sideInput);
       c.output("main:" + side + ":" + c.element());
       c.sideOutput(sideOutput, "side:" + side + ":" + c.element());

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 9bc2d12..5d3746f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -385,7 +385,7 @@ public class DoFnInvokersTest {
               }
 
               @Override
-              public RestrictionTracker restrictionTracker() {
+              public RestrictionTracker<?> restrictionTracker() {
                 return tracker;
               }
             }));

http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
new file mode 100644
index 0000000..c8a530c
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link OffsetRangeTracker}. */
+@RunWith(JUnit4.class)
+public class OffsetRangeTrackerTest {
+  @Rule public final ExpectedException expected = ExpectedException.none();
+
+  @Test
+  public void testTryClaim() throws Exception {
+    OffsetRange range = new OffsetRange(100, 200);
+    OffsetRangeTracker tracker = new OffsetRangeTracker(range);
+    assertEquals(range, tracker.currentRestriction());
+    assertTrue(tracker.tryClaim(100));
+    assertTrue(tracker.tryClaim(150));
+    assertTrue(tracker.tryClaim(199));
+    assertFalse(tracker.tryClaim(200));
+  }
+
+  @Test
+  public void testCheckpointUnstarted() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
+    OffsetRange checkpoint = tracker.checkpoint();
+    assertEquals(new OffsetRange(100, 100), tracker.currentRestriction());
+    assertEquals(new OffsetRange(100, 200), checkpoint);
+  }
+
+  @Test
+  public void testCheckpointJustStarted() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
+    assertTrue(tracker.tryClaim(100));
+    OffsetRange checkpoint = tracker.checkpoint();
+    assertEquals(new OffsetRange(100, 101), tracker.currentRestriction());
+    assertEquals(new OffsetRange(101, 200), checkpoint);
+  }
+
+  @Test
+  public void testCheckpointRegular() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
+    assertTrue(tracker.tryClaim(105));
+    assertTrue(tracker.tryClaim(110));
+    OffsetRange checkpoint = tracker.checkpoint();
+    assertEquals(new OffsetRange(100, 111), tracker.currentRestriction());
+    assertEquals(new OffsetRange(111, 200), checkpoint);
+  }
+
+  @Test
+  public void testCheckpointClaimedLast() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
+    assertTrue(tracker.tryClaim(105));
+    assertTrue(tracker.tryClaim(110));
+    assertTrue(tracker.tryClaim(199));
+    OffsetRange checkpoint = tracker.checkpoint();
+    assertEquals(new OffsetRange(100, 200), tracker.currentRestriction());
+    assertEquals(new OffsetRange(200, 200), checkpoint);
+  }
+
+  @Test
+  public void testCheckpointAfterFailedClaim() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
+    assertTrue(tracker.tryClaim(105));
+    assertTrue(tracker.tryClaim(110));
+    assertTrue(tracker.tryClaim(160));
+    assertFalse(tracker.tryClaim(240));
+    OffsetRange checkpoint = tracker.checkpoint();
+    assertEquals(new OffsetRange(100, 161), tracker.currentRestriction());
+    assertEquals(new OffsetRange(161, 200), checkpoint);
+  }
+
+  @Test
+  public void testNonMonotonicClaim() throws Exception {
+    expected.expectMessage("Trying to claim offset 103 while last claimed was 
110");
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
+    assertTrue(tracker.tryClaim(105));
+    assertTrue(tracker.tryClaim(110));
+    tracker.tryClaim(103);
+  }
+
+  @Test
+  public void testClaimBeforeStartOfRange() throws Exception {
+    expected.expectMessage("Trying to claim offset 90 before start of the 
range [100, 200)");
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
+    tracker.tryClaim(90);
+  }
+}

Reply via email to