This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cbac53b Update code comments to improve readability in docs (#9024) cbac53b is described below commit cbac53b25ef37691b33684f6e4faf3144a308739 Author: Lukasz Cwik <lukec...@gmail.com> AuthorDate: Tue Jul 9 13:06:30 2019 -0700 Update code comments to improve readability in docs (#9024) * Update code comments to improve readability in docs * Apply suggestions Clarify FileIO.ReadableFile description Co-Authored-By: Lukasz Cwik <lc...@google.com> * Run spotlessApply for Spotless PreCommit check * Add javadoc class description for Java PreCommit check * Fix javadoc comment * fixup! Fix broken link --- .../apache/beam/examples/snippets/Snippets.java | 39 ++++++++++------------ website/src/documentation/patterns/overview.md | 4 +-- .../documentation/patterns/side-input-patterns.md | 4 +-- 3 files changed, 21 insertions(+), 26 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java index fb48304..2907ff3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java @@ -528,22 +528,23 @@ public class Snippets { TextIO.read() .from("<path-to-files>/*") .watchForNewFiles( - // Check for new files every minute + // Check for new files every minute. Duration.standardMinutes(1), - // Stop watching the filepattern if no new files appear within an hour + // Stop watching the file pattern if no new files appear for an hour. Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1)))); // [END FileProcessPatternProcessNewFilesSnip2] // [START FileProcessPatternAccessMetadataSnip1] p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz")) - // withCompression can be omitted - by default compression is detected from the filename. + // The withCompression method is optional. By default, the Beam SDK detects compression from + // the filename. .apply(FileIO.readMatches().withCompression(Compression.GZIP)) .apply( ParDo.of( new DoFn<FileIO.ReadableFile, String>() { @ProcessElement public void process(@Element FileIO.ReadableFile file) { - // We now have access to the file and its metadata + // We can now access the file and its metadata. LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId()); } })); @@ -555,12 +556,11 @@ public class Snippets { // [START SideInputPatternSlowUpdateGlobalWindowSnip1] public static void sideInputPatterns() { - // Using View.asSingleton, this pipeline uses a dummy external service as illustration. - // Run in debug mode to see the output + // This pipeline uses View.asSingleton for a placeholder external service. + // Run in debug mode to see the output. Pipeline p = Pipeline.create(); - // Create slowly updating sideinput - + // Create a side input that updates each second. PCollectionView<Map<String, String>> map = p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))) .apply( @@ -574,20 +574,15 @@ public class Snippets { @ProcessElement public void process( @Element Long input, OutputReceiver<Map<String, String>> o) { - // Do any external reads needed here... - // We will make use of our dummy external service. - // Every time this triggers, the complete map will be replaced with that - // read from - // the service. - o.output(DummyExternalService.readDummyData()); + // Replace map with test data from the placeholder external service. + // Add external reads here. + o.output(PlaceholderExternalService.readTestData()); } })) .apply(View.asSingleton()); - // ---- Consume slowly updating sideinput - - // GenerateSequence is only used here to generate dummy data for this illustration. - // You would use your real source for example PubSubIO, KafkaIO etc... + // Consume side input. GenerateSequence generates test data. + // Use a real source (like PubSubIO or KafkaIO) in production. p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L))) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) .apply(Sum.longsGlobally().withoutDefaults()) @@ -601,7 +596,7 @@ public class Snippets { c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now()); LOG.debug( - "Value is {} key A is {} and key B is {}", + "Value is {}, key A is {}, and key B is {}.", c.element(), keyMap.get("Key_A"), keyMap.get("Key_B")); @@ -610,10 +605,10 @@ public class Snippets { .withSideInputs(map)); } - /** Dummy class representing a pretend external service. */ - public static class DummyExternalService { + /** Placeholder class that represents an external service generating test data. */ + public static class PlaceholderExternalService { - public static Map<String, String> readDummyData() { + public static Map<String, String> readTestData() { Map<String, String> map = new HashMap<>(); Instant now = Instant.now(); diff --git a/website/src/documentation/patterns/overview.md b/website/src/documentation/patterns/overview.md index c2d8324..d676b2e 100644 --- a/website/src/documentation/patterns/overview.md +++ b/website/src/documentation/patterns/overview.md @@ -27,7 +27,7 @@ Pipeline patterns demonstrate common Beam use cases. Pipeline patterns are based * [Accessing filenames]({{ site.baseurl }}/documentation/patterns/file-processing-patterns/#accessing-filenames) **Side input patterns** - Patterns for processing supplementary data -* [Using global window side inputs in non-global windows]({{ site.baseurl }}/documentation/patterns/side-input-patterns/#using-global-window-side-inputs-in-non-global-windows) +* [Slowly updating global window side inputs]({{ site.baseurl }}/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs) **Pipeline option patterns** - Patterns for configuring pipelines * [Retroactively logging runtime parameters]({{ site.baseurl }}/documentation/patterns/pipeline-option-patterns/#retroactively-logging-runtime-parameters) @@ -42,4 +42,4 @@ To contribute a new pipeline pattern, create an issue with the [`pipeline-patter ## What's next * Try an [end-to-end example]({{ site.baseurl }}/get-started/try-apache-beam/) -* Execute your pipeline on a [runner]({{ site.baseurl }}/documentation/runners/capability-matrix/) \ No newline at end of file +* Execute your pipeline on a [runner]({{ site.baseurl }}/documentation/runners/capability-matrix/) diff --git a/website/src/documentation/patterns/side-input-patterns.md b/website/src/documentation/patterns/side-input-patterns.md index c5458ca..dc58fd1 100644 --- a/website/src/documentation/patterns/side-input-patterns.md +++ b/website/src/documentation/patterns/side-input-patterns.md @@ -22,11 +22,11 @@ limitations under the License. The samples on this page show you common Beam side input patterns. A side input is an additional input that your `DoFn` can access each time it processes an element in the input `PCollection`. For more information, see the [programming guide section on side inputs]({{ site.baseurl }}/documentation/programming-guide/#side-inputs). -## Using global window side inputs in non-global windows +## Slowly updating global window side inputs You can retrieve side inputs from global windows to use them in a pipeline job with non-global windows, like a `FixedWindow`. -To use global window side inputs in pipelines with non-global windows: +To slowly update global window side inputs in pipelines with non-global windows: 1. Write a `DoFn` that periodically pulls data from a bounded source into a global window.