This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 1aee9a8 Merge #8686: [BEAM-7399] Blog post on looping timers 1aee9a8 is described below commit 1aee9a87616b21a0af0911708c9e0a4ad35f82be Author: Reza Rokni <7542791+rezaro...@users.noreply.github.com> AuthorDate: Wed Jun 12 10:41:06 2019 +0800 Merge #8686: [BEAM-7399] Blog post on looping timers --- website/src/_data/authors.yml | 5 +- website/src/_posts/2019-06-11-looping-timers.md | 342 ++++++++++++++++++++++++ 2 files changed, 346 insertions(+), 1 deletion(-) diff --git a/website/src/_data/authors.yml b/website/src/_data/authors.yml index e6ce735..eb37ed5 100644 --- a/website/src/_data/authors.yml +++ b/website/src/_data/authors.yml @@ -115,4 +115,7 @@ rfernand: mbaetens: name: Matthias Baetens email: baetensmatth...@gmail.com - twitter: matthiasbaetens \ No newline at end of file + twitter: matthiasbaetens +rez: + name: Reza Rokni + email: r...@google.com diff --git a/website/src/_posts/2019-06-11-looping-timers.md b/website/src/_posts/2019-06-11-looping-timers.md new file mode 100644 index 0000000..ff1cb5d --- /dev/null +++ b/website/src/_posts/2019-06-11-looping-timers.md @@ -0,0 +1,342 @@ +--- +layout: post +title: "Looping timers in Apache Beam" +date: 2019-06-11 00:00:01 -0800 +excerpt_separator: <!--more--> +categories: blog +authors: + - rez + - klk +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +Apache Beam’s primitives let you build expressive data pipelines, suitable for a +variety of use cases. One specific use case is the analysis of time series data +in which continuous sequences across window boundaries are important. A few fun +challenges arise as you tackle this type of data and in this blog we will +explore one of those in more detail and make use of the Timer API +([blog post]({{ site.baseurl }}/blog/2017/08/28/timely-processing.html)) +using the "looping timer" pattern. + +<!--more--> + +With Beam in streaming mode, you can take streams of data and build analytical +transforms to produce results on the data. But for time series data, the absence +of data is useful information. So how can we produce results in the absence of +data? + +Let's use a more concrete example to illustrate the requirement. Imagine you +have a simple pipeline that sums the number of events coming from an IoT device +every minute. We would like to produce the value 0 when no data has been seen +within a specific time interval. So why can this get tricky? Well it is easy to +build a simple pipeline that counts events as they arrive, but when there is no +event, there is nothing to count! + +Let's build a simple pipeline to work with: + +``` + // We will start our timer at 1 sec from the fixed upper boundary of our + // minute window + Instant now = Instant.parse("2000-01-01T00:00:59Z"); + + // ----- Create some dummy data + + // Create 3 elements, incrementing by 1 minute and leaving a time gap between + // element 2 and element 3 + TimestampedValue<KV<String, Integer>> time_1 = + TimestampedValue.of(KV.of("Key_A", 1), now); + + TimestampedValue<KV<String, Integer>> time_2 = + TimestampedValue.of(KV.of("Key_A", 2), + now.plus(Duration.standardMinutes(1))); + + // No Value for start time + 2 mins + TimestampedValue<KV<String, Integer>> time_3 = + TimestampedValue.of(KV.of("Key_A", 3), + now.plus(Duration.standardMinutes(3))); + + // Create pipeline + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(PipelineOptions.class); + + Pipeline p = Pipeline.create(options); + + // Apply a fixed window of duration 1 min and Sum the results + p.apply(Create.timestamped(time_1, time_2, time_3)) + .apply( + Window.<KV<String,Integer>>into( +FixedWindows.<Integer>of(Duration.standardMinutes(1)))) + .apply(Sum.integersPerKey()) + .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { + + @ProcessElement public void process(ProcessContext c) { + LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp()); + } + })); + + p.run(); +``` + +Running that pipeline will result in the following output: + +``` +INFO LoopingTimer - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z +INFO LoopingTimer - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z +INFO LoopingTimer - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z +``` + +> Note: The lack of order in the output should be expected, however the +> key-window tuple is correctly computed. + + +As expected, we see output in each of the interval windows which had a data +point with a timestamp between the minimum and maximum value of the window. +There was a data point at timestamps 00:00:59, 00:01:59 and 00:03:59, which +fell into the following interval windows. + +* [00:00:00, 00:00:59.999) +* [00:01:00, 00:01:59.999) +* [00:03:00, 00:03:59.999) + +But as there was no data between 00:02:00 and 00:02:59, no value is produced +for interval window [00:02:00,00:02:59.999). + +How can we get Beam to output values for that missing window? First, let’s walk +through some options that do not make use of the Timer API. + + +## Option 1: External heartbeat + +We can use an external system to emit a value for each time interval and inject +it into the stream of data that Beam consumes. This simple option moves any +complexity out of the Beam pipeline. But using an external system means we need +to monitor this system and perform other maintenance tasks in tandem with the +Beam pipeline. + + +## Option 2: Use a generated source in the Beam pipeline + +We can use a generating source to emit the value using this code snippet: + +``` +pipeline.apply(GenerateSequence. + from(0).withRate(1,Duration.standardSeconds(1L))) +``` + +We can then: + +1. Use a DoFn to convert the value to zero. +2. Flatten this value with the real source. +3. Produce a PCollection which has ticks in every time interval. + +This is also a simple way of producing a value in each time interval. + + +## Option 1 & 2 The problem with multiple keys + +Both options 1 and 2 work well for the case where there the pipeline processes a +single key. Let’s now deal with the case where instead of 1 IoT device, there +are 1000s or 100,000s of these devices, each with a unique key. To make option 1 +or option 2 work in this scenario, we need to carry out an extra step: creating +a FanOut DoFn. Each tick needs to be distributed to all the potential keys, so +we need to create a FanOut DoFn that takes the dummy value and generates a +key-value pair for every available key. + +For example, let's assume we have 3 keys for 3 IoT devices, {key1,key2,key3}. +Using the method we outlined in Option 2 when we get the first element from +GenerateSequence, we need to create a loop in the DoFn that generates 3 +key-value pairs. These pairs become the heartbeat value for each of the IoT +devices. + +And things get a lot more fun when we need to deal with lots of IoT devices, +with a list of keys that are dynamically changing. We would need to add a +transform that does a Distinct operation and feed the data produced as a +side-input into the FanOut DoFn. + + +## Option 3: Implementing a heartbeat using Beam timers + +So how do timers help? Well let's have a look at a new transform: + +```java +public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> { + + Instant stopTimerTime; + + LoopingStatefulTimer(Instant stopTime){ + this.stopTimerTime = stopTime; + } + + @StateId("timerRunning") + private final StateSpec<ValueState<Boolean>> timerRunning = + StateSpecs.value(BooleanCoder.of()); + + @StateId("key") + private final StateSpec<ValueState<String>> key = + StateSpecs.value(StringUtf8Coder.of()); + + @TimerId("loopingTimer") + private final TimerSpec loopingTimer = + TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement public void process(ProcessContext c, @StateId("key") ValueState<String> key, + @StateId("timerRunning") ValueState<Boolean> timerRunning, + @TimerId("loopingTimer") Timer loopingTimer) { + + // If the timer has been set already, do not reset + if (timerRunning.read() == null) { + loopingTimer.set(c.timestamp().plus(Duration.standardMinutes(1))); + timerRunning.write(true); + } + + // We need this value so that we can output a value for the correct key in OnTimer + if (key.read() == null) { + key.write(c.element().getKey()); + } + + c.output(c.element()); + } + + @OnTimer("loopingTimer") + public void onTimer( + OnTimerContext c, + @StateId("key") ValueState<String> key, + @TimerId("loopingTimer") Timer loopingTimer) { + + LOG.info("Timer @ {} fired", c.timestamp()); + c.output(KV.of(key.read(), 0)); + + // If we do not put in a “time to live” value, then the timer would loop forever + Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1)); + if (nextTimer.isBefore(stopTimerTime)) { + loopingTimer.set(nextTimer); + } else { + LOG.info( + "Timer not being set as exceeded Stop Timer value {} ", + stopTimerTime); + } + } +} +``` + +There are two data values that the state API needs to keep: + +1. A boolean `timeRunning` value used to avoid resetting the timer if it’s + already running. +2. A "*key*" state object value that allows us to store the key that we are + working with. This information will be needed in the `OnTimer` event later. + +We also have a Timer with the ID `**loopingTimer**` that acts as our per +interval alarm clock. Note that the timer is an *event timer*. It fires based on +the watermark, not on the passage of time as the pipeline runs. + +Next, let's unpack what's happening in the @ProcessElement block: + +The first element to come to this block will: + +1. Set the state of the `timerRunner` to True. +2. Write the value of the key from the key-value pair into the key StateValue. +3. The code sets the value of the timer to fire one minute after the elements + timestamp. Note that the maximum value allowed for this timestamp is + XX:XX:59.999. This places the maximum alarm value at the upper boundary of + the next time interval. +4. Finally, we output the data from the `@ProcessElement` block using + `c.output`. + +In the @OnTimer block, the following occurs: + +1. The code emits a value with the key pulled from our key StateValue and a + value of 0. The timestamp of the event corresponds to the event time of the + timer firing. +2. We set a new timer for one minute from now, unless we are past the + `stopTimerTime` value. Your use case will normally have more complex stopping + conditions, but we use a simple condition here to allow us to keep the + illustrated code simple. The topic of stopping conditions is discussed in + more detail later. + +And that's it, let's add our transform back into the pipeline: + +```java + // Apply a fixed window of duration 1 min and Sum the results + p.apply(Create.timestamped(time_1, time_2, time_3)).apply( + Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(1)))) + // We use a combiner to reduce the number of calls in keyed state + // from all elements to 1 per FixedWindow + .apply(Sum.integersPerKey()) + .apply(Window.into(new GlobalWindows())) + .apply(ParDo.of(new LoopingStatefulTimer(Instant.parse("2000-01-01T00:04:00Z")))) + .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))) + .apply(Sum.integersPerKey()) + .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { + + @ProcessElement public void process(ProcessContext c) { + + LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp()); + + } + })); +``` + +1. In the first part of the pipeline we create FixedWindows and reduce the value + per key down to a single Sum. +2. Next we re-window the output into a GlobalWindow. Since state and timers are + per window, they must be set within the window boundary. We want the looping + timer to span all the fixed windows, so we set it up in the global window. +3. We then add our LoopingStatefulTimer DoFn. +4. Finally, we reapply the FixedWindows and Sum our values. + +This pipeline ensures that a value of zero exists for each interval window, even +if the Source of the pipeline emitted a value in the minimum and maximum +boundaries of the interval window. This means that we can mark the absence of +data. + +You might question why we use two reducers with multiple `Sum.integersPerKey`. +Why not just use one? Functionally, using one would also produce the correct +result. However, putting two `Sum.integersPerKey` gives us a nice performance +advantage. It reduces the number of elements from many to just one per time +interval. This can reduce the number of reads of the State API during the +`@ProcessElement` calls. + +Here is the logging output of running our modified pipeline: + +``` +INFO LoopingTimer - Timer @ 2000-01-01T00:01:59.999Z fired +INFO LoopingTimer - Timer @ 2000-01-01T00:02:59.999Z fired +INFO LoopingTimer - Timer @ 2000-01-01T00:03:59.999Z fired +INFO LoopingTimer - Timer not being set as exceeded Stop Timer value 2000-01-01T00:04:00.000Z +INFO LoopingTimer - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z +INFO LoopingTimer - Value is KV{Key_A, 0} timestamp is 2000-01-01T00:02:59.999Z +INFO LoopingTimer - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z +INFO LoopingTimer - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z +``` + +Yay! We now have output from the time interval [00:01:00, 00:01:59.999), even +though the source dataset has no elements in that interval. + +In this blog, we covered one of the fun areas around time series use cases and +worked through several options, including an advanced use case of the Timer API. +Happy looping everyone! + +**Note:** Looping timers is an interesting new use case for the Timer API and +runners will need to add support for it with all of their more advanced +feature sets. You can experiment with this pattern today using the +DirectRunner. For other runners, please look out for their release notes on +support for dealing with this use case in production. + +([Capability Matrix]({{ site.baseurl }}/documentation/runners/capability-matrix/)) + + +Runner specific notes: +Google Cloud Dataflow Runners Drain feature does not support looping timers (Link to matrix)