http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
deleted file mode 100644
index 8ed2684..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.isA;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-
-/**
- * Tests for {@link EncodabilityEnforcementFactory}.
- */
-@RunWith(JUnit4.class)
-public class EncodabilityEnforcementFactoryTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private EncodabilityEnforcementFactory factory = 
EncodabilityEnforcementFactory.create();
-  private BundleFactory bundleFactory = InProcessBundleFactory.create();
-
-  @Test
-  public void encodeFailsThrows() {
-    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new 
Record());
-
-    ModelEnforcement<Record> enforcement = createEnforcement(new 
RecordNoEncodeCoder(), record);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(CoderException.class));
-    thrown.expectMessage("Encode not allowed");
-    enforcement.beforeElement(record);
-  }
-
-  @Test
-  public void decodeFailsThrows() {
-    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new 
Record());
-
-    ModelEnforcement<Record> enforcement = createEnforcement(new 
RecordNoDecodeCoder(), record);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(CoderException.class));
-    thrown.expectMessage("Decode not allowed");
-    enforcement.beforeElement(record);
-  }
-
-  @Test
-  public void consistentWithEqualsStructuralValueNotEqualThrows() {
-    WindowedValue<Record> record =
-        WindowedValue.<Record>valueInGlobalWindow(
-            new Record() {
-              @Override
-              public String toString() {
-                return "OriginalRecord";
-              }
-            });
-
-    ModelEnforcement<Record> enforcement =
-        createEnforcement(new RecordStructuralValueCoder(), record);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalArgumentException.class));
-    thrown.expectMessage("does not maintain structural value equality");
-    thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
-    thrown.expectMessage("OriginalRecord");
-    enforcement.beforeElement(record);
-  }
-
-  @Test
-  public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(
-            Create.of(new Record())
-                .withCoder(new 
RecordNotConsistentWithEqualsStructuralValueCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
-    WindowedValue<Record> record = 
WindowedValue.<Record>valueInGlobalWindow(new Record());
-
-    CommittedBundle<Record> input =
-        
bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
-    enforcement.beforeElement(record);
-    enforcement.afterElement(record);
-    enforcement.afterFinish(
-        input,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  private <T> ModelEnforcement<T> createEnforcement(Coder<T> coder, 
WindowedValue<T> record) {
-    TestPipeline p = TestPipeline.create();
-    PCollection<T> unencodable = p.apply(Create.<T>of().withCoder(coder));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<T>globally()).getProducingTransformInternal();
-    CommittedBundle<T> input =
-        
bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<T> enforcement = factory.forBundle(input, consumer);
-    return enforcement;
-  }
-
-  @Test
-  public void structurallyEqualResultsSucceeds() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Integer> unencodable = 
p.apply(Create.of(1).withCoder(VarIntCoder.of()));
-    AppliedPTransform<?, ?, ?> consumer =
-        
unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal();
-
-    WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
-
-    CommittedBundle<Integer> input =
-        
bundleFactory.createRootBundle(unencodable).add(value).commit(Instant.now());
-    ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
-
-    enforcement.beforeElement(value);
-    enforcement.afterElement(value);
-    enforcement.afterFinish(
-        input,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  private static class Record {}
-  private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
-
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      throw new CoderException("Encode not allowed");
-    }
-
-    @Override
-    public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      return null;
-    }
-  }
-
-  private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {}
-
-    @Override
-    public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      throw new CoderException("Decode not allowed");
-    }
-  }
-
-  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {}
-
-    @Override
-    public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      return new Record() {
-        @Override
-        public String toString() {
-          return "DecodedRecord";
-        }
-      };
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return true;
-    }
-
-    @Override
-    public Object structuralValue(Record value) {
-      return value;
-    }
-  }
-
-  private static class RecordNotConsistentWithEqualsStructuralValueCoder
-      extends AtomicCoder<Record> {
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {}
-
-    @Override
-    public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      return new Record() {
-        @Override
-        public String toString() {
-          return "DecodedRecord";
-        }
-      };
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return false;
-    }
-
-    @Override
-    public Object structuralValue(Record value) {
-      return value;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
deleted file mode 100644
index 5c1da14..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FlattenEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class FlattenEvaluatorFactoryTest {
-  private BundleFactory bundleFactory = InProcessBundleFactory.create();
-  @Test
-  public void testFlattenInMemoryEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Integer> left = p.apply("left", Create.of(1, 2, 4));
-    PCollection<Integer> right = p.apply("right", Create.of(-1, 2, -4));
-    PCollectionList<Integer> list = PCollectionList.of(left).and(right);
-
-    PCollection<Integer> flattened = 
list.apply(Flatten.<Integer>pCollections());
-
-    CommittedBundle<Integer> leftBundle =
-        bundleFactory.createRootBundle(left).commit(Instant.now());
-    CommittedBundle<Integer> rightBundle =
-        bundleFactory.createRootBundle(right).commit(Instant.now());
-
-    InProcessEvaluationContext context = 
mock(InProcessEvaluationContext.class);
-
-    UncommittedBundle<Integer> flattenedLeftBundle = 
bundleFactory.createRootBundle(flattened);
-    UncommittedBundle<Integer> flattenedRightBundle = 
bundleFactory.createRootBundle(flattened);
-
-    when(context.createBundle(leftBundle, 
flattened)).thenReturn(flattenedLeftBundle);
-    when(context.createBundle(rightBundle, 
flattened)).thenReturn(flattenedRightBundle);
-
-    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory();
-    TransformEvaluator<Integer> leftSideEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), 
leftBundle, context);
-    TransformEvaluator<Integer> rightSideEvaluator =
-        factory.forApplication(
-            flattened.getProducingTransformInternal(),
-            rightBundle,
-            context);
-
-    leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1));
-    rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
-    leftSideEvaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)));
-    leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, 
PaneInfo.NO_FIRING));
-    rightSideEvaluator.processElement(
-        WindowedValue.valueInEmptyWindows(2, 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
-    rightSideEvaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)));
-
-    InProcessTransformResult rightSideResult = 
rightSideEvaluator.finishBundle();
-    InProcessTransformResult leftSideResult = leftSideEvaluator.finishBundle();
-
-    assertThat(
-        rightSideResult.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>contains(flattenedRightBundle));
-    assertThat(
-        rightSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(flattened.getProducingTransformInternal()));
-    assertThat(
-        leftSideResult.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>contains(flattenedLeftBundle));
-    assertThat(
-        leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(flattened.getProducingTransformInternal()));
-
-    assertThat(
-        flattenedLeftBundle.commit(Instant.now()).getElements(),
-        containsInAnyOrder(
-            WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)),
-            WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING),
-            WindowedValue.valueInGlobalWindow(1)));
-    assertThat(
-        flattenedRightBundle.commit(Instant.now()).getElements(),
-        containsInAnyOrder(
-            WindowedValue.valueInEmptyWindows(2, 
PaneInfo.ON_TIME_AND_ONLY_FIRING),
-            WindowedValue.timestampedValueInGlobalWindow(-4, new 
Instant(-4096)),
-            WindowedValue.valueInGlobalWindow(-1)));
-  }
-
-  @Test
-  public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws 
Exception {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList<Integer> list = PCollectionList.empty(p);
-
-    PCollection<Integer> flattened = 
list.apply(Flatten.<Integer>pCollections());
-
-    InProcessEvaluationContext context = 
mock(InProcessEvaluationContext.class);
-
-    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory();
-    TransformEvaluator<Integer> emptyEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), 
null, context);
-
-    InProcessTransformResult leftSideResult = emptyEvaluator.finishBundle();
-
-    assertThat(leftSideResult.getOutputBundles(), emptyIterable());
-    assertThat(
-        leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(flattened.getProducingTransformInternal()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
deleted file mode 100644
index 366dfc5..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link ForwardingPTransform}.
- */
-@RunWith(JUnit4.class)
-public class ForwardingPTransformTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Mock private PTransform<PCollection<Integer>, PCollection<String>> delegate;
-
-  private ForwardingPTransform<PCollection<Integer>, PCollection<String>> 
forwarding;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    forwarding =
-        new ForwardingPTransform<PCollection<Integer>, PCollection<String>>() {
-          @Override
-          protected PTransform<PCollection<Integer>, PCollection<String>> 
delegate() {
-            return delegate;
-          }
-        };
-  }
-
-  @Test
-  public void applyDelegates() {
-    @SuppressWarnings("unchecked")
-    PCollection<Integer> collection = mock(PCollection.class);
-    @SuppressWarnings("unchecked")
-    PCollection<String> output = mock(PCollection.class);
-    when(delegate.apply(collection)).thenReturn(output);
-    PCollection<String> result = forwarding.apply(collection);
-    assertThat(result, equalTo(output));
-  }
-
-  @Test
-  public void getNameDelegates() {
-    String name = "My_forwardingptransform-name;for!thisTest";
-    when(delegate.getName()).thenReturn(name);
-    assertThat(forwarding.getName(), equalTo(name));
-  }
-
-  @Test
-  public void validateDelegates() {
-    @SuppressWarnings("unchecked")
-    PCollection<Integer> input = mock(PCollection.class);
-    doThrow(RuntimeException.class).when(delegate).validate(input);
-
-    thrown.expect(RuntimeException.class);
-    forwarding.validate(input);
-  }
-
-  @Test
-  public void getDefaultOutputCoderDelegates() throws Exception {
-    @SuppressWarnings("unchecked")
-    PCollection<Integer> input = mock(PCollection.class);
-    @SuppressWarnings("unchecked")
-    PCollection<String> output = mock(PCollection.class);
-    @SuppressWarnings("unchecked")
-    Coder<String> outputCoder = mock(Coder.class);
-
-    when(delegate.getDefaultOutputCoder(input, 
output)).thenReturn(outputCoder);
-    assertThat(forwarding.getDefaultOutputCoder(input, output), 
equalTo(outputCoder));
-  }
-
-  @Test
-  public void populateDisplayDataDelegates() {
-    DisplayData.Builder builder = mock(DisplayData.Builder.class);
-    
doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder);
-
-    thrown.expect(RuntimeException.class);
-    forwarding.populateDisplayData(builder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
deleted file mode 100644
index b7ce169..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import 
org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multiset;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link GroupByKeyEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class GroupByKeyEvaluatorFactoryTest {
-  private BundleFactory bundleFactory = InProcessBundleFactory.create();
-
-  @Test
-  public void testInMemoryEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-    KV<String, Integer> firstFoo = KV.of("foo", -1);
-    KV<String, Integer> secondFoo = KV.of("foo", 1);
-    KV<String, Integer> thirdFoo = KV.of("foo", 3);
-    KV<String, Integer> firstBar = KV.of("bar", 22);
-    KV<String, Integer> secondBar = KV.of("bar", 12);
-    KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
-    PCollection<KV<String, Integer>> values =
-        p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, 
thirdFoo));
-    PCollection<KV<String, WindowedValue<Integer>>> kvs =
-        values.apply(new ReifyTimestampsAndWindows<String, Integer>());
-    PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
-        kvs.apply(new 
GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
-
-    CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
-        bundleFactory.createRootBundle(kvs).commit(Instant.now());
-    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
-
-    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
-        bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
-    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
-        bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
-    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
-        bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
-
-    when(evaluationContext.createKeyedBundle(inputBundle, "foo", 
groupedKvs)).thenReturn(fooBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle, "bar", 
groupedKvs)).thenReturn(barBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle, "baz", 
groupedKvs)).thenReturn(bazBundle);
-
-    // The input to a GroupByKey is assumed to be a KvCoder
-    @SuppressWarnings("unchecked")
-    Coder<String> keyCoder =
-        ((KvCoder<String, WindowedValue<Integer>>) 
kvs.getCoder()).getKeyCoder();
-    TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
-        new GroupByKeyEvaluatorFactory()
-            .forApplication(
-                groupedKvs.getProducingTransformInternal(), inputBundle, 
evaluationContext);
-
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
-
-    evaluator.finishBundle();
-
-    assertThat(
-        fooBundle.commit(Instant.now()).getElements(),
-        contains(
-            new KeyedWorkItemMatcher<String, Integer>(
-                KeyedWorkItems.elementsWorkItem(
-                    "foo",
-                    ImmutableSet.of(
-                        WindowedValue.valueInGlobalWindow(-1),
-                        WindowedValue.valueInGlobalWindow(1),
-                        WindowedValue.valueInGlobalWindow(3))),
-                keyCoder)));
-    assertThat(
-        barBundle.commit(Instant.now()).getElements(),
-        contains(
-            new KeyedWorkItemMatcher<String, Integer>(
-                KeyedWorkItems.elementsWorkItem(
-                    "bar",
-                    ImmutableSet.of(
-                        WindowedValue.valueInGlobalWindow(12),
-                        WindowedValue.valueInGlobalWindow(22))),
-                keyCoder)));
-    assertThat(
-        bazBundle.commit(Instant.now()).getElements(),
-        contains(
-            new KeyedWorkItemMatcher<String, Integer>(
-                KeyedWorkItems.elementsWorkItem(
-                    "baz",
-                    
ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
-                keyCoder)));
-  }
-
-  private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
-    return KV.of(kv.getKey(), 
WindowedValue.valueInGlobalWindow(kv.getValue()));
-  }
-
-  private static class KeyedWorkItemMatcher<K, V>
-      extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
-    private final KeyedWorkItem<K, V> myWorkItem;
-    private final Coder<K> keyCoder;
-
-    public KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> 
keyCoder) {
-      this.myWorkItem = myWorkItem;
-      this.keyCoder = keyCoder;
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      if (item == null || !(item instanceof WindowedValue)) {
-        return false;
-      }
-      WindowedValue<KeyedWorkItem<K, V>> that = 
(WindowedValue<KeyedWorkItem<K, V>>) item;
-      Multiset<WindowedValue<V>> myValues = HashMultiset.create();
-      Multiset<WindowedValue<V>> thatValues = HashMultiset.create();
-      for (WindowedValue<V> value : myWorkItem.elementsIterable()) {
-        myValues.add(value);
-      }
-      for (WindowedValue<V> value : that.getValue().elementsIterable()) {
-        thatValues.add(value);
-      }
-      try {
-        return myValues.equals(thatValues)
-            && keyCoder
-                .structuralValue(myWorkItem.key())
-                .equals(keyCoder.structuralValue(that.getValue().key()));
-      } catch (Exception e) {
-        return false;
-      }
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description
-          .appendText("KeyedWorkItem<K, V> containing key ")
-          .appendValue(myWorkItem.key())
-          .appendText(" and values ")
-          .appendValueList("[", ", ", "]", myWorkItem.elementsIterable());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
deleted file mode 100644
index 386eacc..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ImmutabilityCheckingBundleFactory}.
- */
-@RunWith(JUnit4.class)
-public class ImmutabilityCheckingBundleFactoryTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private ImmutabilityCheckingBundleFactory factory;
-  private PCollection<byte[]> created;
-  private PCollection<byte[]> transformed;
-
-  @Before
-  public void setup() {
-    TestPipeline p = TestPipeline.create();
-    created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of()));
-    transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>()));
-    factory = 
ImmutabilityCheckingBundleFactory.create(InProcessBundleFactory.create());
-  }
-
-  @Test
-  public void noMutationRootBundleSucceeds() {
-    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
-    byte[] array = new byte[] {0, 1, 2};
-    root.add(WindowedValue.valueInGlobalWindow(array));
-    CommittedBundle<byte[]> committed = root.commit(Instant.now());
-
-    assertThat(
-        committed.getElements(), 
containsInAnyOrder(WindowedValue.valueInGlobalWindow(array)));
-  }
-
-  @Test
-  public void noMutationKeyedBundleSucceeds() {
-    CommittedBundle<byte[]> root = 
factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", 
transformed);
-
-    WindowedValue<byte[]> windowedArray =
-        WindowedValue.of(
-            new byte[] {4, 8, 12},
-            new Instant(891L),
-            new IntervalWindow(new Instant(0), new Instant(1000)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    keyed.add(windowedArray);
-
-    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
-  }
-
-  @Test
-  public void noMutationCreateBundleSucceeds() {
-    CommittedBundle<byte[]> root = 
factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, 
transformed);
-
-    WindowedValue<byte[]> windowedArray =
-        WindowedValue.of(
-            new byte[] {4, 8, 12},
-            new Instant(891L),
-            new IntervalWindow(new Instant(0), new Instant(1000)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    intermediate.add(windowedArray);
-
-    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
-  }
-
-  @Test
-  public void mutationBeforeAddRootBundleSucceeds() {
-    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
-    byte[] array = new byte[] {0, 1, 2};
-    array[1] = 2;
-    root.add(WindowedValue.valueInGlobalWindow(array));
-    CommittedBundle<byte[]> committed = root.commit(Instant.now());
-
-    assertThat(
-        committed.getElements(), 
containsInAnyOrder(WindowedValue.valueInGlobalWindow(array)));
-  }
-
-  @Test
-  public void mutationBeforeAddKeyedBundleSucceeds() {
-    CommittedBundle<byte[]> root = 
factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", 
transformed);
-
-    byte[] array = new byte[] {4, 8, 12};
-    array[0] = Byte.MAX_VALUE;
-    WindowedValue<byte[]> windowedArray =
-        WindowedValue.of(
-            array,
-            new Instant(891L),
-            new IntervalWindow(new Instant(0), new Instant(1000)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    keyed.add(windowedArray);
-
-    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
-  }
-
-  @Test
-  public void mutationBeforeAddCreateBundleSucceeds() {
-    CommittedBundle<byte[]> root = 
factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, 
transformed);
-
-    byte[] array = new byte[] {4, 8, 12};
-    WindowedValue<byte[]> windowedArray =
-        WindowedValue.of(
-            array,
-            new Instant(891L),
-            new IntervalWindow(new Instant(0), new Instant(1000)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    array[2] = -3;
-    intermediate.add(windowedArray);
-
-    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
-  }
-
-  @Test
-  public void mutationAfterAddRootBundleThrows() {
-    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
-    byte[] array = new byte[] {0, 1, 2};
-    root.add(WindowedValue.valueInGlobalWindow(array));
-
-    array[1] = 2;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
-    thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    CommittedBundle<byte[]> committed = root.commit(Instant.now());
-  }
-
-  @Test
-  public void mutationAfterAddKeyedBundleThrows() {
-    CommittedBundle<byte[]> root = 
factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", 
transformed);
-
-    byte[] array = new byte[] {4, 8, 12};
-    WindowedValue<byte[]> windowedArray =
-        WindowedValue.of(
-            array,
-            new Instant(891L),
-            new IntervalWindow(new Instant(0), new Instant(1000)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    keyed.add(windowedArray);
-
-    array[0] = Byte.MAX_VALUE;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
-    thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
-  }
-
-  @Test
-  public void mutationAfterAddCreateBundleThrows() {
-    CommittedBundle<byte[]> root = 
factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, 
transformed);
-
-    byte[] array = new byte[] {4, 8, 12};
-    WindowedValue<byte[]> windowedArray =
-        WindowedValue.of(
-            array,
-            new Instant(891L),
-            new IntervalWindow(new Instant(0), new Instant(1000)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    intermediate.add(windowedArray);
-
-    array[2] = -3;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
-    thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
-  }
-
-  private static class IdentityDoFn<T> extends DoFn<T, T> {
-    @Override
-    public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
-      c.output(c.element());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
deleted file mode 100644
index 16633ed..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Collections;
-
-/**
- * Tests for {@link ImmutabilityEnforcementFactory}.
- */
-@RunWith(JUnit4.class)
-public class ImmutabilityEnforcementFactoryTest implements Serializable {
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-  private transient ImmutabilityEnforcementFactory factory;
-  private transient BundleFactory bundleFactory;
-  private transient PCollection<byte[]> pcollection;
-  private transient AppliedPTransform<?, ?, ?> consumer;
-
-  @Before
-  public void setup() {
-    factory = new ImmutabilityEnforcementFactory();
-    bundleFactory = InProcessBundleFactory.create();
-    TestPipeline p = TestPipeline.create();
-    pcollection =
-        p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
-            .apply(
-                ParDo.of(
-                    new DoFn<byte[], byte[]>() {
-                      @Override
-                      public void processElement(DoFn<byte[], 
byte[]>.ProcessContext c)
-                          throws Exception {
-                        c.element()[0] = 'b';
-                      }
-                    }));
-    consumer = 
pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
-  }
-
-  @Test
-  public void unchangedSucceeds() {
-    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
-    CommittedBundle<byte[]> elements =
-        
bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
-
-    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
-    enforcement.beforeElement(element);
-    enforcement.afterElement(element);
-    enforcement.afterFinish(
-        elements,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  @Test
-  public void mutatedDuringProcessElementThrows() {
-    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
-    CommittedBundle<byte[]> elements =
-        
bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
-
-    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
-    enforcement.beforeElement(element);
-    element.getValue()[0] = 'f';
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage(consumer.getFullName());
-    thrown.expectMessage("illegaly mutated");
-    thrown.expectMessage("Input values must not be mutated");
-    enforcement.afterElement(element);
-    enforcement.afterFinish(
-        elements,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  @Test
-  public void mutatedAfterProcessElementFails() {
-
-    WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
-    CommittedBundle<byte[]> elements =
-        
bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
-
-    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
-    enforcement.beforeElement(element);
-    enforcement.afterElement(element);
-
-    element.getValue()[0] = 'f';
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage(consumer.getFullName());
-    thrown.expectMessage("illegaly mutated");
-    thrown.expectMessage("Input values must not be mutated");
-    enforcement.afterFinish(
-        elements,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-}


Reply via email to