Repository: beam-site
Updated Branches:
  refs/heads/asf-site 9cc5b2280 -> 9ffe5ec58


[BEAM-1353] Style Guide fixups

Fixes usages of PTransforms affected by changes as part of
https://issues.apache.org/jira/browse/BEAM-1353


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/e78f8f27
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/e78f8f27
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/e78f8f27

Branch: refs/heads/asf-site
Commit: e78f8f276a6cbf3156e0f7af3dd4ae1d9b92ee7a
Parents: 0d0da02
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri May 12 16:07:19 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Mon May 15 11:28:52 2017 -0700

----------------------------------------------------------------------
 .../pipelines/create-your-pipeline.md           |  4 +-
 .../pipelines/design-your-pipeline.md           | 20 ++++----
 src/documentation/programming-guide.md          | 54 ++++++++++----------
 src/documentation/sdks/java-extensions.md       |  2 +-
 src/get-started/mobile-gaming-example.md        | 20 ++++----
 src/get-started/wordcount-example.md            |  6 +--
 6 files changed, 53 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/pipelines/create-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/create-your-pipeline.md 
b/src/documentation/pipelines/create-your-pipeline.md
index b765467..cbf7d31 100644
--- a/src/documentation/pipelines/create-your-pipeline.md
+++ b/src/documentation/pipelines/create-your-pipeline.md
@@ -42,7 +42,7 @@ The following example code shows how to `apply` a 
`TextIO.Read` root transform t
 
 ```java
 PCollection<String> lines = p.apply(
-  "ReadLines", TextIO.Read.from("gs://some/inputData.txt"));
+  "ReadLines", TextIO.read().from("gs://some/inputData.txt"));
 ```
 
 ## Applying Transforms to Process Pipeline Data
@@ -68,7 +68,7 @@ The following example code shows how to `apply` a 
`TextIO.Write` transform to wr
 ```java
 PCollection<String> filteredWords = ...;
 
-filteredWords.apply("WriteMyFile", 
TextIO.Write.to("gs://some/outputData.txt"));
+filteredWords.apply("WriteMyFile", 
TextIO.write().to("gs://some/outputData.txt"));
 ```
 
 ## Running Your Pipeline

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/pipelines/design-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/design-your-pipeline.md 
b/src/documentation/pipelines/design-your-pipeline.md
index c40803c..ce6a734 100644
--- a/src/documentation/pipelines/design-your-pipeline.md
+++ b/src/documentation/pipelines/design-your-pipeline.md
@@ -103,13 +103,7 @@ final TupleTag<String> startsWithATag = new 
TupleTag<String>(){};
 final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
 
 PCollectionTuple mixedCollection =
-    dbRowCollection.apply(
-        ParDo
-        // Specify main output. In this example, it is the output
-        // with tag startsWithATag.
-        .withOutputTags(startsWithATag,
-        // Specify the output with tag startsWithBTag, as a TupleTagList.
-                        TupleTagList.of(startsWithBTag))
+    dbRowCollection.apply(ParDo
         .of(new DoFn<String, String>() {
           @ProcessElement
           public void processElement(ProcessContext c) {
@@ -121,8 +115,12 @@ PCollectionTuple mixedCollection =
               c.output(startsWithBTag, c.element());
             }
           }
-        }
-        ));
+        })
+        // Specify main output. In this example, it is the output
+        // with tag startsWithATag.
+        .withOutputTags(startsWithATag,
+        // Specify the output with tag startsWithBTag, as a TupleTagList.
+                        TupleTagList.of(startsWithBTag)));
 
 // Get subset of the output with tag startsWithATag.
 mixedCollection.get(startsWithATag).apply(...);
@@ -159,7 +157,7 @@ mergedCollectionWithFlatten.apply(...);
 
 ## Multiple sources
 
