On 6/25/19 1:43 PM, Reza Rokni wrote:


On Tue, 25 Jun 2019 at 18:12, Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    > The TTL check would be in the same Timer rather than a separate
    Timer.  The max value processed in each OnTimer call would be
    stored in valuestate and used as base to know how long it has been
    seen the pipeline has seen an external value for that key.

    OK, that seems to work, if you store maximum timestamp in a value
    state (that is, basically you generate per-key watermark).

    > You can store it in ValueState rather than BagState, but yes you
    store that value in State ready for the next OnTimer() fire.

    In my understanding of the problem, I'd say that this approach
    seems a little suboptimal. Consider the following, when trying to
    generate the OHLC data (open, high, low, close, that is move last
    closing price to next window opening price)

     - suppose we have three times T1 < T2 < T3 < T4, where times T2
    and T4 denote "end of windows" (closing times)

     - first (in processing time), we receive value for time T3, we
    cache it in ValueState, we set timer for T4

     - next, we receive value for T1 - but we cannot overwrite the
    value already written for T3, right? What to do then? And will we
    reset timer to time T3?

     - basically, because we received *two* values, both of which are
    needed and no timer could have been fired in between, we need both
    values stored to know which value to emit when timer fires. And
    consider that on batch, the timer fires only after we see all data
    (without any ordering).

I assume you are referring to late data rather than out of order data ( the later being taken care of with the in-memory sort). As discussed in the talk late data is a sharp edge, one solution for late data is to branch it away before GlobalWindow + State DoFn. This can then be output from the pipeline into a sink with a marker to initiate a manual procedure for correction. Essentially a manual redaction.

Which in-memory sort do you refer to? I'm pretty sure there must be sorting involved for this to work, but I'm not quite sure where exactly it is in your implementation. You said that you can put data in ValueState rather than BagState, so do you have a List as a value in the ValueState? Or do you wrap the stateful par do with some other sorting logic? And if so, how does this work on batch? I suppose that it has to all fit to memory. I think this all goes around the @RequiresTimeSortedInput annotation, that I propose. Maybe we can cooperate on that? :)


    Or? Am I missing something?

    Jan

    On 6/25/19 6:00 AM, Reza Rokni wrote:


    On Fri, 21 Jun 2019 at 18:02, Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Reza,

        great prezentation on the Beam Summit. I have had a few posts
        here in the list during last few weeks, some of which might
        actually be related to both looping timers and validity
        windows. But maybe you will be able to see a different
        approach, than I do, so questions:

         a) because of [1] timers might not be exactly ordered (the
        JIRA talks about DirectRunner, but I suppose the issue is
        present on all runners that use immutable bundles of size >
        1, so might be related to Dataflow as well). This might cause
        issues when you try to introduce TTL for looping timers,
        because the TTL timer might get fired before regular looping
        timer, which might cause incorrect results (state cleared
        before have been flushed).

    The TTL check would be in the same Timer rather than a separate
    Timer.  The max value processed in each OnTimer call would be
    stored in valuestate and used as base to know how long it has
    been seen the pipeline has seen an external value for that key.

         b) because stateful pardo doesn't sort by timestamp, that
        implies, that you have to store last values in BagState (as
        opposed to the blog, where you just emit identity value of
        sum operation), right?

    You can store it in ValueState rather than BagState, but yes you
    store that value in State ready for the next OnTimer() fire.

         c) because of how stateful pardo currently works on batch,
        does that imply that all values (per key) would have to be
        stored in memory? would that scale?

    This is one of the sharp edges and the answer is ... it depends
    :-) I would recommend always using a  FixedWindow+Combiner before
    this step, this will compress the values into something much
    smaller. For example in case of building 'candles' this will
    compress down to low/hi/first/last values per FixedWindow length.
    If the window length is very small there maybe no compression,
    but in most cases I have seen this is a ok compromise.

        There is a discussion about problem a) in [2], but maybe
        there is some different approach possible. For problem b) and
        c) there is a proposal [3]. When the input is sorted, it
        starts to work both in batch and with ValueState, because the
        last value is the *valid* value.

    There was also a discussion on dev@ around a sorted Map state,
    which would be very cool for this usecase.

        This has even connection with the mentioned validity windows,
        as if you sort by timestamp, the _last_ value is the _valid_
        value, so is essentially boils down to keep single value per
        key (and again, starts to work in both batch and stream).

    one for Tyler :-)

        I even have a suspicion, that sorting by timestamp has close
        relation to retractions, because when you are using sorted
        streams, retractions actually became only diff between last
        emitted pane, and current pane. That might even help
        implement that in general, but I might be missing something.
        This just popped in my head today, as I was thinking why
        there was actually no (or little) need for retractions in
        Euphoria model (very similar to Beam, actually differs by the
        sorting thing :)), and why it the need pops out so often in Beam.

    Retractions will be possible with this, but it does mean that we
    would need to keep old versions around, something built in would
    be very cool rather than building it with this pattern.

        I'd be very happy to hear what you think about all of this.

        Cheers,

        Jan

        [1] https://issues.apache.org/jira/browse/BEAM-7520

        [2]
        
