http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java new file mode 100644 index 0000000..ab23b92 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -0,0 +1,452 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.flink.examples; + +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.GcsOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.Keys; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.util.GcsUtil; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PDone; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Set; + +/** + * An example that computes a basic TF-IDF search table for a directory or GCS prefix. + * + * <p> Concepts: joining data; side inputs; logging + * + * <p> To execute this pipeline locally, specify general pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * }</pre> + * and a local output file or output prefix on GCS: + * <pre>{@code + * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] + * }</pre> + * + * <p> To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * --stagingLocation=gs://YOUR_STAGING_DIRECTORY + * --runner=BlockingDataflowPipelineRunner + * and an output prefix on GCS: + * --output=gs://YOUR_OUTPUT_PREFIX + * }</pre> + * + * <p> The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with + * {@code --input}. + */ +public class TFIDF { + /** + * Options supported by {@link TFIDF}. + * <p> + * Inherits standard configuration options. + */ + private interface Options extends PipelineOptions, FlinkPipelineOptions { + @Description("Path to the directory or GCS prefix containing files to read from") + @Default.String("gs://dataflow-samples/shakespeare/") + String getInput(); + void setInput(String value); + + @Description("Prefix of output URI to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + /** + * Lists documents contained beneath the {@code options.input} prefix/directory. + */ + public static Set<URI> listInputDocuments(Options options) + throws URISyntaxException, IOException { + URI baseUri = new URI(options.getInput()); + + // List all documents in the directory or GCS prefix. + URI absoluteUri; + if (baseUri.getScheme() != null) { + absoluteUri = baseUri; + } else { + absoluteUri = new URI( + "file", + baseUri.getAuthority(), + baseUri.getPath(), + baseUri.getQuery(), + baseUri.getFragment()); + } + + Set<URI> uris = new HashSet<>(); + if (absoluteUri.getScheme().equals("file")) { + File directory = new File(absoluteUri); + for (String entry : directory.list()) { + File path = new File(directory, entry); + uris.add(path.toURI()); + } + } else if (absoluteUri.getScheme().equals("gs")) { + GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); + URI gcsUriGlob = new URI( + absoluteUri.getScheme(), + absoluteUri.getAuthority(), + absoluteUri.getPath() + "*", + absoluteUri.getQuery(), + absoluteUri.getFragment()); + for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { + uris.add(entry.toUri()); + } + } + + return uris; + } + + /** + * Reads the documents at the provided uris and returns all lines + * from the documents tagged with which document they are from. + */ + public static class ReadDocuments + extends PTransform<PInput, PCollection<KV<URI, String>>> { + private static final long serialVersionUID = 0; + + private Iterable<URI> uris; + + public ReadDocuments(Iterable<URI> uris) { + this.uris = uris; + } + + @Override + public Coder<?> getDefaultOutputCoder() { + return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); + } + + @Override + public PCollection<KV<URI, String>> apply(PInput input) { + Pipeline pipeline = input.getPipeline(); + + // Create one TextIO.Read transform for each document + // and add its output to a PCollectionList + PCollectionList<KV<URI, String>> urisToLines = + PCollectionList.empty(pipeline); + + // TextIO.Read supports: + // - file: URIs and paths locally + // - gs: URIs on the service + for (final URI uri : uris) { + String uriString; + if (uri.getScheme().equals("file")) { + uriString = new File(uri).getPath(); + } else { + uriString = uri.toString(); + } + + PCollection<KV<URI, String>> oneUriToLines = pipeline + .apply(TextIO.Read.from(uriString) + .named("TextIO.Read(" + uriString + ")")) + .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); + + urisToLines = urisToLines.and(oneUriToLines); + } + + return urisToLines.apply(Flatten.<KV<URI, String>>pCollections()); + } + } + + /** + * A transform containing a basic TF-IDF pipeline. The input consists of KV objects + * where the key is the document's URI and the value is a piece + * of the document's content. The output is mapping from terms to + * scores for each document URI. + */ + public static class ComputeTfIdf + extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> { + private static final long serialVersionUID = 0; + + public ComputeTfIdf() { } + + @Override + public PCollection<KV<String, KV<URI, Double>>> apply( + PCollection<KV<URI, String>> uriToContent) { + + // Compute the total number of documents, and + // prepare this singleton PCollectionView for + // use as a side input. + final PCollectionView<Long> totalDocuments = + uriToContent + .apply("GetURIs", Keys.<URI>create()) + .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create()) + .apply(Count.<URI>globally()) + .apply(View.<Long>asSingleton()); + + // Create a collection of pairs mapping a URI to each + // of the words in the document associated with that that URI. + PCollection<KV<URI, String>> uriToWords = uriToContent + .apply(ParDo.named("SplitWords").of( + new DoFn<KV<URI, String>, KV<URI, String>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + String line = c.element().getValue(); + for (String word : line.split("\\W+")) { + // Log INFO messages when the word âloveâ is found. + if (word.toLowerCase().equals("love")) { + LOG.info("Found {}", word.toLowerCase()); + } + + if (!word.isEmpty()) { + c.output(KV.of(uri, word.toLowerCase())); + } + } + } + })); + + // Compute a mapping from each word to the total + // number of documents in which it appears. + PCollection<KV<String, Long>> wordToDocCount = uriToWords + .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create()) + .apply(Values.<String>create()) + .apply("CountDocs", Count.<String>perElement()); + + // Compute a mapping from each URI to the total + // number of words in the document associated with that URI. + PCollection<KV<URI, Long>> uriToWordTotal = uriToWords + .apply("GetURIs2", Keys.<URI>create()) + .apply("CountWords", Count.<URI>perElement()); + + // Count, for each (URI, word) pair, the number of + // occurrences of that word in the document associated + // with the URI. + PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords + .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement()); + + // Adjust the above collection to a mapping from + // (URI, word) pairs to counts into an isomorphic mapping + // from URI to (word, count) pairs, to prepare for a join + // by the URI key. + PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount + .apply(ParDo.named("ShiftKeys").of( + new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey().getKey(); + String word = c.element().getKey().getValue(); + Long occurrences = c.element().getValue(); + c.output(KV.of(uri, KV.of(word, occurrences))); + } + })); + + // Prepare to join the mapping of URI to (word, count) pairs with + // the mapping of URI to total word counts, by associating + // each of the input PCollection<KV<URI, ...>> with + // a tuple tag. Each input must have the same key type, URI + // in this case. The type parameter of the tuple tag matches + // the types of the values for each collection. + final TupleTag<Long> wordTotalsTag = new TupleTag<>(); + final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>(); + KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple + .of(wordTotalsTag, uriToWordTotal) + .and(wordCountsTag, uriToWordAndCount); + + // Perform a CoGroupByKey (a sort of pre-join) on the prepared + // inputs. This yields a mapping from URI to a CoGbkResult + // (CoGroupByKey Result). The CoGbkResult is a mapping + // from the above tuple tags to the values in each input + // associated with a particular URI. In this case, each + // KV<URI, CoGbkResult> group a URI with the total number of + // words in that document as well as all the (word, count) + // pairs for particular words. + PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput + .apply("CoGroupByUri", CoGroupByKey.<URI>create()); + + // Compute a mapping from each word to a (URI, term frequency) + // pair for each URI. A word's term frequency for a document + // is simply the number of times that word occurs in the document + // divided by the total number of words in the document. + PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal + .apply(ParDo.named("ComputeTermFrequencies").of( + new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); + + for (KV<String, Long> wordAndCount + : c.element().getValue().getAll(wordCountsTag)) { + String word = wordAndCount.getKey(); + Long wordCount = wordAndCount.getValue(); + Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); + c.output(KV.of(word, KV.of(uri, termFrequency))); + } + } + })); + + // Compute a mapping from each word to its document frequency. + // A word's document frequency in a corpus is the number of + // documents in which the word appears divided by the total + // number of documents in the corpus. Note how the total number of + // documents is passed as a side input; the same value is + // presented to each invocation of the DoFn. + PCollection<KV<String, Double>> wordToDf = wordToDocCount + .apply(ParDo + .named("ComputeDocFrequencies") + .withSideInputs(totalDocuments) + .of(new DoFn<KV<String, Long>, KV<String, Double>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Long documentCount = c.element().getValue(); + Long documentTotal = c.sideInput(totalDocuments); + Double documentFrequency = documentCount.doubleValue() + / documentTotal.doubleValue(); + + c.output(KV.of(word, documentFrequency)); + } + })); + + // Join the term frequency and document frequency + // collections, each keyed on the word. + final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>(); + final TupleTag<Double> dfTag = new TupleTag<>(); + PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple + .of(tfTag, wordToUriAndTf) + .and(dfTag, wordToDf) + .apply(CoGroupByKey.<String>create()); + + // Compute a mapping from each word to a (URI, TF-IDF) score + // for each URI. There are a variety of definitions of TF-IDF + // ("term frequency - inverse document frequency") score; + // here we use a basic version that is the term frequency + // divided by the log of the document frequency. + + return wordToUriAndTfAndDf + .apply(ParDo.named("ComputeTfIdf").of( + new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + private static final long serialVersionUID1 = 0; + + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Double df = c.element().getValue().getOnly(dfTag); + + for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) { + URI uri = uriAndTf.getKey(); + Double tf = uriAndTf.getValue(); + Double tfIdf = tf * Math.log(1 / df); + c.output(KV.of(word, KV.of(uri, tfIdf))); + } + } + })); + } + + // Instantiate Logger. + // It is suggested that the user specify the class name of the containing class + // (in this case ComputeTfIdf). + private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); + } + + /** + * A {@link PTransform} to write, in CSV format, a mapping from term and URI + * to score. + */ + public static class WriteTfIdf + extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> { + private static final long serialVersionUID = 0; + + private String output; + + public WriteTfIdf(String output) { + this.output = output; + } + + @Override + public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { + return wordToUriAndTfIdf + .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + c.output(String.format("%s,\t%s,\t%f", + c.element().getKey(), + c.element().getValue().getKey(), + c.element().getValue().getValue())); + } + })) + .apply(TextIO.Write + .to(output) + .withSuffix(".csv")); + } + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + options.setRunner(FlinkPipelineRunner.class); + + Pipeline pipeline = Pipeline.create(options); + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + + pipeline + .apply(new ReadDocuments(listInputDocuments(options))) + .apply(new ComputeTfIdf()) + .apply(new WriteTfIdf(options.getOutput())); + + pipeline.run(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java new file mode 100644 index 0000000..ba46301 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -0,0 +1,111 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.examples; + +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +public class WordCount { + + public static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static class CountWords extends PTransform<PCollection<String>, + PCollection<KV<String, Long>>> { + @Override + public PCollection<KV<String, Long>> apply(PCollection<String> lines) { + + // Convert lines of text into individual words. + PCollection<String> words = lines.apply( + ParDo.of(new ExtractWordsFn())); + + // Count the number of times each word occurs. + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + return wordCounts; + } + } + + /** A SimpleFunction that converts a Word and Count into a printable string. */ + public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { + @Override + public String apply(KV<String, Long> input) { + return input.getKey() + ": " + input.getValue(); + } + } + + /** + * Options supported by {@link WordCount}. + * <p> + * Inherits standard configuration options. + */ + public interface Options extends PipelineOptions, FlinkPipelineOptions { + @Description("Path of the file to read from") + @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") + String getInput(); + void setInput(String value); + + @Description("Path of the file to write to") + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(Options.class); + options.setRunner(FlinkPipelineRunner.class); + + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + .apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())) + .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + + p.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java new file mode 100644 index 0000000..8168122 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -0,0 +1,387 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.AvroCoder; +import com.google.cloud.dataflow.sdk.coders.DefaultCoder; +import com.google.cloud.dataflow.sdk.io.*; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.List; + +/** + * To run the example, first open a socket on a terminal by executing the command: + * <li> + * <li> + * <code>nc -lk 9999</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class AutoComplete { + + /** + * A PTransform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) { + PCollection<CompletionCandidate> candidates = input + // First count how often each token appears. + .apply(new Count.PerElement<String>()) + + // Map the KV outputs of Count into our own CompletionCandiate class. + .apply(ParDo.named("CreateCompletionCandidates").of( + new DoFn<KV<String, Long>, CompletionCandidate>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue()); + c.output(cand); + } + })); + + // Compute the top via either a flat or recursive algorithm. + if (recursive) { + return candidates + .apply(new ComputeTopRecursive(candidatesPerPrefix, 1)) + .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); + } else { + return candidates + .apply(new ComputeTopFlat(candidatesPerPrefix, 1)); + } + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends PTransform<PCollection<CompletionCandidate>, + PCollection<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public PCollection<KV<String, List<CompletionCandidate>>> apply( + PCollection<CompletionCandidate> input) { + return input + // For each completion candidate, map it to all prefixes. + .apply(ParDo.of(new AllPrefixes(minPrefix))) + + // Find and return the top candiates for each prefix. + .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix) + .withHotKeyFanout(new HotKeyFanout())); + } + + private static class HotKeyFanout implements SerializableFunction<String, Integer> { + private static final long serialVersionUID = 0; + + @Override + public Integer apply(String input) { + return (int) Math.pow(4, 5 - input.length()); + } + } + } + + /** + * Cheaper but higher latency. + * + * <p> Returns two PCollections, the first is top prefixes of size greater + * than minPrefix, and the second is top prefixes of size exactly + * minPrefix. + */ + private static class ComputeTopRecursive + extends PTransform<PCollection<CompletionCandidate>, + PCollectionList<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> { + private static final long serialVersionUID = 0; + + @Override + public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) { + return elem.getKey().length() > minPrefix ? 0 : 1; + } + } + + private static class FlattenTops + extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + for (CompletionCandidate cc : c.element().getValue()) { + c.output(cc); + } + } + } + + @Override + public PCollectionList<KV<String, List<CompletionCandidate>>> apply( + PCollection<CompletionCandidate> input) { + if (minPrefix > 10) { + // Base case, partitioning to return the output in the expected format. + return input + .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix)) + .apply(Partition.of(2, new KeySizePartitionFn())); + } else { + // If a candidate is in the top N for prefix a...b, it must also be in the top + // N for a...bX for every X, which is typlically a much smaller set to consider. + // First, compute the top candidate for prefixes of size at least minPrefix + 1. + PCollectionList<KV<String, List<CompletionCandidate>>> larger = input + .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1)); + // Consider the top candidates for each prefix of length minPrefix + 1... + PCollection<KV<String, List<CompletionCandidate>>> small = + PCollectionList + .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) + // ...together with those (previously excluded) candidates of length + // exactly minPrefix... + .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() { + private static final long serialVersionUID = 0; + + @Override + public Boolean apply(CompletionCandidate c) { + return c.getValue().length() == minPrefix; + } + }))) + .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections()) + // ...set the key to be the minPrefix-length prefix... + .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) + // ...and (re)apply the Top operator to all of them together. + .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)); + + PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger + .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); + + return PCollectionList.of(flattenLarger).and(small); + } + } + } + + /** + * A DoFn that keys each candidate by all its prefixes. + */ + private static class AllPrefixes + extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { + private static final long serialVersionUID = 0; + + private final int minPrefix; + private final int maxPrefix; + public AllPrefixes(int minPrefix) { + this(minPrefix, Integer.MAX_VALUE); + } + public AllPrefixes(int minPrefix, int maxPrefix) { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + @Override + public void processElement(ProcessContext c) { + String word = c.element().value; + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element()); + c.output(kv); + } + } + } + + /** + * Class used to store tag-count pairs. + */ + @DefaultCoder(AvroCoder.class) + static class CompletionCandidate implements Comparable<CompletionCandidate> { + private long count; + private String value; + + public CompletionCandidate(String value, long count) { + this.value = value; + this.count = count; + } + + public String getValue() { + return value; + } + + // Empty constructor required for Avro decoding. + @SuppressWarnings("unused") + public CompletionCandidate() {} + + @Override + public int compareTo(CompletionCandidate o) { + if (this.count < o.count) { + return -1; + } else if (this.count == o.count) { + return this.value.compareTo(o.value); + } else { + return 1; + } + } + + @Override + public boolean equals(Object other) { + if (other instanceof CompletionCandidate) { + CompletionCandidate that = (CompletionCandidate) other; + return this.count == that.count && this.value.equals(that.value); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Long.valueOf(count).hashCode() ^ value.hashCode(); + } + + @Override + public String toString() { + return "CompletionCandidate[" + value + ", " + count + "]"; + } + } + + static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + /** + * Takes as input a the top candidates per prefix, and emits an entity + * suitable for writing to Datastore. + */ + static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> + implements DoFn.RequiresWindowAccess{ + + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + StringBuilder str = new StringBuilder(); + KV<String, List<CompletionCandidate>> elem = c.element(); + + str.append(elem.getKey() +" @ "+ c.window() +" -> "); + for(CompletionCandidate cand: elem.getValue()) { + str.append(cand.toString() + " "); + } + System.out.println(str.toString()); + c.output(str.toString()); + } + } + + /** + * Options supported by this class. + * + * <p> Inherits standard Dataflow configuration options. + */ + private interface Options extends WindowedWordCount.StreamingWordCountOptions { + @Description("Whether to use the recursive algorithm") + @Default.Boolean(true) + Boolean getRecursive(); + void setRecursive(Boolean value); + } + + public static void main(String[] args) throws IOException { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + PTransform<? super PBegin, PCollection<String>> readSource = + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream"); + WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + // Create the pipeline. + Pipeline p = Pipeline.create(options); + PCollection<KV<String, List<CompletionCandidate>>> toWrite = p + .apply(readSource) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(ComputeTopCompletions.top(10, options.getRecursive())); + + toWrite + .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile())) + .apply(TextIO.Write.to("./outputAutoComplete.txt")); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java new file mode 100644 index 0000000..3a8bdb0 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.joda.time.Duration; + +/** + * To run the example, first open two sockets on two terminals by executing the commands: + * <li> + * <li> + * <code>nc -lk 9999</code>, and + * </li> + * <li> + * <code>nc -lk 9998</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class JoinExamples { + + static PCollection<String> joinEvents(PCollection<String> streamA, + PCollection<String> streamB) throws Exception { + + final TupleTag<String> firstInfoTag = new TupleTag<>(); + final TupleTag<String> secondInfoTag = new TupleTag<>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection<KV<String, String>> firstInfo = streamA.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection<KV<String, String>> secondInfo = streamB.apply( + ParDo.of(new ExtractEventDataFn())); + + // country code 'key' -> CGBKR (<event info>, <country name>) + PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple + .of(firstInfoTag, firstInfo) + .and(secondInfoTag, secondInfo) + .apply(CoGroupByKey.<String>create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of <event info>, <country name> + PCollection<KV<String, String>> finalResultCollection = + kvpCollection.apply(ParDo.named("Process").of( + new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + String key = e.getKey(); + + String defaultA = "NO_VALUE"; + + // the following getOnly is a bit tricky because it expects to have + // EXACTLY ONE value in the corresponding stream and for the corresponding key. + + String lineA = e.getValue().getOnly(firstInfoTag, defaultA); + for (String lineB : c.element().getValue().getAll(secondInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); + } + } + })); + + return finalResultCollection + .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String result = c.element().getKey() + " -> " + c.element().getValue(); + System.out.println(result); + c.output(result); + } + })); + } + + static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String line = c.element().toLowerCase(); + String key = line.split("\\s")[0]; + c.output(KV.of(key, line)); + } + } + + private interface Options extends WindowedWordCount.StreamingWordCountOptions { + + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + PTransform<? super PBegin, PCollection<String>> readSourceA = + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); + PTransform<? super PBegin, PCollection<String>> readSourceB = + Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); + + WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + Pipeline p = Pipeline.create(options); + + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection<String> streamA = p.apply(readSourceA) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + PCollection<String> streamB = p.apply(readSourceB) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<String> formattedResults = joinEvents(streamA, streamB); + formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); + p.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java new file mode 100644 index 0000000..fa0c8e9 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -0,0 +1,141 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.joda.time.Duration; + +import java.util.Properties; + +public class KafkaWindowedWordCountExample { + + static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from + static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact + static final String GROUP_ID = "myGroup"; // Default groupId + static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka + + public static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + System.out.println(row); + c.output(row); + } + } + + public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { + @Description("The Kafka topic to read from") + @Default.String(KAFKA_TOPIC) + String getKafkaTopic(); + + void setKafkaTopic(String value); + + @Description("The Kafka Broker to read from") + @Default.String(KAFKA_BROKER) + String getBroker(); + + void setBroker(String value); + + @Description("The Zookeeper server to connect to") + @Default.String(ZOOKEEPER) + String getZookeeper(); + + void setZookeeper(String value); + + @Description("The groupId") + @Default.String(GROUP_ID) + String getGroup(); + + void setGroup(String value); + + } + + public static void main(String[] args) { + PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); + KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); + options.setJobName("KafkaExample"); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); + Pipeline pipeline = Pipeline.create(options); + + Properties p = new Properties(); + p.setProperty("zookeeper.connect", options.getZookeeper()); + p.setProperty("bootstrap.servers", options.getBroker()); + p.setProperty("group.id", options.getGroup()); + + // this is the Flink consumer that reads the input to + // the program from a kafka topic. + FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( + options.getKafkaTopic(), + new SimpleStringSchema(), p); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputKafka.txt")); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java new file mode 100644 index 0000000..6af044d --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java @@ -0,0 +1,128 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.*; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * To run the example, first open a socket on a terminal by executing the command: + * <li> + * <li> + * <code>nc -lk 9999</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class WindowedWordCount { + + private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); + + static final long WINDOW_SIZE = 10; // Default window duration in seconds + static final long SLIDE_SIZE = 5; // Default window slide in seconds + + static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + c.output(row); + } + } + + static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options { + @Description("Sliding window duration, in seconds") + @Default.Long(WINDOW_SIZE) + Long getWindowSize(); + + void setWindowSize(Long value); + + @Description("Window slide, in seconds") + @Default.Long(SLIDE_SIZE) + Long getSlide(); + + void setSlide(Long value); + } + + public static void main(String[] args) throws IOException { + StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); + options.setStreaming(true); + options.setWindowSize(10L); + options.setSlide(5L); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + + " sec. and a slide of " + options.getSlide()); + + Pipeline pipeline = Pipeline.create(options); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) + .every(Duration.standardSeconds(options.getSlide()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputWordCount.txt")); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java new file mode 100644 index 0000000..cd25ba3 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java @@ -0,0 +1,80 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.io; + +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; + +/** + * Transform for printing the contents of a {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * to standard output. + * + * This is Flink-specific and will only work when executed using the + * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}. + */ +public class ConsoleIO { + + /** + * A PTransform that writes a PCollection to a standard output. + */ + public static class Write { + + /** + * Returns a ConsoleIO.Write PTransform with a default step name. + */ + public static Bound create() { + return new Bound(); + } + + /** + * Returns a ConsoleIO.Write PTransform with the given step name. + */ + public static Bound named(String name) { + return new Bound().named(name); + } + + /** + * A PTransform that writes a bounded PCollection to standard output. + */ + public static class Bound extends PTransform<PCollection<?>, PDone> { + private static final long serialVersionUID = 0; + + Bound() { + super("ConsoleIO.Write"); + } + + Bound(String name) { + super(name); + } + + /** + * Returns a new ConsoleIO.Write PTransform that's like this one but with the given + * step + * name. Does not modify this object. + */ + public Bound named(String name) { + return new Bound(name); + } + + @Override + public PDone apply(PCollection<?> input) { + return PDone.in(input.getPipeline()); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java new file mode 100644 index 0000000..5201423 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -0,0 +1,149 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.values.PValue; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs. + * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator} + */ +public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { + + /** + * The necessary context in the case of a batch job. + */ + private final FlinkBatchTranslationContext batchContext; + + private int depth = 0; + + /** + * Composite transform that we want to translate before proceeding with other transforms. + */ + private PTransform<?, ?> currentCompositeTransform; + + public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { + this.batchContext = new FlinkBatchTranslationContext(env, options); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline Visitor Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); + + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == null) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + currentCompositeTransform = transform; + if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) { + // we can only optimize CoGroupByKey for input size 2 + currentCompositeTransform = null; + } + } + } + this.depth++; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == transform) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node)); + applyBatchTransform(transform, node, translator); + currentCompositeTransform = null; + } else { + throw new IllegalStateException("Attempted to translate composite transform " + + "but no translator was found: " + currentCompositeTransform); + } + } + this.depth--; + System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node)); + } + + @Override + public void visitTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node)); + if (currentCompositeTransform != null) { + // ignore it + return; + } + + // get the transformation corresponding to hte node we are + // currently visiting and translate it into its Flink alternative. + + PTransform<?, ?> transform = node.getTransform(); + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator == null) { + System.out.println(node.getTransform().getClass()); + throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); + } + applyBatchTransform(transform, node, translator); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + // do nothing here + } + + private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) { + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; + + // create the applied PTransform on the batchContext + batchContext.setCurrentTransform(AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + typedTranslator.translateNode(typedTransform, batchContext); + } + + /** + * A translator of a {@link PTransform}. + */ + public interface BatchTransformTranslator<Type extends PTransform> { + void translateNode(Type transform, FlinkBatchTranslationContext context); + } + + private static String genSpaces(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += "| "; + } + return s; + } + + private static String formatNodeName(TransformTreeNode node) { + return node.toString().split("@")[1] + node.getTransform(); + } +}