-Your pipeline can read its input from one or more sources. If your pipeline 
reads from multiple sources and the data from those sources is related, it can 
be useful to join the inputs together. In the example illustrated in Figure 5 
below, the pipeline reads names and addresses from a database table, and names 
and order numbers from a text file. The pipeline then uses `CoGroupByKey` to 
join this information, where the key is the name; the resulting `PCollection` 
contains all the combinations of names, addresses, and orders.
+Your pipeline can read its input from one or more sources. If your pipeline 
reads from multiple sources and the data from those sources is related, it can 
be useful to join the inputs together. In the example illustrated in Figure 5 
below, the pipeline reads names and addresses from a database table, and names 
and order numbers from a Kafka topic. The pipeline then uses `CoGroupByKey` to 
join this information, where the key is the name; the resulting `PCollection` 
contains all the combinations of names, addresses, and orders.
 
 <figure id="fig5">
     <img src="{{ site.baseurl }}/images/design-your-pipeline-join.png"
@@ -169,7 +167,7 @@ Figure 5: A pipeline with multiple input sources. See the 
example code below:
 ```java
 PCollection<KV<String, String>> userAddress = 
pipeline.apply(JdbcIO.<KV<String, String>>read()...);
 
-PCollection<KV<String, String>> userOrder = pipeline.apply(TextIO.<KV<String, 
String>>read()...);
+PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, 
String>read()...);
 
 final TupleTag<String> addressTag = new TupleTag<String>();
 final TupleTag<String> orderTag = new TupleTag<String>();

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md 
b/src/documentation/programming-guide.md
index f70e255..d7e37a0 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -191,7 +191,7 @@ public static void main(String[] args) {
 
     // Create the PCollection 'lines' by applying a 'Read' transform.
     PCollection<String> lines = p.apply(
-      "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
+      "ReadMyFile", 
TextIO.read().from("protocol://path/to/some/inputData.txt"));
 }
 ```
 
@@ -479,8 +479,8 @@ PCollection<String> words = ...;
 // Apply a MapElements with an anonymous lambda function to the PCollection 
words.
 // Save the result as the PCollection wordLengths.
 PCollection<Integer> wordLengths = words.apply(
-  MapElements.via((String word) -> word.length())
-      .withOutputType(new TypeDescriptor<Integer>() {});
+  MapElements.into(TypeDescriptors.integers())
+             .via((String word) -> word.length()));
 ```
 
 ```py
@@ -862,16 +862,18 @@ Side inputs are useful if your `ParDo` needs to inject 
additional data when proc
 
   // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
   PCollection<String> wordsBelowCutOff =
-  words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
-                    .of(new DoFn<String, String>() {
-      public void processElement(ProcessContext c) {
-        String word = c.element();
-        // In our DoFn, access the side input.
-        int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
-        if (word.length() <= lengthCutOff) {
-          c.output(word);
-        }
-  }}));
+  words.apply(ParDo
+      .of(new DoFn<String, String>() {
+          public void processElement(ProcessContext c) {
+            String word = c.element();
+            // In our DoFn, access the side input.
+            int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
+            if (word.length() <= lengthCutOff) {
+              c.output(word);
+            }
+          }
+      }).withSideInputs(maxWordLengthCutOffView)
+  );
 ```
 
 ```py
@@ -943,17 +945,16 @@ While `ParDo` always produces a main output `PCollection` 
(as the return value f
 // to our ParDo. Note that all of the outputs (including the main output 
PCollection) are bundled into the returned PCollectionTuple.
 
   PCollectionTuple results =
-      words.apply(
-          ParDo
+      words.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
           // Specify the tag for the main output.
           .withOutputTags(wordsBelowCutOffTag,
           // Specify the tags for the two additional outputs as a TupleTagList.
                           TupleTagList.of(wordLengthsAboveCutOffTag)
-                                      .and(markedWordsTag))
-          .of(new DoFn<String, String>() {
-            // DoFn continues here.
-            ...
-          }
+                                      .and(markedWordsTag)));
 ```
 
 ```py
@@ -1114,7 +1115,7 @@ Read transforms read data from an external source and 
return a `PCollection` rep
 #### Using a read transform:
 
 ```java
-PCollection<String> lines = 
p.apply(TextIO.Read.from("gs://some/inputData.txt"));
+PCollection<String> lines = 
p.apply(TextIO.read().from("gs://some/inputData.txt"));
 ```
 
 ```py
@@ -1128,7 +1129,7 @@ Write transforms write the data in a `PCollection` to an 
external data source. Y
 #### Using a Write transform:
 
 ```java
