Repository: beam-site Updated Branches: refs/heads/asf-site 9cc5b2280 -> 9ffe5ec58
[BEAM-1353] Style Guide fixups Fixes usages of PTransforms affected by changes as part of https://issues.apache.org/jira/browse/BEAM-1353 Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/e78f8f27 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/e78f8f27 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/e78f8f27 Branch: refs/heads/asf-site Commit: e78f8f276a6cbf3156e0f7af3dd4ae1d9b92ee7a Parents: 0d0da02 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri May 12 16:07:19 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon May 15 11:28:52 2017 -0700 ---------------------------------------------------------------------- .../pipelines/create-your-pipeline.md | 4 +- .../pipelines/design-your-pipeline.md | 20 ++++---- src/documentation/programming-guide.md | 54 ++++++++++---------- src/documentation/sdks/java-extensions.md | 2 +- src/get-started/mobile-gaming-example.md | 20 ++++---- src/get-started/wordcount-example.md | 6 +-- 6 files changed, 53 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/pipelines/create-your-pipeline.md ---------------------------------------------------------------------- diff --git a/src/documentation/pipelines/create-your-pipeline.md b/src/documentation/pipelines/create-your-pipeline.md index b765467..cbf7d31 100644 --- a/src/documentation/pipelines/create-your-pipeline.md +++ b/src/documentation/pipelines/create-your-pipeline.md @@ -42,7 +42,7 @@ The following example code shows how to `apply` a `TextIO.Read` root transform t ```java PCollection<String> lines = p.apply( - "ReadLines", TextIO.Read.from("gs://some/inputData.txt")); + "ReadLines", TextIO.read().from("gs://some/inputData.txt")); ``` ## Applying Transforms to Process Pipeline Data @@ -68,7 +68,7 @@ The following example code shows how to `apply` a `TextIO.Write` transform to wr ```java PCollection<String> filteredWords = ...; -filteredWords.apply("WriteMyFile", TextIO.Write.to("gs://some/outputData.txt")); +filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt")); ``` ## Running Your Pipeline http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/pipelines/design-your-pipeline.md ---------------------------------------------------------------------- diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index c40803c..ce6a734 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -103,13 +103,7 @@ final TupleTag<String> startsWithATag = new TupleTag<String>(){}; final TupleTag<String> startsWithBTag = new TupleTag<String>(){}; PCollectionTuple mixedCollection = - dbRowCollection.apply( - ParDo - // Specify main output. In this example, it is the output - // with tag startsWithATag. - .withOutputTags(startsWithATag, - // Specify the output with tag startsWithBTag, as a TupleTagList. - TupleTagList.of(startsWithBTag)) + dbRowCollection.apply(ParDo .of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { @@ -121,8 +115,12 @@ PCollectionTuple mixedCollection = c.output(startsWithBTag, c.element()); } } - } - )); + }) + // Specify main output. In this example, it is the output + // with tag startsWithATag. + .withOutputTags(startsWithATag, + // Specify the output with tag startsWithBTag, as a TupleTagList. + TupleTagList.of(startsWithBTag))); // Get subset of the output with tag startsWithATag. mixedCollection.get(startsWithATag).apply(...); @@ -159,7 +157,7 @@ mergedCollectionWithFlatten.apply(...); ## Multiple sources -Your pipeline can read its input from one or more sources. If your pipeline reads from multiple sources and the data from those sources is related, it can be useful to join the inputs together. In the example illustrated in Figure 5 below, the pipeline reads names and addresses from a database table, and names and order numbers from a text file. The pipeline then uses `CoGroupByKey` to join this information, where the key is the name; the resulting `PCollection` contains all the combinations of names, addresses, and orders. +Your pipeline can read its input from one or more sources. If your pipeline reads from multiple sources and the data from those sources is related, it can be useful to join the inputs together. In the example illustrated in Figure 5 below, the pipeline reads names and addresses from a database table, and names and order numbers from a Kafka topic. The pipeline then uses `CoGroupByKey` to join this information, where the key is the name; the resulting `PCollection` contains all the combinations of names, addresses, and orders. <figure id="fig5"> <img src="{{ site.baseurl }}/images/design-your-pipeline-join.png" @@ -169,7 +167,7 @@ Figure 5: A pipeline with multiple input sources. See the example code below: ```java PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...); -PCollection<KV<String, String>> userOrder = pipeline.apply(TextIO.<KV<String, String>>read()...); +PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...); final TupleTag<String> addressTag = new TupleTag<String>(); final TupleTag<String> orderTag = new TupleTag<String>(); http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/programming-guide.md ---------------------------------------------------------------------- diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index f70e255..d7e37a0 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -191,7 +191,7 @@ public static void main(String[] args) { // Create the PCollection 'lines' by applying a 'Read' transform. PCollection<String> lines = p.apply( - "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt")); + "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt")); } ``` @@ -479,8 +479,8 @@ PCollection<String> words = ...; // Apply a MapElements with an anonymous lambda function to the PCollection words. // Save the result as the PCollection wordLengths. PCollection<Integer> wordLengths = words.apply( - MapElements.via((String word) -> word.length()) - .withOutputType(new TypeDescriptor<Integer>() {}); + MapElements.into(TypeDescriptors.integers()) + .via((String word) -> word.length())); ``` ```py @@ -862,16 +862,18 @@ Side inputs are useful if your `ParDo` needs to inject additional data when proc // Apply a ParDo that takes maxWordLengthCutOffView as a side input. PCollection<String> wordsBelowCutOff = - words.apply(ParDo.withSideInputs(maxWordLengthCutOffView) - .of(new DoFn<String, String>() { - public void processElement(ProcessContext c) { - String word = c.element(); - // In our DoFn, access the side input. - int lengthCutOff = c.sideInput(maxWordLengthCutOffView); - if (word.length() <= lengthCutOff) { - c.output(word); - } - }})); + words.apply(ParDo + .of(new DoFn<String, String>() { + public void processElement(ProcessContext c) { + String word = c.element(); + // In our DoFn, access the side input. + int lengthCutOff = c.sideInput(maxWordLengthCutOffView); + if (word.length() <= lengthCutOff) { + c.output(word); + } + } + }).withSideInputs(maxWordLengthCutOffView) + ); ``` ```py @@ -943,17 +945,16 @@ While `ParDo` always produces a main output `PCollection` (as the return value f // to our ParDo. Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple. PCollectionTuple results = - words.apply( - ParDo + words.apply(ParDo + .of(new DoFn<String, String>() { + // DoFn continues here. + ... + }) // Specify the tag for the main output. .withOutputTags(wordsBelowCutOffTag, // Specify the tags for the two additional outputs as a TupleTagList. TupleTagList.of(wordLengthsAboveCutOffTag) - .and(markedWordsTag)) - .of(new DoFn<String, String>() { - // DoFn continues here. - ... - } + .and(markedWordsTag))); ``` ```py @@ -1114,7 +1115,7 @@ Read transforms read data from an external source and return a `PCollection` rep #### Using a read transform: ```java -PCollection<String> lines = p.apply(TextIO.Read.from("gs://some/inputData.txt")); +PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt")); ``` ```py @@ -1128,7 +1129,7 @@ Write transforms write the data in a `PCollection` to an external data source. Y #### Using a Write transform: ```java -output.apply(TextIO.Write.to("gs://some/outputData")); +output.apply(TextIO.write().to("gs://some/outputData")); ``` ```py @@ -1143,7 +1144,7 @@ Many read transforms support reading from multiple input files matching a glob o ```java p.apply(âReadFromTextâ, - TextIO.Read.from("protocol://my_bucket/path/to/input-*.csv"); + TextIO.read().from("protocol://my_bucket/path/to/input-*.csv"); ``` ```py @@ -1161,7 +1162,7 @@ The following write transform example writes multiple output files to a location ```java records.apply("WriteToText", - TextIO.Write.to("protocol://my_bucket/path/to/numbers") + TextIO.write().to("protocol://my_bucket/path/to/numbers") .withSuffix(".csv")); ``` @@ -1563,7 +1564,7 @@ You can allow late data by invoking the `.withAllowedLateness` operation when yo .withAllowedLateness(Duration.standardDays(2))); ``` -When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness propagates forward to any subsequent `PCollection` derived from the first `PCollection` you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explictly by applying `Window.withAllowedLateness()` again. +When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness propagates forward to any subsequent `PCollection` derived from the first `PCollection` you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explictly by applying `Window.configure().withAllowedLateness()`. ### Adding timestamps to a PCollection's elements @@ -1737,7 +1738,7 @@ You set the allowed lateness by using `.withAllowedLateness()` when you set your # The Beam SDK for Python does not support triggers. ``` -This allowed lateness propagates to all `PCollection`s derived as a result of applying transforms to the original `PCollection`. If you want to change the allowed lateness later in your pipeline, you can apply `Window.withAllowedLateness()` again, explicitly. +This allowed lateness propagates to all `PCollection`s derived as a result of applying transforms to the original `PCollection`. If you want to change the allowed lateness later in your pipeline, you can apply `Window.configure().withAllowedLateness()` again, explicitly. ### <a name="composite-triggers"></a>Composite Triggers @@ -1770,6 +1771,7 @@ You can express this pattern using `AfterWatermark.pastEndOfWindow`. For example ```java .apply(Window + .configure() .triggering(AfterWatermark .pastEndOfWindow() .withLateFirings(AfterProcessingTime http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/sdks/java-extensions.md ---------------------------------------------------------------------- diff --git a/src/documentation/sdks/java-extensions.md b/src/documentation/sdks/java-extensions.md index a4694af..17a79e7 100644 --- a/src/documentation/sdks/java-extensions.md +++ b/src/documentation/sdks/java-extensions.md @@ -55,5 +55,5 @@ PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped = // For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key. PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted = grouped.apply( - SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options())); + SortValues.<String, String, Integer>create(BufferedExternalSorter.options())); ``` http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/get-started/mobile-gaming-example.md ---------------------------------------------------------------------- diff --git a/src/get-started/mobile-gaming-example.md b/src/get-started/mobile-gaming-example.md index 5c97274..9e59274 100644 --- a/src/get-started/mobile-gaming-example.md +++ b/src/get-started/mobile-gaming-example.md @@ -107,9 +107,9 @@ public static class ExtractAndSumScore return gameInfo .apply(MapElements - .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) - .withOutputType( - TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))) + .into( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())) + .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))) .apply(Sum.<String>integersPerKey()); } } @@ -148,7 +148,7 @@ public static void main(String[] args) throws Exception { Pipeline pipeline = Pipeline.create(options); // Read events from a text file and parse them. - pipeline.apply(TextIO.Read.from(options.getInput())) + pipeline.apply(TextIO.read().from(options.getInput())) .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) // Extract and sum username/score pairs from the event data. .apply("ExtractUserScore", new ExtractAndSumScore("user")) @@ -314,7 +314,7 @@ public static void main(String[] args) throws Exception { final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin())); // Read 'gaming' events from a text file. - pipeline.apply(TextIO.Read.from(options.getInput())) + pipeline.apply(TextIO.read().from(options.getInput())) // Parse the incoming data. .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) @@ -601,8 +601,6 @@ public static class CalculateSpammyUsers // Filter the user sums using the global mean. PCollection<KV<String, Integer>> filtered = sumScores .apply("ProcessAndFilter", ParDo - // use the derived mean total score as a side input - .withSideInputs(globalMeanScore) .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { private final Aggregator<Long, Long> numSpammerUsers = createAggregator("SpammerUsers", new Sum.SumLongFn()); @@ -617,7 +615,9 @@ public static class CalculateSpammyUsers c.output(c.element()); } } - })); + }) + // use the derived mean total score as a side input + .withSideInputs(globalMeanScore)); return filtered; } } @@ -635,7 +635,6 @@ rawEvents FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out the detected spammer users, using the side input derived above. .apply("FilterOutSpammers", ParDo - .withSideInputs(spammersView) .of(new DoFn<GameActionInfo, GameActionInfo>() { @ProcessElement public void processElement(ProcessContext c) { @@ -644,7 +643,8 @@ rawEvents c.output(c.element()); } } - })) + }) + .withSideInputs(spammersView)) // Extract and sum teamname/score pairs from the event data. .apply("ExtractTeamScore", new ExtractAndSumScore("team")) ``` http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/get-started/wordcount-example.md ---------------------------------------------------------------------- diff --git a/src/get-started/wordcount-example.md b/src/get-started/wordcount-example.md index 503f930..023086d 100644 --- a/src/get-started/wordcount-example.md +++ b/src/get-started/wordcount-example.md @@ -96,7 +96,7 @@ The Minimal WordCount pipeline contains five transforms: 1. A text file `Read` transform is applied to the Pipeline object itself, and produces a `PCollection` as output. Each element in the output PCollection represents one line of text from the input file. This example uses input data stored in a publicly accessible Google Cloud Storage bucket ("gs://"). ```java - p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) + p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) ``` ```py @@ -157,7 +157,7 @@ The Minimal WordCount pipeline contains five transforms: 5. A text file write transform. This transform takes the final `PCollection` of formatted Strings as input and writes each element to an output text file. Each element in the input `PCollection` represents one line of text in the resulting output file. ```java - .apply(TextIO.Write.to("wordcounts")); + .apply(TextIO.write().to("wordcounts")); ``` ```py @@ -398,7 +398,7 @@ public static void main(String[] args) throws IOException { Pipeline pipeline = Pipeline.create(options); PCollection<String> input = pipeline - .apply(TextIO.Read.from(options.getInputFile())) + .apply(TextIO.read().from(options.getInputFile())) ```