Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 0e5c662b4 -> f166b16b8


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index ea708e5..8abfb05 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -28,8 +28,8 @@ import 
org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -157,7 +157,7 @@ public class PipelineTest {
   @Test
   public void testToString() {
     PipelineOptions options = PipelineOptionsFactory.as(PipelineOptions.class);
-    options.setRunner(DirectPipelineRunner.class);
+    options.setRunner(CrashingRunner.class);
     Pipeline pipeline = Pipeline.create(options);
     assertEquals("Pipeline#" + pipeline.hashCode(), pipeline.toString());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
index 774968f..cabfc21 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
@@ -18,14 +18,12 @@
 package org.apache.beam.sdk.io;
 
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -123,10 +121,6 @@ public class BoundedReadFromUnboundedSourceTest implements 
Serializable{
   private void test(boolean dedup, boolean timeBound) throws Exception {
     Pipeline p = TestPipeline.create();
 
-    if (p.getOptions().getRunner() == DirectPipelineRunner.class) {
-      finalizeTracker = new ArrayList<>();
-      TestCountingSource.setFinalizeTracker(finalizeTracker);
-    }
     TestCountingSource source = new 
TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
     if (dedup) {
       source = source.withDedup();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java
deleted file mode 100644
index 92c4835..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.ServiceLoader;
-
-/** Tests for {@link DirectPipelineRegistrar}. */
-@RunWith(JUnit4.class)
-public class DirectPipelineRegistrarTest {
-  @Test
-  public void testCorrectOptionsAreReturned() {
-    assertEquals(ImmutableList.of(DirectPipelineOptions.class),
-        new DirectPipelineRegistrar.Options().getPipelineOptions());
-  }
-
-  @Test
-  public void testCorrectRunnersAreReturned() {
-    assertEquals(ImmutableList.of(DirectPipelineRunner.class),
-        new DirectPipelineRegistrar.Runner().getPipelineRunners());
-  }
-
-  @Test
-  public void testServiceLoaderForOptions() {
-    for (PipelineOptionsRegistrar registrar :
-        
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator()))
 {
-      if (registrar instanceof DirectPipelineRegistrar.Options) {
-        return;
-      }
-    }
-    fail("Expected to find " + DirectPipelineRegistrar.Options.class);
-  }
-
-  @Test
-  public void testServiceLoaderForRunner() {
-    for (PipelineRunnerRegistrar registrar :
-        
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator()))
 {
-      if (registrar instanceof DirectPipelineRegistrar.Runner) {
-        return;
-      }
-    }
-    fail("Expected to find " + DirectPipelineRegistrar.Runner.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
deleted file mode 100644
index edf6996..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.ShardNameTemplate;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.IOChannelUtils;
-
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
-
-import org.apache.avro.file.DataFileReader;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/** Tests for {@link DirectPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class DirectPipelineRunnerTest implements Serializable {
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testToString() {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(DirectPipelineRunner.class);
-    DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
-    assertEquals("DirectPipelineRunner#" + runner.hashCode(),
-        runner.toString());
-  }
-
-  /** A {@link Coder} that fails during decoding. */
-  private static class CrashingCoder<T> extends AtomicCoder<T> {
-    @Override
-    public void encode(T value, OutputStream stream, Context context) throws 
CoderException {
-      throw new CoderException("Called CrashingCoder.encode");
-    }
-
-    @Override
-    public T decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-            throws CoderException {
-      throw new CoderException("Called CrashingCoder.decode");
-    }
-  }
-
-  /** A {@link DoFn} that outputs {@code 'hello'}. */
-  private static class HelloDoFn extends DoFn<Integer, String> {
-    @Override
-    public void processElement(DoFn<Integer, String>.ProcessContext c) throws 
Exception {
-      c.output("hello");
-    }
-  }
-
-  @Test
-  public void testCoderException() {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(DirectPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    p.apply("CreateTestData", Create.of(42))
-        .apply("CrashDuringCoding", ParDo.of(new HelloDoFn()))
-        .setCoder(new CrashingCoder<String>());
-
-    expectedException.expect(RuntimeException.class);
-    expectedException.expectCause(isA(CoderException.class));
-    p.run();
-  }
-
-  @Test
-  public void testDirectPipelineOptions() {
-    DirectPipelineOptions options = 
PipelineOptionsFactory.create().as(DirectPipelineOptions.class);
-    assertNull(options.getDirectPipelineRunnerRandomSeed());
-  }
-
-  @Test
-  public void testTextIOWriteWithDefaultShardingStrategy() throws Exception {
-    String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), 
"output");
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(DirectPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-    String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", 
"g", "h", "i" };
-    p.apply(Create.of(expectedElements))
-     .apply(TextIO.Write.to(prefix).withSuffix("txt"));
-    p.run();
-
-    String filename =
-        IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, 
".txt", 0, 1);
-    List<String> fileContents =
-        Files.readLines(new File(filename), StandardCharsets.UTF_8);
-    // Ensure that each file got at least one record
-    assertFalse(fileContents.isEmpty());
-
-    assertThat(fileContents, containsInAnyOrder(expectedElements));
-  }
-
-  @Test
-  public void testTextIOWriteWithLimitedNumberOfShards() throws Exception {
-    final int numShards = 3;
-    String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), 
"shardedOutput");
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(DirectPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-    String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", 
"g", "h", "i" };
-    p.apply(Create.of(expectedElements))
-     
.apply(TextIO.Write.to(prefix).withNumShards(numShards).withSuffix("txt"));
-    p.run();
-
-    List<String> allContents = new ArrayList<>();
-    for (int i = 0; i < numShards; ++i) {
-      String shardFileName =
-          IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, 
".txt", i, 3);
-      List<String> shardFileContents =
-          Files.readLines(new File(shardFileName), StandardCharsets.UTF_8);
-
-      // Ensure that each file got at least one record
-      assertFalse(shardFileContents.isEmpty());
-
-      allContents.addAll(shardFileContents);
-    }
-
-    assertThat(allContents, containsInAnyOrder(expectedElements));
-  }
-
-  @Test
-  public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception {
-    String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), 
"output");
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(DirectPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-    String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", 
"g", "h", "i" };
-    p.apply(Create.of(expectedElements))
-     
.apply(AvroIO.Write.withSchema(String.class).to(prefix).withSuffix(".avro"));
-    p.run();
-
-    String filename =
-        IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, 
".avro", 0, 1);
-    List<String> fileContents = new ArrayList<>();
-    Iterables.addAll(fileContents, DataFileReader.openReader(
-        new File(filename), AvroCoder.of(String.class).createDatumReader()));
-
-    // Ensure that each file got at least one record
-    assertFalse(fileContents.isEmpty());
-
-    assertThat(fileContents, containsInAnyOrder(expectedElements));
-  }
-
-  @Test
-  public void testAvroIOWriteWithLimitedNumberOfShards() throws Exception {
-    final int numShards = 3;
-    String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), 
"shardedOutput");
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(DirectPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-    String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", 
"g", "h", "i" };
-    p.apply(Create.of(expectedElements))
-     .apply(AvroIO.Write.withSchema(String.class).to(prefix)
-                        .withNumShards(numShards).withSuffix(".avro"));
-    p.run();
-
-    List<String> allContents = new ArrayList<>();
-    for (int i = 0; i < numShards; ++i) {
-      String shardFileName =
-          IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, 
".avro", i, 3);
-      List<String> shardFileContents = new ArrayList<>();
-      Iterables.addAll(shardFileContents, DataFileReader.openReader(
-          new File(shardFileName), 
AvroCoder.of(String.class).createDatumReader()));
-
-      // Ensure that each file got at least one record
-      assertFalse(shardFileContents.isEmpty());
-
-      allContents.addAll(shardFileContents);
-    }
-
-    assertThat(allContents, containsInAnyOrder(expectedElements));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index 9313439..5d2e69d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.DirectPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.TestCredential;
 
