http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java index ba9815b..1fec9d8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.util.WindowedValue; /**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java index f2d577e..b12a34c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 81d2520..dfc1753 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow; import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index fb637b4..4dd1475 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index fceb20c..5030730 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index ffaf3fa..f4260f5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; +import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -36,7 +36,7 @@ import java.util.ArrayList; import java.util.List; /** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link CreatePCollectionView} primitive {@link PTransform}. * * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for @@ -49,7 +49,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { @Override public <T> TransformEvaluator<T> forApplication( AppliedPTransform<?, ?, ?> application, - InProcessPipelineRunner.CommittedBundle<?> inputBundle, + DirectRunner.CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) TransformEvaluator<T> evaluator = createEvaluator( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 628f94d..89866cc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -36,7 +36,7 @@ import java.util.Collection; import javax.annotation.Nullable; /** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link Bound Window.Bound} primitive {@link PTransform}. */ class WindowEvaluatorFactory implements TransformEvaluatorFactory { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java index c0c1361..d94113a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java @@ -114,7 +114,7 @@ public class AvroIOShardedWriteFactoryTest { private Pipeline getPipeline() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setRunner(InProcessPipelineRunner.class); + options.setRunner(DirectRunner.class); return TestPipeline.fromOptions(options); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index bcdc089..e26f860 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -25,8 +25,8 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index 0d1b464..4969a30 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -60,24 +60,24 @@ public class CommittedResultTest implements Serializable { CommittedResult result = CommittedResult.create(StepTransformResult.withoutHold(transform).build(), bundleFactory.createRootBundle(created).commit(Instant.now()), - Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); + Collections.<DirectRunner.CommittedBundle<?>>emptyList()); assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform)); } @Test public void getUncommittedElementsEqualInput() { - InProcessPipelineRunner.CommittedBundle<Integer> bundle = + DirectRunner.CommittedBundle<Integer> bundle = bundleFactory.createRootBundle(created) .add(WindowedValue.valueInGlobalWindow(2)) .commit(Instant.now()); CommittedResult result = CommittedResult.create(StepTransformResult.withoutHold(transform).build(), bundle, - Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); + Collections.<DirectRunner.CommittedBundle<?>>emptyList()); assertThat(result.getUnprocessedInputs(), - Matchers.<InProcessPipelineRunner.CommittedBundle<?>>equalTo(bundle)); + Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle)); } @Test @@ -85,14 +85,14 @@ public class CommittedResultTest implements Serializable { CommittedResult result = CommittedResult.create(StepTransformResult.withoutHold(transform).build(), null, - Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); + Collections.<DirectRunner.CommittedBundle<?>>emptyList()); assertThat(result.getUnprocessedInputs(), nullValue()); } @Test public void getOutputsEqualInput() { - List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs = + List<? extends DirectRunner.CommittedBundle<?>> outputs = ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED)).commit(Instant.now()), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java new file mode 100644 index 0000000..cd44b7e --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.beam.runners.direct.DirectRegistrar.InProcessRunner; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ServiceLoader; + +/** Tests for {@link InProcessRunner}. */ +@RunWith(JUnit4.class) +public class DirectRegistrarTest { + @Test + public void testCorrectOptionsAreReturned() { + assertEquals( + ImmutableList.of(DirectOptions.class), + new DirectRegistrar.InProcessOptions().getPipelineOptions()); + } + + @Test + public void testCorrectRunnersAreReturned() { + assertEquals( + ImmutableList.of(DirectRunner.class), + new DirectRegistrar.InProcessRunner().getPipelineRunners()); + } + + @Test + public void testServiceLoaderForOptions() { + for (PipelineOptionsRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { + if (registrar instanceof DirectRegistrar.InProcessOptions) { + return; + } + } + fail("Expected to find " + DirectRegistrar.InProcessOptions.class); + } + + @Test + public void testServiceLoaderForRunner() { + for (PipelineRunnerRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { + if (registrar instanceof DirectRegistrar.InProcessRunner) { + return; + } + } + fail("Expected to find " + DirectRegistrar.InProcessRunner.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java new file mode 100644 index 0000000..1de38df --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.IllegalMutationException; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.TypeDescriptor; + +import com.google.common.collect.ImmutableMap; + +import com.fasterxml.jackson.annotation.JsonValue; + +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Tests for basic {@link DirectRunner} functionality. + */ +@RunWith(JUnit4.class) +public class DirectRunnerTest implements Serializable { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + private Pipeline getPipeline() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(DirectRunner.class); + + Pipeline p = Pipeline.create(opts); + return p; + } + + @Test + public void defaultRunnerLoaded() { + assertThat(DirectRunner.class, + Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create() + .getRunner())); + } + + @Test + public void wordCountShouldSucceed() throws Throwable { + Pipeline p = getPipeline(); + + PCollection<KV<String, Long>> counts = + p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo")) + .apply(MapElements.via(new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return input; + } + })) + .apply(Count.<String>perElement()); + PCollection<String> countStrs = + counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { + @Override + public String apply(KV<String, Long> input) { + String str = String.format("%s: %s", input.getKey(), input.getValue()); + return str; + } + })); + + PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); + + DirectPipelineResult result = ((DirectPipelineResult) p.run()); + result.awaitCompletion(); + } + + @Test(timeout = 5000L) + public void byteArrayCountShouldSucceed() { + Pipeline p = getPipeline(); + + SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() { + @Override + public byte[] apply(Integer input) { + try { + return CoderUtils.encodeToByteArray(VarIntCoder.of(), input); + } catch (CoderException e) { + fail("Unexpected Coder Exception " + e); + throw new AssertionError("Unreachable"); + } + } + }; + TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() { + }; + PCollection<byte[]> foos = + p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td)); + PCollection<byte[]> msync = + p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td)); + PCollection<byte[]> bytes = + PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections()); + PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement()); + PCollection<KV<Integer, Long>> countsBackToString = + counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() { + @Override + public KV<Integer, Long> apply(KV<byte[], Long> input) { + try { + return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()), + input.getValue()); + } catch (CoderException e) { + fail("Unexpected Coder Exception " + e); + throw new AssertionError("Unreachable"); + } + } + })); + + Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L) + .put(2, 2L) + .put(3, 1L) + .put(-2, 1L) + .put(-8, 1L) + .put(-16, 1L) + .build(); + PAssert.thatMap(countsBackToString).isEqualTo(expected); + } + + @Test + public void transformDisplayDataExceptionShouldFail() { + DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) throws Exception {} + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + throw new RuntimeException("oh noes!"); + } + }; + + Pipeline p = getPipeline(); + p + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(brokenDoFn)); + + thrown.expectMessage(brokenDoFn.getClass().getName()); + thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!"))); + p.run(); + } + + @Test + public void pipelineOptionsDisplayDataExceptionShouldFail() { + Object brokenValueType = new Object() { + @JsonValue + public int getValue () { + return 42; + } + + @Override + public String toString() { + throw new RuntimeException("oh noes!!"); + } + }; + + Pipeline p = getPipeline(); + p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType); + + p.apply(Create.of(1, 2, 3)); + + thrown.expectMessage(PipelineOptions.class.getName()); + thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!"))); + p.run(); + } + + /** {@link PipelineOptions} to inject bad object implementations. */ + public interface ObjectPipelineOptions extends PipelineOptions { + Object getValue(); + void setValue(Object value); + } + + + /** + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the + * {@link DirectRunner}. + */ + @Test + public void testMutatingOutputThenOutputDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(42)) + .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { + @Override public void processElement(ProcessContext c) { + List<Integer> outputList = Arrays.asList(1, 2, 3, 4); + c.output(outputList); + outputList.set(0, 37); + c.output(outputList); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("output"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the + * {@link DirectRunner}. + */ + @Test + public void testMutatingOutputThenTerminateDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(42)) + .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { + @Override public void processElement(ProcessContext c) { + List<Integer> outputList = Arrays.asList(1, 2, 3, 4); + c.output(outputList); + outputList.set(0, 37); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("output"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails + * in the {@link DirectRunner}. + */ + @Test + public void testMutatingOutputCoderDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(42)) + .apply(ParDo.of(new DoFn<Integer, byte[]>() { + @Override public void processElement(ProcessContext c) { + byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; + c.output(outputArray); + outputArray[0] = 0xa; + c.output(outputArray); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("output"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the + * {@link DirectRunner}. + */ + @Test + public void testMutatingInputDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) + .withCoder(ListCoder.of(VarIntCoder.of()))) + .apply(ParDo.of(new DoFn<List<Integer>, Integer>() { + @Override public void processElement(ProcessContext c) { + List<Integer> inputList = c.element(); + inputList.set(0, 37); + c.output(12); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("Input"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails + * in the {@link DirectRunner}. + */ + @Test + public void testMutatingInputCoderDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) + .apply(ParDo.of(new DoFn<byte[], Integer>() { + @Override public void processElement(ProcessContext c) { + byte[] inputArray = c.element(); + inputArray[0] = 0xa; + c.output(13); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("Input"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java index 9a358dd..e129489 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.isA; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 66a5106..5efb090 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index a4f900c..b589db0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -22,8 +22,8 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index 20670ca..c4da86c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -20,8 +20,8 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index 6cef60d..ead9c9e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Count; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java index af08d02..21c941a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java @@ -29,8 +29,8 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java index abe2a19..3a3ac8c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java @@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java index 18db400..b1cbeb1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java @@ -28,9 +28,9 @@ import static org.junit.Assert.assertThat; import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -99,8 +99,8 @@ public class InProcessEvaluationContextTest { @Before public void setup() { - InProcessPipelineRunner runner = - InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); + DirectRunner runner = + DirectRunner.fromOptions(PipelineOptionsFactory.create()); p = TestPipeline.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java index 28a3cf6..e8d4711 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java @@ -22,8 +22,8 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java deleted file mode 100644 index 54094c4..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.beam.runners.direct.InProcessRegistrar.InProcessRunner; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.ServiceLoader; - -/** Tests for {@link InProcessRunner}. */ -@RunWith(JUnit4.class) -public class InProcessPipelineRegistrarTest { - @Test - public void testCorrectOptionsAreReturned() { - assertEquals( - ImmutableList.of(InProcessPipelineOptions.class), - new InProcessRegistrar.InProcessOptions().getPipelineOptions()); - } - - @Test - public void testCorrectRunnersAreReturned() { - assertEquals( - ImmutableList.of(InProcessPipelineRunner.class), - new InProcessRegistrar.InProcessRunner().getPipelineRunners()); - } - - @Test - public void testServiceLoaderForOptions() { - for (PipelineOptionsRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { - if (registrar instanceof InProcessRegistrar.InProcessOptions) { - return; - } - } - fail("Expected to find " + InProcessRegistrar.InProcessOptions.class); - } - - @Test - public void testServiceLoaderForRunner() { - for (PipelineRunnerRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { - if (registrar instanceof InProcessRegistrar.InProcessRunner) { - return; - } - } - fail("Expected to find " + InProcessRegistrar.InProcessRunner.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java deleted file mode 100644 index ab26c15..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.TypeDescriptor; - -import com.google.common.collect.ImmutableMap; - -import com.fasterxml.jackson.annotation.JsonValue; - -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * Tests for basic {@link InProcessPipelineRunner} functionality. - */ -@RunWith(JUnit4.class) -public class InProcessPipelineRunnerTest implements Serializable { - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - private Pipeline getPipeline() { - PipelineOptions opts = PipelineOptionsFactory.create(); - opts.setRunner(InProcessPipelineRunner.class); - - Pipeline p = Pipeline.create(opts); - return p; - } - - @Test - public void defaultRunnerLoaded() { - assertThat(InProcessPipelineRunner.class, - Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create() - .getRunner())); - } - - @Test - public void wordCountShouldSucceed() throws Throwable { - Pipeline p = getPipeline(); - - PCollection<KV<String, Long>> counts = - p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo")) - .apply(MapElements.via(new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return input; - } - })) - .apply(Count.<String>perElement()); - PCollection<String> countStrs = - counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { - @Override - public String apply(KV<String, Long> input) { - String str = String.format("%s: %s", input.getKey(), input.getValue()); - return str; - } - })); - - PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); - - InProcessPipelineResult result = ((InProcessPipelineResult) p.run()); - result.awaitCompletion(); - } - - @Test(timeout = 5000L) - public void byteArrayCountShouldSucceed() { - Pipeline p = getPipeline(); - - SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() { - @Override - public byte[] apply(Integer input) { - try { - return CoderUtils.encodeToByteArray(VarIntCoder.of(), input); - } catch (CoderException e) { - fail("Unexpected Coder Exception " + e); - throw new AssertionError("Unreachable"); - } - } - }; - TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() { - }; - PCollection<byte[]> foos = - p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td)); - PCollection<byte[]> msync = - p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td)); - PCollection<byte[]> bytes = - PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections()); - PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement()); - PCollection<KV<Integer, Long>> countsBackToString = - counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() { - @Override - public KV<Integer, Long> apply(KV<byte[], Long> input) { - try { - return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()), - input.getValue()); - } catch (CoderException e) { - fail("Unexpected Coder Exception " + e); - throw new AssertionError("Unreachable"); - } - } - })); - - Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L) - .put(2, 2L) - .put(3, 1L) - .put(-2, 1L) - .put(-8, 1L) - .put(-16, 1L) - .build(); - PAssert.thatMap(countsBackToString).isEqualTo(expected); - } - - @Test - public void transformDisplayDataExceptionShouldFail() { - DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) throws Exception {} - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - throw new RuntimeException("oh noes!"); - } - }; - - Pipeline p = getPipeline(); - p - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(brokenDoFn)); - - thrown.expectMessage(brokenDoFn.getClass().getName()); - thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!"))); - p.run(); - } - - @Test - public void pipelineOptionsDisplayDataExceptionShouldFail() { - Object brokenValueType = new Object() { - @JsonValue - public int getValue () { - return 42; - } - - @Override - public String toString() { - throw new RuntimeException("oh noes!!"); - } - }; - - Pipeline p = getPipeline(); - p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType); - - p.apply(Create.of(1, 2, 3)); - - thrown.expectMessage(PipelineOptions.class.getName()); - thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!"))); - p.run(); - } - - /** {@link PipelineOptions} to inject bad object implementations. */ - public interface ObjectPipelineOptions extends PipelineOptions { - Object getValue(); - void setValue(Object value); - } - - - /** - * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the - * {@link InProcessPipelineRunner}. - */ - @Test - public void testMutatingOutputThenOutputDoFnError() throws Exception { - Pipeline pipeline = getPipeline(); - - pipeline - .apply(Create.of(42)) - .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { - @Override public void processElement(ProcessContext c) { - List<Integer> outputList = Arrays.asList(1, 2, 3, 4); - c.output(outputList); - outputList.set(0, 37); - c.output(outputList); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("output"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the - * {@link InProcessPipelineRunner}. - */ - @Test - public void testMutatingOutputThenTerminateDoFnError() throws Exception { - Pipeline pipeline = getPipeline(); - - pipeline - .apply(Create.of(42)) - .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { - @Override public void processElement(ProcessContext c) { - List<Integer> outputList = Arrays.asList(1, 2, 3, 4); - c.output(outputList); - outputList.set(0, 37); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("output"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails - * in the {@link InProcessPipelineRunner}. - */ - @Test - public void testMutatingOutputCoderDoFnError() throws Exception { - Pipeline pipeline = getPipeline(); - - pipeline - .apply(Create.of(42)) - .apply(ParDo.of(new DoFn<Integer, byte[]>() { - @Override public void processElement(ProcessContext c) { - byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; - c.output(outputArray); - outputArray[0] = 0xa; - c.output(outputArray); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("output"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the - * {@link InProcessPipelineRunner}. - */ - @Test - public void testMutatingInputDoFnError() throws Exception { - Pipeline pipeline = getPipeline(); - - pipeline - .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) - .withCoder(ListCoder.of(VarIntCoder.of()))) - .apply(ParDo.of(new DoFn<List<Integer>, Integer>() { - @Override public void processElement(ProcessContext c) { - List<Integer> inputList = c.element(); - inputList.set(0, 37); - c.output(12); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("Input"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails - * in the {@link InProcessPipelineRunner}. - */ - @Test - public void testMutatingInputCoderDoFnError() throws Exception { - Pipeline pipeline = getPipeline(); - - pipeline - .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) - .apply(ParDo.of(new DoFn<byte[], Integer>() { - @Override public void processElement(ProcessContext c) { - byte[] inputArray = c.element(); - inputArray[0] = 0xa; - c.output(13); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("Input"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java index 0f7afa1..b78eb40 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java @@ -25,8 +25,8 @@ import static org.mockito.Mockito.when; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; @@ -139,7 +139,7 @@ public class ParDoInProcessEvaluatorTest { private ParDoInProcessEvaluator<Integer> createEvaluator( PCollectionView<Integer> singletonView, RecorderFn fn, - InProcessPipelineRunner.CommittedBundle<Integer> inputBundle, + DirectRunner.CommittedBundle<Integer> inputBundle, PCollection<Integer> output) { when( evaluationContext.createSideInputReader( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index a6f31c0..e61881e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index a1480e5..8b8d44f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java index fe9866c..5ede931 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java @@ -114,7 +114,7 @@ public class TextIOShardedWriteFactoryTest { private Pipeline getPipeline() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setRunner(InProcessPipelineRunner.class); + options.setRunner(DirectRunner.class); return TestPipeline.fromOptions(options); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 0345662..a5e6cee 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -27,7 +27,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 05656eb..be5c489 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -25,8 +25,8 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index 859418b..714e9c9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index a2f971a..8a3591b 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index 01f3070..5fdfb49 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.translation; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; -import org.apache.beam.runners.direct.InProcessPipelineRunner; +import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; @@ -58,7 +58,7 @@ public class TransformTranslatorTest { */ @Test public void testTextIOReadAndWriteTransforms() throws IOException { - String directOut = runPipeline(InProcessPipelineRunner.class); + String directOut = runPipeline(DirectRunner.class); String sparkOut = runPipeline(SparkPipelineRunner.class); List<String> directOutput = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index b1b5280..456b6ae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -277,7 +277,7 @@ public interface PipelineOptions extends HasDisplayData { try { @SuppressWarnings({"unchecked", "rawtypes"}) Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName( - "org.apache.beam.runners.direct.InProcessPipelineRunner"); + "org.apache.beam.runners.direct.DirectRunner"); return direct; } catch (ClassNotFoundException e) { throw new IllegalArgumentException(String.format( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/testing/travis/test_wordcount.sh ---------------------------------------------------------------------- diff --git a/testing/travis/test_wordcount.sh b/testing/travis/test_wordcount.sh index b00b0d6..e059a35 100755 --- a/testing/travis/test_wordcount.sh +++ b/testing/travis/test_wordcount.sh @@ -70,7 +70,7 @@ function run_via_mvn { local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2 local cmd='mvn exec:java -f pom.xml -pl examples/java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ - -Dexec.args="--runner=InProcessPipelineRunner --inputFile='"$input"' --output='"$outfile_prefix"'"' + -Dexec.args="--runner=DirectRunner --inputFile='"$input"' --output='"$outfile_prefix"'"' echo "$name: Running $cmd" >&2 sh -c "$cmd" check_result_hash "$name" "$outfile_prefix" "$expected_hash" @@ -84,7 +84,7 @@ function run_bundled { local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2 local cmd='java -cp '"$JAR_FILE"' \ org.apache.beam.examples.WordCount \ - --runner=InProcessPipelineRunner \ + --runner=DirectRunner \ --inputFile='"'$input'"' \ --output='"$outfile_prefix" echo "$name: Running $cmd" >&2