Hi,
I'm not sure if I got everything from this thread right, but from my
point of view, triggers are property of GBK. They are property of
neither windowing, nor PCollection, but relate solely to GBK. This can
be seen from the fact, that unlike windowFn, triggers are completely
ignored in stateful ParDo (there is no semantics for them, which is
fine). It would be cool if the model could be adjusted for that - this
would actually mean, that the correct place, where to specify triggering
is not Window PTransform, but the GBK, i.e.
input.apply(GroupByKey.create().triggering(...))
That would imply we simply have default trigger for all GBKs, unless
explicitly changed, but for that particular instance only. I'm not sure
what the impacts on pipeline compatibility would be, though.
Jan
On 2/19/21 12:09 AM, Robert Bradshaw wrote:
On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
I would prefer to leave downstream triggering up to the runner
(or, better, leave upstream triggering up to the runner, a la
sink triggers), but one problem is that without an
explicit AfterSynchronizedProcessingTime one can't tell if the
downstream ProcessingTime between two groupings is due to an
explicit re-triggering between them or inherited from one to
the other.
I mean to propose that there should be no triggering specified
unless due to explicit re-triggering.
You're saying that we leave the trigger (and perhaps other) fields of
the WindowingStrategy attached to PCollections downstream the first
GBK unset in the proto? And let runners walk over the graph to infer
it? I could be OK with making this legal, though updating all SDKs and
Runners to handle this doesn't seem high priority at the moment.
(and BTW yes I agree about sink triggers, but we need retractions
and probably some theoretical work before we can aim for that)
Kenn
On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
Just for the thread I want to comment on another, more
drastic approach: eliminate continuation triggers from the
model, leaving downstream triggering up to a runner. This
approach is not viable because transforms may need to
change their behavior based on whether or not a trigger
will fire more than once. Transforms can and do inspect
the windowing strategy to do things differently.
Kenn
On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
I'll say that synchronized processing time has
confused users before. Users sometimes use
processing-time triggers to optimize latency, banking
that that will decouple stage latency from the
long-tail latency of previous stages. However
continuation triggers silently switching to
synchronized processing time has defeated that, and it
wasn't clear to users why.
On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
On Thu, Feb 11, 2021 at 9:38 PM Robert
Bradshaw <rober...@google.com
<mailto:rober...@google.com>> wrote:
Of course the right answer is to just
implement sink triggers and sidestep the
question altogether :).
In the meantime, I think leaving
AfterSynchronizedProcessingTime in the
model makes the most sense, and runners
can choose an implementation between
firing eagerly and waiting some amount of
time until they think all (most?)
downstream results are in before firing,
depending on how smart the runner wants to
be. As you point out, they're all correct,
and we'll have multiple firings due to the
upstream trigger anyway, and this is safer
than it used to be (though still possibly
requires work).
Just to clarify, as I got a little confused,
is your suggestion: Leave
AfterSynchronizedProcessingTime* triggers in
the model/proto, let the SDK put them in where
they want, and let runners decide how to
interpret them? (this SGTM and requires the
least/no changes)
Yep. We may want to update Python/Go to produce
AfterSynchronizedProcessingTime downstream of
ProcessingTime triggers too, eventually, to better
express intent.
Kenn
*noting that
TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not
related to this, except in implementation, and
should be removed either way.
On Wed, Feb 10, 2021 at 1:37 PM Kenneth
Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
Hi all,
TL;DR:
1. should we replace "after
synchronized processing time" with
"after count 1"?
2. should we remove "continuation
trigger" and leave this to runners?
----
"AfterSynchronizedProcessingTime"
triggers were invented to solve a
specific problem. They are
inconsistent across SDKs today.
- You have an aggregation/GBK with
aligned processing time trigger like
("output every minute on the minute")
- You have a downstream
aggregation/GBK between that and the sink
- You expect to have about one output
every minute per key+window pair
Any output of the upstream aggregation
may contribute to any key+window of
the downstream aggregation. The
AfterSynchronizedProcessingTime
trigger waits for all the processing
time based triggers to fire and commit
their outputs. The downstream
aggregation will output as fast as
possible in panes consistent with the
upstream aggregation.
- The Java SDK behavior is as above,
to output "as fast as reasonable".
- The Python SDK never uses
"AfterSynchronizedProcessingTime"
triggers but simply propagates the
same trigger to the next GBK, creating
additional delay.
- I don't know what the Go SDK may
do, if it supports this at all.
Any behavior could be defined as
"correct". A simple option could be to
have the downstream aggregation "fire
always" aka "after element count 1".
How would this change things? We would
potentially see many more outputs.
Why did we do this in the first place?
There are (at least) these reasons:
- Previously, triggers could "finish"
an aggregation thus dropping all
further data. In this case, waiting
for all outputs is critical or else
you lose data. Now triggers cannot
finish aggregations.
- Whenever there may be more than one
pane, a user has to write logic to
compensate and deal with it. Changing
from guaranteed single pane to
multi-pane would break things. So if
the user configures a single firing,
all downstream aggregations must
respect it. Now that triggers cannot
finish, I think processing time can
only be used in multi-pane contexts
anyhow.
- The above example illustrates how
the behavior in Java maintains
something that the user will expect.
Or so we think. Maybe users don't care.
How did we get into this inconsistent
state? When the user specifies
triggering it applies to the very
nearest aggregation/GBK. The SDK
decides what triggering to insert
downstream. One possibility is to
remove this and have it unspecified,
left to runner behavior.
I think maybe these pieces of
complexity are both not helpful and
also not (necessarily) breaking
changes to alter, especially
considering we have inconsistency in
the model.
WDYT? And I wonder what this means for
xlang and portability... how does
continuation triggering even work? (if
at all)
Kenn