Hi Reza,

cool, I have put together a PR [1], which is still not completely ready. There are least still missing some tests - probably @ValidatesRunner and then fixing runners that won't pass that. It also misses few features described in the design doc, but that could be probably fixed later (support for allowedLateness and user-supplied sorting criterion). Would you like to test this on some of your code? It might suffice to put @RequiresTimeSortedInput on @ProcessElement method and input should start being sorted (should work at least for DirectRunner, FlinkRunner (stream and batch) and SparkRunner (batch)).

[1] https://github.com/apache/beam/pull/8774

On 6/27/19 6:16 AM, Reza Rokni wrote:


On Tue, 25 Jun 2019 at 21:20, Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:


    On 6/25/19 1:43 PM, Reza Rokni wrote:


    On Tue, 25 Jun 2019 at 18:12, Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        > The TTL check would be in the same Timer rather than a
        separate Timer.  The max value processed in each OnTimer call
        would be stored in valuestate and used as base to know how
        long it has been seen the pipeline has seen an external value
        for that key.

        OK, that seems to work, if you store maximum timestamp in a
        value state (that is, basically you generate per-key watermark).

        > You can store it in ValueState rather than BagState, but
        yes you store that value in State ready for the next
        OnTimer() fire.

        In my understanding of the problem, I'd say that this
        approach seems a little suboptimal. Consider the following,
        when trying to generate the OHLC data (open, high, low,
        close, that is move last closing price to next window opening
        price)

         - suppose we have three times T1 < T2 < T3 < T4, where times
        T2 and T4 denote "end of windows" (closing times)

         - first (in processing time), we receive value for time T3,
        we cache it in ValueState, we set timer for T4

         - next, we receive value for T1 - but we cannot overwrite
        the value already written for T3, right? What to do then? And
        will we reset timer to time T3?

         - basically, because we received *two* values, both of which
        are needed and no timer could have been fired in between, we
        need both values stored to know which value to emit when
        timer fires. And consider that on batch, the timer fires only
        after we see all data (without any ordering).

    I assume you are referring to late data rather than out of order
    data ( the later being taken care of with the in-memory sort). As
    discussed in the talk late data is a sharp edge, one solution for
    late data is to branch it away before GlobalWindow + State DoFn.
    This can then be output from the pipeline into a sink with a
    marker to initiate a manual procedure for correction. Essentially
    a manual redaction.

    Which in-memory sort do you refer to? I'm pretty sure there must
    be sorting involved for this to work, but I'm not quite sure where
    exactly it is in your implementation. You said that you can put
    data in ValueState rather than BagState, so do you have a List as
    a value in the ValueState? Or do you wrap the stateful par do with
    some other sorting logic? And if so, how does this work on batch?
    I suppose that it has to all fit to memory. I think this all goes
    around the @RequiresTimeSortedInput annotation, that I propose.
    Maybe we can cooperate on that? :)\

Hummmm... nice this chat made me notice a bug in the looping timer example code we missed thanx :-) , the ValueState<Boolean> timerRunning, should actually be a ValueState<Long> minTimestamp and the check to set the timer needs to be if (NULL or  element.Timestamp is < then timer.Timestamp). Which also requires the use of timer read pattern as we don't have timer.read() https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542. I will fix and put a PR to correct the blog.

For the hold and propagate pattern (for those following the original thread the pattern is not covered in the blog example code, but discussed at the summit):
OnProcess()
- You can drop the accumulators into BagState.
- A timer is set at the minimum time interval.
OnTimer()
- The list is sorted in memory, for a lot of timeseries use cases (for example ohlc) the memory issues are heavily mitigated as we can use a Fixed Windows partial aggregations before the GlobalWindow stage. (Partial because they dont have the correct Open value set until they flow into the Global Window). Of course how big the window is dictates the compression you would get.
- The current timer is set again to fire in the next interval window.

