Hi Robert,> Here "sink" is really any observable outside effect, so I think "how often output should be written" and "how quickly output should react to the change of input" are the same.
The difference is in the input trigger - let's imagine, that I have two chained GBKs (A and B). If I trigger A every minute, but B every second, I will output 60 records per minute, but 59 of them will be the same. That's why it seems to me, that meaningful "sink" triggering has to start at the input and then propagate with each pane.
> As an example, if I want, say, hourly output, triggering hourly at the source and then as quickly as possible from then on may be wasteful. It may also be desirable to arrange such that certain transforms only have a single pane per window, which is easier to propagate up than down. As another example, consider accumulating vs. discarding. If I have CombineValues(sum) followed by a re-keying and another CombineValues(sum), and I want the final output to be accumulating, the first must be discarding (or, better, retractions). Propagating upwards is possible in a way propagating downward is not.
I'm not sure I understand this. If I want hourly output, I cannot trigger source with lower frequency. If I trigger source with hourly, but do not propagate this as fast as possible, I'm inevitably introducing additional latency (that's the definition of "not as fast as possible") in downstream processing. Therefore the final triggering cannot be "hourly output" at least not with regard to the rate of change in inputs.
On 2/23/21 5:47 PM, Robert Bradshaw wrote:
On Tue, Feb 23, 2021 at 1:07 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:First, +1 to the conclusion of this thread. One note regarding the composite transforms and triggers *inside* those transforms - I think that propagating the triggering from input PCollection might be even dangerous and composite PTransforms that would be sensitive to the change of triggering will (should!) override the input triggering, and therefore adjusting it upfront will not work. There is clear option for composite PTransform (which includes one or more GBKs) to create API to specify the _input_ triggering of the composite as a whole, i.e. input.apply(MyComposite.create().triggering()) which (consistently with how triggering works for pure GBK) would change the input triggering (if we define trigger as "buffer input in state, flush buffer when trigger fires") of the PTransform. The PTransform knows how it expands and so it is quite easy to do the output triggering correctly.When we originally explored this (for windowing, before triggering existed) we looked at the number of composite operations (combining, joining, cogbk, ...) that contained GBKs and realized it would add a lot of boilerplate to manually pass through the windowing information to each. Worse, this is a burden placed on every author of a composite operation (and omitting this argument, or hard coding a default, would be strictly worse). Triggering doesn't flow as nicely, but requiring it on every subtransform invocation during pipeline construction would have the same downsides of verbosity.Regarding the sink triggering - out of curiosity, how does that differ from applying the triggering on the very first GBK(s) and the subsequently trigger all downstream GBKs using AfterPane.elementCountAtLeast(1)? It seems to me, that from user perspective what I will want to define is not "how often output should be written", but "how quickly output should react to the change of input" - therefore I *must* trigger with at least this frequency from the source and then propagate each pane as quickly as possible to the output. Am I missing something?Here "sink" is really any observable outside effect, so I think "how often output should be written" and "how quickly output should react to the change of input" are the same.As an example, if I want, say, hourly output, triggering hourly at the source and then as quickly as possible from then on may be wasteful. It may also be desirable to arrange such that certain transforms only have a single pane per window, which is easier to propagate up than down. As another example, consider accumulating vs. discarding. If I have CombineValues(sum) followed by a re-keying and another CombineValues(sum), and I want the final output to be accumulating, the first must be discarding (or, better, retractions). Propagating upwards is possible in a way propagating downward is not.Jan On 2/22/21 9:53 PM, Reuven Lax wrote:I really wish that we had found the time to build sink triggers. Jan is right - specifying triggers up front and having them propagate down is confusing (it's also a bit confusing for Windows, but with Windows the propagation at least makes sense). The fact that users rarely have access to the actual GBK operation means that allowing them to specify triggers on their sinks is the best approach. On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote: On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote: I agree completely: Triggers control the output of the GBK. The issue is composite transforms, where there will be a GBK deep inside some code and the user cannot adjust the triggering. What a user really wants is "sink triggers <https://s.apache.org/beam-sink-triggers>" [1], a purely hypothetical feature where they specify the latency requirements on each _output_ and everything else is figured out automatically. Unfortunately, sink triggers require retractions, so each PCollection can be a complete changelog. Otherwise transformations cannot be transparently correct throughout a pipeline and triggers cannot be decoupled from pipeline logic. Retractions themselves are not necessarily complex in some cases (Flink SQL has them - they are extra easy for "pure" code) but require a massive working of the library of transforms, particularly IOs. And backwards compatibility concerns for existing DoFns are somewhat tricky. We've had two prototypes [2] [3] and some important design investigations [4], but no time to really finish adding them, even as just an optional experiment. And once we have retractions, there is still a lot to figure out to finish sink triggers. They may not even really be possible! So for now, we do our best with the user setting up triggering at the beginning of the pipeline instead of the end of the pipeline. The very first GBK (which may be deep in library code) is controlled by the triggering they set up and all the rest get the "continuation trigger" which tries to just let the data flow. Unless they set up another bit of triggering. Some of our transforms do this for various reasons. I think the conclusion of this particular thread is: - make all the SDKs use AfterSynchronizedProcessingTime triggers - allow runners to do whatever they want when they see AfterSynchronizedProcessingTime trigger - remove TimeDomain.afterSynchronizedProcessingTime from the proto since it is only for timers and they should not use this - later, figure out if we want to add support for making downstream triggering optional (could be useful prep for sink triggers) +1 [1] https://s.apache.org/beam-sink-triggers [2] https://github.com/apache/beam/pull/4742 [3] https://github.com/apache/beam/pull/9199 [4] https://s.apache.org/beam-retractions On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote: The same holds true for pane accumulation mode. Jan On 2/22/21 10:21 AM, Jan Lukavský wrote:Hi, I'm not sure if I got everything from this thread right, but from my point of view, triggers are property of GBK. They are property of neither windowing, nor PCollection, but relate solely to GBK. This can be seen from the fact, that unlike windowFn, triggers are completely ignored in stateful ParDo (there is no semantics for them, which is fine). It would be cool if the model could be adjusted for that - this would actually mean, that the correct place, where to specify triggering is not Window PTransform, but the GBK, i.e. input.apply(GroupByKey.create().triggering(...)) That would imply we simply have default trigger for all GBKs, unless explicitly changed, but for that particular instance only. I'm not sure what the impacts on pipeline compatibility would be, though. Jan On 2/19/21 12:09 AM, Robert Bradshaw wrote:On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote: On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote: I would prefer to leave downstream triggering up to the runner (or, better, leave upstream triggering up to the runner, a la sink triggers), but one problem is that without an explicit AfterSynchronizedProcessingTime one can't tell if the downstream ProcessingTime between two groupings is due to an explicit re-triggering between them or inherited from one to the other. I mean to propose that there should be no triggering specified unless due to explicit re-triggering. You're saying that we leave the trigger (and perhaps other) fields of the WindowingStrategy attached to PCollections downstream the first GBK unset in the proto? And let runners walk over the graph to infer it? I could be OK with making this legal, though updating all SDKs and Runners to handle this doesn't seem high priority at the moment. (and BTW yes I agree about sink triggers, but we need retractions and probably some theoretical work before we can aim for that) Kenn On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote: Just for the thread I want to comment on another, more drastic approach: eliminate continuation triggers from the model, leaving downstream triggering up to a runner. This approach is not viable because transforms may need to change their behavior based on whether or not a trigger will fire more than once. Transforms can and do inspect the windowing strategy to do things differently. Kenn On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote: I'll say that synchronized processing time has confused users before. Users sometimes use processing-time triggers to optimize latency, banking that that will decouple stage latency from the long-tail latency of previous stages. However continuation triggers silently switching to synchronized processing time has defeated that, and it wasn't clear to users why. On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote: On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote: On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote: Of course the right answer is to just implement sink triggers and sidestep the question altogether :). In the meantime, I think leaving AfterSynchronizedProcessingTime in the model makes the most sense, and runners can choose an implementation between firing eagerly and waiting some amount of time until they think all (most?) downstream results are in before firing, depending on how smart the runner wants to be. As you point out, they're all correct, and we'll have multiple firings due to the upstream trigger anyway, and this is safer than it used to be (though still possibly requires work). Just to clarify, as I got a little confused, is your suggestion: Leave AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK put them in where they want, and let runners decide how to interpret them? (this SGTM and requires the least/no changes) Yep. We may want to update Python/Go to produce AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too, eventually, to better express intent. Kenn *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to this, except in implementation, and should be removed either way. On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote: Hi all, TL;DR: 1. should we replace "after synchronized processing time" with "after count 1"? 2. should we remove "continuation trigger" and leave this to runners? ---- "AfterSynchronizedProcessingTime" triggers were invented to solve a specific problem. They are inconsistent across SDKs today. - You have an aggregation/GBK with aligned processing time trigger like ("output every minute on the minute") - You have a downstream aggregation/GBK between that and the sink - You expect to have about one output every minute per key+window pair Any output of the upstream aggregation may contribute to any key+window of the downstream aggregation. The AfterSynchronizedProcessingTime trigger waits for all the processing time based triggers to fire and commit their outputs. The downstream aggregation will output as fast as possible in panes consistent with the upstream aggregation. - The Java SDK behavior is as above, to output "as fast as reasonable". - The Python SDK never uses "AfterSynchronizedProcessingTime" triggers but simply propagates the same trigger to the next GBK, creating additional delay. - I don't know what the Go SDK may do, if it supports this at all. Any behavior could be defined as "correct". A simple option could be to have the downstream aggregation "fire always" aka "after element count 1". How would this change things? We would potentially see many more outputs. Why did we do this in the first place? There are (at least) these reasons: - Previously, triggers could "finish" an aggregation thus dropping all further data. In this case, waiting for all outputs is critical or else you lose data. Now triggers cannot finish aggregations. - Whenever there may be more than one pane, a user has to write logic to compensate and deal with it. Changing from guaranteed single pane to multi-pane would break things. So if the user configures a single firing, all downstream aggregations must respect it. Now that triggers cannot finish, I think processing time can only be used in multi-pane contexts anyhow. - The above example illustrates how the behavior in Java maintains something that the user will expect. Or so we think. Maybe users don't care. How did we get into this inconsistent state? When the user specifies triggering it applies to the very nearest aggregation/GBK. The SDK decides what triggering to insert downstream. One possibility is to remove this and have it unspecified, left to runner behavior. I think maybe these pieces of complexity are both not helpful and also not (necessarily) breaking changes to alter, especially considering we have inconsistency in the model. WDYT? And I wonder what this means for xlang and portability... how does continuation triggering even work? (if at all) Kenn