https://lists.apache.org/thread.html/1a3a0dd9da682e159f78f131d335782fd92b047895001455ff659613@%3Cdev.beam.apache.org%3E

        [3]
        
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing

        On 6/21/19 8:12 AM, Reza Rokni wrote:
        Great question, one thing that we did not cover in the blog
        and I think we should have is the use case where you would
        want to bootstrap the pipeline.

        One option would be on startup to have an extra bounded
        source that is read and flattened into the main pipeline,
        the source will need to contain values in Timestamped<V>
        format which would correspond to the first window that you
        would like to kickstart the process from. Will see if I can
        try and find some time to code up an example and add that
        and the looping timer code into the Beam patterns.

        https://beam.apache.org/documentation/patterns/overview/

        Cheers
        Reza





        On Fri, 21 Jun 2019 at 07:59, Manu Zhang
        <[email protected] <mailto:[email protected]>>
        wrote:

            Indeed interesting pattern.

            One minor question. It seems the timer is triggered by
            the first element so what if there is no data in the
            "first interval" ?

            Thanks for the write-up.
            Manu

            On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni
            <[email protected] <mailto:[email protected]>> wrote:

                Hi folks,

                Just wanted to drop a note here on a new pattern
                that folks may find interesting, called  Looping
                Timers. It allows for default values to be created
                in interval windows in the absence of any external
                data coming into the pipeline. The details are in
                this blog below:

                https://beam.apache.org/blog/2019/06/11/looping-timers.html

                Its main utility is when dealing with time series
                data. There are still rough edges, like dealing with
                TTL and it would be great to hear feedback on ways
                it can be improved.

                The next pattern to publish in this domain will
                assist will hold and propagation of values from one
                interval window to the next, which coupled to
                looping timers starts to solve some interesting
                problems.

                Cheers

                Reza



--
                This email may be confidential and privileged. If
                you received this communication by mistake, please
                don't forward it to anyone else, please erase all
                copies and attachments, and please let me know that
                it has gone to the wrong person.

                The above terms reflect a potential business
                arrangement, are provided solely as a basis for
                further discussion, and are not intended to be and
                do not constitute a legally binding obligation. No
                legally binding obligations will be created,
                implied, or inferred until an agreement in final
                form is executed in writing by all parties involved.



--
        This email may be confidential and privileged. If you
        received this communication by mistake, please don't forward
        it to anyone else, please erase all copies and attachments,
        and please let me know that it has gone to the wrong person.

        The above terms reflect a potential business arrangement,
        are provided solely as a basis for further discussion, and
        are not intended to be and do not constitute a legally
        binding obligation. No legally binding obligations will be
        created, implied, or inferred until an agreement in final
        form is executed in writing by all parties involved.



--
    This email may be confidential and privileged. If you received
    this communication by mistake, please don't forward it to anyone
    else, please erase all copies and attachments, and please let me
    know that it has gone to the wrong person.

    The above terms reflect a potential business arrangement, are
    provided solely as a basis for further discussion, and are not
    intended to be and do not constitute a legally binding
    obligation. No legally binding obligations will be created,
    implied, or inferred until an agreement in final form is executed
    in writing by all parties involved.



--

This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.

Reply via email to