http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
deleted file mode 100644
index a394090..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read.Bounded;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
- * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
- */
-final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  /*
-   * An evaluator for a Source is stateful, to ensure data is not read 
multiple times.
-   * Evaluators are cached here to ensure that the reader is not restarted if 
the evaluator is
-   * retriggered.
-   */
-  private final ConcurrentMap<EvaluatorKey, Queue<? extends 
BoundedReadEvaluator<?>>>
-      sourceEvaluators = new ConcurrentHashMap<>();
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
-      throws IOException {
-    return getTransformEvaluator((AppliedPTransform) application, 
evaluationContext);
-  }
-
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform,
-      final InProcessEvaluationContext evaluationContext) {
-    BoundedReadEvaluator<?> evaluator =
-        getTransformEvaluatorQueue(transform, evaluationContext).poll();
-    if (evaluator == null) {
-      return EmptyTransformEvaluator.create(transform);
-    }
-    return evaluator;
-  }
-
-  /**
-   * Get the queue of {@link TransformEvaluator TransformEvaluators} that 
produce elements for the
-   * provided application of {@link Bounded Read.Bounded}, initializing it if 
required.
-   *
-   * <p>This method is thread-safe, and will only produce new evaluators if no 
other invocation has
-   * already done so.
-   */
-  @SuppressWarnings("unchecked")
-  private <OutputT> Queue<BoundedReadEvaluator<OutputT>> 
getTransformEvaluatorQueue(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform,
-      final InProcessEvaluationContext evaluationContext) {
-    // Key by the application and the context the evaluation is occurring in 
(which call to
-    // Pipeline#run).
-    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
-    Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-    if (evaluatorQueue == null) {
-      evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
-        // If no queue existed in the evaluators, add an evaluator to 
initialize the evaluator
-        // factory for this transform
-        BoundedSource<OutputT> source = transform.getTransform().getSource();
-        BoundedReadEvaluator<OutputT> evaluator =
-            new BoundedReadEvaluator<OutputT>(transform, evaluationContext, 
source);
-        evaluatorQueue.offer(evaluator);
-      } else {
-        // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) 
sourceEvaluators.get(key);
-      }
-    }
-    return evaluatorQueue;
-  }
-
-  /**
-   * A {@link BoundedReadEvaluator} produces elements from an underlying 
{@link BoundedSource},
-   * discarding all input elements. Within the call to {@link 
#finishBundle()}, the evaluator
-   * creates the {@link BoundedReader} and consumes all available input.
-   *
-   * <p>A {@link BoundedReadEvaluator} should only be created once per {@link 
BoundedSource}, and
-   * each evaluator should only be called once per evaluation of the pipeline. 
Otherwise, the source
-   * may produce duplicate elements.
-   */
-  private static class BoundedReadEvaluator<OutputT> implements 
TransformEvaluator<Object> {
-    private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform;
-    private final InProcessEvaluationContext evaluationContext;
-    /**
-     * The source being read from by this {@link BoundedReadEvaluator}. This 
may not be the same
-     * as the source derived from {@link #transform} due to splitting.
-     */
-    private BoundedSource<OutputT> source;
-
-    public BoundedReadEvaluator(
-        AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext,
-        BoundedSource<OutputT> source) {
-      this.transform = transform;
-      this.evaluationContext = evaluationContext;
-      this.source = source;
-    }
-
-    @Override
-    public void processElement(WindowedValue<Object> element) {}
-
-    @Override
-    public InProcessTransformResult finishBundle() throws IOException {
-      try (final BoundedReader<OutputT> reader =
-              source.createReader(evaluationContext.getPipelineOptions());) {
-        boolean contentsRemaining = reader.start();
-        UncommittedBundle<OutputT> output =
-            evaluationContext.createRootBundle(transform.getOutput());
-        while (contentsRemaining) {
-          output.add(
-              WindowedValue.timestampedValueInGlobalWindow(
-                  reader.getCurrent(), reader.getCurrentTimestamp()));
-          contentsRemaining = reader.advance();
-        }
-        return StepTransformResult.withHold(transform, 
BoundedWindow.TIMESTAMP_MAX_VALUE)
-            .addOutput(output)
-            .build();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java
deleted file mode 100644
index 5479b00..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import 
org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A factory that creates {@link UncommittedBundle UncommittedBundles}.
- */
-public interface BundleFactory {
-  /**
-   * Create an {@link UncommittedBundle} from an empty input. Elements added 
to the bundle belong to
-   * the {@code output} {@link PCollection}.
-   */
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);
-
-  /**
-   * Create an {@link UncommittedBundle} from the specified input. Elements 
added to the bundle
-   * belong to the {@code output} {@link PCollection}.
-   */
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, 
PCollection<T> output);
-
-  /**
-   * Create an {@link UncommittedBundle} with the specified keys at the 
specified step. For use by
-   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements 
added to the bundle
-   * belong to the {@code output} {@link PCollection}.
-   */
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
deleted file mode 100644
index 12427d9..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link ExecutorServiceFactory} that produces cached thread pools via
- * {@link Executors#newCachedThreadPool()}.
- */
-class CachedThreadPoolExecutorServiceFactory
-    implements DefaultValueFactory<ExecutorServiceFactory>, 
ExecutorServiceFactory {
-  private static final CachedThreadPoolExecutorServiceFactory INSTANCE =
-      new CachedThreadPoolExecutorServiceFactory();
-
-  @Override
-  public ExecutorServiceFactory create(PipelineOptions options) {
-    return INSTANCE;
-  }
-
-  @Override
-  public ExecutorService create() {
-    return Executors.newCachedThreadPool();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java
deleted file mode 100644
index 7a51251..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.joda.time.Instant;
-
-/**
- * Access to the current time.
- */
-public interface Clock {
-  /**
-   * Returns the current time as an {@link Instant}.
-   */
-  Instant now();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
deleted file mode 100644
index 10e9697..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.runners.inprocess;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-
-import com.google.auto.value.AutoValue;
-
-/**
- * A {@link InProcessTransformResult} that has been committed.
- */
-@AutoValue
-abstract class CommittedResult {
-  /**
-   * Returns the {@link AppliedPTransform} that produced this result.
-   */
-  public abstract AppliedPTransform<?, ?, ?> getTransform();
-
-  /**
-   * Returns the outputs produced by the transform.
-   */
-  public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
-
-  public static CommittedResult create(
-      InProcessTransformResult original, Iterable<? extends 
CommittedBundle<?>> outputs) {
-    return new AutoValue_CommittedResult(original.getTransform(),
-        outputs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
deleted file mode 100644
index 30a2b92..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-
-/**
- * A callback for completing a bundle of input.
- */
-interface CompletionCallback {
-  /**
-   * Handle a successful result, returning the committed outputs of the result.
-   */
-  CommittedResult handleResult(
-      CommittedBundle<?> inputBundle, InProcessTransformResult result);
-
-  /**
-   * Handle a result that terminated abnormally due to the provided {@link 
Throwable}.
-   */
-  void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
deleted file mode 100644
index f6ea4af..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.PValue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each 
{@link PValue} in the
- * {@link Pipeline}. This is used to schedule consuming {@link PTransform 
PTransforms} to consume
- * input after the upstream transform has produced and committed output.
- */
-public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
-  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers 
= new HashMap<>();
-  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new 
ArrayList<>();
-  private Collection<PCollectionView<?>> views = new ArrayList<>();
-  private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-  private Set<PValue> toFinalize = new HashSet<>();
-  private int numTransforms = 0;
-  private boolean finalized = false;
-
-  @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
-    checkState(
-        !finalized,
-        "Attempting to traverse a pipeline (node %s) with a %s "
-            + "which has already visited a Pipeline and is finalized",
-        node.getFullName(),
-        ConsumerTrackingPipelineVisitor.class.getSimpleName());
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-    checkState(
-        !finalized,
-        "Attempting to traverse a pipeline (node %s) with a %s which is 
already finalized",
-        node.getFullName(),
-        ConsumerTrackingPipelineVisitor.class.getSimpleName());
-    if (node.isRootNode()) {
-      finalized = true;
-    }
-  }
-
-  @Override
-  public void visitTransform(TransformTreeNode node) {
-    toFinalize.removeAll(node.getInput().expand());
-    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
-    stepNames.put(appliedTransform, genStepName());
-    if (node.getInput().expand().isEmpty()) {
-      rootTransforms.add(appliedTransform);
-    } else {
-      for (PValue value : node.getInput().expand()) {
-        valueToConsumers.get(value).add(appliedTransform);
-      }
-    }
-  }
-
-  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformTreeNode 
node) {
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
node.getTransform());
-    return application;
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    toFinalize.add(value);
-    for (PValue expandedValue : value.expand()) {
-      valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, 
?, ?>>());
-      if (expandedValue instanceof PCollectionView) {
-        views.add((PCollectionView<?>) expandedValue);
-      }
-      expandedValue.recordAsOutput(getAppliedTransform(producer));
-    }
-    value.recordAsOutput(getAppliedTransform(producer));
-  }
-
-  private String genStepName() {
-    return String.format("s%s", numTransforms++);
-  }
-
-
-  /**
-   * Returns a mapping of each fully-expanded {@link PValue} to each
-   * {@link AppliedPTransform} that consumes it. For each AppliedPTransform in 
the collection
-   * returned from {@code getValueToCustomers().get(PValue)},
-   * {@code AppliedPTransform#getInput().expand()} will contain the argument 
{@link PValue}.
-   */
-  public Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
getValueToConsumers() {
-    checkState(
-        finalized,
-        "Can't call getValueToConsumers before the Pipeline has been 
completely traversed");
-
-    return valueToConsumers;
-  }
-
-  /**
-   * Returns the mapping for each {@link AppliedPTransform} in the {@link 
Pipeline} to a unique step
-   * name.
-   */
-  public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
-    checkState(
-        finalized, "Can't call getStepNames before the Pipeline has been 
completely traversed");
-
-    return stepNames;
-  }
-
-  /**
-   * Returns the root transforms of the {@link Pipeline}. A root {@link 
AppliedPTransform} consumes
-   * a {@link PInput} where the {@link PInput#expand()} returns an empty 
collection.
-   */
-  public Collection<AppliedPTransform<?, ?, ?>> getRootTransforms() {
-    checkState(
-        finalized,
-        "Can't call getRootTransforms before the Pipeline has been completely 
traversed");
-
-    return rootTransforms;
-  }
-
-  /**
-   * Returns all of the {@link PCollectionView PCollectionViews} contained in 
the visited
-   * {@link Pipeline}.
-   */
-  public Collection<PCollectionView<?>> getViews() {
-    checkState(finalized, "Can't call getViews before the Pipeline has been 
completely traversed");
-
-    return views;
-  }
-
-  /**
-   * Returns all of the {@link PValue PValues} that have been produced but not 
consumed. These
-   * {@link PValue PValues} should be finalized by the {@link PipelineRunner} 
before the
-   * {@link Pipeline} is executed.
-   */
-  public Set<PValue> getUnfinalizedPValues() {
-    checkState(
-        finalized,
-        "Can't call getUnfinalizedPValues before the Pipeline has been 
completely traversed");
-
-    return toFinalize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java
deleted file mode 100644
index d198903..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * A {@link TransformEvaluator} that ignores all input and produces no output. 
The result of
- * invoking {@link #finishBundle()} on this evaluator is to return an
- * {@link InProcessTransformResult} with no elements and a timestamp hold 
equal to
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no 
elements, this hold
- * will not affect the watermark.
- */
-final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
-  public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> 
transform) {
-    return new EmptyTransformEvaluator<T>(transform);
-  }
-
-  private final AppliedPTransform<?, ?, ?> transform;
-
-  private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) {
-    this.transform = transform;
-  }
-
-  @Override
-  public void processElement(WindowedValue<T> element) throws Exception {}
-
-  @Override
-  public InProcessTransformResult finishBundle() throws Exception {
-    return StepTransformResult.withHold(transform, 
BoundedWindow.TIMESTAMP_MIN_VALUE)
-        .build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
deleted file mode 100644
index d234d4f..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Enforces that all elements in a {@link PCollection} can be encoded using 
that
- * {@link PCollection PCollection's} {@link Coder}.
- */
-class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
-  public static EncodabilityEnforcementFactory create() {
-    return new EncodabilityEnforcementFactory();
-  }
-
-  @Override
-  public <T> ModelEnforcement<T> forBundle(
-      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
-    return new EncodabilityEnforcement<>(input);
-  }
-
-  private static class EncodabilityEnforcement<T> extends 
AbstractModelEnforcement<T> {
-    private Coder<T> coder;
-
-    public EncodabilityEnforcement(CommittedBundle<T> input) {
-      coder = input.getPCollection().getCoder();
-    }
-
-    @Override
-    public void beforeElement(WindowedValue<T> element) {
-      try {
-        T clone = CoderUtils.clone(coder, element.getValue());
-        if (coder.consistentWithEquals()) {
-          checkArgument(
-              
coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
-              "Coder %s of class %s does not maintain structural value 
equality"
-                  + " on input element %s",
-              coder,
-              coder.getClass().getSimpleName(),
-              element.getValue());
-        }
-      } catch (Exception e) {
-        throw UserCodeException.wrap(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java
deleted file mode 100644
index 9d8fc43..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-
-import java.util.Objects;
-
-/**
- * A (Transform, Pipeline Execution) key for stateful evaluators.
- *
- * Source evaluators are stateful to ensure data is not read multiple times. 
Evaluators are cached
- * to ensure that the reader is not restarted if the evaluator is retriggered. 
An
- * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be 
executed without sharing
- * the same evaluators.
- */
-final class EvaluatorKey {
-  private final AppliedPTransform<?, ?, ?> transform;
-  private final InProcessEvaluationContext context;
-
-  public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, 
InProcessEvaluationContext context) {
-    this.transform = transform;
-    this.context = context;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(transform, context);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof EvaluatorKey)) {
-      return false;
-    }
-    EvaluatorKey that = (EvaluatorKey) other;
-    return Objects.equals(this.transform, that.transform)
-        && Objects.equals(this.context, that.context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java
deleted file mode 100644
index cfbf7b4..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import java.util.concurrent.ExecutorService;
-
-/**
- * A factory that creates {@link ExecutorService ExecutorServices}.
- * {@link ExecutorService ExecutorServices} created by this factory should be 
independent of one
- * another (e.g., if any executor is shut down the remaining executors should 
continue to process
- * work).
- */
-public interface ExecutorServiceFactory {
-  /**
-   * Create a new {@link ExecutorService}.
-   */
-  ExecutorService create();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
deleted file mode 100644
index 19bf35d..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.Pipeline;
-import 
org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-
-import javax.annotation.Nullable;
-
-/**
- * An {@link InProcessExecutor} that uses an underlying {@link 
ExecutorService} and
- * {@link InProcessEvaluationContext} to execute a {@link Pipeline}.
- */
-final class ExecutorServiceParallelExecutor implements InProcessExecutor {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
-
-  private final ExecutorService executorService;
-
-  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
valueToConsumers;
-  private final Set<PValue> keyedPValues;
-  private final TransformEvaluatorRegistry registry;
-  @SuppressWarnings("rawtypes")
-  private final Map<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>
-      transformEnforcements;
-
-  private final InProcessEvaluationContext evaluationContext;
-
-  private final LoadingCache<StepAndKey, TransformExecutorService> 
executorServices;
-  private final ConcurrentMap<TransformExecutor<?>, Boolean> 
scheduledExecutors;
-
-  private final Queue<ExecutorUpdate> allUpdates;
-  private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
-
-  private final TransformExecutorService parallelExecutorService;
-  private final CompletionCallback defaultCompletionCallback;
-
-  private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
-
-  public static ExecutorServiceParallelExecutor create(
-      ExecutorService executorService,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Set<PValue> keyedPValues,
-      TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
transformEnforcements,
-      InProcessEvaluationContext context) {
-    return new ExecutorServiceParallelExecutor(
-        executorService, valueToConsumers, keyedPValues, registry, 
transformEnforcements, context);
-  }
-
-  private ExecutorServiceParallelExecutor(
-      ExecutorService executorService,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Set<PValue> keyedPValues,
-      TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
transformEnforcements,
-      InProcessEvaluationContext context) {
-    this.executorService = executorService;
-    this.valueToConsumers = valueToConsumers;
-    this.keyedPValues = keyedPValues;
-    this.registry = registry;
-    this.transformEnforcements = transformEnforcements;
-    this.evaluationContext = context;
-
-    scheduledExecutors = new ConcurrentHashMap<>();
-    // Weak Values allows TransformExecutorServices that are no longer in use 
to be reclaimed.
-    // Executing TransformExecutorServices have a strong reference to their 
TransformExecutorService
-    // which stops the TransformExecutorServices from being prematurely 
garbage collected
-    executorServices =
-        
CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
-
-    this.allUpdates = new ConcurrentLinkedQueue<>();
-    this.visibleUpdates = new ArrayBlockingQueue<>(20);
-
-    parallelExecutorService =
-        TransformExecutorServices.parallel(executorService, 
scheduledExecutors);
-    defaultCompletionCallback = new DefaultCompletionCallback();
-  }
-
-  private CacheLoader<StepAndKey, TransformExecutorService>
-      serialTransformExecutorServiceCacheLoader() {
-    return new CacheLoader<StepAndKey, TransformExecutorService>() {
-      @Override
-      public TransformExecutorService load(StepAndKey stepAndKey) throws 
Exception {
-        return TransformExecutorServices.serial(executorService, 
scheduledExecutors);
-      }
-    };
-  }
-
-  @Override
-  public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
-    rootNodes = ImmutableList.copyOf(roots);
-    Runnable monitorRunnable = new MonitorRunnable();
-    executorService.submit(monitorRunnable);
-  }
-
-  @SuppressWarnings("unchecked")
-  public void scheduleConsumption(
-      AppliedPTransform<?, ?, ?> consumer,
-      @Nullable CommittedBundle<?> bundle,
-      CompletionCallback onComplete) {
-    evaluateBundle(consumer, bundle, onComplete);
-  }
-
-  private <T> void evaluateBundle(
-      final AppliedPTransform<?, ?, ?> transform,
-      @Nullable final CommittedBundle<T> bundle,
-      final CompletionCallback onComplete) {
-    TransformExecutorService transformExecutor;
-
-    if (bundle != null && isKeyed(bundle.getPCollection())) {
-      final StepAndKey stepAndKey =
-          StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
-      // This executor will remain reachable until it has executed all 
scheduled transforms.
-      // The TransformExecutors keep a strong reference to the Executor, the 
ExecutorService keeps
-      // a reference to the scheduled TransformExecutor callable. Follow-up 
TransformExecutors
-      // (scheduled due to the completion of another TransformExecutor) are 
provided to the
-      // ExecutorService before the Earlier TransformExecutor callable 
completes.
-      transformExecutor = executorServices.getUnchecked(stepAndKey);
-    } else {
-      transformExecutor = parallelExecutorService;
-    }
-
-    Collection<ModelEnforcementFactory> enforcements =
-        MoreObjects.firstNonNull(
-            transformEnforcements.get(transform.getTransform().getClass()),
-            Collections.<ModelEnforcementFactory>emptyList());
-
-    TransformExecutor<T> callable =
-        TransformExecutor.create(
-            registry,
-            enforcements,
-            evaluationContext,
-            bundle,
-            transform,
-            onComplete,
-            transformExecutor);
-    transformExecutor.schedule(callable);
-  }
-
-  private boolean isKeyed(PValue pvalue) {
-    return keyedPValues.contains(pvalue);
-  }
-
-  private void scheduleConsumers(CommittedBundle<?> bundle) {
-    for (AppliedPTransform<?, ?, ?> consumer : 
valueToConsumers.get(bundle.getPCollection())) {
-      scheduleConsumption(consumer, bundle, defaultCompletionCallback);
-    }
-  }
-
-  @Override
-  public void awaitCompletion() throws Throwable {
-    VisibleExecutorUpdate update;
-    do {
-      update = visibleUpdates.take();
-      if (update.throwable.isPresent()) {
-        throw update.throwable.get();
-      }
-    } while (!update.isDone());
-    executorService.shutdown();
-  }
-
-  /**
-   * The default {@link CompletionCallback}. The default completion callback 
is used to complete
-   * transform evaluations that are triggered due to the arrival of elements 
from an upstream
-   * transform, or for a source transform.
-   */
-  private class DefaultCompletionCallback implements CompletionCallback {
-    @Override
-    public CommittedResult handleResult(
-        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, 
Collections.<TimerData>emptyList(), result);
-      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
-      }
-      return committedResult;
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
-    }
-  }
-
-  /**
-   * A {@link CompletionCallback} where the completed bundle was produced to 
deliver some collection
-   * of {@link TimerData timers}. When the evaluator completes successfully, 
reports all of the
-   * timers used to create the input to the {@link InProcessEvaluationContext 
evaluation context}
-   * as part of the result.
-   */
-  private class TimerCompletionCallback implements CompletionCallback {
-    private final Iterable<TimerData> timers;
-
-    private TimerCompletionCallback(Iterable<TimerData> timers) {
-      this.timers = timers;
-    }
-
-    @Override
-    public CommittedResult handleResult(
-        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, timers, result);
-      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
-      }
-      return committedResult;
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
-    }
-  }
-
-  /**
-   * An internal status update on the state of the executor.
-   *
-   * Used to signal when the executor should be shut down (due to an 
exception).
-   */
-  private static class ExecutorUpdate {
-    private final Optional<? extends CommittedBundle<?>> bundle;
-    private final Optional<? extends Throwable> throwable;
-
-    public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
-      return new ExecutorUpdate(bundle, null);
-    }
-
-    public static ExecutorUpdate fromThrowable(Throwable t) {
-      return new ExecutorUpdate(null, t);
-    }
-
-    private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable 
throwable) {
-      this.bundle = Optional.fromNullable(producedBundle);
-      this.throwable = Optional.fromNullable(throwable);
-    }
-
-    public Optional<? extends CommittedBundle<?>> getBundle() {
-      return bundle;
-    }
-
-    public Optional<? extends Throwable> getException() {
-      return throwable;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(ExecutorUpdate.class)
-          .add("bundle", bundle)
-          .add("exception", throwable)
-          .toString();
-    }
-  }
-
-  /**
-   * An update of interest to the user. Used in {@link #awaitCompletion} to 
decide whether to
-   * return normally or throw an exception.
-   */
-  private static class VisibleExecutorUpdate {
-    private final Optional<? extends Throwable> throwable;
-    private final boolean done;
-
-    public static VisibleExecutorUpdate fromThrowable(Throwable e) {
-      return new VisibleExecutorUpdate(false, e);
-    }
-
-    public static VisibleExecutorUpdate finished() {
-      return new VisibleExecutorUpdate(true, null);
-    }
-
-    private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) 
{
-      this.throwable = Optional.fromNullable(exception);
-      this.done = done;
-    }
-
-    public boolean isDone() {
-      return done;
-    }
-  }
-
-  private class MonitorRunnable implements Runnable {
-    private final String runnableName =
-        String.format(
-            "%s$%s-monitor",
-            evaluationContext.getPipelineOptions().getAppName(),
-            ExecutorServiceParallelExecutor.class.getSimpleName());
-
-    @Override
-    public void run() {
-      String oldName = Thread.currentThread().getName();
-      Thread.currentThread().setName(runnableName);
-      try {
-        ExecutorUpdate update = allUpdates.poll();
-        // pull all of the pending work off of the queue
-        while (update != null) {
-          LOG.debug("Executor Update: {}", update);
-          if (update.getBundle().isPresent()) {
-            scheduleConsumers(update.getBundle().get());
-          } else if (update.getException().isPresent()) {
-            
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
-          }
-          update = allUpdates.poll();
-        }
-        boolean timersFired = fireTimers();
-        addWorkIfNecessary(timersFired);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.error("Monitor died due to being interrupted");
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
-          visibleUpdates.poll();
-        }
-      } catch (Throwable t) {
-        LOG.error("Monitor thread died due to throwable", t);
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
-          visibleUpdates.poll();
-        }
-      } finally {
-        if (!shouldShutdown()) {
-          // The monitor thread should always be scheduled; but we only need 
to be scheduled once
-          executorService.submit(this);
-        }
-        Thread.currentThread().setName(oldName);
-      }
-    }
-
-    /**
-     * Fires any available timers. Returns true if at least one timer was 
fired.
-     */
-    private boolean fireTimers() throws Exception {
-      try {
-        boolean firedTimers = false;
-        for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
transformTimers :
-            evaluationContext.extractFiredTimers().entrySet()) {
-          AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
-          for (Map.Entry<Object, FiredTimers> keyTimers : 
transformTimers.getValue().entrySet()) {
-            for (TimeDomain domain : TimeDomain.values()) {
-              Collection<TimerData> delivery = 
keyTimers.getValue().getTimers(domain);
-              if (delivery.isEmpty()) {
-                continue;
-              }
-              KeyedWorkItem<Object, Object> work =
-                  KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
-              @SuppressWarnings({"unchecked", "rawtypes"})
-              CommittedBundle<?> bundle =
-                  evaluationContext
-                      .createKeyedBundle(
-                          null, keyTimers.getKey(), (PCollection) 
transform.getInput())
-                      .add(WindowedValue.valueInEmptyWindows(work))
-                      .commit(Instant.now());
-              scheduleConsumption(transform, bundle, new 
TimerCompletionCallback(delivery));
-              firedTimers = true;
-            }
-          }
-        }
-        return firedTimers;
-      } catch (Exception e) {
-        LOG.error("Internal Error while delivering timers", e);
-        throw e;
-      }
-    }
-
-    private boolean shouldShutdown() {
-      if (evaluationContext.isDone()) {
-        LOG.debug("Pipeline is finished. Shutting down. {}");
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
-          visibleUpdates.poll();
-        }
-        executorService.shutdown();
-        return true;
-      }
-      return false;
-    }
-
-    /**
-     * If all active {@link TransformExecutor TransformExecutors} are in a 
blocked state,
-     * add more work from root nodes that may have additional work. This 
ensures that if a pipeline
-     * has elements available from the root nodes it will add those elements 
when necessary.
-     */
-    private void addWorkIfNecessary(boolean firedTimers) {
-      // If any timers have fired, they will add more work; We don't need to 
add more
-      if (firedTimers) {
-        return;
-      }
-      for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
-        if (!isExecutorBlocked(executor)) {
-          // We have at least one executor that can proceed without adding 
additional work
-          return;
-        }
-      }
-      // All current TransformExecutors are blocked; add more work from the 
roots.
-      for (AppliedPTransform<?, ?, ?> root : rootNodes) {
-        if (!evaluationContext.isDone(root)) {
-          scheduleConsumption(root, null, defaultCompletionCallback);
-        }
-      }
-    }
-
-    /**
-     * Return true if the provided executor might make more progress if no 
action is taken.
-     *
-     * <p>May return false even if all executor threads are currently blocked 
or cleaning up, as
-     * these can cause more work to be scheduled. If this does not occur, 
after these calls
-     * terminate, future calls will return true if all executors are waiting.
-     */
-    private boolean isExecutorBlocked(TransformExecutor<?> executor) {
-      Thread thread = executor.getThread();
-      if (thread == null) {
-        return false;
-      }
-      switch (thread.getState()) {
-        case TERMINATED:
-          throw new IllegalStateException(String.format(
-              "Unexpectedly encountered a Terminated TransformExecutor %s", 
executor));
-        case WAITING:
-        case TIMED_WAITING:
-          // The thread is waiting for some external input. Adding more work 
may cause the thread
-          // to stop waiting (e.g. the thread is waiting on an unbounded side 
input)
-          return true;
-        case BLOCKED:
-          // The executor is blocked on acquisition of a java monitor. This 
usually means it is
-          // making a call to the EvaluationContext, but not a model-blocking 
call - and will
-          // eventually complete, at which point we may reevaluate.
-        default:
-          // NEW and RUNNABLE threads can make progress
-          return false;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java
deleted file mode 100644
index 4e23dde..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for 
the {@link Flatten}
- * {@link PTransform}.
- */
-class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) 
createInMemoryEvaluator(
-            (AppliedPTransform) application, inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
-      final AppliedPTransform<
-              PCollectionList<InputT>, PCollection<InputT>, 
FlattenPCollectionList<InputT>>
-          application,
-      final CommittedBundle<InputT> inputBundle,
-      final InProcessEvaluationContext evaluationContext) {
-    if (inputBundle == null) {
-      // it is impossible to call processElement on a flatten with no input 
bundle. A Flatten with
-      // no input bundle occurs as an output of 
Flatten.pcollections(PCollectionList.empty())
-      return new FlattenEvaluator<>(
-          null, StepTransformResult.withoutHold(application).build());
-    }
-    final UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(inputBundle, application.getOutput());
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
-    return new FlattenEvaluator<>(outputBundle, result);
-  }
-
-  private static class FlattenEvaluator<InputT> implements 
TransformEvaluator<InputT> {
-    private final UncommittedBundle<InputT> outputBundle;
-    private final InProcessTransformResult result;
-
-    public FlattenEvaluator(
-        UncommittedBundle<InputT> outputBundle, InProcessTransformResult 
result) {
-      this.outputBundle = outputBundle;
-      this.result = result;
-    }
-
-    @Override
-    public void processElement(WindowedValue<InputT> element) {
-      outputBundle.add(element);
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() {
-      return result;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
deleted file mode 100644
index 85aa1c4..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.TypedPValue;
-
-/**
- * A base class for implementing {@link PTransform} overrides, which behave 
identically to the
- * delegate transform but with overridden methods. Implementors are required 
to implement
- * {@link #delegate()}, which returns the object to forward calls to, and 
{@link #apply(PInput)}.
- */
-public abstract class ForwardingPTransform<InputT extends PInput, OutputT 
extends POutput>
-    extends PTransform<InputT, OutputT> {
-  protected abstract PTransform<InputT, OutputT> delegate();
-
-  @Override
-  public OutputT apply(InputT input) {
-    return delegate().apply(input);
-  }
-
-  @Override
-  public void validate(InputT input) {
-    delegate().validate(input);
-  }
-
-  @Override
-  public String getName() {
-    return delegate().getName();
-  }
-
-  @Override
-  public <T> Coder<T> getDefaultOutputCoder(InputT input, 
@SuppressWarnings("unused")
-      TypedPValue<T> output) throws CannotProvideCoderException {
-    return delegate().getDefaultOutputCoder(input, output);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    delegate().populateDisplayData(builder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
deleted file mode 100644
index 4cec841..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.StepTransformResult.Builder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
-import 
org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.SystemReduceFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for 
the {@link GroupByKey}
- * {@link PTransform}.
- */
-class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator = createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle, 
evaluationContext);
-    return evaluator;
-  }
-
-  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
-      final AppliedPTransform<
-              PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>,
-              InProcessGroupByKeyOnly<K, V>>
-          application,
-      final CommittedBundle<KV<K, V>> inputBundle,
-      final InProcessEvaluationContext evaluationContext) {
-    return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, 
application);
-  }
-
-  private static class GroupByKeyEvaluator<K, V>
-      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
-    private final InProcessEvaluationContext evaluationContext;
-
-    private final CommittedBundle<KV<K, V>> inputBundle;
-    private final AppliedPTransform<
-            PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, 
V>>,
-            InProcessGroupByKeyOnly<K, V>>
-        application;
-    private final Coder<K> keyCoder;
-    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
-
-    public GroupByKeyEvaluator(
-        InProcessEvaluationContext evaluationContext,
-        CommittedBundle<KV<K, V>> inputBundle,
-        AppliedPTransform<
-                PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>,
-                InProcessGroupByKeyOnly<K, V>>
-            application) {
-      this.evaluationContext = evaluationContext;
-      this.inputBundle = inputBundle;
-      this.application = application;
-
-      PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
-      keyCoder = getKeyCoder(input.getCoder());
-      groupingMap = new HashMap<>();
-    }
-
-    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
-      if (!(coder instanceof KvCoder)) {
-        throw new IllegalStateException();
-      }
-      @SuppressWarnings("unchecked")
-      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
-      return keyCoder;
-    }
-
-    @Override
-    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) 
{
-      KV<K, WindowedValue<V>> kv = element.getValue();
-      K key = kv.getKey();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            String.format("unable to encode key %s of input to %s using %s", 
key, this, keyCoder),
-            exn);
-      }
-      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<WindowedValue<V>>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(kv.getValue());
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() {
-      Builder resultBuilder = StepTransformResult.withoutHold(application);
-      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
-          groupingMap.entrySet()) {
-        K key = groupedEntry.getKey().key;
-        KeyedWorkItem<K, V> groupedKv =
-            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
-        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
-            evaluationContext.createKeyedBundle(inputBundle, key, 
application.getOutput());
-        bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
-        resultBuilder.addOutput(bundle);
-      }
-      return resultBuilder.build();
-    }
-
-    private static class GroupingKey<K> {
-      private K key;
-      private byte[] encodedKey;
-
-      public GroupingKey(K key, byte[] encodedKey) {
-        this.key = key;
-        this.encodedKey = encodedKey;
-      }
-
-      @Override
-      public boolean equals(Object o) {
-        if (o instanceof GroupingKey) {
-          GroupingKey<?> that = (GroupingKey<?>) o;
-          return Arrays.equals(this.encodedKey, that.encodedKey);
-        } else {
-          return false;
-        }
-      }
-
-      @Override
-      public int hashCode() {
-        return Arrays.hashCode(encodedKey);
-      }
-    }
-  }
-
-  /**
-   * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
-   */
-  public static final class InProcessGroupByKeyOverrideFactory
-      implements PTransformOverrideFactory {
-    @Override
-    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, 
OutputT> override(
-        PTransform<InputT, OutputT> transform) {
-      if (transform instanceof GroupByKey) {
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        PTransform<InputT, OutputT> override = new 
InProcessGroupByKey((GroupByKey) transform);
-        return override;
-      }
-      return transform;
-    }
-  }
-
-  /**
-   * An in-memory implementation of the {@link GroupByKey} primitive as a 
composite
-   * {@link PTransform}.
-   */
-  private static final class InProcessGroupByKey<K, V>
-      extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
-    private final GroupByKey<K, V> original;
-
-    private InProcessGroupByKey(GroupByKey<K, V> from) {
-      this.original = from;
-    }
-
-    @Override
-    public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
delegate() {
-      return original;
-    }
-
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
-      // This operation groups by the combination of key and window,
-      // merging windows as needed, using the windows assigned to the
-      // key/value input elements and the window merge operation of the
-      // window function associated with the input PCollection.
-      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-      // Use the default GroupAlsoByWindow implementation
-      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow =
-          groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
-
-      // By default, implement GroupByKey via a series of lower-level 
operations.
-      return input
-          // Make each input element's timestamp and assigned windows
-          // explicit, in the value part.
-          .apply(new ReifyTimestampsAndWindows<K, V>())
-
-          .apply(new InProcessGroupByKeyOnly<K, V>())
-          .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(),
-              inputCoder.getValueCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()))
-
-          // Group each key's values by window, merging windows as needed.
-          .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow))
-
-          // And update the windowing strategy as appropriate.
-          
.setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
-          .setCoder(
-              KvCoder.of(inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getValueCoder())));
-    }
-
-    private <W extends BoundedWindow>
-        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(
-            final WindowingStrategy<?, W> windowingStrategy, final Coder<V> 
inputCoder) {
-      return GroupAlsoByWindowViaWindowSetDoFn.create(
-          windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
-    }
-  }
-
-  /**
-   * An implementation primitive to use in the evaluation of a {@link 
GroupByKey}
-   * {@link PTransform}.
-   */
-  public static final class InProcessGroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>> {
-    @Override
-    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, 
WindowedValue<V>>> input) {
-      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
-    }
-
-    @VisibleForTesting
-    InProcessGroupByKeyOnly() {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
deleted file mode 100644
index 04ece1c..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.MutationDetector;
-import org.apache.beam.sdk.util.MutationDetectors;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.client.util.Throwables;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-
-import org.joda.time.Instant;
-
-/**
- * A {@link BundleFactory} that ensures that elements added to it are not 
mutated after being
- * output. Immutability checks are enforced at the time {@link 
UncommittedBundle#commit(Instant)} is
- * called, checking the value at that time against the value at the time the 
element was added. All
- * elements added to the bundle will be encoded by the {@link Coder} of the 
underlying
- * {@link PCollection}.
- *
- * <p>This catches errors during the execution of a {@link DoFn} caused by 
modifying an element
- * after it is added to an output {@link PCollection}.
- */
-class ImmutabilityCheckingBundleFactory implements BundleFactory {
-  /**
-   * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the 
underlying
-   * {@link BundleFactory} to create the output bundle.
-   */
-  public static ImmutabilityCheckingBundleFactory create(BundleFactory 
underlying) {
-    return new ImmutabilityCheckingBundleFactory(underlying);
-  }
-
-  private final BundleFactory underlying;
-
-  private ImmutabilityCheckingBundleFactory(BundleFactory underlying) {
-    this.underlying = checkNotNull(underlying);
-  }
-
-  @Override
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return new 
ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output));
-  }
-
-  @Override
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, 
PCollection<T> output) {
-    return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, 
output));
-  }
-
-  @Override
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output) {
-    return new 
ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
-  }
-
-  private static class ImmutabilityEnforcingBundle<T> implements 
UncommittedBundle<T> {
-    private final UncommittedBundle<T> underlying;
-    private final SetMultimap<WindowedValue<T>, MutationDetector> 
mutationDetectors;
-    private Coder<T> coder;
-
-    public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) {
-      this.underlying = underlying;
-      mutationDetectors = HashMultimap.create();
-      coder = getPCollection().getCoder();
-    }
-
-    @Override
-    public PCollection<T> getPCollection() {
-      return underlying.getPCollection();
-    }
-
-    @Override
-    public UncommittedBundle<T> add(WindowedValue<T> element) {
-      try {
-        mutationDetectors.put(
-            element, MutationDetectors.forValueWithCoder(element.getValue(), 
coder));
-      } catch (CoderException e) {
-        throw Throwables.propagate(e);
-      }
-      underlying.add(element);
-      return this;
-    }
-
-    @Override
-    public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
-      for (MutationDetector detector : mutationDetectors.values()) {
-        try {
-          detector.verifyUnmodified();
-        } catch (IllegalMutationException exn) {
-          throw UserCodeException.wrap(
-              new IllegalMutationException(
-                  String.format(
-                      "PTransform %s mutated value %s after it was output (new 
value was %s)."
-                          + " Values must not be mutated in any way after 
being output.",
-                      
underlying.getPCollection().getProducingTransformInternal().getFullName(),
-                      exn.getSavedValue(),
-                      exn.getNewValue()),
-                  exn.getSavedValue(),
-                  exn.getNewValue(),
-                  exn));
-        }
-      }
-      return underlying.commit(synchronizedProcessingTime);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
deleted file mode 100644
index 2f21032..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.MutationDetector;
-import org.apache.beam.sdk.util.MutationDetectors;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import java.util.IdentityHashMap;
-import java.util.Map;
-
-/**
- * {@link ModelEnforcement} that enforces elements are not modified over the 
course of processing
- * an element.
- *
- * <p>Implies {@link EncodabilityEnforcment}.
- */
-class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
-  public static ModelEnforcementFactory create() {
-    return new ImmutabilityEnforcementFactory();
-  }
-
-  @Override
-  public <T> ModelEnforcement<T> forBundle(
-      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
-    return new ImmutabilityCheckingEnforcement<T>(input, consumer);
-  }
-
-  private static class ImmutabilityCheckingEnforcement<T> extends 
AbstractModelEnforcement<T> {
-    private final AppliedPTransform<?, ?, ?> transform;
-    private final Map<WindowedValue<T>, MutationDetector> mutationElements;
-    private final Coder<T> coder;
-
-    private ImmutabilityCheckingEnforcement(
-        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
-      this.transform = transform;
-      coder = input.getPCollection().getCoder();
-      mutationElements = new IdentityHashMap<>();
-    }
-
-    @Override
-    public void beforeElement(WindowedValue<T> element) {
-      try {
-        mutationElements.put(
-            element, MutationDetectors.forValueWithCoder(element.getValue(), 
coder));
-      } catch (CoderException e) {
-        throw UserCodeException.wrap(e);
-      }
-    }
-
-    @Override
-    public void afterElement(WindowedValue<T> element) {
-      verifyUnmodified(mutationElements.get(element));
-    }
-
-    @Override
-    public void afterFinish(
-        CommittedBundle<T> input,
-        InProcessTransformResult result,
-        Iterable<? extends CommittedBundle<?>> outputs) {
-      for (MutationDetector detector : mutationElements.values()) {
-        verifyUnmodified(detector);
-      }
-    }
-
-    private void verifyUnmodified(MutationDetector detector) {
-      try {
-        detector.verifyUnmodified();
-      } catch (IllegalMutationException e) {
-        throw new IllegalMutationException(
-            String.format(
-                "PTransform %s illegaly mutated value %s of class %s."
-                    + " Input values must not be mutated in any way.",
-                transform.getFullName(),
-                e.getSavedValue(),
-                e.getSavedValue().getClass()),
-            e.getSavedValue(),
-            e.getNewValue());
-      }
-    }
-  }
-}

Reply via email to