@@ -53,10 +54,10 @@ public class PipelineRunnerTest {
     options.setAppName("test");
     options.setProject("test");
     options.setGcsUtil(mockGcsUtil);
-    options.setRunner(DirectPipelineRunner.class);
+    options.setRunner(CrashingRunner.class);
     options.setGcpCredential(new TestCredential());
     PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
-    assertTrue(runner instanceof DirectPipelineRunner);
+    assertTrue(runner instanceof CrashingRunner);
   }
 
   @Test
@@ -66,10 +67,10 @@ public class PipelineRunnerTest {
     options.setAppName("test");
     options.setProject("test");
     options.setGcsUtil(mockGcsUtil);
-    options.setRunner(DirectPipelineRunner.class);
+    options.setRunner(CrashingRunner.class);
     options.setGcpCredential(new TestCredential());
     PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
-    assertTrue(runner instanceof DirectPipelineRunner);
+    assertTrue(runner instanceof CrashingRunner);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index a0b508c..b0ca70b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -56,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
@@ -87,7 +85,6 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Random;
 import java.util.Set;
 
 /**
@@ -516,24 +513,6 @@ public class CombineTest implements Serializable {
     assertThat(sum.getName(), Matchers.startsWith("Sum"));
   }
 
-  @Test
-  public void testAddInputsRandomly() {
-    TestCounter counter = new TestCounter();
-    Combine.KeyedCombineFn<
-        String, Integer, TestCounter.Counter, Iterable<Long>> fn =
-        counter.asKeyedFn();
-
-    List<TestCounter.Counter> accums = 
DirectPipelineRunner.TestCombineDoFn.addInputsRandomly(
-        PerKeyCombineFnRunners.create(fn), "bob", Arrays.asList(NUMBERS), new 
Random(42),
-        processContext);
-
-    assertThat(accums, Matchers.contains(
-        counter.new Counter(3, 2, 0, 0),
-        counter.new Counter(131, 5, 0, 0),
-        counter.new Counter(8, 2, 0, 0),
-        counter.new Counter(1, 1, 0, 0)));
-  }
-
   private static final SerializableFunction<String, Integer> hotKeyFanout =
       new SerializableFunction<String, Integer>() {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 4ce025d..299def7 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -30,9 +30,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -244,15 +241,9 @@ public class GroupByKeyTest {
                     Duration.standardMinutes(1)))));
   }
 
-  private Pipeline createTestDirectRunner() {
-    DirectPipelineOptions options = 
PipelineOptionsFactory.as(DirectPipelineOptions.class);
-    options.setRunner(DirectPipelineRunner.class);
-    return Pipeline.create(options);
-  }
-
   @Test
   public void testInvalidWindowsDirect() {
-    Pipeline p = createTestDirectRunner();
+    Pipeline p = TestPipeline.create();
 
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -297,7 +288,7 @@ public class GroupByKeyTest {
 
   @Test
   public void testGroupByKeyDirectUnbounded() {
-    Pipeline p = createTestDirectRunner();
+    Pipeline p = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
         p.apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 18d39d7..5e6e6a3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -33,9 +33,6 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -1335,12 +1332,6 @@ public class ViewTest implements Serializable {
     assertEquals("View.AsMultimap", View.<String, 
Integer>asMultimap().getName());
   }
 
-  private Pipeline createTestDirectRunner() {
-    DirectPipelineOptions options = 
PipelineOptionsFactory.as(DirectPipelineOptions.class);
-    options.setRunner(DirectPipelineRunner.class);
-    return Pipeline.create(options);
-  }
-
   private void testViewUnbounded(
       Pipeline pipeline,
       PTransform<PCollection<KV<String, Integer>>, ? extends 
PCollectionView<?>> view) {
@@ -1378,51 +1369,51 @@ public class ViewTest implements Serializable {
 
   @Test
   public void testViewUnboundedAsSingletonDirect() {
-    testViewUnbounded(createTestDirectRunner(), View.<KV<String, 
Integer>>asSingleton());
+    testViewUnbounded(TestPipeline.create(), View.<KV<String, 
Integer>>asSingleton());
   }
 
   @Test
   public void testViewUnboundedAsIterableDirect() {
-    testViewUnbounded(createTestDirectRunner(), View.<KV<String, 
Integer>>asIterable());
+    testViewUnbounded(TestPipeline.create(), View.<KV<String, 
Integer>>asIterable());
   }
 
   @Test
   public void testViewUnboundedAsListDirect() {
-    testViewUnbounded(createTestDirectRunner(), View.<KV<String, 
Integer>>asList());
+    testViewUnbounded(TestPipeline.create(), View.<KV<String, 
Integer>>asList());
   }
 
   @Test
   public void testViewUnboundedAsMapDirect() {
-    testViewUnbounded(createTestDirectRunner(), View.<String, Integer>asMap());
+    testViewUnbounded(TestPipeline.create(), View.<String, Integer>asMap());
   }
 
   @Test
   public void testViewUnboundedAsMultimapDirect() {
-    testViewUnbounded(createTestDirectRunner(), View.<String, 
Integer>asMultimap());
+    testViewUnbounded(TestPipeline.create(), View.<String, 
Integer>asMultimap());
   }
 
   @Test
   public void testViewNonmergingAsSingletonDirect() {
-    testViewNonmerging(createTestDirectRunner(), View.<KV<String, 
Integer>>asSingleton());
+    testViewNonmerging(TestPipeline.create(), View.<KV<String, 
Integer>>asSingleton());
   }
 
   @Test
   public void testViewNonmergingAsIterableDirect() {
-    testViewNonmerging(createTestDirectRunner(), View.<KV<String, 
Integer>>asIterable());
+    testViewNonmerging(TestPipeline.create(), View.<KV<String, 
Integer>>asIterable());
   }
 
   @Test
   public void testViewNonmergingAsListDirect() {
-    testViewNonmerging(createTestDirectRunner(), View.<KV<String, 
Integer>>asList());
+    testViewNonmerging(TestPipeline.create(), View.<KV<String, 
Integer>>asList());
   }
 
   @Test
   public void testViewNonmergingAsMapDirect() {
-    testViewNonmerging(createTestDirectRunner(), View.<String, 
Integer>asMap());
+    testViewNonmerging(TestPipeline.create(), View.<String, Integer>asMap());
   }
 
   @Test
   public void testViewNonmergingAsMultimapDirect() {
-    testViewNonmerging(createTestDirectRunner(), View.<String, 
Integer>asMultimap());
+    testViewNonmerging(TestPipeline.create(), View.<String, 
Integer>asMultimap());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
 
b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
index 4914d4c..76df4d4 100644
--- 
a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ 
b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
@@ -17,6 +17,7 @@
  */
 package ${package}.common;
 
+import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -39,7 +40,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.IntraBundleParallelization;
 import org.apache.beam.sdk.util.Transport;
 import com.google.common.collect.Lists;
@@ -250,17 +250,8 @@ public class DataflowExampleUtils {
     }
   }
 
-  /**
-   * Do some runner setup: check that the DirectPipelineRunner is not used in 
conjunction with
-   * streaming, and if streaming is specified, use the DataflowPipelineRunner. 
Return the streaming
-   * flag value.
-   */
   public void setupRunner() {
-    if (options.isStreaming()) {
-      if (options.getRunner() == DirectPipelineRunner.class) {
-        throw new IllegalArgumentException(
-          "Processing of unbounded input sources is not supported with the 
DirectPipelineRunner.");
-      }
+    if (options.isStreaming() && 
options.getRunner().equals(BlockingDataflowPipelineRunner.class)) {
       // In order to cancel the pipelines automatically,
       // {@literal DataflowPipelineRunner} is forced to be used.
       options.setRunner(DataflowPipelineRunner.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/testing/travis/test_wordcount.sh
----------------------------------------------------------------------
diff --git a/testing/travis/test_wordcount.sh b/testing/travis/test_wordcount.sh
index 40e2724..b00b0d6 100755
--- a/testing/travis/test_wordcount.sh
+++ b/testing/travis/test_wordcount.sh
@@ -70,7 +70,7 @@ function run_via_mvn {
   local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2
   local cmd='mvn exec:java -f pom.xml -pl examples/java \
     -Dexec.mainClass=org.apache.beam.examples.WordCount \
-    -Dexec.args="--runner=DirectPipelineRunner --inputFile='"$input"' 
--output='"$outfile_prefix"'"'
+    -Dexec.args="--runner=InProcessPipelineRunner --inputFile='"$input"' 
--output='"$outfile_prefix"'"'
   echo "$name: Running $cmd" >&2
   sh -c "$cmd"
   check_result_hash "$name" "$outfile_prefix" "$expected_hash"
@@ -84,7 +84,7 @@ function run_bundled {
   local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2
   local cmd='java -cp '"$JAR_FILE"' \
     org.apache.beam.examples.WordCount \
-    --runner=DirectPipelineRunner \
+    --runner=InProcessPipelineRunner \
     --inputFile='"'$input'"' \
     --output='"$outfile_prefix"
   echo "$name: Running $cmd" >&2

Reply via email to