Very interesting thread.
Having read the original Timer design document, I find it compelling to
model timers with a loop from producing to consuming PCollections. This
makes it very explicit how timers are positioned in the dataflow.
What Robert proposes looks less explicit, yet much closer to how Runner
authors would go about to implement it. I'm not fully aware of any
limitations of this model. Lukasz mentioned that we would have to hold
back the Watermark for as long as the Timer is not yet set, as
potentially it could have already been passed before set. As for output
time being different from fire time, I suppose we can add a hold for the
output watermark before the timer is fired.
Whichever model we pursue, we have to solve the same
problems/requirements for Timers. It does look like this is more a
problem of how things are represented in the proto? Practically, the
runtime implementation looks similar.
If I had to choose I'd probably go for timers being represented as part
of a spec for a DoFn (which seems to be already the case). Timers as
separate PCollections seems elegant but less practical to me.
-Max
[Disclaimer: I could be wrong since I just thought about this in more
detail]
On 20.09.18 00:28, Robert Bradshaw wrote:
On Wed, Sep 19, 2018 at 11:54 PM Lukasz Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw <rober...@google.com
<mailto:rober...@google.com>> wrote:
On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
*How does modelling a timer as a PCollection help the Beam
model?*
The largest concern was about how to model timers within
Apache Beam that:
1) removed the need for the watermark hold that is typically
accompanied with state/timer implementations
2) enabled the ability to set the explicit output time to be
independent of the firing time for all timer specifications [1]
I felt as though treating timers as a self-loop around the
ParDo PTransform allowed us to use the natural definition of
output watermark = min(all input watermarks) as a way to
define how timers hold output and using windowed values that
contained timers as a natural way to represent the output
time to be independent of the firing time. The purpose of
the PCollection right now is to store the representation of
how timers are encoded. I suspect that at some point in time
we will have different timer encodings.
I agree that being able to separate the hold time from firing
time of a timer is important, but in retrospect don't think
timers as PCollections is the only (or most natural) way to
represent that (in the model or in runner implementations).
Can you go into more detail as to what your suggesting as the
replacement and why you believe it fits the model more naturally
since "state" doesn't have watermarks or produce output but timers
can. I'm not disagreeing that timers as PCollections may not be a
natural fit but I don't see them as state as well since "user state"
doesn't produce output.
Yeah, timers are their own thing. They come in like data, but are
written out like state.
I guess the high level is that I think the beam graph should represent,
within reason, the user's model of what their pipeline is, and treating
timers as PCollections with this self-loop feels like an implementation
detail, and furthermore an implementation detail that no runner is
actually going to use to implement things. And (again, this is
subjective) seems to complicate both the reasoning about a pipeline and
implementing its execution over viewing the stateful/timely aspects of a
DoFn as internal details to the ParDo operation.
Having this fit well with how timers are delivered between
the SDK and Runner was an added bonus. Also, a good portion
of the code that I needed to fix up was more related to the
assumption that there was ever only a single input producer
to an executable stage and plumbing of timer specifications
through all the runner library support layers.
----------
*There is no "clear" for timers.*
The current Java API for timers only allows you to set them.
Clearing timers is not exposed to users and is only used by
internal implementations to support runners[2] via
TimerInternals. Usage of a timer is like so:
@TimerId("timer")
private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext context,
BoundedWindow window,
@TimerId("timer") Timer myTimer) {
myTimer.set(window.maxTimestamp().plus(allowedLateness));
}
We'll probably want clear. But currently there's already exactly
one timer per window per key, and setting another one overwrites
the previous one, again making it more like state. Maybe, as you
said, it could involve retractions (but every output being a
retraction seems odd.)
Once retractions exist, most GBK firings will have a preceding
retraction so I believe they will be very common.
True, but I don't think we want to insert the GBK + CV in the graph to
represent the consolidation that's going on here.
* side inputs already require a runner to introspect the
ParDo payload to get the SideInputSpec, requiring it to have
knowledge of the TimerSpec is no different.
My point was that once it has knowelge of the TimerSpec, there
is no need for (meaning no additional information provided by)
the timer PCollection nor its edges.
The way in which the timer is encoded is missing. This could be
explicit on the TimerSpec like the other StateSpec definitions though.
Ah, I didn't realize there was a choice in the matter.
On Wed, Sep 19, 2018 at 11:57 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
We'll probably want clear. But currently there's already exactly
one timer per window per key, and setting another one overwrites
the previous one, again making it more like state. Maybe, as you
said, it could involve retractions (but every output being a
retraction seems odd.)
This is not true. We support multiple (tagged) timers per key.
Yeah, I misspoke. I meant every distinct (tagged) timer has one firing
time, rather than getting appended like in a PCollection.
- Robert