@RequiresTimeSortedInput looks super interesting, happy to help out. Although its a harder problem then the targeted timeseries use cases where FixedWindows aggregations can be used before the final step.


        Or? Am I missing something?

        Jan

        On 6/25/19 6:00 AM, Reza Rokni wrote:


        On Fri, 21 Jun 2019 at 18:02, Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            Hi Reza,

            great prezentation on the Beam Summit. I have had a few
            posts here in the list during last few weeks, some of
            which might actually be related to both looping timers
            and validity windows. But maybe you will be able to see
            a different approach, than I do, so questions:

             a) because of [1] timers might not be exactly ordered
            (the JIRA talks about DirectRunner, but I suppose the
            issue is present on all runners that use immutable
            bundles of size > 1, so might be related to Dataflow as
            well). This might cause issues when you try to introduce
            TTL for looping timers, because the TTL timer might get
            fired before regular looping timer, which might cause
            incorrect results (state cleared before have been flushed).

        The TTL check would be in the same Timer rather than a
        separate Timer.  The max value processed in each OnTimer
        call would be stored in valuestate and used as base to know
        how long it has been seen the pipeline has seen an external
        value for that key.

             b) because stateful pardo doesn't sort by timestamp,
            that implies, that you have to store last values in
            BagState (as opposed to the blog, where you just emit
            identity value of sum operation), right?

        You can store it in ValueState rather than BagState, but yes
        you store that value in State ready for the next OnTimer()
        fire.

             c) because of how stateful pardo currently works on
            batch, does that imply that all values (per key) would
            have to be stored in memory? would that scale?

        This is one of the sharp edges and the answer is ... it
        depends :-) I would recommend always using a
        FixedWindow+Combiner before this step, this will compress
        the values into something much smaller. For example in case
        of building 'candles' this will compress down to
        low/hi/first/last values per FixedWindow length. If the
        window length is very small there maybe no compression, but
        in most cases I have seen this is a ok compromise.

            There is a discussion about problem a) in [2], but maybe
            there is some different approach possible. For problem
            b) and c) there is a proposal [3]. When the input is
            sorted, it starts to work both in batch and with
            ValueState, because the last value is the *valid* value.

        There was also a discussion on dev@ around a sorted Map
        state, which would be very cool for this usecase.

            This has even connection with the mentioned validity
            windows, as if you sort by timestamp, the _last_ value
            is the _valid_ value, so is essentially boils down to
            keep single value per key (and again, starts to work in
            both batch and stream).

        one for Tyler :-)

            I even have a suspicion, that sorting by timestamp has
            close relation to retractions, because when you are
            using sorted streams, retractions actually became only
            diff between last emitted pane, and current pane. That
            might even help implement that in general, but I might
            be missing something. This just popped in my head today,
            as I was thinking why there was actually no (or little)
            need for retractions in Euphoria model (very similar to
            Beam, actually differs by the sorting thing :)), and why
            it the need pops out so often in Beam.

        Retractions will be possible with this, but it does mean
        that we would need to keep old versions around, something
        built in would be very cool rather than building it with
        this pattern.

            I'd be very happy to hear what you think about all of this.

            Cheers,

            Jan

            [1] https://issues.apache.org/jira/browse/BEAM-7520

            [2]
            
https://lists.apache.org/thread.html/1a3a0dd9da682e159f78f131d335782fd92b047895001455ff659613@%3Cdev.beam.apache.org%3E

            [3]
            
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing

            On 6/21/19 8:12 AM, Reza Rokni wrote:
            Great question, one thing that we did not cover in the
            blog and I think we should have is the use case where
            you would want to bootstrap the pipeline.

            One option would be on startup to have an extra bounded
            source that is read and flattened into the main
            pipeline, the source will need to contain values in
            Timestamped<V> format which would correspond to the
            first window that you would like to kickstart the
            process from. Will see if I can try and find some time
            to code up an example and add that and the looping
            timer code into the Beam patterns.

            https://beam.apache.org/documentation/patterns/overview/

            Cheers
            Reza





            On Fri, 21 Jun 2019 at 07:59, Manu Zhang
            <[email protected]
            <mailto:[email protected]>> wrote:

                Indeed interesting pattern.

                One minor question. It seems the timer is triggered
                by the first element so what if there is no data in
                the "first interval" ?

                Thanks for the write-up.
                Manu

                On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni
                <[email protected] <mailto:[email protected]>> wrote:

                    Hi folks,

                    Just wanted to drop a note here on a new
                    pattern that folks may find interesting,
                    called  Looping Timers. It allows for default
                    values to be created in interval windows in the
                    absence of any external data coming into the
                    pipeline. The details are in this blog below:

                    https://beam.apache.org/blog/2019/06/11/looping-timers.html

                    Its main utility is when dealing with time
                    series data. There are still rough edges, like
                    dealing with TTL and it would be great to hear
                    feedback on ways it can be improved.

                    The next pattern to publish in this domain will
                    assist will hold and propagation of values from
                    one interval window to the next, which coupled
                    to looping timers starts to solve some
                    interesting problems.

                    Cheers

                    Reza



--
                    This email may be confidential and privileged.
                    If you received this communication by mistake,
                    please don't forward it to anyone else, please
                    erase all copies and attachments, and please
                    let me know that it has gone to the wrong person.

                    The above terms reflect a potential business
                    arrangement, are provided solely as a basis for
                    further discussion, and are not intended to be
                    and do not constitute a legally binding
                    obligation. No legally binding obligations will
                    be created, implied, or inferred until an
                    agreement in final form is executed in writing
                    by all parties involved.



--
            This email may be confidential and privileged. If you
            received this communication by mistake, please don't
            forward it to anyone else, please erase all copies and
            attachments, and please let me know that it has gone to
            the wrong person.

            The above terms reflect a potential business
            arrangement, are provided solely as a basis for further
            discussion, and are not intended to be and do not
            constitute a legally binding obligation. No legally
            binding obligations will be created, implied, or
            inferred until an agreement in final form is executed
            in writing by all parties involved.



--
        This email may be confidential and privileged. If you
        received this communication by mistake, please don't forward
        it to anyone else, please erase all copies and attachments,
        and please let me know that it has gone to the wrong person.

        The above terms reflect a potential business arrangement,
        are provided solely as a basis for further discussion, and
        are not intended to be and do not constitute a legally
        binding obligation. No legally binding obligations will be
        created, implied, or inferred until an agreement in final
        form is executed in writing by all parties involved.



--
    This email may be confidential and privileged. If you received
    this communication by mistake, please don't forward it to anyone
    else, please erase all copies and attachments, and please let me
    know that it has gone to the wrong person.

    The above terms reflect a potential business arrangement, are
    provided solely as a basis for further discussion, and are not
    intended to be and do not constitute a legally binding
    obligation. No legally binding obligations will be created,
    implied, or inferred until an agreement in final form is executed
    in writing by all parties involved.



--

This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.

Reply via email to