Repository: incubator-beam Updated Branches: refs/heads/master 387854624 -> 26635d7fb
[BEAM-242] Enable and fix checkstyle in Flink runner examples Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafb8055 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafb8055 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafb8055 Branch: refs/heads/master Commit: dafb80556c1d984630c6ccf615ba982903f176df Parents: 3878546 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Tue Sep 6 07:26:45 2016 +0200 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Wed Sep 7 05:55:50 2016 +0200 ---------------------------------------------------------------------- runners/flink/examples/pom.xml | 2 -- .../beam/runners/flink/examples/WordCount.java | 9 ++++++ .../runners/flink/examples/package-info.java | 22 +++++++++++++ .../flink/examples/streaming/AutoComplete.java | 5 +-- .../flink/examples/streaming/JoinExamples.java | 3 +- .../examples/streaming/KafkaIOExamples.java | 34 ++++++++++---------- .../KafkaWindowedWordCountExample.java | 27 +++++++++++++--- .../examples/streaming/WindowedWordCount.java | 19 +++++++---- .../flink/examples/streaming/package-info.java | 22 +++++++++++++ 9 files changed, 110 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml index 9f705db..b8489fc 100644 --- a/runners/flink/examples/pom.xml +++ b/runners/flink/examples/pom.xml @@ -109,12 +109,10 @@ </executions> </plugin> - <!-- Checkstyle errors for now <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> </plugin> - --> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index ab9297f..9cce757 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -36,8 +36,14 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +/** + * Wordcount pipeline. + */ public class WordCount { + /** + * Function to extract words. + */ public static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @@ -60,6 +66,9 @@ public class WordCount { } } + /** + * PTransform counting words. + */ public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java new file mode 100644 index 0000000..b0ecb56 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Flink Beam runner exemple. + */ +package org.apache.beam.runners.flink.examples; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java index 9b5e31d..4636e3f 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -96,7 +96,8 @@ public class AutoComplete { @ProcessElement public void processElement(ProcessContext c) { - CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue()); + CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), + c.element().getValue()); c.output(cand); } })); @@ -349,7 +350,7 @@ public class AutoComplete { StringBuilder str = new StringBuilder(); KV<String, List<CompletionCandidate>> elem = c.element(); - str.append(elem.getKey() +" @ "+ window +" -> "); + str.append(elem.getKey() + " @ " + window + " -> "); for (CompletionCandidate cand: elem.getValue()) { str.append(cand.toString() + " "); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java index bf5dfc4..96638aa 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -132,7 +132,8 @@ public class JoinExamples { options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); - WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + WindowFn<Object, ?> windowFn = FixedWindows.of( + Duration.standardSeconds(options.getWindowSize())); Pipeline p = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java index 27faefe..f0bf188 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java @@ -56,15 +56,15 @@ public class KafkaIOExamples { private static final String KAFKA_AVRO_TOPIC = "output"; // Default kafka topic to read from private static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact private static final String GROUP_ID = "myGroup"; // Default groupId - private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka + private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect (Kafka) /** - * Read/Write String data to Kafka + * Read/Write String data to Kafka. */ public static class KafkaString { /** - * Read String data from Kafka + * Read String data from Kafka. */ public static class ReadStringFromKafka { @@ -88,7 +88,7 @@ public class KafkaIOExamples { } /** - * Write String data to Kafka + * Write String data to Kafka. */ public static class WriteStringToKafka { @@ -113,12 +113,12 @@ public class KafkaIOExamples { } /** - * Read/Write Avro data to Kafka + * Read/Write Avro data to Kafka. */ public static class KafkaAvro { /** - * Read Avro data from Kafka + * Read Avro data from Kafka. */ public static class ReadAvroFromKafka { @@ -142,7 +142,7 @@ public class KafkaIOExamples { } /** - * Write Avro data to Kafka + * Write Avro data to Kafka. */ public static class WriteAvroToKafka { @@ -169,7 +169,7 @@ public class KafkaIOExamples { } /** - * Serialiation/Deserialiation schema for Avro types + * Serialiation/Deserialiation schema for Avro types. * @param <T> */ static class AvroSerializationDeserializationSchema<T> @@ -217,7 +217,7 @@ public class KafkaIOExamples { } /** - * Custom type for Avro serialization + * Custom type for Avro serialization. */ static class MyType implements Serializable { @@ -233,10 +233,10 @@ public class KafkaIOExamples { @Override public String toString() { - return "MyType{" + - "word='" + word + '\'' + - ", count=" + count + - '}'; + return "MyType{" + + "word='" + word + '\'' + + ", count=" + count + + '}'; } } } @@ -244,7 +244,7 @@ public class KafkaIOExamples { // -------------- Utilities -------------- /** - * Custom options for the Pipeline + * Custom options for the Pipeline. */ public interface KafkaOptions extends FlinkPipelineOptions { @Description("The Kafka topic to read from") @@ -279,7 +279,7 @@ public class KafkaIOExamples { } /** - * Initializes some options for the Flink runner + * Initializes some options for the Flink runner. * @param args The command line args * @return the pipeline */ @@ -298,7 +298,7 @@ public class KafkaIOExamples { } /** - * Gets KafkaOptions from the Pipeline + * Gets KafkaOptions from the Pipeline. * @param p the pipeline * @return KafkaOptions */ @@ -322,7 +322,7 @@ public class KafkaIOExamples { } /** - * Print contents to stdout + * Print contents to stdout. * @param <T> type of the input */ private static class PrintFn<T> extends DoFn<T, T> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index 365fb7b..42c42f3 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -40,6 +40,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.joda.time.Duration; +/** + * Wordcount example using Kafka topic. + */ public class KafkaWindowedWordCountExample { static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from @@ -47,6 +50,9 @@ public class KafkaWindowedWordCountExample { static final String GROUP_ID = "myGroup"; // Default groupId static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka + /** + * Function to extract words. + */ public static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @@ -69,16 +75,24 @@ public class KafkaWindowedWordCountExample { } } + /** + * Function to format KV as String. + */ public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { @ProcessElement public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + + c.timestamp().toString(); System.out.println(row); c.output(row); } } - public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { + /** + * Pipeline options. + */ + public interface KafkaStreamingWordCountOptions + extends WindowedWordCount.StreamingWordCountOptions { @Description("The Kafka topic to read from") @Default.String(KAFKA_TOPIC) String getKafkaTopic(); @@ -107,7 +121,8 @@ public class KafkaWindowedWordCountExample { public static void main(String[] args) { PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); - KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); + KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args) + .as(KafkaStreamingWordCountOptions.class); options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); options.setStreaming(true); options.setCheckpointingInterval(1000L); @@ -115,7 +130,8 @@ public class KafkaWindowedWordCountExample { options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); - System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); + System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + + options.getBroker() + " " + options.getGroup()); Pipeline pipeline = Pipeline.create(options); Properties p = new Properties(); @@ -132,7 +148,8 @@ public class KafkaWindowedWordCountExample { PCollection<String> words = pipeline .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer))) .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) + .apply(Window.<String>into(FixedWindows.of( + Duration.standardSeconds(options.getWindowSize()))) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java index f3361c5..0e250b8 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java @@ -60,7 +60,8 @@ public class WindowedWordCount { static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { @ProcessElement public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + + c.timestamp().toString(); c.output(row); } } @@ -87,7 +88,11 @@ public class WindowedWordCount { } } - public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options { + /** + * Pipeline options. + */ + public interface StreamingWordCountOptions + extends org.apache.beam.runners.flink.examples.WordCount.Options { @Description("Sliding window duration, in seconds") @Default.Long(WINDOW_SIZE) Long getWindowSize(); @@ -102,7 +107,8 @@ public class WindowedWordCount { } public static void main(String[] args) throws IOException { - StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); + StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(StreamingWordCountOptions.class); options.setStreaming(true); options.setWindowSize(10L); options.setSlide(5L); @@ -111,8 +117,8 @@ public class WindowedWordCount { options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); - LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + - " sec. and a slide of " + options.getSlide()); + LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + + " sec. and a slide of " + options.getSlide()); Pipeline pipeline = Pipeline.create(options); @@ -120,7 +126,8 @@ public class WindowedWordCount { .apply("StreamingWordCount", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) + .apply(Window.<String>into(SlidingWindows.of( + Duration.standardSeconds(options.getWindowSize())) .every(Duration.standardSeconds(options.getSlide()))) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java new file mode 100644 index 0000000..58f41b6 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Flink Beam runner exemple. + */ +package org.apache.beam.runners.flink.examples.streaming;