http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
new file mode 100644
index 0000000..59c4d8e
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import 
org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.Counter.AggregationKind;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link InProcessEvaluationContext}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessEvaluationContextTest {
+  private TestPipeline p;
+  private InProcessEvaluationContext context;
+
+  private PCollection<Integer> created;
+  private PCollection<KV<String, Integer>> downstream;
+  private PCollectionView<Iterable<Integer>> view;
+  private PCollection<Long> unbounded;
+  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
+  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
+
+  private BundleFactory bundleFactory;
+
+  @Before
+  public void setup() {
+    InProcessPipelineRunner runner =
+        InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
+
+    p = TestPipeline.create();
+
+    created = p.apply(Create.of(1, 2, 3));
+    downstream = created.apply(WithKeys.<String, Integer>of("foo"));
+    view = created.apply(View.<Integer>asIterable());
+    unbounded = p.apply(CountingInput.unbounded());
+
+    ConsumerTrackingPipelineVisitor cVis = new 
ConsumerTrackingPipelineVisitor();
+    p.traverseTopologically(cVis);
+    rootTransforms = cVis.getRootTransforms();
+    valueToConsumers = cVis.getValueToConsumers();
+
+    bundleFactory = InProcessBundleFactory.create();
+
+    context =
+        InProcessEvaluationContext.create(
+            runner.getPipelineOptions(),
+            InProcessBundleFactory.create(),
+            rootTransforms,
+            valueToConsumers,
+            cVis.getStepNames(),
+            cVis.getViews());
+  }
+
+  @Test
+  public void writeToViewWriterThenReadReads() {
+    PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
+        context.createPCollectionViewWriter(
+            PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
+                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
+            view);
+    BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
+    BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
+    WindowedValue<Integer> firstValue =
+        WindowedValue.of(1, new Instant(1222), window, 
PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Integer> secondValue =
+        WindowedValue.of(
+            2, new Instant(8766L), second, PaneInfo.createPane(true, false, 
Timing.ON_TIME, 0, 0));
+    Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, 
secondValue);
+    viewWriter.add(values);
+
+    SideInputReader reader =
+        
context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
+    assertThat(reader.get(view, window), containsInAnyOrder(1));
+    assertThat(reader.get(view, second), containsInAnyOrder(2));
+
+    WindowedValue<Integer> overrittenSecondValue =
+        WindowedValue.of(
+            4444, new Instant(8677L), second, PaneInfo.createPane(false, true, 
Timing.LATE, 1, 1));
+    viewWriter.add(Collections.singleton(overrittenSecondValue));
+    assertThat(reader.get(view, second), containsInAnyOrder(4444));
+  }
+
+  @Test
+  public void getExecutionContextSameStepSameKeyState() {
+    InProcessExecutionContext fooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), 
"foo");
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
+
+    InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", 
"s1");
+    stepContext.stateInternals().state(StateNamespaces.global(), 
intBag).add(1);
+
+    context.handleResult(
+        InProcessBundleFactory.create()
+            .createKeyedBundle(null, "foo", created)
+            .commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(created.getProducingTransformInternal())
+            .withState(stepContext.commitState())
+            .build());
+
+    InProcessExecutionContext secondFooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), 
"foo");
+    assertThat(
+        secondFooContext
+            .getOrCreateStepContext("s1", "s1")
+            .stateInternals()
+            .state(StateNamespaces.global(), intBag)
+            .read(),
+        contains(1));
+  }
+
+
+  @Test
+  public void getExecutionContextDifferentKeysIndependentState() {
+    InProcessExecutionContext fooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), 
"foo");
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
+
+    fooContext
+        .getOrCreateStepContext("s1", "s1")
+        .stateInternals()
+        .state(StateNamespaces.global(), intBag)
+        .add(1);
+
+    InProcessExecutionContext barContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), 
"bar");
+    assertThat(barContext, not(equalTo(fooContext)));
+    assertThat(
+        barContext
+            .getOrCreateStepContext("s1", "s1")
+            .stateInternals()
+            .state(StateNamespaces.global(), intBag)
+            .read(),
+        emptyIterable());
+  }
+
+  @Test
+  public void getExecutionContextDifferentStepsIndependentState() {
+    String myKey = "foo";
+    InProcessExecutionContext fooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), 
myKey);
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
+
+    fooContext
+        .getOrCreateStepContext("s1", "s1")
+        .stateInternals()
+        .state(StateNamespaces.global(), intBag)
+        .add(1);
+
+    InProcessExecutionContext barContext =
+        
context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+    assertThat(
+        barContext
+            .getOrCreateStepContext("s1", "s1")
+            .stateInternals()
+            .state(StateNamespaces.global(), intBag)
+            .read(),
+        emptyIterable());
+  }
+
+  @Test
+  public void handleResultMergesCounters() {
+    CounterSet counters = context.createCounterSet();
+    Counter<Long> myCounter = Counter.longs("foo", AggregationKind.SUM);
+    counters.addCounter(myCounter);
+
+    myCounter.addValue(4L);
+    InProcessTransformResult result =
+        
StepTransformResult.withoutHold(created.getProducingTransformInternal())
+            .withCounters(counters)
+            .build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), result);
+    assertThat((Long) 
context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L));
+
+    CounterSet againCounters = context.createCounterSet();
+    Counter<Long> myLongCounterAgain = Counter.longs("foo", 
AggregationKind.SUM);
+    againCounters.add(myLongCounterAgain);
+    myLongCounterAgain.addValue(8L);
+
+    InProcessTransformResult secondResult =
+        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+            .withCounters(againCounters)
+            .build();
+    context.handleResult(
+        context.createRootBundle(created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        secondResult);
+    assertThat((Long) 
context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
+  }
+
+  @Test
+  public void handleResultStoresState() {
+    String myKey = "foo";
+    InProcessExecutionContext fooContext =
+        
context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
+
+    CopyOnAccessInMemoryStateInternals<Object> state =
+        fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
+    BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
+    bag.add(1);
+    bag.add(2);
+    bag.add(4);
+
+    InProcessTransformResult stateResult =
+        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+            .withState(state)
+            .build();
+
+    context.handleResult(
+        context.createKeyedBundle(null, myKey, created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        stateResult);
+
+    InProcessExecutionContext afterResultContext =
+        
context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+
+    CopyOnAccessInMemoryStateInternals<Object> afterResultState =
+        afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
+    assertThat(afterResultState.state(StateNamespaces.global(), 
intBag).read(), contains(1, 2, 4));
+  }
+
+  @Test
+  public void 
callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws 
Exception {
+    final CountDownLatch callLatch = new CountDownLatch(1);
+    Runnable callback =
+        new Runnable() {
+          @Override
+          public void run() {
+            callLatch.countDown();
+          }
+        };
+
+    // Should call back after the end of the global window
+    context.scheduleAfterOutputWouldBeProduced(
+        downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), 
callback);
+
+    InProcessTransformResult result =
+        StepTransformResult.withHold(created.getProducingTransformInternal(), 
new Instant(0))
+            .build();
+
+    context.handleResult(null, ImmutableList.<TimerData>of(), result);
+
+    // Difficult to demonstrate that we took no action in a multithreaded 
world; poll for a bit
+    // will likely be flaky if this logic is broken
+    assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
+
+    InProcessTransformResult finishedResult =
+        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
+    // Obtain the value via blocking call
+    assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
+  }
+
+  @Test
+  public void 
callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws 
Exception {
+    InProcessTransformResult finishedResult =
+        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
+
+    final CountDownLatch callLatch = new CountDownLatch(1);
+    Runnable callback =
+        new Runnable() {
+          @Override
+          public void run() {
+            callLatch.countDown();
+          }
+        };
+    context.scheduleAfterOutputWouldBeProduced(
+        downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), 
callback);
+    assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
+  }
+
+  @Test
+  public void extractFiredTimersExtractsTimers() {
+    InProcessTransformResult holdResult =
+        StepTransformResult.withHold(created.getProducingTransformInternal(), 
new Instant(0))
+            .build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
+
+    String key = "foo";
+    TimerData toFire =
+        TimerData.of(StateNamespaces.global(), new Instant(100L), 
TimeDomain.EVENT_TIME);
+    InProcessTransformResult timerResult =
+        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+            .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, 
null))
+            .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
+            .build();
+
+    // haven't added any timers, must be empty
+    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+    context.handleResult(
+        context.createKeyedBundle(null, key, created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        timerResult);
+
+    // timer hasn't fired
+    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+
+    InProcessTransformResult advanceResult =
+        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    // Should cause the downstream timer to fire
+    context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
+
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = 
context.extractFiredTimers();
+    assertThat(
+        fired,
+        Matchers.<AppliedPTransform<?, ?, 
?>>hasKey(downstream.getProducingTransformInternal()));
+    Map<Object, FiredTimers> downstreamFired =
+        fired.get(downstream.getProducingTransformInternal());
+    assertThat(downstreamFired, Matchers.<Object>hasKey(key));
+
+    FiredTimers firedForKey = downstreamFired.get(key);
+    assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), 
emptyIterable());
+    assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), 
emptyIterable());
+    assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire));
+
+    // Don't reextract timers
+    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+  }
+
+  @Test
+  public void createBundleKeyedResultPropagatesKey() {
+    CommittedBundle<KV<String, Integer>> newBundle =
+        context
+            .createBundle(
+                bundleFactory.createKeyedBundle(null, "foo", 
created).commit(Instant.now()),
+                downstream)
+            .commit(Instant.now());
+    assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+  }
+
+  @Test
+  public void createKeyedBundleKeyed() {
+    CommittedBundle<KV<String, Integer>> keyedBundle =
+        context
+            .createKeyedBundle(
+                bundleFactory.createRootBundle(created).commit(Instant.now()), 
"foo", downstream)
+            .commit(Instant.now());
+    assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+  }
+
+  @Test
+  public void isDoneWithUnboundedPCollectionAndShutdown() {
+    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), 
is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), 
is(true));
+  }
+
+  @Test
+  public void isDoneWithUnboundedPCollectionAndNotShutdown() {
+    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), 
is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), 
is(false));
+  }
+
+  @Test
+  public void isDoneWithOnlyBoundedPCollections() {
+    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+    assertThat(context.isDone(created.getProducingTransformInternal()), 
is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+    assertThat(context.isDone(created.getProducingTransformInternal()), 
is(true));
+  }
+
+  @Test
+  public void isDoneWithPartiallyDone() {
+    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+    assertThat(context.isDone(), is(false));
+
+    UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
+    rootBundle.add(WindowedValue.valueInGlobalWindow(1));
+    CommittedResult handleResult =
+        context.handleResult(
+            null,
+            ImmutableList.<TimerData>of(),
+            
StepTransformResult.withoutHold(created.getProducingTransformInternal())
+                .addOutput(rootBundle)
+                .build());
+    @SuppressWarnings("unchecked")
+    CommittedBundle<Integer> committedBundle =
+        (CommittedBundle<Integer>) 
Iterables.getOnlyElement(handleResult.getOutputs());
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    assertThat(context.isDone(), is(false));
+
+    for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) 
{
+      context.handleResult(
+          committedBundle,
+          ImmutableList.<TimerData>of(),
+          StepTransformResult.withoutHold(consumers).build());
+    }
+    assertThat(context.isDone(), is(true));
+  }
+
+  @Test
+  public void isDoneWithUnboundedAndNotShutdown() {
+    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+    assertThat(context.isDone(), is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    context.handleResult(
+        context.createRootBundle(created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+    assertThat(context.isDone(), is(false));
+
+    context.handleResult(
+        context.createRootBundle(created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        
StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+    assertThat(context.isDone(), is(false));
+  }
+
+  private static class TestBoundedWindow extends BoundedWindow {
+    private final Instant ts;
+
+    public TestBoundedWindow(Instant ts) {
+      this.ts = ts;
+    }
+
+    @Override
+    public Instant maxTimestamp() {
+      return ts;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
new file mode 100644
index 0000000..54094c4
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.runners.direct.InProcessRegistrar.InProcessRunner;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ServiceLoader;
+
+/** Tests for {@link InProcessRunner}. */
+@RunWith(JUnit4.class)
+public class InProcessPipelineRegistrarTest {
+  @Test
+  public void testCorrectOptionsAreReturned() {
+    assertEquals(
+        ImmutableList.of(InProcessPipelineOptions.class),
+        new InProcessRegistrar.InProcessOptions().getPipelineOptions());
+  }
+
+  @Test
+  public void testCorrectRunnersAreReturned() {
+    assertEquals(
+        ImmutableList.of(InProcessPipelineRunner.class),
+        new InProcessRegistrar.InProcessRunner().getPipelineRunners());
+  }
+
+  @Test
+  public void testServiceLoaderForOptions() {
+    for (PipelineOptionsRegistrar registrar :
+        
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator()))
 {
+      if (registrar instanceof InProcessRegistrar.InProcessOptions) {
+        return;
+      }
+    }
+    fail("Expected to find " + InProcessRegistrar.InProcessOptions.class);
+  }
+
+  @Test
+  public void testServiceLoaderForRunner() {
+    for (PipelineRunnerRegistrar registrar :
+        
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator()))
 {
+      if (registrar instanceof InProcessRegistrar.InProcessRunner) {
+        return;
+      }
+    }
+    fail("Expected to find " + InProcessRegistrar.InProcessRunner.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
new file mode 100644
index 0000000..87db39a
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for basic {@link InProcessPipelineRunner} functionality.
+ */
+@RunWith(JUnit4.class)
+public class InProcessPipelineRunnerTest implements Serializable {
+  @Test
+  public void wordCountShouldSucceed() throws Throwable {
+    Pipeline p = getPipeline();
+
+    PCollection<KV<String, Long>> counts =
+        p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+            .apply(MapElements.via(new SimpleFunction<String, String>() {
+              @Override
+              public String apply(String input) {
+                return input;
+              }
+            }))
+            .apply(Count.<String>perElement());
+    PCollection<String> countStrs =
+        counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, 
String>() {
+          @Override
+          public String apply(KV<String, Long> input) {
+            String str = String.format("%s: %s", input.getKey(), 
input.getValue());
+            return str;
+          }
+        }));
+
+    PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+    InProcessPipelineResult result = ((InProcessPipelineResult) p.run());
+    result.awaitCompletion();
+  }
+
+  private Pipeline getPipeline() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(InProcessPipelineRunner.class);
+
+    Pipeline p = Pipeline.create(opts);
+    return p;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
new file mode 100644
index 0000000..d8a78f2
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doAnswer;
+
+import 
org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for {@link InProcessSideInputContainer}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessSideInputContainerTest {
+  private static final BoundedWindow FIRST_WINDOW =
+      new BoundedWindow() {
+        @Override
+        public Instant maxTimestamp() {
+          return new Instant(789541L);
+        }
+
+        @Override
+        public String toString() {
+          return "firstWindow";
+        }
+      };
+
+  private static final BoundedWindow SECOND_WINDOW =
+      new BoundedWindow() {
+        @Override
+        public Instant maxTimestamp() {
+          return new Instant(14564786L);
+        }
+
+        @Override
+        public String toString() {
+          return "secondWindow";
+        }
+      };
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Mock
+  private InProcessEvaluationContext context;
+
+  private TestPipeline pipeline;
+
+  private InProcessSideInputContainer container;
+
+  private PCollectionView<Map<String, Integer>> mapView;
+  private PCollectionView<Double> singletonView;
+
+  // Not present in container.
+  private PCollectionView<Iterable<Integer>> iterableView;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    pipeline = TestPipeline.create();
+
+    PCollection<Integer> create =
+        pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));
+
+    mapView =
+        create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
+            .apply("asMapView", View.<String, Integer>asMap());
+
+    singletonView = create.apply("forCombinedTypes", 
Mean.<Integer>globally().asSingletonView());
+    iterableView = create.apply("asIterableView", View.<Integer>asIterable());
+
+    container = InProcessSideInputContainer.create(
+        context, ImmutableList.of(iterableView, mapView, singletonView));
+  }
+
+  @Test
+  public void getAfterWriteReturnsPaneInWindow() throws Exception {
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1), new Instant(1L), FIRST_WINDOW, 
PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2), new Instant(20L), FIRST_WINDOW, 
PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+    Map<String, Integer> viewContents =
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, FIRST_WINDOW);
+    assertThat(viewContents, hasEntry("one", 1));
+    assertThat(viewContents, hasEntry("two", 2));
+    assertThat(viewContents.size(), is(2));
+  }
+
+  @Test
+  public void getReturnsLatestPaneInWindow() throws Exception {
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1),
+            new Instant(1L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2),
+            new Instant(20L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+    Map<String, Integer> viewContents =
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
+    assertThat(viewContents, hasEntry("one", 1));
+    assertThat(viewContents, hasEntry("two", 2));
+    assertThat(viewContents.size(), is(2));
+
+    WindowedValue<KV<String, Integer>> three =
+        WindowedValue.of(
+            KV.of("three", 3),
+            new Instant(300L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+
+    Map<String, Integer> overwrittenViewContents =
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
+    assertThat(overwrittenViewContents, hasEntry("three", 3));
+    assertThat(overwrittenViewContents.size(), is(1));
+  }
+
+  /**
+   * Demonstrates that calling get() on a window that currently has no data 
does not return until
+   * there is data in the pane.
+   */
+  @Test
+  public void getBlocksUntilPaneAvailable() throws Exception {
+    BoundedWindow window =
+        new BoundedWindow() {
+          @Override
+          public Instant maxTimestamp() {
+            return new Instant(1024L);
+          }
+        };
+    Future<Double> singletonFuture =
+        getFutureOfView(
+            
container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)),
+            singletonView,
+            window);
+
+    WindowedValue<Double> singletonValue =
+        WindowedValue.of(4.75, new Instant(475L), window, 
PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+    assertThat(singletonFuture.isDone(), is(false));
+    container.write(singletonView, 
ImmutableList.<WindowedValue<?>>of(singletonValue));
+    assertThat(singletonFuture.get(), equalTo(4.75));
+  }
+
+  @Test
+  public void withPCollectionViewsWithPutInOriginalReturnsContents() throws 
Exception {
+    BoundedWindow window = new BoundedWindow() {
+      @Override
+      public Instant maxTimestamp() {
+        return new Instant(1024L);
+      }
+    };
+    SideInputReader newReader =
+        
container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView));
+    Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, 
window);
+
+    WindowedValue<Double> singletonValue =
+        WindowedValue.of(24.125, new Instant(475L), window, 
PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+    assertThat(singletonFuture.isDone(), is(false));
+    container.write(singletonView, 
ImmutableList.<WindowedValue<?>>of(singletonValue));
+    assertThat(singletonFuture.get(), equalTo(24.125));
+  }
+
+  @Test
+  public void withPCollectionViewsErrorsForContainsNotInViews() {
+    PCollectionView<Map<String, Iterable<String>>> newView =
+        PCollectionViews.multimapView(
+            pipeline,
+            WindowingStrategy.globalDefault(),
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("with unknown views " + 
ImmutableList.of(newView).toString());
+
+    
container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+  }
+
+  @Test
+  public void withViewsForViewNotInContainerFails() {
+    PCollectionView<Map<String, Iterable<String>>> newView =
+        PCollectionViews.multimapView(
+            pipeline,
+            WindowingStrategy.globalDefault(),
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("unknown views");
+    thrown.expectMessage(newView.toString());
+
+    
container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+  }
+
+  @Test
+  public void getOnReaderForViewNotInReaderFails() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("unknown view: " + iterableView.toString());
+
+    
container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        .get(iterableView, GlobalWindow.INSTANCE);
+  }
+
+  @Test
+  public void writeForMultipleElementsInDifferentWindowsSucceeds() throws 
Exception {
+    WindowedValue<Double> firstWindowedValue =
+        WindowedValue.of(
+            2.875,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Double> secondWindowedValue =
+        WindowedValue.of(
+            4.125,
+            SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+            SECOND_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    container.write(singletonView, ImmutableList.of(firstWindowedValue, 
secondWindowedValue));
+    assertThat(
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, FIRST_WINDOW),
+        equalTo(2.875));
+    assertThat(
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, SECOND_WINDOW),
+        equalTo(4.125));
+  }
+
+  @Test
+  public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws 
Exception {
+    WindowedValue<Integer> firstValue =
+        WindowedValue.of(
+            44,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Integer> secondValue =
+        WindowedValue.of(
+            44,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+    container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+
+    assertThat(
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
+            .get(iterableView, FIRST_WINDOW),
+        contains(44, 44));
+  }
+
+  @Test
+  public void writeForElementInMultipleWindowsSucceeds() throws Exception {
+    WindowedValue<Double> multiWindowedValue =
+        WindowedValue.of(
+            2.875,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    container.write(singletonView, ImmutableList.of(multiWindowedValue));
+    assertThat(
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, FIRST_WINDOW),
+        equalTo(2.875));
+    assertThat(
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, SECOND_WINDOW),
+        equalTo(2.875));
+  }
+
+  @Test
+  public void finishDoesNotOverwriteWrittenElements() throws Exception {
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1),
+            new Instant(1L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2),
+            new Instant(20L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+
+    Map<String, Integer> viewContents =
+        container
+            
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
+
+    assertThat(viewContents, hasEntry("one", 1));
+    assertThat(viewContents, hasEntry("two", 2));
+    assertThat(viewContents.size(), is(2));
+  }
+
+  @Test
+  public void finishOnPendingViewsSetsEmptyElements() throws Exception {
+    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+    Future<Map<String, Integer>> mapFuture =
+        getFutureOfView(
+            
container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)),
+            mapView,
+            SECOND_WINDOW);
+
+    assertThat(mapFuture.get().isEmpty(), is(true));
+  }
+
+  /**
+   * Demonstrates that calling isReady on an empty container throws an
+   * {@link IllegalArgumentException}.
+   */
+  @Test
+  public void isReadyInEmptyReaderThrows() {
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of());
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("does not contain");
+    thrown.expectMessage(ImmutableList.of().toString());
+    reader.isReady(mapView, GlobalWindow.INSTANCE);
+  }
+
+  /**
+   * Demonstrates that calling isReady returns false until elements are 
written to the
+   * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it 
returns true.
+   */
+  @Test
+  public void isReadyForSomeNotReadyViewsFalseUntilElements() {
+    container.write(
+        mapView,
+        ImmutableList.of(
+            WindowedValue.of(
+                KV.of("one", 1),
+                SECOND_WINDOW.maxTimestamp().minus(100L),
+                SECOND_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.of(mapView, 
singletonView));
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
+
+    container.write(
+        mapView,
+        ImmutableList.of(
+            WindowedValue.of(
+                KV.of("too", 2),
+                FIRST_WINDOW.maxTimestamp().minus(100L),
+                FIRST_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
+
+    container.write(
+        singletonView,
+        ImmutableList.of(
+            WindowedValue.of(
+                1.25,
+                SECOND_WINDOW.maxTimestamp().minus(100L),
+                SECOND_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+
+    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), 
is(false));
+  }
+
+  @Test
+  public void isReadyForEmptyWindowTrue() {
+    immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
+
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.of(mapView, 
singletonView));
+    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), 
is(false));
+
+    immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE);
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
+  }
+
+  /**
+   * When a callAfterWindowCloses with the specified view's producing 
transform, window, and
+   * windowing strategy is invoked, immediately execute the callback.
+   */
+  private void immediatelyInvokeCallback(PCollectionView<?> view, 
BoundedWindow window) {
+    doAnswer(
+            new Answer<Void>() {
+              @Override
+              public Void answer(InvocationOnMock invocation) throws Throwable 
{
+                Object callback = invocation.getArguments()[3];
+                Runnable callbackRunnable = (Runnable) callback;
+                callbackRunnable.run();
+                return null;
+              }
+            })
+        .when(context)
+        .scheduleAfterOutputWouldBeProduced(
+            Mockito.eq(view),
+            Mockito.eq(window),
+            Mockito.eq(view.getWindowingStrategyInternal()),
+            Mockito.any(Runnable.class));
+  }
+
+  private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader 
myReader,
+      final PCollectionView<ValueT> view, final BoundedWindow window) {
+    Callable<ValueT> callable = new Callable<ValueT>() {
+      @Override
+      public ValueT call() throws Exception {
+        return myReader.get(view, window);
+      }
+    };
+    return Executors.newSingleThreadExecutor().submit(callable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
new file mode 100644
index 0000000..34a8980
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import 
org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import 
org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link InProcessTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessTimerInternalsTest {
+  private MockClock clock;
+  @Mock private TransformWatermarks watermarks;
+
+  private TimerUpdateBuilder timerUpdateBuilder;
+
+  private InProcessTimerInternals internals;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    clock = MockClock.fromInstant(new Instant(0));
+
+    timerUpdateBuilder = TimerUpdate.builder(1234);
+
+    internals = InProcessTimerInternals.create(clock, watermarks, 
timerUpdateBuilder);
+  }
+
+  @Test
+  public void setTimerAddsToBuilder() {
+    TimerData eventTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(20145L), 
TimeDomain.EVENT_TIME);
+    TimerData processingTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(125555555L), 
TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTimer =
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(98745632189L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    internals.setTimer(eventTimer);
+    internals.setTimer(processingTimer);
+    internals.setTimer(synchronizedProcessingTimer);
+
+    assertThat(
+        internals.getTimerUpdate().getSetTimers(),
+        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, 
processingTimer));
+  }
+
+  @Test
+  public void deleteTimerDeletesOnBuilder() {
+    TimerData eventTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(20145L), 
TimeDomain.EVENT_TIME);
+    TimerData processingTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(125555555L), 
TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTimer =
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(98745632189L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    internals.deleteTimer(eventTimer);
+    internals.deleteTimer(processingTimer);
+    internals.deleteTimer(synchronizedProcessingTimer);
+
+    assertThat(
+        internals.getTimerUpdate().getDeletedTimers(),
+        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, 
processingTimer));
+  }
+
+  @Test
+  public void getProcessingTimeIsClockNow() {
+    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
+    Instant oldProcessingTime = internals.currentProcessingTime();
+
+    clock.advance(Duration.standardHours(12));
+
+    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
+    assertThat(
+        internals.currentProcessingTime(),
+        equalTo(oldProcessingTime.plus(Duration.standardHours(12))));
+  }
+
+  @Test
+  public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() {
+    when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new 
Instant(12345L));
+    assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new 
Instant(12345L)));
+  }
+
+  @Test
+  public void getInputWatermarkTimeUsesWatermarkTime() {
+    when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L));
+    assertThat(internals.currentInputWatermarkTime(), equalTo(new 
Instant(8765L)));
+  }
+
+  @Test
+  public void getOutputWatermarkTimeUsesWatermarkTime() {
+    when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L));
+    assertThat(internals.currentOutputWatermarkTime(), equalTo(new 
Instant(25525L)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
new file mode 100644
index 0000000..24f9715
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Tests for {@link KeyedPValueTrackingVisitor}.
+ */
+@RunWith(JUnit4.class)
+public class KeyedPValueTrackingVisitorTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private KeyedPValueTrackingVisitor visitor;
+  private Pipeline p;
+
+  @Before
+  public void setup() {
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    p = Pipeline.create(options);
+    @SuppressWarnings("rawtypes")
+    Set<Class<? extends PTransform>> producesKeyed =
+        ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, 
CompositeKeyer.class);
+    visitor = KeyedPValueTrackingVisitor.create(producesKeyed);
+  }
+
+  @Test
+  public void primitiveProducesKeyedOutputUnkeyedInputKeyedOutput() {
+    PCollection<Integer> keyed =
+        p.apply(Create.<Integer>of(1, 2, 3)).apply(new 
PrimitiveKeyer<Integer>());
+
+    p.traverseTopologically(visitor);
+    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+  }
+
+  @Test
+  public void primitiveProducesKeyedOutputKeyedInputKeyedOutut() {
+    PCollection<Integer> keyed =
+        p.apply(Create.<Integer>of(1, 2, 3))
+            .apply("firstKey", new PrimitiveKeyer<Integer>())
+            .apply("secondKey", new PrimitiveKeyer<Integer>());
+
+    p.traverseTopologically(visitor);
+    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+  }
+
+  @Test
+  public void compositeProducesKeyedOutputUnkeyedInputKeyedOutput() {
+    PCollection<Integer> keyed =
+        p.apply(Create.<Integer>of(1, 2, 3)).apply(new 
CompositeKeyer<Integer>());
+
+    p.traverseTopologically(visitor);
+    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+  }
+
+  @Test
+  public void compositeProducesKeyedOutputKeyedInputKeyedOutut() {
+    PCollection<Integer> keyed =
+        p.apply(Create.<Integer>of(1, 2, 3))
+            .apply("firstKey", new CompositeKeyer<Integer>())
+            .apply("secondKey", new CompositeKeyer<Integer>());
+
+    p.traverseTopologically(visitor);
+    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+  }
+
+
+  @Test
+  public void noInputUnkeyedOutput() {
+    PCollection<KV<Integer, Iterable<Void>>> unkeyed =
+        p.apply(
+            Create.of(KV.<Integer, Iterable<Void>>of(-1, 
Collections.<Void>emptyList()))
+                .withCoder(KvCoder.of(VarIntCoder.of(), 
IterableCoder.of(VoidCoder.of()))));
+
+    p.traverseTopologically(visitor);
+    assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
+  }
+
+  @Test
+  public void keyedInputNotProducesKeyedOutputUnkeyedOutput() {
+    PCollection<Integer> onceKeyed =
+        p.apply(Create.<Integer>of(1, 2, 3))
+            .apply(new PrimitiveKeyer<Integer>())
+            .apply(ParDo.of(new IdentityFn<Integer>()));
+
+    p.traverseTopologically(visitor);
+    assertThat(visitor.getKeyedPValues(), not(hasItem(onceKeyed)));
+  }
+
+  @Test
+  public void unkeyedInputNotProducesKeyedOutputUnkeyedOutput() {
+    PCollection<Integer> unkeyed =
+        p.apply(Create.<Integer>of(1, 2, 3)).apply(ParDo.of(new 
IdentityFn<Integer>()));
+
+    p.traverseTopologically(visitor);
+    assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
+  }
+
+  @Test
+  public void traverseMultipleTimesThrows() {
+    p.apply(
+            Create.<KV<Integer, Void>>of(
+                    KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, 
(Void) null))
+                .withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of())))
+        .apply(GroupByKey.<Integer, Void>create())
+        .apply(Keys.<Integer>create());
+
+    p.traverseTopologically(visitor);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("already been finalized");
+    thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName());
+    p.traverseTopologically(visitor);
+  }
+
+  @Test
+  public void getKeyedPValuesBeforeTraverseThrows() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("completely traversed");
+    thrown.expectMessage("getKeyedPValues");
+    visitor.getKeyedPValues();
+  }
+
+  private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, 
PCollection<K>> {
+    @Override
+    public PCollection<K> apply(PCollection<K> input) {
+      return PCollection.<K>createPrimitiveOutputInternal(
+              input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded())
+          .setCoder(input.getCoder());
+    }
+  }
+
+  private static class CompositeKeyer<K> extends PTransform<PCollection<K>, 
PCollection<K>> {
+    @Override
+    public PCollection<K> apply(PCollection<K> input) {
+      return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new 
IdentityFn<K>()));
+    }
+  }
+
+  private static class IdentityFn<K> extends DoFn<K, K> {
+    @Override
+    public void processElement(DoFn<K, K>.ProcessContext c) throws Exception {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
new file mode 100644
index 0000000..11ecbff
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A clock that returns a constant value for now which can be set with calls to
+ * {@link #set(Instant)}.
+ *
+ * <p>For uses of the {@link Clock} interface in unit tests.
+ */
+public class MockClock implements Clock {
+
+  private Instant now;
+
+  public static MockClock fromInstant(Instant initial) {
+    return new MockClock(initial);
+  }
+
+  private MockClock(Instant initialNow) {
+    this.now = initialNow;
+  }
+
+  public void set(Instant newNow) {
+    checkArgument(!newNow.isBefore(now), "Cannot move MockClock backwards in 
time from %s to %s",
+        now, newNow);
+    this.now = newNow;
+  }
+
+  public void advance(Duration duration) {
+    checkArgument(
+        duration.getMillis() > 0,
+        "Cannot move MockClock backwards in time by duration %s",
+        duration);
+    set(now.plus(duration));
+  }
+
+  @Override
+  public Instant now() {
+    return now;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
new file mode 100644
index 0000000..cecfe01
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link ParDoMultiEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoMultiEvaluatorFactoryTest implements Serializable {
+  private transient BundleFactory bundleFactory = 
InProcessBundleFactory.create();
+
+  @Test
+  public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, 
Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+    final TupleTag<Integer> lengthTag = new TupleTag<>();
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(KV.<String, Integer>of(c.element(), 
c.element().length()));
+                    c.sideOutput(elementTag, c.element());
+                    c.sideOutput(lengthTag, c.element().length());
+                  }
+                })
+            .withOutputTags(mainOutputTag, 
TupleTagList.of(elementTag).and(lengthTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = 
outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+    PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = 
bundleFactory.createRootBundle(elementOutput);
+    UncommittedBundle<Integer> lengthOutputBundle = 
bundleFactory.createRootBundle(lengthOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, elementOutput))
+        .thenReturn(elementOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, 
lengthOutput)).thenReturn(lengthOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, null, null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(
+            lengthOutputBundle, mainOutputBundle, elementOutputBundle));
+    assertThat(result.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getCounters(), equalTo(counters));
+
+    assertThat(
+        mainOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
+            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new 
Instant(1000)),
+            WindowedValue.valueInGlobalWindow(
+                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(
+        elementOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<String>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow("foo"),
+            WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)),
+            WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(
+        lengthOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(3),
+            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
+            WindowedValue.valueInGlobalWindow(5, 
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void testParDoMultiUndeclaredSideOutput() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, 
Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+    final TupleTag<Integer> lengthTag = new TupleTag<>();
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(KV.<String, Integer>of(c.element(), 
c.element().length()));
+                    c.sideOutput(elementTag, c.element());
+                    c.sideOutput(lengthTag, c.element().length());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = 
outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = 
bundleFactory.createRootBundle(elementOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, elementOutput))
+        .thenReturn(elementOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, null, null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, 
elementOutputBundle));
+    assertThat(result.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getCounters(), equalTo(counters));
+
+    assertThat(
+        mainOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
+            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new 
Instant(1000)),
+            WindowedValue.valueInGlobalWindow(
+                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(
+        elementOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<String>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow("foo"),
+            WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)),
+            WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void finishBundleWithStatePutsStateInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, 
Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+
+    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
+        StateTags.watermarkStateInternal("myId", 
OutputTimeFns.outputAtEndOfWindow());
+    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", 
StringUtf8Coder.of());
+    final StateNamespace windowNs =
+        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE);
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.windowingInternals()
+                        .stateInternals()
+                        .state(StateNamespaces.global(), watermarkTag)
+                        .add(new Instant(20202L + c.element().length()));
+                    c.windowingInternals()
+                        .stateInternals()
+                        .state(
+                            StateNamespaces.window(
+                                GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE),
+                            bagTag)
+                        .add(c.element());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = 
outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = 
bundleFactory.createRootBundle(elementOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, elementOutput))
+        .thenReturn(elementOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, 
elementOutputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
+    assertThat(result.getState(), not(nullValue()));
+    assertThat(
+        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
+        equalTo(new Instant(20205L)));
+    assertThat(
+        result.getState().state(windowNs, bagTag).read(),
+        containsInAnyOrder("foo", "bara", "bazam"));
+  }
+
+  @Test
+  public void finishBundleWithStateAndTimersPutsTimersInResult() throws 
Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, 
Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+
+    final TimerData addedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(
+                    new Instant(0).plus(Duration.standardMinutes(5)),
+                    new Instant(1)
+                        .plus(Duration.standardMinutes(5))
+                        .plus(Duration.standardHours(1)))),
+            new Instant(54541L),
+            TimeDomain.EVENT_TIME);
+    final TimerData deletedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardHours(1)))),
+            new Instant(3400000),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.windowingInternals().stateInternals();
+                    c.windowingInternals()
+                        .timerInternals()
+                        .setTimer(
+                            TimerData.of(
+                                StateNamespaces.window(
+                                    IntervalWindow.getCoder(),
+                                    new IntervalWindow(
+                                        new 
Instant(0).plus(Duration.standardMinutes(5)),
+                                        new Instant(1)
+                                            .plus(Duration.standardMinutes(5))
+                                            .plus(Duration.standardHours(1)))),
+                                new Instant(54541L),
+                                TimeDomain.EVENT_TIME));
+                    c.windowingInternals()
+                        .timerInternals()
+                        .deleteTimer(
+                            TimerData.of(
+                                StateNamespaces.window(
+                                    IntervalWindow.getCoder(),
+                                    new IntervalWindow(
+                                        new Instant(0),
+                                        new 
Instant(0).plus(Duration.standardHours(1)))),
+                                new Instant(3400000),
+                                TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = 
outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = 
bundleFactory.createRootBundle(elementOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, elementOutput))
+        .thenReturn(elementOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getTimerUpdate(),
+        equalTo(
+            TimerUpdate.builder("myKey")
+                .setTimer(addedTimer)
+                .setTimer(addedTimer)
+                .setTimer(addedTimer)
+                .deletedTimer(deletedTimer)
+                .deletedTimer(deletedTimer)
+                .deletedTimer(deletedTimer)
+                .build()));
+  }
+}

Reply via email to