-output.apply(TextIO.Write.to("gs://some/outputData"));
+output.apply(TextIO.write().to("gs://some/outputData"));
 ```
 
 ```py
@@ -1143,7 +1144,7 @@ Many read transforms support reading from multiple input 
files matching a glob o
 
 ```java
 p.apply(“ReadFromText”,
-    TextIO.Read.from("protocol://my_bucket/path/to/input-*.csv");
+    TextIO.read().from("protocol://my_bucket/path/to/input-*.csv");
 ```
 
 ```py
@@ -1161,7 +1162,7 @@ The following write transform example writes multiple 
output files to a location
 
 ```java
 records.apply("WriteToText",
-    TextIO.Write.to("protocol://my_bucket/path/to/numbers")
+    TextIO.write().to("protocol://my_bucket/path/to/numbers")
                 .withSuffix(".csv"));
 ```
 
@@ -1563,7 +1564,7 @@ You can allow late data by invoking the 
`.withAllowedLateness` operation when yo
               .withAllowedLateness(Duration.standardDays(2)));
 ```
 
-When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness 
propagates forward to any subsequent `PCollection` derived from the first 
`PCollection` you applied allowed lateness to. If you want to change the 
allowed lateness later in your pipeline, you must do so explictly by applying 
`Window.withAllowedLateness()` again.
+When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness 
propagates forward to any subsequent `PCollection` derived from the first 
`PCollection` you applied allowed lateness to. If you want to change the 
allowed lateness later in your pipeline, you must do so explictly by applying 
`Window.configure().withAllowedLateness()`.
 
 
 ### Adding timestamps to a PCollection's elements
