http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java deleted file mode 100644 index 8ed2684..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.isA; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; - -/** - * Tests for {@link EncodabilityEnforcementFactory}. - */ -@RunWith(JUnit4.class) -public class EncodabilityEnforcementFactoryTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create(); - private BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Test - public void encodeFailsThrows() { - WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); - - ModelEnforcement<Record> enforcement = createEnforcement(new RecordNoEncodeCoder(), record); - - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(CoderException.class)); - thrown.expectMessage("Encode not allowed"); - enforcement.beforeElement(record); - } - - @Test - public void decodeFailsThrows() { - WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); - - ModelEnforcement<Record> enforcement = createEnforcement(new RecordNoDecodeCoder(), record); - - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(CoderException.class)); - thrown.expectMessage("Decode not allowed"); - enforcement.beforeElement(record); - } - - @Test - public void consistentWithEqualsStructuralValueNotEqualThrows() { - WindowedValue<Record> record = - WindowedValue.<Record>valueInGlobalWindow( - new Record() { - @Override - public String toString() { - return "OriginalRecord"; - } - }); - - ModelEnforcement<Record> enforcement = - createEnforcement(new RecordStructuralValueCoder(), record); - - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalArgumentException.class)); - thrown.expectMessage("does not maintain structural value equality"); - thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName()); - thrown.expectMessage("OriginalRecord"); - enforcement.beforeElement(record); - } - - @Test - public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() { - TestPipeline p = TestPipeline.create(); - PCollection<Record> unencodable = - p.apply( - Create.of(new Record()) - .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder())); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); - - WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record()); - - CommittedBundle<Record> input = - bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now()); - ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); - - enforcement.beforeElement(record); - enforcement.afterElement(record); - enforcement.afterFinish( - input, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - private <T> ModelEnforcement<T> createEnforcement(Coder<T> coder, WindowedValue<T> record) { - TestPipeline p = TestPipeline.create(); - PCollection<T> unencodable = p.apply(Create.<T>of().withCoder(coder)); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<T>globally()).getProducingTransformInternal(); - CommittedBundle<T> input = - bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now()); - ModelEnforcement<T> enforcement = factory.forBundle(input, consumer); - return enforcement; - } - - @Test - public void structurallyEqualResultsSucceeds() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of())); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal(); - - WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1); - - CommittedBundle<Integer> input = - bundleFactory.createRootBundle(unencodable).add(value).commit(Instant.now()); - ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer); - - enforcement.beforeElement(value); - enforcement.afterElement(value); - enforcement.afterFinish( - input, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - private static class Record {} - private static class RecordNoEncodeCoder extends AtomicCoder<Record> { - - @Override - public void encode( - Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - throw new CoderException("Encode not allowed"); - } - - @Override - public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - return null; - } - } - - private static class RecordNoDecodeCoder extends AtomicCoder<Record> { - @Override - public void encode( - Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException {} - - @Override - public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - throw new CoderException("Decode not allowed"); - } - } - - private static class RecordStructuralValueCoder extends AtomicCoder<Record> { - @Override - public void encode( - Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException {} - - @Override - public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - return new Record() { - @Override - public String toString() { - return "DecodedRecord"; - } - }; - } - - @Override - public boolean consistentWithEquals() { - return true; - } - - @Override - public Object structuralValue(Record value) { - return value; - } - } - - private static class RecordNotConsistentWithEqualsStructuralValueCoder - extends AtomicCoder<Record> { - @Override - public void encode( - Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException {} - - @Override - public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - return new Record() { - @Override - public String toString() { - return "DecodedRecord"; - } - }; - } - - @Override - public boolean consistentWithEquals() { - return false; - } - - @Override - public Object structuralValue(Record value) { - return value; - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java deleted file mode 100644 index 5c1da14..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; - -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link FlattenEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class FlattenEvaluatorFactoryTest { - private BundleFactory bundleFactory = InProcessBundleFactory.create(); - @Test - public void testFlattenInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> left = p.apply("left", Create.of(1, 2, 4)); - PCollection<Integer> right = p.apply("right", Create.of(-1, 2, -4)); - PCollectionList<Integer> list = PCollectionList.of(left).and(right); - - PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections()); - - CommittedBundle<Integer> leftBundle = - bundleFactory.createRootBundle(left).commit(Instant.now()); - CommittedBundle<Integer> rightBundle = - bundleFactory.createRootBundle(right).commit(Instant.now()); - - InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); - - UncommittedBundle<Integer> flattenedLeftBundle = bundleFactory.createRootBundle(flattened); - UncommittedBundle<Integer> flattenedRightBundle = bundleFactory.createRootBundle(flattened); - - when(context.createBundle(leftBundle, flattened)).thenReturn(flattenedLeftBundle); - when(context.createBundle(rightBundle, flattened)).thenReturn(flattenedRightBundle); - - FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(); - TransformEvaluator<Integer> leftSideEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), leftBundle, context); - TransformEvaluator<Integer> rightSideEvaluator = - factory.forApplication( - flattened.getProducingTransformInternal(), - rightBundle, - context); - - leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1)); - rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1)); - leftSideEvaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024))); - leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING)); - rightSideEvaluator.processElement( - WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING)); - rightSideEvaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096))); - - InProcessTransformResult rightSideResult = rightSideEvaluator.finishBundle(); - InProcessTransformResult leftSideResult = leftSideEvaluator.finishBundle(); - - assertThat( - rightSideResult.getOutputBundles(), - Matchers.<UncommittedBundle<?>>contains(flattenedRightBundle)); - assertThat( - rightSideResult.getTransform(), - Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal())); - assertThat( - leftSideResult.getOutputBundles(), - Matchers.<UncommittedBundle<?>>contains(flattenedLeftBundle)); - assertThat( - leftSideResult.getTransform(), - Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal())); - - assertThat( - flattenedLeftBundle.commit(Instant.now()).getElements(), - containsInAnyOrder( - WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)), - WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING), - WindowedValue.valueInGlobalWindow(1))); - assertThat( - flattenedRightBundle.commit(Instant.now()).getElements(), - containsInAnyOrder( - WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), - WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)), - WindowedValue.valueInGlobalWindow(-1))); - } - - @Test - public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception { - TestPipeline p = TestPipeline.create(); - PCollectionList<Integer> list = PCollectionList.empty(p); - - PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections()); - - InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); - - FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(); - TransformEvaluator<Integer> emptyEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), null, context); - - InProcessTransformResult leftSideResult = emptyEvaluator.finishBundle(); - - assertThat(leftSideResult.getOutputBundles(), emptyIterable()); - assertThat( - leftSideResult.getTransform(), - Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal())); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java deleted file mode 100644 index 366dfc5..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link ForwardingPTransform}. - */ -@RunWith(JUnit4.class) -public class ForwardingPTransformTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Mock private PTransform<PCollection<Integer>, PCollection<String>> delegate; - - private ForwardingPTransform<PCollection<Integer>, PCollection<String>> forwarding; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - forwarding = - new ForwardingPTransform<PCollection<Integer>, PCollection<String>>() { - @Override - protected PTransform<PCollection<Integer>, PCollection<String>> delegate() { - return delegate; - } - }; - } - - @Test - public void applyDelegates() { - @SuppressWarnings("unchecked") - PCollection<Integer> collection = mock(PCollection.class); - @SuppressWarnings("unchecked") - PCollection<String> output = mock(PCollection.class); - when(delegate.apply(collection)).thenReturn(output); - PCollection<String> result = forwarding.apply(collection); - assertThat(result, equalTo(output)); - } - - @Test - public void getNameDelegates() { - String name = "My_forwardingptransform-name;for!thisTest"; - when(delegate.getName()).thenReturn(name); - assertThat(forwarding.getName(), equalTo(name)); - } - - @Test - public void validateDelegates() { - @SuppressWarnings("unchecked") - PCollection<Integer> input = mock(PCollection.class); - doThrow(RuntimeException.class).when(delegate).validate(input); - - thrown.expect(RuntimeException.class); - forwarding.validate(input); - } - - @Test - public void getDefaultOutputCoderDelegates() throws Exception { - @SuppressWarnings("unchecked") - PCollection<Integer> input = mock(PCollection.class); - @SuppressWarnings("unchecked") - PCollection<String> output = mock(PCollection.class); - @SuppressWarnings("unchecked") - Coder<String> outputCoder = mock(Coder.class); - - when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder); - assertThat(forwarding.getDefaultOutputCoder(input, output), equalTo(outputCoder)); - } - - @Test - public void populateDisplayDataDelegates() { - DisplayData.Builder builder = mock(DisplayData.Builder.class); - doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder); - - thrown.expect(RuntimeException.class); - forwarding.populateDisplayData(builder); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java deleted file mode 100644 index b7ce169..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multiset; - -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link GroupByKeyEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class GroupByKeyEvaluatorFactoryTest { - private BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Test - public void testInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - KV<String, Integer> firstFoo = KV.of("foo", -1); - KV<String, Integer> secondFoo = KV.of("foo", 1); - KV<String, Integer> thirdFoo = KV.of("foo", 3); - KV<String, Integer> firstBar = KV.of("bar", 22); - KV<String, Integer> secondBar = KV.of("bar", 12); - KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE); - PCollection<KV<String, Integer>> values = - p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); - PCollection<KV<String, WindowedValue<Integer>>> kvs = - values.apply(new ReifyTimestampsAndWindows<String, Integer>()); - PCollection<KeyedWorkItem<String, Integer>> groupedKvs = - kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>()); - - CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle = - bundleFactory.createRootBundle(kvs).commit(Instant.now()); - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - - UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = - bundleFactory.createKeyedBundle(null, "foo", groupedKvs); - UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = - bundleFactory.createKeyedBundle(null, "bar", groupedKvs); - UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = - bundleFactory.createKeyedBundle(null, "baz", groupedKvs); - - when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); - when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle); - when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle); - - // The input to a GroupByKey is assumed to be a KvCoder - @SuppressWarnings("unchecked") - Coder<String> keyCoder = - ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder(); - TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator = - new GroupByKeyEvaluatorFactory() - .forApplication( - groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz))); - - evaluator.finishBundle(); - - assertThat( - fooBundle.commit(Instant.now()).getElements(), - contains( - new KeyedWorkItemMatcher<String, Integer>( - KeyedWorkItems.elementsWorkItem( - "foo", - ImmutableSet.of( - WindowedValue.valueInGlobalWindow(-1), - WindowedValue.valueInGlobalWindow(1), - WindowedValue.valueInGlobalWindow(3))), - keyCoder))); - assertThat( - barBundle.commit(Instant.now()).getElements(), - contains( - new KeyedWorkItemMatcher<String, Integer>( - KeyedWorkItems.elementsWorkItem( - "bar", - ImmutableSet.of( - WindowedValue.valueInGlobalWindow(12), - WindowedValue.valueInGlobalWindow(22))), - keyCoder))); - assertThat( - bazBundle.commit(Instant.now()).getElements(), - contains( - new KeyedWorkItemMatcher<String, Integer>( - KeyedWorkItems.elementsWorkItem( - "baz", - ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))), - keyCoder))); - } - - private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) { - return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue())); - } - - private static class KeyedWorkItemMatcher<K, V> - extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> { - private final KeyedWorkItem<K, V> myWorkItem; - private final Coder<K> keyCoder; - - public KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> keyCoder) { - this.myWorkItem = myWorkItem; - this.keyCoder = keyCoder; - } - - @Override - public boolean matches(Object item) { - if (item == null || !(item instanceof WindowedValue)) { - return false; - } - WindowedValue<KeyedWorkItem<K, V>> that = (WindowedValue<KeyedWorkItem<K, V>>) item; - Multiset<WindowedValue<V>> myValues = HashMultiset.create(); - Multiset<WindowedValue<V>> thatValues = HashMultiset.create(); - for (WindowedValue<V> value : myWorkItem.elementsIterable()) { - myValues.add(value); - } - for (WindowedValue<V> value : that.getValue().elementsIterable()) { - thatValues.add(value); - } - try { - return myValues.equals(thatValues) - && keyCoder - .structuralValue(myWorkItem.key()) - .equals(keyCoder.structuralValue(that.getValue().key())); - } catch (Exception e) { - return false; - } - } - - @Override - public void describeTo(Description description) { - description - .appendText("KeyedWorkItem<K, V> containing key ") - .appendValue(myWorkItem.key()) - .appendText(" and values ") - .appendValueList("[", ", ", "]", myWorkItem.elementsIterable()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java deleted file mode 100644 index 386eacc..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ImmutabilityCheckingBundleFactory}. - */ -@RunWith(JUnit4.class) -public class ImmutabilityCheckingBundleFactoryTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - private ImmutabilityCheckingBundleFactory factory; - private PCollection<byte[]> created; - private PCollection<byte[]> transformed; - - @Before - public void setup() { - TestPipeline p = TestPipeline.create(); - created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of())); - transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>())); - factory = ImmutabilityCheckingBundleFactory.create(InProcessBundleFactory.create()); - } - - @Test - public void noMutationRootBundleSucceeds() { - UncommittedBundle<byte[]> root = factory.createRootBundle(created); - byte[] array = new byte[] {0, 1, 2}; - root.add(WindowedValue.valueInGlobalWindow(array)); - CommittedBundle<byte[]> committed = root.commit(Instant.now()); - - assertThat( - committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array))); - } - - @Test - public void noMutationKeyedBundleSucceeds() { - CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); - - WindowedValue<byte[]> windowedArray = - WindowedValue.of( - new byte[] {4, 8, 12}, - new Instant(891L), - new IntervalWindow(new Instant(0), new Instant(1000)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - keyed.add(windowedArray); - - CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); - assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); - } - - @Test - public void noMutationCreateBundleSucceeds() { - CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed); - - WindowedValue<byte[]> windowedArray = - WindowedValue.of( - new byte[] {4, 8, 12}, - new Instant(891L), - new IntervalWindow(new Instant(0), new Instant(1000)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - intermediate.add(windowedArray); - - CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); - assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); - } - - @Test - public void mutationBeforeAddRootBundleSucceeds() { - UncommittedBundle<byte[]> root = factory.createRootBundle(created); - byte[] array = new byte[] {0, 1, 2}; - array[1] = 2; - root.add(WindowedValue.valueInGlobalWindow(array)); - CommittedBundle<byte[]> committed = root.commit(Instant.now()); - - assertThat( - committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array))); - } - - @Test - public void mutationBeforeAddKeyedBundleSucceeds() { - CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); - - byte[] array = new byte[] {4, 8, 12}; - array[0] = Byte.MAX_VALUE; - WindowedValue<byte[]> windowedArray = - WindowedValue.of( - array, - new Instant(891L), - new IntervalWindow(new Instant(0), new Instant(1000)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - keyed.add(windowedArray); - - CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); - assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); - } - - @Test - public void mutationBeforeAddCreateBundleSucceeds() { - CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed); - - byte[] array = new byte[] {4, 8, 12}; - WindowedValue<byte[]> windowedArray = - WindowedValue.of( - array, - new Instant(891L), - new IntervalWindow(new Instant(0), new Instant(1000)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - array[2] = -3; - intermediate.add(windowedArray); - - CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); - assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); - } - - @Test - public void mutationAfterAddRootBundleThrows() { - UncommittedBundle<byte[]> root = factory.createRootBundle(created); - byte[] array = new byte[] {0, 1, 2}; - root.add(WindowedValue.valueInGlobalWindow(array)); - - array[1] = 2; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); - thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle<byte[]> committed = root.commit(Instant.now()); - } - - @Test - public void mutationAfterAddKeyedBundleThrows() { - CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); - - byte[] array = new byte[] {4, 8, 12}; - WindowedValue<byte[]> windowedArray = - WindowedValue.of( - array, - new Instant(891L), - new IntervalWindow(new Instant(0), new Instant(1000)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - keyed.add(windowedArray); - - array[0] = Byte.MAX_VALUE; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); - thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); - } - - @Test - public void mutationAfterAddCreateBundleThrows() { - CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed); - - byte[] array = new byte[] {4, 8, 12}; - WindowedValue<byte[]> windowedArray = - WindowedValue.of( - array, - new Instant(891L), - new IntervalWindow(new Instant(0), new Instant(1000)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - intermediate.add(windowedArray); - - array[2] = -3; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); - thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); - } - - private static class IdentityDoFn<T> extends DoFn<T, T> { - @Override - public void processElement(DoFn<T, T>.ProcessContext c) throws Exception { - c.output(c.element()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java deleted file mode 100644 index 16633ed..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Collections; - -/** - * Tests for {@link ImmutabilityEnforcementFactory}. - */ -@RunWith(JUnit4.class) -public class ImmutabilityEnforcementFactoryTest implements Serializable { - @Rule public transient ExpectedException thrown = ExpectedException.none(); - private transient ImmutabilityEnforcementFactory factory; - private transient BundleFactory bundleFactory; - private transient PCollection<byte[]> pcollection; - private transient AppliedPTransform<?, ?, ?> consumer; - - @Before - public void setup() { - factory = new ImmutabilityEnforcementFactory(); - bundleFactory = InProcessBundleFactory.create(); - TestPipeline p = TestPipeline.create(); - pcollection = - p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) - .apply( - ParDo.of( - new DoFn<byte[], byte[]>() { - @Override - public void processElement(DoFn<byte[], byte[]>.ProcessContext c) - throws Exception { - c.element()[0] = 'b'; - } - })); - consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal(); - } - - @Test - public void unchangedSucceeds() { - WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); - CommittedBundle<byte[]> elements = - bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now()); - - ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); - enforcement.beforeElement(element); - enforcement.afterElement(element); - enforcement.afterFinish( - elements, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - @Test - public void mutatedDuringProcessElementThrows() { - WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); - CommittedBundle<byte[]> elements = - bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now()); - - ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); - enforcement.beforeElement(element); - element.getValue()[0] = 'f'; - thrown.expect(IllegalMutationException.class); - thrown.expectMessage(consumer.getFullName()); - thrown.expectMessage("illegaly mutated"); - thrown.expectMessage("Input values must not be mutated"); - enforcement.afterElement(element); - enforcement.afterFinish( - elements, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - @Test - public void mutatedAfterProcessElementFails() { - - WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); - CommittedBundle<byte[]> elements = - bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now()); - - ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); - enforcement.beforeElement(element); - enforcement.afterElement(element); - - element.getValue()[0] = 'f'; - thrown.expect(IllegalMutationException.class); - thrown.expectMessage(consumer.getFullName()); - thrown.expectMessage("illegaly mutated"); - thrown.expectMessage("Input values must not be mutated"); - enforcement.afterFinish( - elements, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } -}