Repository: incubator-beam Updated Branches: refs/heads/python-sdk 0e5c662b4 -> f166b16b8
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index ea708e5..8abfb05 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -28,8 +28,8 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -157,7 +157,7 @@ public class PipelineTest { @Test public void testToString() { PipelineOptions options = PipelineOptionsFactory.as(PipelineOptions.class); - options.setRunner(DirectPipelineRunner.class); + options.setRunner(CrashingRunner.class); Pipeline pipeline = Pipeline.create(options); assertEquals("Pipeline#" + pipeline.hashCode(), pipeline.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java index 774968f..cabfc21 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -18,14 +18,12 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; - import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.dataflow.TestCountingSource; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -123,10 +121,6 @@ public class BoundedReadFromUnboundedSourceTest implements Serializable{ private void test(boolean dedup, boolean timeBound) throws Exception { Pipeline p = TestPipeline.create(); - if (p.getOptions().getRunner() == DirectPipelineRunner.class) { - finalizeTracker = new ArrayList<>(); - TestCountingSource.setFinalizeTracker(finalizeTracker); - } TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting(); if (dedup) { source = source.withDedup(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java deleted file mode 100644 index 92c4835..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.ServiceLoader; - -/** Tests for {@link DirectPipelineRegistrar}. */ -@RunWith(JUnit4.class) -public class DirectPipelineRegistrarTest { - @Test - public void testCorrectOptionsAreReturned() { - assertEquals(ImmutableList.of(DirectPipelineOptions.class), - new DirectPipelineRegistrar.Options().getPipelineOptions()); - } - - @Test - public void testCorrectRunnersAreReturned() { - assertEquals(ImmutableList.of(DirectPipelineRunner.class), - new DirectPipelineRegistrar.Runner().getPipelineRunners()); - } - - @Test - public void testServiceLoaderForOptions() { - for (PipelineOptionsRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { - if (registrar instanceof DirectPipelineRegistrar.Options) { - return; - } - } - fail("Expected to find " + DirectPipelineRegistrar.Options.class); - } - - @Test - public void testServiceLoaderForRunner() { - for (PipelineRunnerRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { - if (registrar instanceof DirectPipelineRegistrar.Runner) { - return; - } - } - fail("Expected to find " + DirectPipelineRegistrar.Runner.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java deleted file mode 100644 index edf6996..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.ShardNameTemplate; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.IOChannelUtils; - -import com.google.common.collect.Iterables; -import com.google.common.io.Files; - -import org.apache.avro.file.DataFileReader; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -/** Tests for {@link DirectPipelineRunner}. */ -@RunWith(JUnit4.class) -public class DirectPipelineRunnerTest implements Serializable { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @Test - public void testToString() { - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options); - assertEquals("DirectPipelineRunner#" + runner.hashCode(), - runner.toString()); - } - - /** A {@link Coder} that fails during decoding. */ - private static class CrashingCoder<T> extends AtomicCoder<T> { - @Override - public void encode(T value, OutputStream stream, Context context) throws CoderException { - throw new CoderException("Called CrashingCoder.encode"); - } - - @Override - public T decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException { - throw new CoderException("Called CrashingCoder.decode"); - } - } - - /** A {@link DoFn} that outputs {@code 'hello'}. */ - private static class HelloDoFn extends DoFn<Integer, String> { - @Override - public void processElement(DoFn<Integer, String>.ProcessContext c) throws Exception { - c.output("hello"); - } - } - - @Test - public void testCoderException() { - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - - p.apply("CreateTestData", Create.of(42)) - .apply("CrashDuringCoding", ParDo.of(new HelloDoFn())) - .setCoder(new CrashingCoder<String>()); - - expectedException.expect(RuntimeException.class); - expectedException.expectCause(isA(CoderException.class)); - p.run(); - } - - @Test - public void testDirectPipelineOptions() { - DirectPipelineOptions options = PipelineOptionsFactory.create().as(DirectPipelineOptions.class); - assertNull(options.getDirectPipelineRunnerRandomSeed()); - } - - @Test - public void testTextIOWriteWithDefaultShardingStrategy() throws Exception { - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(TextIO.Write.to(prefix).withSuffix("txt")); - p.run(); - - String filename = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".txt", 0, 1); - List<String> fileContents = - Files.readLines(new File(filename), StandardCharsets.UTF_8); - // Ensure that each file got at least one record - assertFalse(fileContents.isEmpty()); - - assertThat(fileContents, containsInAnyOrder(expectedElements)); - } - - @Test - public void testTextIOWriteWithLimitedNumberOfShards() throws Exception { - final int numShards = 3; - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(TextIO.Write.to(prefix).withNumShards(numShards).withSuffix("txt")); - p.run(); - - List<String> allContents = new ArrayList<>(); - for (int i = 0; i < numShards; ++i) { - String shardFileName = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".txt", i, 3); - List<String> shardFileContents = - Files.readLines(new File(shardFileName), StandardCharsets.UTF_8); - - // Ensure that each file got at least one record - assertFalse(shardFileContents.isEmpty()); - - allContents.addAll(shardFileContents); - } - - assertThat(allContents, containsInAnyOrder(expectedElements)); - } - - @Test - public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception { - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(AvroIO.Write.withSchema(String.class).to(prefix).withSuffix(".avro")); - p.run(); - - String filename = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".avro", 0, 1); - List<String> fileContents = new ArrayList<>(); - Iterables.addAll(fileContents, DataFileReader.openReader( - new File(filename), AvroCoder.of(String.class).createDatumReader())); - - // Ensure that each file got at least one record - assertFalse(fileContents.isEmpty()); - - assertThat(fileContents, containsInAnyOrder(expectedElements)); - } - - @Test - public void testAvroIOWriteWithLimitedNumberOfShards() throws Exception { - final int numShards = 3; - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(AvroIO.Write.withSchema(String.class).to(prefix) - .withNumShards(numShards).withSuffix(".avro")); - p.run(); - - List<String> allContents = new ArrayList<>(); - for (int i = 0; i < numShards; ++i) { - String shardFileName = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".avro", i, 3); - List<String> shardFileContents = new ArrayList<>(); - Iterables.addAll(shardFileContents, DataFileReader.openReader( - new File(shardFileName), AvroCoder.of(String.class).createDatumReader())); - - // Ensure that each file got at least one record - assertFalse(shardFileContents.isEmpty()); - - allContents.addAll(shardFileContents); - } - - assertThat(allContents, containsInAnyOrder(expectedElements)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java index 9313439..5d2e69d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.DirectPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.TestCredential; @@ -53,10 +54,10 @@ public class PipelineRunnerTest { options.setAppName("test"); options.setProject("test"); options.setGcsUtil(mockGcsUtil); - options.setRunner(DirectPipelineRunner.class); + options.setRunner(CrashingRunner.class); options.setGcpCredential(new TestCredential()); PipelineRunner<?> runner = PipelineRunner.fromOptions(options); - assertTrue(runner instanceof DirectPipelineRunner); + assertTrue(runner instanceof CrashingRunner); } @Test @@ -66,10 +67,10 @@ public class PipelineRunnerTest { options.setAppName("test"); options.setProject("test"); options.setGcsUtil(mockGcsUtil); - options.setRunner(DirectPipelineRunner.class); + options.setRunner(CrashingRunner.class); options.setGcpCredential(new TestCredential()); PipelineRunner<?> runner = PipelineRunner.fromOptions(options); - assertTrue(runner instanceof DirectPipelineRunner); + assertTrue(runner instanceof CrashingRunner); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index a0b508c..b0ca70b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -56,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.util.PerKeyCombineFnRunners; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; @@ -87,7 +85,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Random; import java.util.Set; /** @@ -516,24 +513,6 @@ public class CombineTest implements Serializable { assertThat(sum.getName(), Matchers.startsWith("Sum")); } - @Test - public void testAddInputsRandomly() { - TestCounter counter = new TestCounter(); - Combine.KeyedCombineFn< - String, Integer, TestCounter.Counter, Iterable<Long>> fn = - counter.asKeyedFn(); - - List<TestCounter.Counter> accums = DirectPipelineRunner.TestCombineDoFn.addInputsRandomly( - PerKeyCombineFnRunners.create(fn), "bob", Arrays.asList(NUMBERS), new Random(42), - processContext); - - assertThat(accums, Matchers.contains( - counter.new Counter(3, 2, 0, 0), - counter.new Counter(131, 5, 0, 0), - counter.new Counter(8, 2, 0, 0), - counter.new Counter(1, 1, 0, 0))); - } - private static final SerializableFunction<String, Integer> hotKeyFanout = new SerializableFunction<String, Integer>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 4ce025d..299def7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -30,9 +30,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -244,15 +241,9 @@ public class GroupByKeyTest { Duration.standardMinutes(1))))); } - private Pipeline createTestDirectRunner() { - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setRunner(DirectPipelineRunner.class); - return Pipeline.create(options); - } - @Test public void testInvalidWindowsDirect() { - Pipeline p = createTestDirectRunner(); + Pipeline p = TestPipeline.create(); List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); @@ -297,7 +288,7 @@ public class GroupByKeyTest { @Test public void testGroupByKeyDirectUnbounded() { - Pipeline p = createTestDirectRunner(); + Pipeline p = TestPipeline.create(); PCollection<KV<String, Integer>> input = p.apply( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 18d39d7..5e6e6a3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -33,9 +33,6 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -1335,12 +1332,6 @@ public class ViewTest implements Serializable { assertEquals("View.AsMultimap", View.<String, Integer>asMultimap().getName()); } - private Pipeline createTestDirectRunner() { - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setRunner(DirectPipelineRunner.class); - return Pipeline.create(options); - } - private void testViewUnbounded( Pipeline pipeline, PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) { @@ -1378,51 +1369,51 @@ public class ViewTest implements Serializable { @Test public void testViewUnboundedAsSingletonDirect() { - testViewUnbounded(createTestDirectRunner(), View.<KV<String, Integer>>asSingleton()); + testViewUnbounded(TestPipeline.create(), View.<KV<String, Integer>>asSingleton()); } @Test public void testViewUnboundedAsIterableDirect() { - testViewUnbounded(createTestDirectRunner(), View.<KV<String, Integer>>asIterable()); + testViewUnbounded(TestPipeline.create(), View.<KV<String, Integer>>asIterable()); } @Test public void testViewUnboundedAsListDirect() { - testViewUnbounded(createTestDirectRunner(), View.<KV<String, Integer>>asList()); + testViewUnbounded(TestPipeline.create(), View.<KV<String, Integer>>asList()); } @Test public void testViewUnboundedAsMapDirect() { - testViewUnbounded(createTestDirectRunner(), View.<String, Integer>asMap()); + testViewUnbounded(TestPipeline.create(), View.<String, Integer>asMap()); } @Test public void testViewUnboundedAsMultimapDirect() { - testViewUnbounded(createTestDirectRunner(), View.<String, Integer>asMultimap()); + testViewUnbounded(TestPipeline.create(), View.<String, Integer>asMultimap()); } @Test public void testViewNonmergingAsSingletonDirect() { - testViewNonmerging(createTestDirectRunner(), View.<KV<String, Integer>>asSingleton()); + testViewNonmerging(TestPipeline.create(), View.<KV<String, Integer>>asSingleton()); } @Test public void testViewNonmergingAsIterableDirect() { - testViewNonmerging(createTestDirectRunner(), View.<KV<String, Integer>>asIterable()); + testViewNonmerging(TestPipeline.create(), View.<KV<String, Integer>>asIterable()); } @Test public void testViewNonmergingAsListDirect() { - testViewNonmerging(createTestDirectRunner(), View.<KV<String, Integer>>asList()); + testViewNonmerging(TestPipeline.create(), View.<KV<String, Integer>>asList()); } @Test public void testViewNonmergingAsMapDirect() { - testViewNonmerging(createTestDirectRunner(), View.<String, Integer>asMap()); + testViewNonmerging(TestPipeline.create(), View.<String, Integer>asMap()); } @Test public void testViewNonmergingAsMultimapDirect() { - testViewNonmerging(createTestDirectRunner(), View.<String, Integer>asMultimap()); + testViewNonmerging(TestPipeline.create(), View.<String, Integer>asMultimap()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java index 4914d4c..76df4d4 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java @@ -17,6 +17,7 @@ */ package ${package}.common; +import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowPipelineRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -39,7 +40,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.IntraBundleParallelization; import org.apache.beam.sdk.util.Transport; import com.google.common.collect.Lists; @@ -250,17 +250,8 @@ public class DataflowExampleUtils { } } - /** - * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming - * flag value. - */ public void setupRunner() { - if (options.isStreaming()) { - if (options.getRunner() == DirectPipelineRunner.class) { - throw new IllegalArgumentException( - "Processing of unbounded input sources is not supported with the DirectPipelineRunner."); - } + if (options.isStreaming() && options.getRunner().equals(BlockingDataflowPipelineRunner.class)) { // In order to cancel the pipelines automatically, // {@literal DataflowPipelineRunner} is forced to be used. options.setRunner(DataflowPipelineRunner.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/testing/travis/test_wordcount.sh ---------------------------------------------------------------------- diff --git a/testing/travis/test_wordcount.sh b/testing/travis/test_wordcount.sh index 40e2724..b00b0d6 100755 --- a/testing/travis/test_wordcount.sh +++ b/testing/travis/test_wordcount.sh @@ -70,7 +70,7 @@ function run_via_mvn { local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2 local cmd='mvn exec:java -f pom.xml -pl examples/java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ - -Dexec.args="--runner=DirectPipelineRunner --inputFile='"$input"' --output='"$outfile_prefix"'"' + -Dexec.args="--runner=InProcessPipelineRunner --inputFile='"$input"' --output='"$outfile_prefix"'"' echo "$name: Running $cmd" >&2 sh -c "$cmd" check_result_hash "$name" "$outfile_prefix" "$expected_hash" @@ -84,7 +84,7 @@ function run_bundled { local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2 local cmd='java -cp '"$JAR_FILE"' \ org.apache.beam.examples.WordCount \ - --runner=DirectPipelineRunner \ + --runner=InProcessPipelineRunner \ --inputFile='"'$input'"' \ --output='"$outfile_prefix" echo "$name: Running $cmd" >&2