@@ -1737,7 +1738,7 @@ You set the allowed lateness by using 
`.withAllowedLateness()` when you set your
   # The Beam SDK for Python does not support triggers.
 ```
 
-This allowed lateness propagates to all `PCollection`s derived as a result of 
applying transforms to the original `PCollection`. If you want to change the 
allowed lateness later in your pipeline, you can apply 
`Window.withAllowedLateness()` again, explicitly.
+This allowed lateness propagates to all `PCollection`s derived as a result of 
applying transforms to the original `PCollection`. If you want to change the 
allowed lateness later in your pipeline, you can apply 
`Window.configure().withAllowedLateness()` again, explicitly.
 
 
 ### <a name="composite-triggers"></a>Composite Triggers
@@ -1770,6 +1771,7 @@ You can express this pattern using 
`AfterWatermark.pastEndOfWindow`. For example
 
 ```java
   .apply(Window
+      .configure()
       .triggering(AfterWatermark
            .pastEndOfWindow()
            .withLateFirings(AfterProcessingTime

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/sdks/java-extensions.md
----------------------------------------------------------------------
diff --git a/src/documentation/sdks/java-extensions.md 
b/src/documentation/sdks/java-extensions.md
index a4694af..17a79e7 100644
--- a/src/documentation/sdks/java-extensions.md
+++ b/src/documentation/sdks/java-extensions.md
@@ -55,5 +55,5 @@ PCollection<KV<String, Iterable<KV<String, Integer>>>> 
grouped =
 // For every primary key, sort the iterable of <SecondaryKey, Value> pairs by 
secondary key.
 PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
     grouped.apply(
-        SortValues.<String, String, Integer>create(new 
BufferedExternalSorter.Options()));
+        SortValues.<String, String, 
Integer>create(BufferedExternalSorter.options()));
 ```

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/get-started/mobile-gaming-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/mobile-gaming-example.md 
b/src/get-started/mobile-gaming-example.md
index 5c97274..9e59274 100644
--- a/src/get-started/mobile-gaming-example.md
+++ b/src/get-started/mobile-gaming-example.md
@@ -107,9 +107,9 @@ public static class ExtractAndSumScore
 
     return gameInfo
       .apply(MapElements
-          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), 
gInfo.getScore()))
-          .withOutputType(
-              TypeDescriptors.kvs(TypeDescriptors.strings(), 
TypeDescriptors.integers())))
+          .into(
+              TypeDescriptors.kvs(TypeDescriptors.strings(), 
TypeDescriptors.integers()))
+          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), 
gInfo.getScore())))
       .apply(Sum.<String>integersPerKey());
   }
 }
@@ -148,7 +148,7 @@ public static void main(String[] args) throws Exception {
   Pipeline pipeline = Pipeline.create(options);
 
   // Read events from a text file and parse them.
-  pipeline.apply(TextIO.Read.from(options.getInput()))
+  pipeline.apply(TextIO.read().from(options.getInput()))
     .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
     // Extract and sum username/score pairs from the event data.
     .apply("ExtractUserScore", new ExtractAndSumScore("user"))
@@ -314,7 +314,7 @@ public static void main(String[] args) throws Exception {
   final Instant startMinTimestamp = new 
Instant(minFmt.parseMillis(options.getStartMin()));
 
   // Read 'gaming' events from a text file.
-  pipeline.apply(TextIO.Read.from(options.getInput()))
+  pipeline.apply(TextIO.read().from(options.getInput()))
     // Parse the incoming data.
     .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
 
@@ -601,8 +601,6 @@ public static class CalculateSpammyUsers
     // Filter the user sums using the global mean.
     PCollection<KV<String, Integer>> filtered = sumScores
         .apply("ProcessAndFilter", ParDo
-            // use the derived mean total score as a side input
-            .withSideInputs(globalMeanScore)
             .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
               private final Aggregator<Long, Long> numSpammerUsers =
                 createAggregator("SpammerUsers", new Sum.SumLongFn());
@@ -617,7 +615,9 @@ public static class CalculateSpammyUsers
                   c.output(c.element());
                 }
               }
-            }));
+            })
+            // use the derived mean total score as a side input
+            .withSideInputs(globalMeanScore));
     return filtered;
   }
 }
@@ -635,7 +635,6 @@ rawEvents
       
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
   // Filter out the detected spammer users, using the side input derived above.
   .apply("FilterOutSpammers", ParDo
-          .withSideInputs(spammersView)
           .of(new DoFn<GameActionInfo, GameActionInfo>() {
             @ProcessElement
             public void processElement(ProcessContext c) {
@@ -644,7 +643,8 @@ rawEvents
                 c.output(c.element());
               }
             }
-          }))
+          })
+          .withSideInputs(spammersView))
   // Extract and sum teamname/score pairs from the event data.
   .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
 ```

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/get-started/wordcount-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/wordcount-example.md 
b/src/get-started/wordcount-example.md
index 503f930..023086d 100644
--- a/src/get-started/wordcount-example.md
+++ b/src/get-started/wordcount-example.md
@@ -96,7 +96,7 @@ The Minimal WordCount pipeline contains five transforms:
 1.  A text file `Read` transform is applied to the Pipeline object itself, and 
produces a `PCollection` as output. Each element in the output PCollection 
represents one line of text from the input file. This example uses input data 
stored in a publicly accessible Google Cloud Storage bucket ("gs://").
 
     ```java
-    p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
     ```
 
     ```py
@@ -157,7 +157,7 @@ The Minimal WordCount pipeline contains five transforms:
 5.  A text file write transform. This transform takes the final `PCollection` 
of formatted Strings as input and writes each element to an output text file. 
Each element in the input `PCollection` represents one line of text in the 
resulting output file.
 
     ```java
-    .apply(TextIO.Write.to("wordcounts"));
+    .apply(TextIO.write().to("wordcounts"));
     ```
 
     ```py
@@ -398,7 +398,7 @@ public static void main(String[] args) throws IOException {
     Pipeline pipeline = Pipeline.create(options);
 
     PCollection<String> input = pipeline
-      .apply(TextIO.Read.from(options.getInputFile()))
+      .apply(TextIO.read().from(options.getInputFile()))
 
 ```
 

Reply via email to