This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 759ea22 Add Snippets for 3 new Beam Patterns 2 X FilePatterns 1 X SideInput Pattern new 311edfd Merge pull request #8538 from rezarokni/Beam-Patterns 759ea22 is described below commit 759ea2278a4febb156bba63b16e7c8e091e27cb5 Author: rarokni <rezaro...@google.com> AuthorDate: Thu May 9 08:27:11 2019 +0800 Add Snippets for 3 new Beam Patterns 2 X FilePatterns 1 X SideInput Pattern --- .../apache/beam/examples/snippets/Snippets.java | 142 +++++++++++++++++++++ .../apache_beam/examples/snippets/snippets.py | 17 +++ 2 files changed, 159 insertions(+) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java index 04f36fd..8216bba 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java @@ -24,12 +24,18 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; @@ -40,16 +46,31 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Code snippets used in webdocs. */ public class Snippets { @@ -463,4 +484,125 @@ public class Snippets { // [END CoGroupByKeyTuple] return contactLines; } + + public static void fileProcessPattern() throws Exception { + Pipeline p = Pipeline.create(); + + // [START FileProcessPatternProcessNewFilesSnip1] + // This produces PCollection<MatchResult.Metadata> + p.apply( + FileIO.match() + .filepattern("...") + .continuously( + Duration.standardSeconds(30), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1)))); + // [END FileProcessPatternProcessNewFilesSnip1] + + // [START FileProcessPatternProcessNewFilesSnip2] + // This produces PCollection<String> + p.apply( + TextIO.read() + .from("<path-to-files>/*") + .watchForNewFiles( + // Check for new files every minute + Duration.standardMinutes(1), + // Stop watching the filepattern if no new files appear within an hour + Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1)))); + // [END FileProcessPatternProcessNewFilesSnip2] + + // [START FileProcessPatternAccessMetadataSnip1] + p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz")) + // withCompression can be omitted - by default compression is detected from the filename. + .apply(FileIO.readMatches().withCompression(Compression.GZIP)) + .apply( + ParDo.of( + new DoFn<FileIO.ReadableFile, String>() { + @ProcessElement + public void process(@Element FileIO.ReadableFile file) { + // We now have access to the file and its metadata + LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId()); + } + })); + // [END FileProcessPatternAccessMetadataSnip1] + + } + + private static final Logger LOG = LoggerFactory.getLogger(Snippets.class); + + // [START SideInputPatternSlowUpdateGlobalWindowSnip1] + public static void sideInputPatterns() { + // Using View.asSingleton, this pipeline uses a dummy external service as illustration. + // Run in debug mode to see the output + Pipeline p = Pipeline.create(); + + // Create slowly updating sideinput + + PCollectionView<Map<String, String>> map = + p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))) + .apply( + Window.<Long>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) + .discardingFiredPanes()) + .apply( + ParDo.of( + new DoFn<Long, Map<String, String>>() { + + @ProcessElement + public void process( + @Element Long input, OutputReceiver<Map<String, String>> o) { + // Do any external reads needed here... + // We will make use of our dummy external service. + // Every time this triggers, the complete map will be replaced with that + // read from + // the service. + o.output(DummyExternalService.readDummyData()); + } + })) + .apply(View.asSingleton()); + + // ---- Consume slowly updating sideinput + + // GenerateSequence is only used here to generate dummy data for this illustration. + // You would use your real source for example PubSubIO, KafkaIO etc... + p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L))) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) + .apply(Sum.longsGlobally().withoutDefaults()) + .apply( + ParDo.of( + new DoFn<Long, KV<Long, Long>>() { + + @ProcessElement + public void process(ProcessContext c) { + Map<String, String> keyMap = c.sideInput(map); + c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now()); + + LOG.debug( + "Value is {} key A is {} and key B is {}", + c.element(), + keyMap.get("Key_A"), + keyMap.get("Key_B")); + } + }) + .withSideInputs(map)); + } + + /** Dummy class representing a pretend external service. */ + public static class DummyExternalService { + + public static Map<String, String> readDummyData() { + + Map<String, String> map = new HashMap<>(); + Instant now = Instant.now(); + + DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS"); + + map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf)); + map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString()); + + return map; + } + } + + // [END SideInputPatternSlowUpdateGlobalWindowSnip1] + } diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 7a04c4c..5e7f9bf 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1335,3 +1335,20 @@ class Count(beam.PTransform): | 'PairWithOne' >> beam.Map(lambda v: (v, 1)) | beam.CombinePerKey(sum)) # [END model_library_transforms_count] + + +def file_process_pattern_access_metadata(): + + import apache_beam as beam + from apache_beam.io import fileio + + # [START FileProcessPatternAccessMetadataSnip1] + with beam.Pipeline() as p: + readable_files = (p + | fileio.MatchFiles('hdfs://path/to/*.txt') + | fileio.ReadMatches() + | beam.Reshuffle()) + files_and_contents = (readable_files + | beam.Map(lambda x: (x.metadata.path, + x.read_utf8()))) + # [END FileProcessPatternAccessMetadataSnip1]