> The TTL check would be in the same Timer rather than a separate Timer.  The max value processed in each OnTimer call would be stored in valuestate and used as base to know how long it has been seen the pipeline has seen an external value for that key.

OK, that seems to work, if you store maximum timestamp in a value state (that is, basically you generate per-key watermark).

> You can store it in ValueState rather than BagState, but yes you store that value in State ready for the next OnTimer() fire.

In my understanding of the problem, I'd say that this approach seems a little suboptimal. Consider the following, when trying to generate the OHLC data (open, high, low, close, that is move last closing price to next window opening price)

 - suppose we have three times T1 < T2 < T3 < T4, where times T2 and T4 denote "end of windows" (closing times)

 - first (in processing time), we receive value for time T3, we cache it in ValueState, we set timer for T4

 - next, we receive value for T1 - but we cannot overwrite the value already written for T3, right? What to do then? And will we reset timer to time T3?

 - basically, because we received *two* values, both of which are needed and no timer could have been fired in between, we need both values stored to know which value to emit when timer fires. And consider that on batch, the timer fires only after we see all data (without any ordering).

Or? Am I missing something?

Jan

On 6/25/19 6:00 AM, Reza Rokni wrote:


On Fri, 21 Jun 2019 at 18:02, Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Reza,

    great prezentation on the Beam Summit. I have had a few posts here
    in the list during last few weeks, some of which might actually be
    related to both looping timers and validity windows. But maybe you
    will be able to see a different approach, than I do, so questions:

     a) because of [1] timers might not be exactly ordered (the JIRA
    talks about DirectRunner, but I suppose the issue is present on
    all runners that use immutable bundles of size > 1, so might be
    related to Dataflow as well). This might cause issues when you try
    to introduce TTL for looping timers, because the TTL timer might
    get fired before regular looping timer, which might cause
    incorrect results (state cleared before have been flushed).

The TTL check would be in the same Timer rather than a separate Timer.  The max value processed in each OnTimer call would be stored in valuestate and used as base to know how long it has been seen the pipeline has seen an external value for that key.

     b) because stateful pardo doesn't sort by timestamp, that
    implies, that you have to store last values in BagState (as
    opposed to the blog, where you just emit identity value of sum
    operation), right?

You can store it in ValueState rather than BagState, but yes you store that value in State ready for the next OnTimer() fire.

     c) because of how stateful pardo currently works on batch, does
    that imply that all values (per key) would have to be stored in
    memory? would that scale?

This is one of the sharp edges and the answer is ... it depends :-) I would recommend always using a FixedWindow+Combiner before this step, this will compress the values into something much smaller. For example in case of building 'candles' this will compress down to low/hi/first/last values per FixedWindow length. If the window length is very small there maybe no compression, but in most cases I have seen this is a ok compromise.

    There is a discussion about problem a) in [2], but maybe there is
    some different approach possible. For problem b) and c) there is a
    proposal [3]. When the input is sorted, it starts to work both in
    batch and with ValueState, because the last value is the *valid*
    value.

There was also a discussion on dev@ around a sorted Map state, which would be very cool for this usecase.

    This has even connection with the mentioned validity windows, as
    if you sort by timestamp, the _last_ value is the _valid_ value,
    so is essentially boils down to keep single value per key (and
    again, starts to work in both batch and stream).

one for Tyler :-)

    I even have a suspicion, that sorting by timestamp has close
    relation to retractions, because when you are using sorted
    streams, retractions actually became only diff between last
    emitted pane, and current pane. That might even help implement
    that in general, but I might be missing something. This just
    popped in my head today, as I was thinking why there was actually
    no (or little) need for retractions in Euphoria model (very
    similar to Beam, actually differs by the sorting thing :)), and
    why it the need pops out so often in Beam.

Retractions will be possible with this, but it does mean that we would need to keep old versions around, something built in would be very cool rather than building it with this pattern.

    I'd be very happy to hear what you think about all of this.

    Cheers,

    Jan

    [1] https://issues.apache.org/jira/browse/BEAM-7520

    [2]
    
https://lists.apache.org/thread.html/1a3a0dd9da682e159f78f131d335782fd92b047895001455ff659613@%3Cdev.beam.apache.org%3E

    [3]
    
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing

    On 6/21/19 8:12 AM, Reza Rokni wrote:
    Great question, one thing that we did not cover in the blog and I
    think we should have is the use case where you would want to
    bootstrap the pipeline.

    One option would be on startup to have an extra bounded source
    that is read and flattened into the main pipeline, the source
    will need to contain values in  Timestamped<V> format which would
    correspond to the first window that you would like to kickstart
    the process from.  Will see if I can try and find some time to
    code up an example and add that and the looping timer code into
    the Beam patterns.

    https://beam.apache.org/documentation/patterns/overview/

    Cheers
    Reza





    On Fri, 21 Jun 2019 at 07:59, Manu Zhang <owenzhang1...@gmail.com
    <mailto:owenzhang1...@gmail.com>> wrote:

        Indeed interesting pattern.

        One minor question. It seems the timer is triggered by the
        first element so what if there is no data in the "first
        interval" ?

        Thanks for the write-up.
        Manu

        On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni <r...@google.com
        <mailto:r...@google.com>> wrote:

            Hi folks,

            Just wanted to drop a note here on a new pattern that
            folks may find interesting, called  Looping Timers. It
            allows for default values to be created in interval
            windows in the absence of any external data coming into
            the pipeline. The details are in this blog below:

            https://beam.apache.org/blog/2019/06/11/looping-timers.html

            Its main utility is when dealing with time series data.
            There are still rough edges, like dealing with TTL and it
            would be great to hear feedback on ways it can be improved.

            The next pattern to publish in this domain will assist
            will hold and propagation of values from one interval
            window to the next, which coupled to looping timers
            starts to solve some interesting problems.

            Cheers

            Reza



--
            This email may be confidential and privileged. If you
            received this communication by mistake, please don't
            forward it to anyone else, please erase all copies and
            attachments, and please let me know that it has gone to
            the wrong person.

            The above terms reflect a potential business arrangement,
            are provided solely as a basis for further discussion,
            and are not intended to be and do not constitute a
            legally binding obligation. No legally binding
            obligations will be created, implied, or inferred until
            an agreement in final form is executed in writing by all
            parties involved.



--
    This email may be confidential and privileged. If you received
    this communication by mistake, please don't forward it to anyone
    else, please erase all copies and attachments, and please let me
    know that it has gone to the wrong person.

    The above terms reflect a potential business arrangement, are
    provided solely as a basis for further discussion, and are not
    intended to be and do not constitute a legally binding
    obligation. No legally binding obligations will be created,
    implied, or inferred until an agreement in final form is executed
    in writing by all parties involved.



--

This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.

Reply via email to