[BEAM-972] Add unit tests to Gearpump runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb0d333d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb0d333d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb0d333d Branch: refs/heads/master Commit: eb0d333df23624f54aae2abb8d7c7873f8ed2a7a Parents: 555842a Author: huafengw <fvunic...@gmail.com> Authored: Tue Mar 21 19:45:10 2017 +0800 Committer: huafengw <fvunic...@gmail.com> Committed: Thu Mar 23 19:52:11 2017 +0800 ---------------------------------------------------------------------- examples/java/pom.xml | 12 +++ pom.xml | 6 ++ runners/gearpump/README.md | 41 ++++++++- runners/gearpump/pom.xml | 2 - .../gearpump/GearpumpRunnerRegistrar.java | 4 +- .../translators/WindowAssignTranslator.java | 2 +- .../gearpump/translators/io/ValuesSource.java | 2 - .../gearpump/GearpumpRunnerRegistrarTest.java | 55 ++++++++++++ .../runners/gearpump/PipelineOptionsTest.java | 73 ++++++++++++++++ .../translators/io/GearpumpSourceTest.java | 90 ++++++++++++++++++++ .../gearpump/translators/io/ValueSoureTest.java | 82 ++++++++++++++++++ 11 files changed, 362 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index ed4a1d4..0a6d8fe 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -87,6 +87,18 @@ </dependencies> </profile> + <!-- Include the Apache Gearpump (incubating) runner with -P gearpump-runner --> + <profile> + <id>gearpump-runner</id> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-gearpump</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + <!-- Include the Apache Flink runner with -P flink-runner --> <profile> <id>flink-runner</id> http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c3b8476..2cdb09d 100644 --- a/pom.xml +++ b/pom.xml @@ -475,6 +475,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-gearpump</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-examples-java</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/README.md ---------------------------------------------------------------------- diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md index ad043fa..e8ce794 100644 --- a/runners/gearpump/README.md +++ b/runners/gearpump/README.md @@ -19,4 +19,43 @@ ## Gearpump Beam Runner -The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine. \ No newline at end of file +The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine. + +##Getting Started + +The following shows how to run the WordCount example that is provided with the source code on Beam. + +###Installing Beam + +To get the latest version of Beam with Gearpump-Runner, first clone the Beam repository: + +``` +git clone https://github.com/apache/beam +git checkout gearpump-runner +``` + +Then switch to the newly created directory and run Maven to build the Apache Beam: + +``` +cd beam +mvn clean install -DskipTests +``` + +Now Apache Beam and the Gearpump Runner are installed in your local Maven repository. + +###Running Wordcount Example + +Download something to count: + +``` +curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt +``` + +Run the pipeline, using the Gearpump runner: + +``` +cd examples/java +mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/tmp/kinglear.txt --output=/tmp/wordcounts.txt --runner=TestGearpumpRunner" -Pgearpump-runner +``` + +Once completed, check the output file /tmp/wordcounts.txt-00000-of-00001 http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 9a6a432..a691801 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -99,13 +99,11 @@ <groupId>org.apache.gearpump</groupId> <artifactId>gearpump-streaming_2.11</artifactId> <version>${gearpump.version}</version> - <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.gearpump</groupId> <artifactId>gearpump-core_2.11</artifactId> <version>${gearpump.version}</version> - <scope>provided</scope> </dependency> <dependency> <groupId>com.typesafe</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java index b77e1e3..3183d45 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java @@ -44,7 +44,9 @@ public class GearpumpRunnerRegistrar { @Override public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class); + return ImmutableList.<Class<? extends PipelineRunner<?>>>of( + GearpumpRunner.class, + TestGearpumpRunner.class); } } http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java index fe6015a..29d8f02 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java @@ -39,7 +39,7 @@ import org.joda.time.Instant; * {@link Window.Bound} is translated to Gearpump flatMap function. */ @SuppressWarnings("unchecked") -public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> { +public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> { private static final long serialVersionUID = -964887482120489061L; http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java index e0488cd..ccd5cdf 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java @@ -110,8 +110,6 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi this.source = source; } - - @Override public boolean start() throws IOException { if (null == iterator) { http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java new file mode 100644 index 0000000..9a01d20 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +/** + * Tests for {@link GearpumpRunnerRegistrar}. + */ +public class GearpumpRunnerRegistrarTest { + + @Test + public void testFullName() { + String[] args = + new String[] {String.format("--runner=%s", GearpumpRunner.class.getName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), GearpumpRunner.class); + } + + @Test + public void testClassName() { + String[] args = + new String[] {String.format("--runner=%s", GearpumpRunner.class.getSimpleName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), GearpumpRunner.class); + } + + @Test + public void testOptions() { + assertEquals( + ImmutableList.of(GearpumpPipelineOptions.class), + new GearpumpRunnerRegistrar.Options().getPipelineOptions()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java new file mode 100644 index 0000000..994856b --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.gearpump; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; +import org.junit.Test; + +/** + * Tests for {@link GearpumpPipelineOptions}. + */ +public class PipelineOptionsTest { + + @Test + public void testIgnoredFieldSerialization() throws IOException { + String appName = "forTest"; + Map<String, String> serializers = Maps.newHashMap(); + serializers.put("classA", "SerializerA"); + GearpumpPipelineOptions options = PipelineOptionsFactory.create() + .as(GearpumpPipelineOptions.class); + Config config = ClusterConfig.master(null); + EmbeddedCluster cluster = new EmbeddedCluster(config); + options.setSerializers(serializers); + options.setApplicationName(appName); + options.setEmbeddedCluster(cluster); + options.setParallelism(10); + + byte[] serializedOptions = serialize(options); + GearpumpPipelineOptions deserializedOptions = new ObjectMapper() + .readValue(serializedOptions, PipelineOptions.class).as(GearpumpPipelineOptions.class); + + assertNull(deserializedOptions.getEmbeddedCluster()); + assertNull(deserializedOptions.getSerializers()); + assertEquals(10, deserializedOptions.getParallelism()); + assertEquals(appName, deserializedOptions.getApplicationName()); + } + + private byte[] serialize(Object obj) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, obj); + return baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java new file mode 100644 index 0000000..af5a1d2 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.io; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.gearpump.Message; +import org.apache.gearpump.streaming.source.Watermark; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link GearpumpSource}. + */ +public class GearpumpSourceTest { + private static final List<TimestampedValue<String>> TEST_VALUES = Lists.newArrayList( + TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)), + TimestampedValue.of("b", new org.joda.time.Instant(0)), + TimestampedValue.of("c", new org.joda.time.Instant(53)), + TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1)) + ); + + private static class SourceForTest<T> extends GearpumpSource<T> { + private ValuesSource<T> valuesSource; + + SourceForTest(PipelineOptions options, ValuesSource<T> valuesSource) { + super(options); + this.valuesSource = valuesSource; + } + + @Override + protected Source.Reader<T> createReader(PipelineOptions options) throws IOException { + return this.valuesSource.createReader(options, null); + } + } + + @Test + public void testGearpumpSource() { + GearpumpPipelineOptions options = PipelineOptionsFactory.create() + .as(GearpumpPipelineOptions.class); + ValuesSource<TimestampedValue<String>> valuesSource = new ValuesSource<>(TEST_VALUES, + TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())); + SourceForTest<TimestampedValue<String>> sourceForTest = + new SourceForTest<>(options, valuesSource); + sourceForTest.open(null, Instant.EPOCH); + + for (TimestampedValue<String> value: TEST_VALUES) { + // Check the watermark first since the Source will advance when it's opened + Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp()); + Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark()); + + Message expectedMsg = Message.apply( + WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()), + value.getTimestamp().getMillis()); + Message message = sourceForTest.read(); + Assert.assertEquals(expectedMsg, message); + } + + Assert.assertNull(sourceForTest.read()); + Assert.assertEquals(Watermark.MAX(), sourceForTest.getWatermark()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java new file mode 100644 index 0000000..8c50703 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.gearpump.translators.io; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.GearpumpRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; +import org.apache.gearpump.util.Constants; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link ValuesSource}. + */ +public class ValueSoureTest { + + @Test + public void testValueSource() { + GearpumpPipelineOptions options = PipelineOptionsFactory.create() + .as(GearpumpPipelineOptions.class); + Config config = ClusterConfig.master(null); + config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), + ConfigValueFactory.fromAnyRef(0)); + EmbeddedCluster cluster = new EmbeddedCluster(config); + cluster.start(); + + options.setEmbeddedCluster(cluster); + options.setRunner(GearpumpRunner.class); + options.setParallelism(1); + Pipeline p = Pipeline.create(options); + List<String> values = Lists.newArrayList("1", "2", "3", "4", "5"); + ValuesSource<String> source = new ValuesSource<>(values, StringUtf8Coder.of()); + p.apply(Read.from(source)) + .apply(ParDo.of(new ResultCollector())); + + p.run().waitUntilFinish(); + cluster.stop(); + + Assert.assertEquals(Sets.newHashSet(values), ResultCollector.RESULTS); + } + + private static class ResultCollector extends DoFn<Object, Void> { + private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>()); + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + RESULTS.add(c.element()); + } + } +}