Repository: incubator-beam
Updated Branches:
  refs/heads/master e9f1b579a -> 9039949d5


Switch the Default PipelineRunner

Use the InProcessPiplineRunner (pending rename) as the default runner.
The InProcessPipelineRunner implements the beam model, including support
for Unbounded PCollections.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/757cb326
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/757cb326
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/757cb326

Branch: refs/heads/master
Commit: 757cb326b909ad62aea8c51183a83521adfd5a3a
Parents: e9f1b57
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 8 10:20:56 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 15 16:12:52 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/options/PipelineOptions.java       |  4 +-
 .../ImmutabilityCheckingBundleFactory.java      | 20 +++--
 .../sdk/options/PipelineOptionsFactoryTest.java |  3 +-
 .../beam/sdk/options/PipelineOptionsTest.java   |  4 +-
 .../beam/sdk/runners/TransformTreeTest.java     | 79 ++++++++++----------
 .../EncodabilityEnforcementFactoryTest.java     |  2 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  | 17 ++---
 .../apache/beam/sdk/transforms/ParDoTest.java   | 14 ++--
 .../beam/sdk/transforms/WithKeysJava8Test.java  |  3 +-
 9 files changed, 68 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 17cf5b3..d87e396 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -21,8 +21,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 
@@ -225,7 +225,7 @@ public interface PipelineOptions {
   @Description("The pipeline runner that will be used to execute the pipeline. 
"
       + "For registered runners, the class name can be specified, otherwise 
the fully "
       + "qualified name needs to be specified.")
-  @Default.Class(DirectPipelineRunner.class)
+  @Default.Class(InProcessPipelineRunner.class)
   Class<? extends PipelineRunner<?>> getRunner();
   void setRunner(Class<? extends PipelineRunner<?>> kls);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
index 0852269..bb3d501 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.MutationDetector;
 import org.apache.beam.sdk.util.MutationDetectors;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -113,17 +112,16 @@ class ImmutabilityCheckingBundleFactory implements 
BundleFactory {
         try {
           detector.verifyUnmodified();
         } catch (IllegalMutationException exn) {
-          throw UserCodeException.wrap(
-              new IllegalMutationException(
-                  String.format(
-                      "PTransform %s mutated value %s after it was output (new 
value was %s)."
-                          + " Values must not be mutated in any way after 
being output.",
-                      
underlying.getPCollection().getProducingTransformInternal().getFullName(),
-                      exn.getSavedValue(),
-                      exn.getNewValue()),
+          throw new IllegalMutationException(
+              String.format(
+                  "PTransform %s mutated value %s after it was output (new 
value was %s)."
+                      + " Values must not be mutated in any way after being 
output.",
+                  
underlying.getPCollection().getProducingTransformInternal().getFullName(),
                   exn.getSavedValue(),
-                  exn.getNewValue(),
-                  exn));
+                  exn.getNewValue()),
+              exn.getSavedValue(),
+              exn.getNewValue(),
+              exn);
         }
       }
       return underlying.commit(synchronizedProcessingTime);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 62c6909..e2d4342 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
 
