Repository: incubator-beam Updated Branches: refs/heads/master ee1a3bcfb -> 1a7cd4112
Allow for custom timestamp/watermark function in FlinkPipelineRunner Added new "of" signature and constructor for UnboundedFlinkSource to allow event timestamping Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9000d95d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9000d95d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9000d95d Branch: refs/heads/master Commit: 9000d95d2b34ab45f799aedb140710986ff19452 Parents: ee1a3bc Author: David Desberg <david.desb...@uber.com> Authored: Mon Jul 11 12:24:18 2016 -0700 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Jul 13 14:15:54 2016 +0200 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 10 ++++++--- .../streaming/io/UnboundedFlinkSource.java | 23 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 5d04068..fa6b387 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -70,7 +70,9 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.util.Collector; import org.joda.time.Instant; import org.slf4j.Logger; @@ -252,6 +254,8 @@ public class FlinkStreamingTransformTranslators { if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { @SuppressWarnings("unchecked") UnboundedFlinkSource<T> flinkSourceFunction = (UnboundedFlinkSource<T>) transform.getSource(); + final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner(); + DataStream<T> flinkSource = context.getExecutionEnvironment() .addSource(flinkSourceFunction.getFlinkSource()); @@ -260,17 +264,17 @@ public class FlinkStreamingTransformTranslators { context.getExecutionEnvironment().getConfig())); source = flinkSource + .assignTimestampsAndWatermarks(flinkAssigner) .flatMap(new FlatMapFunction<T, WindowedValue<T>>() { @Override public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception { collector.collect( WindowedValue.of( s, - Instant.now(), + new Instant(flinkAssigner.extractTimestamp(s, -1)), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } - }).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>()); + }}); } else { try { transform.getSource(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 94b73ce..716ca30 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.List; @@ -40,14 +42,26 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource. /** Coder set during translation */ private Coder<T> coder; + /** Timestamp / watermark assigner for source; defaults to ingestion time */ + private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner = new IngestionTimeExtractor<T>(); + public UnboundedFlinkSource(SourceFunction<T> source) { flinkSource = checkNotNull(source); } + public UnboundedFlinkSource(SourceFunction<T> source, AssignerWithPeriodicWatermarks<T> timestampAssigner) { + flinkSource = checkNotNull(source); + flinkTimestampAssigner = checkNotNull(timestampAssigner); + } + public SourceFunction<T> getFlinkSource() { return this.flinkSource; } + public AssignerWithPeriodicWatermarks<T> getFlinkTimestampAssigner() { + return flinkTimestampAssigner; + } + @Override public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner."); @@ -79,6 +93,10 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource. this.coder = coder; } + public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) { + this.flinkTimestampAssigner = flinkTimestampAssigner; + } + /** * Creates a new unbounded source from a Flink source. * @param flinkSource The Flink source function @@ -88,4 +106,9 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource. public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) { return new UnboundedFlinkSource<>(flinkSource); } + + public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of( + SourceFunction<T> flinkSource, AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) { + return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner); + } }