Very interesting thread.

Having read the original Timer design document, I find it compelling to model timers with a loop from producing to consuming PCollections. This makes it very explicit how timers are positioned in the dataflow.

What Robert proposes looks less explicit, yet much closer to how Runner authors would go about to implement it. I'm not fully aware of any limitations of this model. Lukasz mentioned that we would have to hold back the Watermark for as long as the Timer is not yet set, as potentially it could have already been passed before set. As for output time being different from fire time, I suppose we can add a hold for the output watermark before the timer is fired.

Whichever model we pursue, we have to solve the same problems/requirements for Timers. It does look like this is more a problem of how things are represented in the proto? Practically, the runtime implementation looks similar.

If I had to choose I'd probably go for timers being represented as part of a spec for a DoFn (which seems to be already the case). Timers as separate PCollections seems elegant but less practical to me.

-Max

[Disclaimer: I could be wrong since I just thought about this in more detail]

On 20.09.18 00:28, Robert Bradshaw wrote:
On Wed, Sep 19, 2018 at 11:54 PM Lukasz Cwik <lc...@google.com <mailto:lc...@google.com>> wrote:


    On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw <rober...@google.com
    <mailto:rober...@google.com>> wrote:

        On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik <lc...@google.com
        <mailto:lc...@google.com>> wrote:

            *How does modelling a timer as a PCollection help the Beam
            model?*

            The largest concern was about how to model timers within
            Apache Beam that:
            1) removed the need for the watermark hold that is typically
            accompanied with state/timer implementations
            2) enabled the ability to set the explicit output time to be
            independent of the firing time for all timer specifications [1]

            I felt as though treating timers as a self-loop around the
            ParDo PTransform allowed us to use the natural definition of
            output watermark = min(all input watermarks) as a way to
            define how timers hold output and using windowed values that
            contained timers as a natural way to represent the output
            time to be independent of the firing time. The purpose of
            the PCollection right now is to store the representation of
            how timers are encoded. I suspect that at some point in time
            we will have different timer encodings.


        I agree that being able to separate the hold time from firing
        time of a timer is important, but in retrospect don't think
        timers as PCollections is the only (or most natural) way to
represent that (in the model or in runner implementations).
    Can you go into more detail as to what your suggesting as the
    replacement and why you believe it fits the model more naturally
    since "state" doesn't have watermarks or produce output but timers
    can. I'm not disagreeing that timers as PCollections may not be a
    natural fit but I don't see them as state as well since "user state"
    doesn't produce output.


Yeah, timers are their own thing. They come in like data, but are written out like state.

I guess the high level is that I think the beam graph should represent, within reason, the user's model of what their pipeline is, and treating timers as PCollections with this self-loop feels like an implementation detail, and furthermore an implementation detail that no runner is actually going to use to implement things. And (again, this is subjective) seems to complicate both the reasoning about a pipeline and implementing its execution over viewing the stateful/timely aspects of a DoFn as internal details to the ParDo operation.

            Having this fit well with how timers are delivered between
            the SDK and Runner was an added bonus. Also, a good portion
            of the code that I needed to fix up was more related to the
            assumption that there was ever only a single input producer
            to an executable stage and plumbing of timer specifications
            through all the runner library support layers.

            ----------
            *There is no "clear" for timers.*

            The current Java API for timers only allows you to set them.
            Clearing timers is not exposed to users and is only used by
            internal implementations to support runners[2] via
            TimerInternals. Usage of a timer is like so:
               @TimerId("timer")
               private final TimerSpec timerSpec =
            TimerSpecs.timer(TimeDomain.EVENT_TIME);

               @ProcessElement
               public void process(
                   ProcessContext context,
                   BoundedWindow window,
                   @TimerId("timer") Timer myTimer) {

                 myTimer.set(window.maxTimestamp().plus(allowedLateness));
               }


        We'll probably want clear. But currently there's already exactly
        one timer per window per key, and setting another one overwrites
        the previous one, again making it more like state. Maybe, as you
        said, it could involve retractions (but every output being a
        retraction seems odd.)

    Once retractions exist, most GBK firings will have a preceding
retraction so I believe they will be very common.

True, but I don't think we want to insert the GBK + CV in the graph to represent the consolidation that's going on here.

            * side inputs already require a runner to introspect the
            ParDo payload to get the SideInputSpec, requiring it to have
            knowledge of the TimerSpec is no different.


        My point was that once it has knowelge of the TimerSpec, there
        is no need for (meaning no additional information provided by)
        the timer PCollection nor its edges.

    The way in which the timer is encoded is missing. This could be
    explicit on the TimerSpec like the other StateSpec definitions though.


Ah, I didn't realize there was a choice in the matter.


On Wed, Sep 19, 2018 at 11:57 PM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:


        We'll probably want clear. But currently there's already exactly
        one timer per window per key, and setting another one overwrites
        the previous one, again making it more like state. Maybe, as you
        said, it could involve retractions (but every output being a
        retraction seems odd.)


    This is not true. We support multiple (tagged) timers per key.


Yeah, I misspoke. I meant every distinct (tagged) timer has one firing time, rather than getting appended like in a PCollection.

- Robert

Reply via email to