@@ -60,7 +61,7 @@ import java.util.Set;
 @RunWith(JUnit4.class)
 public class PipelineOptionsFactoryTest {
   private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS 
=
-      DirectPipelineRunner.class;
+      InProcessPipelineRunner.class;
 
   @Rule public ExpectedException expectedException = ExpectedException.none();
   @Rule public TestRule restoreSystemProperties = new 
RestoreSystemProperties();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index dfda528..459272e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -87,7 +87,7 @@ public class PipelineOptionsTest {
 
   @Test
   public void testDefaultRunnerIsSet() {
-    assertEquals(DirectPipelineRunner.class, 
PipelineOptionsFactory.create().getRunner());
+    assertEquals(InProcessPipelineRunner.class, 
PipelineOptionsFactory.create().getRunner());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 7690d2b..a778a0d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -128,45 +128,46 @@ public class TransformTreeTest {
     final EnumSet<TransformsSeen> left =
         EnumSet.noneOf(TransformsSeen.class);
 
-    p.traverseTopologically(new Pipeline.PipelineVisitor() {
-      @Override
-      public void enterCompositeTransform(TransformTreeNode node) {
-        PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Sample.SampleAny) {
-          assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
-          assertNotNull(node.getEnclosingNode());
-          assertTrue(node.isCompositeNode());
-        } else if (transform instanceof Write.Bound) {
-          assertTrue(visited.add(TransformsSeen.WRITE));
-          assertNotNull(node.getEnclosingNode());
-          assertTrue(node.isCompositeNode());
-        }
-        assertThat(transform, not(instanceOf(Read.Bounded.class)));
-      }
-
-      @Override
-      public void leaveCompositeTransform(TransformTreeNode node) {
-        PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Sample.SampleAny) {
-          assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
-        }
-      }
-
-      @Override
-      public void visitTransform(TransformTreeNode node) {
-        PTransform<?, ?> transform = node.getTransform();
-        // Pick is a composite, should not be visited here.
-        assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
-        assertThat(transform, not(instanceOf(Write.Bound.class)));
-        if (transform instanceof Read.Bounded) {
-          assertTrue(visited.add(TransformsSeen.READ));
-        }
-      }
-
-      @Override
-      public void visitValue(PValue value, TransformTreeNode producer) {
-      }
-    });
+    p.traverseTopologically(
+        new Pipeline.PipelineVisitor() {
+          @Override
+          public void enterCompositeTransform(TransformTreeNode node) {
+            PTransform<?, ?> transform = node.getTransform();
+            if (transform instanceof Sample.SampleAny) {
+              assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
+              assertNotNull(node.getEnclosingNode());
+              assertTrue(node.isCompositeNode());
+            } else if (transform instanceof Write.Bound) {
+              assertTrue(visited.add(TransformsSeen.WRITE));
+              assertNotNull(node.getEnclosingNode());
+              assertTrue(node.isCompositeNode());
+            }
+            assertThat(transform, not(instanceOf(Read.Bounded.class)));
+          }
+
+          @Override
+          public void leaveCompositeTransform(TransformTreeNode node) {
+            PTransform<?, ?> transform = node.getTransform();
+            if (transform instanceof Sample.SampleAny) {
+              assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
+            }
+          }
+
+          @Override
+          public void visitTransform(TransformTreeNode node) {
+            PTransform<?, ?> transform = node.getTransform();
+            // Pick is a composite, should not be visited here.
+            assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
+            assertThat(transform, not(instanceOf(Write.Bound.class)));
+            if (transform instanceof Read.Bounded
+                && node.getEnclosingNode().getTransform() instanceof 
TextIO.Read.Bound) {
+              assertTrue(visited.add(TransformsSeen.READ));
+            }
+          }
+
+          @Override
+          public void visitValue(PValue value, TransformTreeNode producer) {}
+        });
 
     assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
     assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
index 7720589..b3a7d15 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -55,7 +55,7 @@ public class EncodabilityEnforcementFactoryTest {
   public void encodeFailsThrows() {
     TestPipeline p = TestPipeline.create();
     PCollection<Record> unencodable =
-        p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
+        p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
     AppliedPTransform<?, ?, ?> consumer =
         
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
index 386eacc..06e71b8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.runners.inprocess;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -31,7 +30,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -163,10 +161,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
     root.add(WindowedValue.valueInGlobalWindow(array));
 
     array[1] = 2;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    CommittedBundle<byte[]> committed = root.commit(Instant.now());
+    root.commit(Instant.now());
   }
 
   @Test
@@ -184,10 +181,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
     keyed.add(windowedArray);
 
     array[0] = Byte.MAX_VALUE;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+    keyed.commit(Instant.now());
   }
 
   @Test
@@ -205,10 +201,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
     intermediate.add(windowedArray);
 
     array[2] = -3;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+    intermediate.commit(Instant.now());
   }
 
   private static class IdentityDoFn<T> extends DoFn<T, T> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 44154e6..83e0f2c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -26,7 +26,6 @@ import static 
org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 
 import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.isA;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.hamcrest.core.AnyOf.anyOf;
@@ -36,7 +35,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
@@ -1119,7 +1117,7 @@ public class ParDoTest implements Serializable {
     input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag))
         .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
-    thrown.expect(PipelineExecutionException.class);
+    thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder");
     pipeline.run();
   }
@@ -1422,8 +1420,7 @@ public class ParDoTest implements Serializable {
           }
         }));
 
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("output");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
@@ -1472,8 +1469,7 @@ public class ParDoTest implements Serializable {
           }
         }));
 
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("output");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
@@ -1499,7 +1495,7 @@ public class ParDoTest implements Serializable {
         }));
 
     thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("input");
+    thrown.expectMessage("Input");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
   }
@@ -1523,7 +1519,7 @@ public class ParDoTest implements Serializable {
         }));
 
     thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("input");
+    thrown.expectMessage("Input");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/757cb326/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index a0d1a63..1ffb147 100644
--- 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -65,7 +64,7 @@ public class WithKeysJava8Test {
 
     values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> 
Integer.valueOf(s)));
 
-    thrown.expect(PipelineExecutionException.class);
+    thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder for 
ApplyKeysWithWithKeys");
     thrown.expectMessage("Cannot provide a coder for type variable K");
     thrown.expectMessage("the actual type is unknown due to erasure.");

Reply via email to