On Fri, 21 Jun 2019 at 18:02, Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Reza,
great prezentation on the Beam Summit. I have had a few
posts here in the list during last few weeks, some of
which might actually be related to both looping timers
and validity windows. But maybe you will be able to see
a different approach, than I do, so questions:
a) because of [1] timers might not be exactly ordered
(the JIRA talks about DirectRunner, but I suppose the
issue is present on all runners that use immutable
bundles of size > 1, so might be related to Dataflow as
well). This might cause issues when you try to introduce
TTL for looping timers, because the TTL timer might get
fired before regular looping timer, which might cause
incorrect results (state cleared before have been flushed).
The TTL check would be in the same Timer rather than a
separate Timer. The max value processed in each OnTimer
call would be stored in valuestate and used as base to know
how long it has been seen the pipeline has seen an external
value for that key.
b) because stateful pardo doesn't sort by timestamp,
that implies, that you have to store last values in
BagState (as opposed to the blog, where you just emit
identity value of sum operation), right?
You can store it in ValueState rather than BagState, but yes
you store that value in State ready for the next OnTimer()
fire.
c) because of how stateful pardo currently works on
batch, does that imply that all values (per key) would
have to be stored in memory? would that scale?
This is one of the sharp edges and the answer is ... it
depends :-) I would recommend always using a
FixedWindow+Combiner before this step, this will compress
the values into something much smaller. For example in case
of building 'candles' this will compress down to
low/hi/first/last values per FixedWindow length. If the
window length is very small there maybe no compression, but
in most cases I have seen this is a ok compromise.
There is a discussion about problem a) in [2], but maybe
there is some different approach possible. For problem
b) and c) there is a proposal [3]. When the input is
sorted, it starts to work both in batch and with
ValueState, because the last value is the *valid* value.
There was also a discussion on dev@ around a sorted Map
state, which would be very cool for this usecase.
This has even connection with the mentioned validity
windows, as if you sort by timestamp, the _last_ value
is the _valid_ value, so is essentially boils down to
keep single value per key (and again, starts to work in
both batch and stream).
one for Tyler :-)
I even have a suspicion, that sorting by timestamp has
close relation to retractions, because when you are
using sorted streams, retractions actually became only
diff between last emitted pane, and current pane. That
might even help implement that in general, but I might
be missing something. This just popped in my head today,
as I was thinking why there was actually no (or little)
need for retractions in Euphoria model (very similar to
Beam, actually differs by the sorting thing :)), and why
it the need pops out so often in Beam.
Retractions will be possible with this, but it does mean
that we would need to keep old versions around, something
built in would be very cool rather than building it with
this pattern.
I'd be very happy to hear what you think about all of this.
Cheers,
Jan
[1] https://issues.apache.org/jira/browse/BEAM-7520
[2]
https://lists.apache.org/thread.html/1a3a0dd9da682e159f78f131d335782fd92b047895001455ff659613@%3Cdev.beam.apache.org%3E
[3]
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
On 6/21/19 8:12 AM, Reza Rokni wrote:
Great question, one thing that we did not cover in the
blog and I think we should have is the use case where
you would want to bootstrap the pipeline.
One option would be on startup to have an extra bounded
source that is read and flattened into the main
pipeline, the source will need to contain values in
Timestamped<V> format which would correspond to the
first window that you would like to kickstart the
process from. Will see if I can try and find some time
to code up an example and add that and the looping
timer code into the Beam patterns.
https://beam.apache.org/documentation/patterns/overview/
Cheers
Reza
On Fri, 21 Jun 2019 at 07:59, Manu Zhang
<[email protected]
<mailto:[email protected]>> wrote:
Indeed interesting pattern.
One minor question. It seems the timer is triggered
by the first element so what if there is no data in
the "first interval" ?
Thanks for the write-up.
Manu
On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni
<[email protected] <mailto:[email protected]>> wrote:
Hi folks,
Just wanted to drop a note here on a new
pattern that folks may find interesting,
called Looping Timers. It allows for default
values to be created in interval windows in the
absence of any external data coming into the
pipeline. The details are in this blog below:
https://beam.apache.org/blog/2019/06/11/looping-timers.html
Its main utility is when dealing with time
series data. There are still rough edges, like
dealing with TTL and it would be great to hear
feedback on ways it can be improved.
The next pattern to publish in this domain will
assist will hold and propagation of values from
one interval window to the next, which coupled
to looping timers starts to solve some
interesting problems.
Cheers
Reza
--
This email may be confidential and privileged.
If you received this communication by mistake,
please don't forward it to anyone else, please
erase all copies and attachments, and please
let me know that it has gone to the wrong person.
The above terms reflect a potential business
arrangement, are provided solely as a basis for
further discussion, and are not intended to be
and do not constitute a legally binding
obligation. No legally binding obligations will
be created, implied, or inferred until an
agreement in final form is executed in writing
by all parties involved.
--
This email may be confidential and privileged. If you
received this communication by mistake, please don't
forward it to anyone else, please erase all copies and
attachments, and please let me know that it has gone to
the wrong person.
The above terms reflect a potential business
arrangement, are provided solely as a basis for further
discussion, and are not intended to be and do not
constitute a legally binding obligation. No legally
binding obligations will be created, implied, or
inferred until an agreement in final form is executed
in writing by all parties involved.
--
This email may be confidential and privileged. If you
received this communication by mistake, please don't forward
it to anyone else, please erase all copies and attachments,
and please let me know that it has gone to the wrong person.
The above terms reflect a potential business arrangement,
are provided solely as a basis for further discussion, and
are not intended to be and do not constitute a legally
binding obligation. No legally binding obligations will be
created, implied, or inferred until an agreement in final
form is executed in writing by all parties involved.