Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12be8b1e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12be8b1e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12be8b1e Branch: refs/heads/master Commit: 12be8b1e6adf342e2b482aa37d5a9577e13802c5 Parents: 8d478c0 Author: Stas Levin <stasle...@gmail.com> Authored: Tue Dec 20 17:38:38 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 20 09:55:45 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 25 +++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12be8b1e/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index cc1ef26..071deea 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; @@ -95,6 +94,9 @@ public class KafkaIOTest { */ @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException thrown = ExpectedException.none(); // Update mock consumer with records distributed among the given topics, each with given number @@ -268,7 +270,6 @@ public class KafkaIOTest { @Test @Category(NeedsRunner.class) public void testUnboundedSource() { - Pipeline p = TestPipeline.create(); int numElements = 1000; PCollection<Long> input = p @@ -283,7 +284,6 @@ public class KafkaIOTest { @Test @Category(NeedsRunner.class) public void testUnboundedSourceWithExplicitPartitions() { - Pipeline p = TestPipeline.create(); int numElements = 1000; List<String> topics = ImmutableList.of("test"); @@ -322,7 +322,7 @@ public class KafkaIOTest { @Test @Category(NeedsRunner.class) public void testUnboundedSourceTimestamps() { - Pipeline p = TestPipeline.create(); + int numElements = 1000; PCollection<Long> input = p @@ -350,7 +350,7 @@ public class KafkaIOTest { @Test @Category(NeedsRunner.class) public void testUnboundedSourceSplits() throws Exception { - Pipeline p = TestPipeline.create(); + int numElements = 1000; int numSplits = 10; @@ -514,10 +514,9 @@ public class KafkaIOTest { ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); - Pipeline pipeline = TestPipeline.create(); String topic = "test"; - pipeline + p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(KafkaIO.write() @@ -527,7 +526,7 @@ public class KafkaIOTest { .withValueCoder(BigEndianLongCoder.of()) .withProducerFactoryFn(new ProducerFactoryFn())); - pipeline.run(); + p.run(); completionThread.shutdown(); @@ -547,10 +546,9 @@ public class KafkaIOTest { ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); - Pipeline pipeline = TestPipeline.create(); String topic = "test"; - pipeline + p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(Values.<Long>create()) // there are no keys @@ -562,7 +560,7 @@ public class KafkaIOTest { .withProducerFactoryFn(new ProducerFactoryFn()) .values()); - pipeline.run(); + p.run(); completionThread.shutdown(); @@ -588,13 +586,12 @@ public class KafkaIOTest { MOCK_PRODUCER.clear(); - Pipeline pipeline = TestPipeline.create(); String topic = "test"; ProducerSendCompletionThread completionThreadWithErrors = new ProducerSendCompletionThread(10, 100).start(); - pipeline + p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(KafkaIO.write() @@ -605,7 +602,7 @@ public class KafkaIOTest { .withProducerFactoryFn(new ProducerFactoryFn())); try { - pipeline.run(); + p.run(); } catch (PipelineExecutionException e) { // throwing inner exception helps assert that first exception is thrown from the Sink throw e.getCause().getCause();