[runner] add streaming support with checkpointing
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/edff0785 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/edff0785 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/edff0785 Branch: refs/heads/master Commit: edff0785a82d2c6c01abeb3c64ca0d2958ccd0fd Parents: 517c1bd Author: Kostas Kloudas <kklou...@gmail.com> Authored: Wed Dec 9 17:30:53 2015 +0100 Committer: Davor Bonaci <davorbon...@users.noreply.github.com> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- runners/flink/pom.xml | 28 + .../dataflow/FlinkJobExecutionEnvironment.java | 238 ++++++++ .../flink/dataflow/FlinkPipelineRunner.java | 99 +-- .../flink/dataflow/examples/WordCount.java | 2 +- .../examples/streaming/AutoComplete.java | 384 ++++++++++++ .../examples/streaming/JoinExamples.java | 157 +++++ .../KafkaWindowedWordCountExample.java | 138 +++++ .../examples/streaming/WindowedWordCount.java | 126 ++++ .../FlinkBatchPipelineTranslator.java | 152 +++++ .../FlinkBatchTransformTranslators.java | 593 ++++++++++++++++++ .../FlinkBatchTranslationContext.java | 129 ++++ .../translation/FlinkPipelineTranslator.java | 145 +---- .../FlinkStreamingPipelineTranslator.java | 138 +++++ .../FlinkStreamingTransformTranslators.java | 356 +++++++++++ .../FlinkStreamingTranslationContext.java | 86 +++ .../translation/FlinkTransformTranslators.java | 594 ------------------ .../translation/TranslationContext.java | 129 ---- .../translation/types/CoderComparator.java | 216 +++++++ .../translation/types/CoderComperator.java | 218 ------- .../translation/types/CoderTypeInformation.java | 6 +- .../translation/types/CoderTypeSerializer.java | 2 - .../translation/types/KvCoderComperator.java | 2 +- .../types/VoidCoderTypeSerializer.java | 1 - .../translation/wrappers/SourceInputFormat.java | 4 +- .../streaming/FlinkAbstractParDoWrapper.java | 274 +++++++++ .../FlinkGroupAlsoByWindowWrapper.java | 601 +++++++++++++++++++ .../streaming/FlinkGroupByKeyWrapper.java | 56 ++ .../streaming/FlinkParDoBoundMultiWrapper.java | 72 +++ .../streaming/FlinkParDoBoundWrapper.java | 89 +++ .../streaming/io/UnboundedFlinkSource.java | 76 +++ .../streaming/io/UnboundedSocketSource.java | 228 +++++++ .../streaming/io/UnboundedSourceWrapper.java | 120 ++++ .../state/AbstractFlinkTimerInternals.java | 139 +++++ .../streaming/state/FlinkStateInternals.java | 533 ++++++++++++++++ .../streaming/state/StateCheckpointReader.java | 89 +++ .../streaming/state/StateCheckpointUtils.java | 152 +++++ .../streaming/state/StateCheckpointWriter.java | 127 ++++ .../wrappers/streaming/state/StateType.java | 67 +++ .../streaming/GroupAlsoByWindowTest.java | 507 ++++++++++++++++ .../streaming/StateSerializationTest.java | 257 ++++++++ .../flink/dataflow/util/JoinExamples.java | 4 +- 41 files changed, 6157 insertions(+), 1177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index 6102d74..14693b8 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -71,6 +71,18 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> @@ -114,6 +126,22 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java new file mode 100644 index 0000000..66d60fa --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java @@ -0,0 +1,238 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow; + +import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator; +import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator; +import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator; +import com.google.cloud.dataflow.sdk.Pipeline; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class FlinkJobExecutionEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class); + + private final FlinkPipelineOptions options; + + /** + * The Flink Batch execution environment. This is instantiated to either a + * {@link org.apache.flink.api.java.CollectionEnvironment}, + * a {@link org.apache.flink.api.java.LocalEnvironment} or + * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration + * options. + */ + private ExecutionEnvironment flinkBatchEnv; + + + /** + * The Flink Streaming execution environment. This is instantiated to either a + * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or + * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending + * on the configuration options, and more specifically, the url of the master url. + */ + private StreamExecutionEnvironment flinkStreamEnv; + + /** + * Translator for this FlinkPipelineRunner. Its role is to translate the Dataflow operators to + * their Flink based counterparts. Based on the options provided by the user, if we have a streaming job, + * this is instantiated to a FlinkStreamingPipelineTranslator. In other case, i.e. a batch job, + * a FlinkBatchPipelineTranslator is created. + */ + private FlinkPipelineTranslator flinkPipelineTranslator; + + public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) { + if (options == null) { + throw new IllegalArgumentException("Options in the FlinkJobExecutionEnvironment cannot be NULL."); + } + this.options = options; + this.createJobEnvironment(); + this.createJobGraphTranslator(); + } + + /** + * Depending on the type of job (Streaming or Batch) and the user-specified options, + * this method creates the adequate ExecutionEnvironment. + */ + private void createJobEnvironment() { + if (options.isStreaming()) { + LOG.info("Creating the required STREAMING Environment."); + createStreamExecutionEnvironment(); + } else { + LOG.info("Creating the required BATCH Environment."); + createBatchExecutionEnvironment(); + } + } + + /** + * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph + * translator. In the case of batch, it will work with DataSets, while for streaming, it will work + * with DataStreams. + */ + private void createJobGraphTranslator() { + checkInitializationState(); + if (this.flinkPipelineTranslator != null) { + throw new IllegalStateException("JobGraphTranslator already initialized."); + } + + this.flinkPipelineTranslator = options.isStreaming() ? + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : + new FlinkBatchPipelineTranslator(flinkBatchEnv, options); + } + + public void translate(Pipeline pipeline) { + checkInitializationState(); + if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { + createJobEnvironment(); + } + if (this.flinkPipelineTranslator == null) { + createJobGraphTranslator(); + } + this.flinkPipelineTranslator.translate(pipeline); + } + + public JobExecutionResult executeJob() throws Exception { + if (options.isStreaming()) { + + System.out.println("Plan: " + this.flinkStreamEnv.getExecutionPlan()); + + if (this.flinkStreamEnv == null) { + throw new RuntimeException("JobExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("JobGraphTranslator not initialized."); + } + return this.flinkStreamEnv.execute(); + } else { + if (this.flinkBatchEnv == null) { + throw new RuntimeException("JobExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("JobGraphTranslator not initialized."); + } + return this.flinkBatchEnv.execute(); + } + } + + /** + * If the submitted job is a batch processing job, this method creates the adequate + * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending + * on the user-specified options. + */ + private void createBatchExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("JobExecutionEnvironment already initialized."); + } + + String masterUrl = options.getFlinkMaster(); + this.flinkStreamEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[collection]")) { + this.flinkBatchEnv = new CollectionEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), + stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { + this.flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkBatchEnv.getParallelism()); + } + + /** + * If the submitted job is a stream processing job, this method creates the adequate + * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending + * on the user-specified options. + */ + private void createStreamExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("JobExecutionEnvironment already initialized."); + } + + String masterUrl = options.getFlinkMaster(); + this.flinkBatchEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1) { + this.flinkStreamEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkStreamEnv.getParallelism()); + + // although we do not use the generated timestamps, + // enabling timestamps is needed for the watermarks. + this.flinkStreamEnv.getConfig().enableTimestamps(); + + this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + this.flinkStreamEnv.enableCheckpointing(1000); + this.flinkStreamEnv.setNumberOfExecutionRetries(5); + + LOG.info("Setting execution retry delay to 3 sec"); + this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000); + } + + private final void checkInitializationState() { + if (this.options == null) { + throw new IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet."); + } + + if (options.isStreaming() && this.flinkBatchEnv != null) { + throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); + } else if (!options.isStreaming() && this.flinkStreamEnv != null) { + throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java index ae31f48..f57fed2 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java @@ -15,7 +15,6 @@ */ package com.dataartisans.flink.dataflow; -import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; @@ -28,8 +27,6 @@ import com.google.cloud.dataflow.sdk.values.POutput; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.CollectionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,27 +42,19 @@ import java.util.Map; * A {@link PipelineRunner} that executes the operations in the * pipeline by first translating them to a Flink Plan and then executing them either locally * or on a Flink cluster, depending on the configuration. - * + * <p> * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}. */ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); - /** Provided options. */ - private final FlinkPipelineOptions options; - /** - * The Flink execution environment. This is instantiated to either a - * {@link org.apache.flink.api.java.CollectionEnvironment}, - * a {@link org.apache.flink.api.java.LocalEnvironment} or - * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration - * options. + * Provided options. */ - private final ExecutionEnvironment flinkEnv; + private final FlinkPipelineOptions options; - /** Translator for this FlinkPipelineRunner, based on options. */ - private final FlinkPipelineTranslator translator; + private final FlinkJobExecutionEnvironment flinkJobEnv; /** * Construct a runner from the provided options. @@ -109,90 +98,38 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { flinkOptions.setFlinkMaster("[auto]"); } - if (flinkOptions.isStreaming()) { - throw new RuntimeException("Streaming is currently not supported."); - } - return new FlinkPipelineRunner(flinkOptions); } private FlinkPipelineRunner(FlinkPipelineOptions options) { this.options = options; - this.flinkEnv = createExecutionEnvironment(options); - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkEnv.getParallelism()); - - this.translator = new FlinkPipelineTranslator(flinkEnv, options); - } - - /** - * Create Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending - * on the options. - */ - private ExecutionEnvironment createExecutionEnvironment(FlinkPipelineOptions options) { - String masterUrl = options.getFlinkMaster(); - - - if (masterUrl.equals("[local]")) { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - if (options.getParallelism() != -1) { - env.setParallelism(options.getParallelism()); - } - return env; - } else if (masterUrl.equals("[collection]")) { - return new CollectionEnvironment(); - } else if (masterUrl.equals("[auto]")) { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if (options.getParallelism() != -1) { - env.setParallelism(options.getParallelism()); - } - return env; - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), - stagingFiles.toArray(new String[stagingFiles.size()])); - if (options.getParallelism() != -1) { - env.setParallelism(options.getParallelism()); - } - return env; - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if (options.getParallelism() != -1) { - env.setParallelism(options.getParallelism()); - } - return env; - } + this.flinkJobEnv = new FlinkJobExecutionEnvironment(options); } @Override public FlinkRunnerResult run(Pipeline pipeline) { LOG.info("Executing pipeline using FlinkPipelineRunner."); - + LOG.info("Translating pipeline to Flink program."); - - translator.translate(pipeline); - + + this.flinkJobEnv.translate(pipeline); + LOG.info("Starting execution of Flink program."); JobExecutionResult result; try { - result = flinkEnv.execute(); - } - catch (Exception e) { + result = this.flinkJobEnv.executeJob(); + } catch (Exception e) { LOG.error("Pipeline execution failed", e); throw new RuntimeException("Pipeline execution failed", e); } - + LOG.info("Execution finished in {} msecs", result.getNetRuntime()); - + Map<String, Object> accumulators = result.getAllAccumulatorResults(); if (accumulators != null && !accumulators.isEmpty()) { LOG.info("Final aggregator values:"); - + for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { LOG.info("{} : {}", entry.getKey(), entry.getValue()); } @@ -230,16 +167,18 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { ///////////////////////////////////////////////////////////////////////////// @Override - public String toString() { return "DataflowPipelineRunner#" + hashCode(); } + public String toString() { + return "DataflowPipelineRunner#" + hashCode(); + } /** * Attempts to detect all the resources the class loader has access to. This does not recurse * to class loader parents stopping it from pulling in resources from the system class loader. * * @param classLoader The URLClassLoader to use to detect resources to stage. - * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one - * of the resources the class loader exposes is not a file resource. * @return A list of absolute paths to the resources the class loader uses. + * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one + * of the resources the class loader exposes is not a file resource. */ protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { if (!(classLoader instanceof URLClassLoader)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java index 82f1e46..7857778 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java @@ -43,7 +43,7 @@ public class WordCount { String getOutput(); void setOutput(String value); } - + public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java new file mode 100644 index 0000000..0245a7b --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java @@ -0,0 +1,384 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.dataartisans.flink.dataflow.examples.streaming; + +import com.dataartisans.flink.dataflow.FlinkPipelineRunner; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.AvroCoder; +import com.google.cloud.dataflow.sdk.coders.DefaultCoder; +import com.google.cloud.dataflow.sdk.io.*; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * To run the example, first open a socket on a terminal by executing the command: + * <li> + * <li> + * <code>nc -lk 9999</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class AutoComplete { + + /** + * A PTransform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) { + PCollection<CompletionCandidate> candidates = input + // First count how often each token appears. + .apply(new Count.PerElement<String>()) + + // Map the KV outputs of Count into our own CompletionCandiate class. + .apply(ParDo.named("CreateCompletionCandidates").of( + new DoFn<KV<String, Long>, CompletionCandidate>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue()); + c.output(cand); + } + })); + + // Compute the top via either a flat or recursive algorithm. + if (recursive) { + return candidates + .apply(new ComputeTopRecursive(candidatesPerPrefix, 1)) + .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); + } else { + return candidates + .apply(new ComputeTopFlat(candidatesPerPrefix, 1)); + } + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends PTransform<PCollection<CompletionCandidate>, + PCollection<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public PCollection<KV<String, List<CompletionCandidate>>> apply( + PCollection<CompletionCandidate> input) { + return input + // For each completion candidate, map it to all prefixes. + .apply(ParDo.of(new AllPrefixes(minPrefix))) + + // Find and return the top candiates for each prefix. + .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix) + .withHotKeyFanout(new HotKeyFanout())); + } + + private static class HotKeyFanout implements SerializableFunction<String, Integer> { + private static final long serialVersionUID = 0; + + @Override + public Integer apply(String input) { + return (int) Math.pow(4, 5 - input.length()); + } + } + } + + /** + * Cheaper but higher latency. + * + * <p> Returns two PCollections, the first is top prefixes of size greater + * than minPrefix, and the second is top prefixes of size exactly + * minPrefix. + */ + private static class ComputeTopRecursive + extends PTransform<PCollection<CompletionCandidate>, + PCollectionList<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> { + private static final long serialVersionUID = 0; + + @Override + public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) { + return elem.getKey().length() > minPrefix ? 0 : 1; + } + } + + private static class FlattenTops + extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + for (CompletionCandidate cc : c.element().getValue()) { + c.output(cc); + } + } + } + + @Override + public PCollectionList<KV<String, List<CompletionCandidate>>> apply( + PCollection<CompletionCandidate> input) { + if (minPrefix > 10) { + // Base case, partitioning to return the output in the expected format. + return input + .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix)) + .apply(Partition.of(2, new KeySizePartitionFn())); + } else { + // If a candidate is in the top N for prefix a...b, it must also be in the top + // N for a...bX for every X, which is typlically a much smaller set to consider. + // First, compute the top candidate for prefixes of size at least minPrefix + 1. + PCollectionList<KV<String, List<CompletionCandidate>>> larger = input + .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1)); + // Consider the top candidates for each prefix of length minPrefix + 1... + PCollection<KV<String, List<CompletionCandidate>>> small = + PCollectionList + .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) + // ...together with those (previously excluded) candidates of length + // exactly minPrefix... + .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() { + private static final long serialVersionUID = 0; + + @Override + public Boolean apply(CompletionCandidate c) { + return c.getValue().length() == minPrefix; + } + }))) + .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections()) + // ...set the key to be the minPrefix-length prefix... + .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) + // ...and (re)apply the Top operator to all of them together. + .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)); + + PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger + .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); + + return PCollectionList.of(flattenLarger).and(small); + } + } + } + + /** + * A DoFn that keys each candidate by all its prefixes. + */ + private static class AllPrefixes + extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { + private static final long serialVersionUID = 0; + + private final int minPrefix; + private final int maxPrefix; + public AllPrefixes(int minPrefix) { + this(minPrefix, Integer.MAX_VALUE); + } + public AllPrefixes(int minPrefix, int maxPrefix) { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + @Override + public void processElement(ProcessContext c) { + String word = c.element().value; + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + KV kv = KV.of(word.substring(0, i), c.element()); + c.output(kv); + } + } + } + + /** + * Class used to store tag-count pairs. + */ + @DefaultCoder(AvroCoder.class) + static class CompletionCandidate implements Comparable<CompletionCandidate> { + private long count; + private String value; + + public CompletionCandidate(String value, long count) { + this.value = value; + this.count = count; + } + + public String getValue() { + return value; + } + + // Empty constructor required for Avro decoding. + @SuppressWarnings("unused") + public CompletionCandidate() {} + + @Override + public int compareTo(CompletionCandidate o) { + if (this.count < o.count) { + return -1; + } else if (this.count == o.count) { + return this.value.compareTo(o.value); + } else { + return 1; + } + } + + @Override + public boolean equals(Object other) { + if (other instanceof CompletionCandidate) { + CompletionCandidate that = (CompletionCandidate) other; + return this.count == that.count && this.value.equals(that.value); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Long.valueOf(count).hashCode() ^ value.hashCode(); + } + + @Override + public String toString() { + return "CompletionCandidate[" + value + ", " + count + "]"; + } + } + + static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + /** + * Takes as input a the top candidates per prefix, and emits an entity + * suitable for writing to Datastore. + */ + static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + StringBuilder str = new StringBuilder(); + KV<String, List<CompletionCandidate>> elem = c.element(); + + str.append(elem.getKey() +" @ "+ c.window() +" -> "); + for(CompletionCandidate cand: elem.getValue()) { + str.append(cand.toString() + " "); + } + System.out.println(str.toString()); + c.output(str.toString()); + } + } + + /** + * Options supported by this class. + * + * <p> Inherits standard Dataflow configuration options. + */ + private static interface Options extends WindowedWordCount.StreamingWordCountOptions { + @Description("Whether to use the recursive algorithm") + @Default.Boolean(true) + Boolean getRecursive(); + void setRecursive(Boolean value); + } + + public static void main(String[] args) throws IOException { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + options.setStreaming(true); + options.setRunner(FlinkPipelineRunner.class); + + PTransform<? super PBegin, PCollection<String>> readSource = + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream"); + WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + // Create the pipeline. + Pipeline p = Pipeline.create(options); + PCollection<KV<String, List<CompletionCandidate>>> toWrite = p + .apply(readSource) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(ComputeTopCompletions.top(10, options.getRecursive())); + + toWrite + .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile())) + .apply(TextIO.Write.to("./outputAutoComplete.txt")); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java new file mode 100644 index 0000000..b0cc4fa --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.dataartisans.flink.dataflow.examples.streaming; + +import com.dataartisans.flink.dataflow.FlinkPipelineRunner; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.joda.time.Duration; + +/** + * To run the example, first open two sockets on two terminals by executing the commands: + * <li> + * <li> + * <code>nc -lk 9999</code>, and + * </li> + * <li> + * <code>nc -lk 9998</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class JoinExamples { + + static PCollection<String> joinEvents(PCollection<String> streamA, + PCollection<String> streamB) throws Exception { + + final TupleTag<String> firstInfoTag = new TupleTag<String>(); + final TupleTag<String> secondInfoTag = new TupleTag<String>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection<KV<String, String>> firstInfo = streamA.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection<KV<String, String>> secondInfo = streamB.apply( + ParDo.of(new ExtractEventDataFn())); + + // country code 'key' -> CGBKR (<event info>, <country name>) + PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple + .of(firstInfoTag, firstInfo) + .and(secondInfoTag, secondInfo) + .apply(CoGroupByKey.<String>create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of <event info>, <country name> + PCollection<KV<String, String>> finalResultCollection = + kvpCollection.apply(ParDo.named("Process").of( + new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + String key = e.getKey(); + + String defaultA = "NO_VALUE"; + + // the following getOnly is a bit tricky because it expects to have + // EXACTLY ONE value in the corresponding stream and for the corresponding key. + + String lineA = e.getValue().getOnly(firstInfoTag, defaultA); + for (String lineB : c.element().getValue().getAll(secondInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); + } + } + })); + + return finalResultCollection + .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String result = c.element().getKey() + " -> " + c.element().getValue(); + System.out.println(result); + c.output(result); + } + })); + } + + static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String line = c.element().toLowerCase(); + String key = line.split("\\s")[0]; + c.output(KV.of(key, line)); + } + } + + private static interface Options extends WindowedWordCount.StreamingWordCountOptions { + + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + // make it a streaming example. + options.setStreaming(true); + options.setRunner(FlinkPipelineRunner.class); + + PTransform<? super PBegin, PCollection<String>> readSourceA = + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); + PTransform<? super PBegin, PCollection<String>> readSourceB = + Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); + + WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + Pipeline p = Pipeline.create(options); + + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection<String> streamA = p.apply(readSourceA) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + PCollection<String> streamB = p.apply(readSourceB) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<String> formattedResults = joinEvents(streamA, streamB); + formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); + p.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java new file mode 100644 index 0000000..46c9bd6 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java @@ -0,0 +1,138 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.examples.streaming; + +import com.dataartisans.flink.dataflow.FlinkPipelineRunner; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedFlinkSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.joda.time.Duration; + +import java.util.Properties; + +public class KafkaWindowedWordCountExample { + + static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from + static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact + static final String GROUP_ID = "myGroup"; // Default groupId + static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka + + public static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + System.out.println(row); + c.output(row); + } + } + + public static interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { + @Description("The Kafka topic to read from") + @Default.String(KAFKA_TOPIC) + String getKafkaTopic(); + + void setKafkaTopic(String value); + + @Description("The Kafka Broker to read from") + @Default.String(KAFKA_BROKER) + String getBroker(); + + void setBroker(String value); + + @Description("The Zookeeper server to connect to") + @Default.String(ZOOKEEPER) + String getZookeeper(); + + void setZookeeper(String value); + + @Description("The groupId") + @Default.String(GROUP_ID) + String getGroup(); + + void setGroup(String value); + + } + + public static void main(String[] args) { + PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); + KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); + options.setJobName("KafkaExample"); + options.setStreaming(true); + options.setRunner(FlinkPipelineRunner.class); + + System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); + Pipeline pipeline = Pipeline.create(options); + + Properties p = new Properties(); + p.setProperty("zookeeper.connect", options.getZookeeper()); + p.setProperty("bootstrap.servers", options.getBroker()); + p.setProperty("group.id", options.getGroup()); + + // this is the Flink consumer that reads the input to + // the program from a kafka topic. + FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( + options.getKafkaTopic(), + new SimpleStringSchema(), p); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputKafka.txt")); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java new file mode 100644 index 0000000..1d4a44b --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java @@ -0,0 +1,126 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.examples.streaming; + +import com.dataartisans.flink.dataflow.FlinkPipelineRunner; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.*; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * To run the example, first open a socket on a terminal by executing the command: + * <li> + * <li> + * <code>nc -lk 9999</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class WindowedWordCount { + + private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); + + static final long WINDOW_SIZE = 10; // Default window duration in seconds + static final long SLIDE_SIZE = 5; // Default window slide in seconds + + static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + c.output(row); + } + } + + static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static interface StreamingWordCountOptions extends com.dataartisans.flink.dataflow.examples.WordCount.Options { + @Description("Sliding window duration, in seconds") + @Default.Long(WINDOW_SIZE) + Long getWindowSize(); + + void setWindowSize(Long value); + + @Description("Window slide, in seconds") + @Default.Long(SLIDE_SIZE) + Long getSlide(); + + void setSlide(Long value); + } + + public static void main(String[] args) throws IOException { + StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); + options.setStreaming(true); + options.setWindowSize(10L); + options.setSlide(5L); + options.setRunner(FlinkPipelineRunner.class); + + LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + + " sec. and a slide of " + options.getSlide()); + + Pipeline pipeline = Pipeline.create(options); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) + .every(Duration.standardSeconds(options.getSlide()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputWordCount.txt")); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java new file mode 100644 index 0000000..8c0183e --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java @@ -0,0 +1,152 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.values.PValue; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs. + * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator} + */ +public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { + + /** + * The necessary context in the case of a batch job. + */ + private final FlinkBatchTranslationContext batchContext; + + private int depth = 0; + + /** + * Composite transform that we want to translate before proceeding with other transforms. + */ + private PTransform<?, ?> currentCompositeTransform; + + public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { + this.batchContext = new FlinkBatchTranslationContext(env, options); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline Visitor Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); + + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == null) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + currentCompositeTransform = transform; + if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) { + // we can only optimize CoGroupByKey for input size 2 + currentCompositeTransform = null; + } + } + } + this.depth++; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == transform) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node)); + applyBatchTransform(transform, node, translator); + currentCompositeTransform = null; + } else { + throw new IllegalStateException("Attempted to translate composite transform " + + "but no translator was found: " + currentCompositeTransform); + } + } + this.depth--; + System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node)); + } + + @Override + public void visitTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node)); + if (currentCompositeTransform != null) { + // ignore it + return; + } + + // get the transformation corresponding to hte node we are + // currently visiting and translate it into its Flink alternative. + + PTransform<?, ?> transform = node.getTransform(); + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator == null) { + System.out.println(node.getTransform().getClass()); + throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); + } + applyBatchTransform(transform, node, translator); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + // do nothing here + } + + private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) { + if (this.batchContext == null) { + throw new IllegalStateException("The FlinkPipelineTranslator is not yet initialized."); + } + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; + + // create the applied PTransform on the batchContext + batchContext.setCurrentTransform(AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + typedTranslator.translateNode(typedTransform, batchContext); + } + + /** + * A translator of a {@link PTransform}. + */ + public interface BatchTransformTranslator<Type extends PTransform> { + void translateNode(Type transform, FlinkBatchTranslationContext context); + } + + private static String genSpaces(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += "| "; + } + return s; + } + + private static String formatNodeName(TransformTreeNode node) { + return node.toString().split("@")[1] + node.getTransform(); + } +}