Repository: incubator-beam Updated Branches: refs/heads/master e9f1b579a -> 9039949d5
Switch the Default PipelineRunner Use the InProcessPiplineRunner (pending rename) as the default runner. The InProcessPipelineRunner implements the beam model, including support for Unbounded PCollections. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/757cb326 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/757cb326 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/757cb326 Branch: refs/heads/master Commit: 757cb326b909ad62aea8c51183a83521adfd5a3a Parents: e9f1b57 Author: Thomas Groh <tg...@google.com> Authored: Fri Apr 8 10:20:56 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Apr 15 16:12:52 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/options/PipelineOptions.java | 4 +- .../ImmutabilityCheckingBundleFactory.java | 20 +++-- .../sdk/options/PipelineOptionsFactoryTest.java | 3 +- .../beam/sdk/options/PipelineOptionsTest.java | 4 +- .../beam/sdk/runners/TransformTreeTest.java | 79 ++++++++++---------- .../EncodabilityEnforcementFactoryTest.java | 2 +- .../ImmutabilityCheckingBundleFactoryTest.java | 17 ++--- .../apache/beam/sdk/transforms/ParDoTest.java | 14 ++-- .../beam/sdk/transforms/WithKeysJava8Test.java | 3 +- 9 files changed, 68 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 17cf5b3..d87e396 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -21,8 +21,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -225,7 +225,7 @@ public interface PipelineOptions { @Description("The pipeline runner that will be used to execute the pipeline. " + "For registered runners, the class name can be specified, otherwise the fully " + "qualified name needs to be specified.") - @Default.Class(DirectPipelineRunner.class) + @Default.Class(InProcessPipelineRunner.class) Class<? extends PipelineRunner<?>> getRunner(); void setRunner(Class<? extends PipelineRunner<?>> kls); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java index 0852269..bb3d501 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -113,17 +112,16 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory { try { detector.verifyUnmodified(); } catch (IllegalMutationException exn) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - underlying.getPCollection().getProducingTransformInternal().getFullName(), - exn.getSavedValue(), - exn.getNewValue()), + throw new IllegalMutationException( + String.format( + "PTransform %s mutated value %s after it was output (new value was %s)." + + " Values must not be mutated in any way after being output.", + underlying.getPCollection().getProducingTransformInternal().getFullName(), exn.getSavedValue(), - exn.getNewValue(), - exn)); + exn.getNewValue()), + exn.getSavedValue(), + exn.getNewValue(), + exn); } } return underlying.commit(synchronizedProcessingTime); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 62c6909..e2d4342 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.RestoreSystemProperties; @@ -60,7 +61,7 @@ import java.util.Set; @RunWith(JUnit4.class) public class PipelineOptionsFactoryTest { private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS = - DirectPipelineRunner.class; + InProcessPipelineRunner.class; @Rule public ExpectedException expectedException = ExpectedException.none(); @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java index dfda528..459272e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -87,7 +87,7 @@ public class PipelineOptionsTest { @Test public void testDefaultRunnerIsSet() { - assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner()); + assertEquals(InProcessPipelineRunner.class, PipelineOptionsFactory.create().getRunner()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index 7690d2b..a778a0d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -128,45 +128,46 @@ public class TransformTreeTest { final EnumSet<TransformsSeen> left = EnumSet.noneOf(TransformsSeen.class); - p.traverseTopologically(new Pipeline.PipelineVisitor() { - @Override - public void enterCompositeTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - if (transform instanceof Sample.SampleAny) { - assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); - assertNotNull(node.getEnclosingNode()); - assertTrue(node.isCompositeNode()); - } else if (transform instanceof Write.Bound) { - assertTrue(visited.add(TransformsSeen.WRITE)); - assertNotNull(node.getEnclosingNode()); - assertTrue(node.isCompositeNode()); - } - assertThat(transform, not(instanceOf(Read.Bounded.class))); - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - if (transform instanceof Sample.SampleAny) { - assertTrue(left.add(TransformsSeen.SAMPLE_ANY)); - } - } - - @Override - public void visitTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - // Pick is a composite, should not be visited here. - assertThat(transform, not(instanceOf(Sample.SampleAny.class))); - assertThat(transform, not(instanceOf(Write.Bound.class))); - if (transform instanceof Read.Bounded) { - assertTrue(visited.add(TransformsSeen.READ)); - } - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - } - }); + p.traverseTopologically( + new Pipeline.PipelineVisitor() { + @Override + public void enterCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform instanceof Sample.SampleAny) { + assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); + assertNotNull(node.getEnclosingNode()); + assertTrue(node.isCompositeNode()); + } else if (transform instanceof Write.Bound) { + assertTrue(visited.add(TransformsSeen.WRITE)); + assertNotNull(node.getEnclosingNode()); + assertTrue(node.isCompositeNode()); + } + assertThat(transform, not(instanceOf(Read.Bounded.class))); + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform instanceof Sample.SampleAny) { + assertTrue(left.add(TransformsSeen.SAMPLE_ANY)); + } + } + + @Override + public void visitTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + // Pick is a composite, should not be visited here. + assertThat(transform, not(instanceOf(Sample.SampleAny.class))); + assertThat(transform, not(instanceOf(Write.Bound.class))); + if (transform instanceof Read.Bounded + && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) { + assertTrue(visited.add(TransformsSeen.READ)); + } + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) {} + }); assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class))); assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java index 7720589..b3a7d15 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java @@ -55,7 +55,7 @@ public class EncodabilityEnforcementFactoryTest { public void encodeFailsThrows() { TestPipeline p = TestPipeline.create(); PCollection<Record> unencodable = - p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder())); + p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder())); AppliedPTransform<?, ?, ?> consumer = unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java index 386eacc..06e71b8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -31,7 +30,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -163,10 +161,9 @@ public class ImmutabilityCheckingBundleFactoryTest { root.add(WindowedValue.valueInGlobalWindow(array)); array[1] = 2; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle<byte[]> committed = root.commit(Instant.now()); + root.commit(Instant.now()); } @Test @@ -184,10 +181,9 @@ public class ImmutabilityCheckingBundleFactoryTest { keyed.add(windowedArray); array[0] = Byte.MAX_VALUE; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); + keyed.commit(Instant.now()); } @Test @@ -205,10 +201,9 @@ public class ImmutabilityCheckingBundleFactoryTest { intermediate.add(windowedArray); array[2] = -3; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); + intermediate.commit(Instant.now()); } private static class IdentityDoFn<T> extends DoFn<T, T> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 44154e6..83e0f2c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -26,7 +26,6 @@ import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.isA; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.hamcrest.core.AnyOf.anyOf; @@ -36,7 +35,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; @@ -1119,7 +1117,7 @@ public class ParDoTest implements Serializable { input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag)) .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); - thrown.expect(PipelineExecutionException.class); + thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder"); pipeline.run(); } @@ -1422,8 +1420,7 @@ public class ParDoTest implements Serializable { } })); - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("output"); thrown.expectMessage("must not be mutated"); pipeline.run(); @@ -1472,8 +1469,7 @@ public class ParDoTest implements Serializable { } })); - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("output"); thrown.expectMessage("must not be mutated"); pipeline.run(); @@ -1499,7 +1495,7 @@ public class ParDoTest implements Serializable { })); thrown.expect(IllegalMutationException.class); - thrown.expectMessage("input"); + thrown.expectMessage("Input"); thrown.expectMessage("must not be mutated"); pipeline.run(); } @@ -1523,7 +1519,7 @@ public class ParDoTest implements Serializable { })); thrown.expect(IllegalMutationException.class); - thrown.expectMessage("input"); + thrown.expectMessage("Input"); thrown.expectMessage("must not be mutated"); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java index a0d1a63..1ffb147 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -65,7 +64,7 @@ public class WithKeysJava8Test { values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s))); - thrown.expect(PipelineExecutionException.class); + thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); thrown.expectMessage("Cannot provide a coder for type variable K"); thrown.expectMessage("the actual type is unknown due to erasure.");