Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On Tue, Feb 27, 2024 at 10:22 AM Robert Bradshaw via dev < dev@beam.apache.org> wrote: > On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles wrote: > > > > Pulling out focus points: > > > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > > > I can't act on something yet [...] but I expect to be able to [...] at > some time in the processing-time future. > > > > I like this as a clear and internally-consistent feature description. It > describes ProcessContinuation and those timers which serve the same purpose > as ProcessContinuation. > > > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > > > I can't think of a batch or streaming scenario where it would be > correct to not wait at least that long > > > > The main reason we created timers: to take action in the absence of > data. The archetypal use case for processing time timers was/is "flush data > from state if it has been sitting there too long". For this use case, the > right behavior for batch is to skip the timer. It is actually basically > incorrect to wait. > > Good point calling out the distinction between "I need to wait in case > there's more data." and "I need to wait for something external." We > can't currently distinguish between the two, but a batch runner can > say something definitive about the first. Feels like we need a new > primitive (or at least new signaling information on our existing > primitive). > > BTW the first is also relevant to drain. One reason drain often takes a long time today is because it has to wait for processing-time timers to fire (it has to wait because those timers have watermark holds), but usually those timers are noops. > > On Fri, Feb 23, 2024 at 3:54 PM Robert Burke > wrote: > > > It doesn't require a new primitive. > > > > IMO what's being proposed *is* a new primitive. I think it is a good > primitive. It is the underlying primitive to ProcessContinuation. It would > be user-friendly as a kind of timer. But if we made this the behavior of > processing time timers retroactively, it would break everyone using them to > flush data who is also reprocessing data. > > > > There's two very different use cases ("I need to wait, and block data" > vs "I want to act without data, aka NOT wait for data") and I think we > should serve both of them, but it doesn't have to be with the same > low-level feature. > > > > Kenn > > > > > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >> > >> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke > wrote: > >> > > >> > While I'm currently on the other side of the fence, I would not be > against changing/requiring the semantics of ProcessingTime constructs to be > "must wait and execute" as such a solution, and enables the Proposed > "batch" process continuation throttling mechanism to work as hypothesized > for both "batch" and "streaming" execution. > >> > > >> > There's a lot to like, as it leans Beam further into the unification > of Batch and Stream, with one fewer exception (eg. unifies timer experience > further). It doesn't require a new primitive. It probably matches more with > user expectations anyway. > >> > > >> > It does cause looping timer execution with processing time to be a > problem for Drains however. > >> > >> I think we have a problem with looping timers plus drain (a mostly > >> streaming idea anyway) regardless. > >> > >> > I'd argue though that in the case of a drain, we could updated the > semantics as "move watermark to infinity" "existing timers are executed, > but new timers are ignored", > >> > >> I don't like the idea of dropping timers for drain. I think correct > >> handling here requires user visibility into whether a pipeline is > >> draining or not. > >> > >> > and ensure/and update the requirements around OnWindowExpiration > callbacks to be a bit more insistent on being implemented for correct > execution, which is currently the only "hard" signal to the SDK side that > the window's work is guaranteed to be over, and remaining state needs to be > addressed by the transform or be garbage collected. This remains critical > for developing a good pattern for ProcessingTime timers within a Global > Window too. > >> > >> +1 > >> > >> > > >> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote: > >> > > Thanks for bringing this up. > >> > > > >> > > My position is that both batch and streaming should wait for > >> > > processing time timers, according to local time (with the exception > of > >> > > tests that can accelerate this via faked clocks). > >> > > > >> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO > >> > > isomorphic, and can be implemented in terms of each other (at least > in > >> > > one direction, and likely the other). Both are an indication that I > >> > > can't act on something yet due to external constraints (e.g. not all > >> > > the data has been published, or I lack sufficient
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský wrote: > > On 2/27/24 19:22, Robert Bradshaw via dev wrote: > > On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles wrote: > >> Pulling out focus points: > >> > >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev > >> wrote: > >>> I can't act on something yet [...] but I expect to be able to [...] at > >>> some time in the processing-time future. > >> I like this as a clear and internally-consistent feature description. It > >> describes ProcessContinuation and those timers which serve the same > >> purpose as ProcessContinuation. > >> > >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev > >> wrote: > >>> I can't think of a batch or streaming scenario where it would be correct > >>> to not wait at least that long > >> The main reason we created timers: to take action in the absence of data. > >> The archetypal use case for processing time timers was/is "flush data from > >> state if it has been sitting there too long". For this use case, the right > >> behavior for batch is to skip the timer. It is actually basically > >> incorrect to wait. > > Good point calling out the distinction between "I need to wait in case > > there's more data." and "I need to wait for something external." We > > can't currently distinguish between the two, but a batch runner can > > say something definitive about the first. Feels like we need a new > > primitive (or at least new signaling information on our existing > > primitive). > Runners signal end of data to a DoFn via (input) watermark. Is there a > need for additional information? Yes, and I agree that watermarks/event timestamps are a much better way to track data completeness (if possible). Unfortunately processing timers don't specify if they're waiting for additional data or external/environmental change, meaning we can't use the (event time) watermark to determine whether they're safe to trigger.
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On 2/27/24 19:30, Robert Bradshaw via dev wrote: On Tue, Feb 27, 2024 at 7:44 AM Robert Burke wrote: An "as fast as it can runner" with dynamic splits, would ultimately split to the systems maximum available parallelism (for stateful DoFns, this is the number of keys; for SplittableDoFns, this is the maximum sharding of each input element's restriction. That's what would happen with a "normal" sleep. WRT Portability, this means adding a current ProcessingTime field to the ProcessBundleRequest, and likely also to the ProgressRequest so the runner could coordinate. ProgressResponse may then need a "asleepUntil" field to communicate back the state of the bundle, which the runner could then use to better time its next ProgressRequest, and potentially arrest dynamic splitting for that bundle. After all, the sleeping bundle is blocked until processing time has advanced anyway; no progress can be made. I like moving the abstraction out of the timer space, as it better aligns with user intent for the throttle case, and it doesn't require a Stateful DoFn to operate (orthogonal!), meaning it's useful for It also solves the testing issue WRT ProcessingTime timers using an absolute time, rather than a relative time, as the SDK can rebuild it's relative setters for output time on the new canonical processing time, without user code changing. The sleeping inprogress bundle naturally holds back the watermark too. I suspect this mechanism would end up tending to over throttle as Reuven described earlier, since the user is only pushing back on immediate processing for the current element, not necessarily all elements. This is particularly likely if there's a long gap between ProgressRequests for the bundle and the runner doesn't adapt it's cadence. An external source of rate doesn't really exist, other than some external source that can provide throttle information. There would remain time skew between the runner system and the external system though, but for a throttle that's likely fine. A central notion of ProcessingTime also allows the runner to "smear" processing time so if there's a particularly long delay, it doesn't need to catch up at once. I don't think that's relevant for the throttle case though, since with the described clock mechanism and the communication back to the runner, the unblocking notion is probably fine. On this note, I have become skeptical that a global throttling rate can be done well with local information. For streaming dataflow, we can have an approximate solution by knowing the number of keys and doing per-key throttling because keys (at least up to hundreds per worker) are all processed concurrently. This solution doesn't even require state + timers and would best be done by standard sleeps. For most other systems, including dataflow batch, this would massively under throttle. Here we need to either add something to the model, or do something outside the model, to discover, dynamically, how many siblings are being concurrently run. (This could be done at a worker/process level, rather than bundle level, as well.) The ability to broadcast, aggregate, and read dynamic, provisional from all workers could help in other cases too (e.g. a more efficient top N), but this is a whole new thread... So while I think the semantics of processing timers in batch is worth solving, this probably isn't the best application. Yes, it seems that under the assumption of dynamic parallelism defined by runner defining global throttling rate is not possible under the current model. But maybe (rather than introducing a whole new concept) we could propagate the informatoin about current parallelism from runner to DoFn via ProcessContext? For some runners that would be as easy as returning a constant. Dynamic runners would be more involved, but the only other option than propagaring parallelism from runner to workers seems to be introduction of a whole new worker <-> runner communication channel, so that worker could ask runner for a permission to proceed with processing data based on some (global) condition. It feels somewhat too complex given the motivating example. Maybe there could be others so that this could be generalized to a concept, what comes to mind is something Flink calls "watermark alignment", which throttles sources based on the event-time progress of individual partitions, so that partitions that are too ahead of time do not blow up downstream state. These might be related concepts. We'd need a discussion of what an SDK must do if the runner doesn't support the central clock for completeness, and consistency. On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský wrote: On 2/27/24 14:51, Kenneth Knowles wrote: I very much like the idea of processing time clock as a parameter to @ProcessElement. That will be obviously useful and remove a source of inconsistency, in addition to letting the runner/SDK harness control it. I also like the idea of passi
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On 2/27/24 19:22, Robert Bradshaw via dev wrote: On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles wrote: Pulling out focus points: On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev wrote: I can't act on something yet [...] but I expect to be able to [...] at some time in the processing-time future. I like this as a clear and internally-consistent feature description. It describes ProcessContinuation and those timers which serve the same purpose as ProcessContinuation. On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev wrote: I can't think of a batch or streaming scenario where it would be correct to not wait at least that long The main reason we created timers: to take action in the absence of data. The archetypal use case for processing time timers was/is "flush data from state if it has been sitting there too long". For this use case, the right behavior for batch is to skip the timer. It is actually basically incorrect to wait. Good point calling out the distinction between "I need to wait in case there's more data." and "I need to wait for something external." We can't currently distinguish between the two, but a batch runner can say something definitive about the first. Feels like we need a new primitive (or at least new signaling information on our existing primitive). Runners signal end of data to a DoFn via (input) watermark. Is there a need for additional information? On Fri, Feb 23, 2024 at 3:54 PM Robert Burke wrote: It doesn't require a new primitive. IMO what's being proposed *is* a new primitive. I think it is a good primitive. It is the underlying primitive to ProcessContinuation. It would be user-friendly as a kind of timer. But if we made this the behavior of processing time timers retroactively, it would break everyone using them to flush data who is also reprocessing data. There's two very different use cases ("I need to wait, and block data" vs "I want to act without data, aka NOT wait for data") and I think we should serve both of them, but it doesn't have to be with the same low-level feature. Kenn On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev wrote: On Fri, Feb 23, 2024 at 3:54 PM Robert Burke wrote: While I'm currently on the other side of the fence, I would not be against changing/requiring the semantics of ProcessingTime constructs to be "must wait and execute" as such a solution, and enables the Proposed "batch" process continuation throttling mechanism to work as hypothesized for both "batch" and "streaming" execution. There's a lot to like, as it leans Beam further into the unification of Batch and Stream, with one fewer exception (eg. unifies timer experience further). It doesn't require a new primitive. It probably matches more with user expectations anyway. It does cause looping timer execution with processing time to be a problem for Drains however. I think we have a problem with looping timers plus drain (a mostly streaming idea anyway) regardless. I'd argue though that in the case of a drain, we could updated the semantics as "move watermark to infinity" "existing timers are executed, but new timers are ignored", I don't like the idea of dropping timers for drain. I think correct handling here requires user visibility into whether a pipeline is draining or not. and ensure/and update the requirements around OnWindowExpiration callbacks to be a bit more insistent on being implemented for correct execution, which is currently the only "hard" signal to the SDK side that the window's work is guaranteed to be over, and remaining state needs to be addressed by the transform or be garbage collected. This remains critical for developing a good pattern for ProcessingTime timers within a Global Window too. +1 On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote: Thanks for bringing this up. My position is that both batch and streaming should wait for processing time timers, according to local time (with the exception of tests that can accelerate this via faked clocks). Both ProcessContinuations delays and ProcessingTimeTimers are IMHO isomorphic, and can be implemented in terms of each other (at least in one direction, and likely the other). Both are an indication that I can't act on something yet due to external constraints (e.g. not all the data has been published, or I lack sufficient capacity/quota to push things downstream) but I expect to be able to (or at least would like to check again) at some time in the processing-time future. I can't think of a batch or streaming scenario where it would be correct to not wait at least that long (even in batch inputs, e.g. suppose I'm tailing logs and was eagerly started before they were fully written, or waiting for some kind of (non-data-dependent) quiessence or other operation to finish). On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský wrote: For me it always helps to seek analogy in our physical reality. Stream processing actually has quite a
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On Tue, Feb 27, 2024 at 7:44 AM Robert Burke wrote: > > An "as fast as it can runner" with dynamic splits, would ultimately split to > the systems maximum available parallelism (for stateful DoFns, this is the > number of keys; for SplittableDoFns, this is the maximum sharding of each > input element's restriction. That's what would happen with a "normal" sleep. > > WRT Portability, this means adding a current ProcessingTime field to the > ProcessBundleRequest, and likely also to the ProgressRequest so the runner > could coordinate. ProgressResponse may then need a "asleepUntil" field to > communicate back the state of the bundle, which the runner could then use to > better time its next ProgressRequest, and potentially arrest dynamic > splitting for that bundle. After all, the sleeping bundle is blocked until > processing time has advanced anyway; no progress can be made. > > I like moving the abstraction out of the timer space, as it better aligns > with user intent for the throttle case, and it doesn't require a Stateful > DoFn to operate (orthogonal!), meaning it's useful for It also solves the > testing issue WRT ProcessingTime timers using an absolute time, rather than a > relative time, as the SDK can rebuild it's relative setters for output time > on the new canonical processing time, without user code changing. > > The sleeping inprogress bundle naturally holds back the watermark too. > > I suspect this mechanism would end up tending to over throttle as Reuven > described earlier, since the user is only pushing back on immediate > processing for the current element, not necessarily all elements. This is > particularly likely if there's a long gap between ProgressRequests for the > bundle and the runner doesn't adapt it's cadence. > > An external source of rate doesn't really exist, other than some external > source that can provide throttle information. There would remain time skew > between the runner system and the external system though, but for a throttle > that's likely fine. > > A central notion of ProcessingTime also allows the runner to "smear" > processing time so if there's a particularly long delay, it doesn't need to > catch up at once. I don't think that's relevant for the throttle case though, > since with the described clock mechanism and the communication back to the > runner, the unblocking notion is probably fine. On this note, I have become skeptical that a global throttling rate can be done well with local information. For streaming dataflow, we can have an approximate solution by knowing the number of keys and doing per-key throttling because keys (at least up to hundreds per worker) are all processed concurrently. This solution doesn't even require state + timers and would best be done by standard sleeps. For most other systems, including dataflow batch, this would massively under throttle. Here we need to either add something to the model, or do something outside the model, to discover, dynamically, how many siblings are being concurrently run. (This could be done at a worker/process level, rather than bundle level, as well.) The ability to broadcast, aggregate, and read dynamic, provisional from all workers could help in other cases too (e.g. a more efficient top N), but this is a whole new thread... So while I think the semantics of processing timers in batch is worth solving, this probably isn't the best application. > We'd need a discussion of what an SDK must do if the runner doesn't support > the central clock for completeness, and consistency. > > > On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský wrote: >> >> On 2/27/24 14:51, Kenneth Knowles wrote: >> >> I very much like the idea of processing time clock as a parameter to >> @ProcessElement. That will be obviously useful and remove a source of >> inconsistency, in addition to letting the runner/SDK harness control it. I >> also like the idea of passing a Sleeper or to @ProcessElement. These are >> both good practices for testing and flexibility and runner/SDK language >> differences. >> >> In your (a) (b) (c) can you be more specific about which watermarks you are >> referring to? Are they the same as in my opening email? If so, then what you >> describe is what we already have. >> >> Yes, we have that for streaming, but it does not work this way in batch. In >> my understanding we violate (a), we ignore (b) because we fire timers at GC >> time only and (c) is currently relevant only immediately preceding window GC >> time, but can be defined more generally. But essentially yes, I was just >> trying to restate the streaming processing time semantics in the limited >> batch case. >> >> >> Kenn >> >> On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský wrote: >>> >>> I think that before we introduce a possibly somewhat duplicate new feature >>> we should be certain that it is really semantically different. I'll >>> rephrase the two cases: >>> >>> a) need to wait and block data (delay
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles wrote: > > Pulling out focus points: > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev > wrote: > > I can't act on something yet [...] but I expect to be able to [...] at some > > time in the processing-time future. > > I like this as a clear and internally-consistent feature description. It > describes ProcessContinuation and those timers which serve the same purpose > as ProcessContinuation. > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev > wrote: > > I can't think of a batch or streaming scenario where it would be correct to > > not wait at least that long > > The main reason we created timers: to take action in the absence of data. The > archetypal use case for processing time timers was/is "flush data from state > if it has been sitting there too long". For this use case, the right behavior > for batch is to skip the timer. It is actually basically incorrect to wait. Good point calling out the distinction between "I need to wait in case there's more data." and "I need to wait for something external." We can't currently distinguish between the two, but a batch runner can say something definitive about the first. Feels like we need a new primitive (or at least new signaling information on our existing primitive). > On Fri, Feb 23, 2024 at 3:54 PM Robert Burke wrote: > > It doesn't require a new primitive. > > IMO what's being proposed *is* a new primitive. I think it is a good > primitive. It is the underlying primitive to ProcessContinuation. It would be > user-friendly as a kind of timer. But if we made this the behavior of > processing time timers retroactively, it would break everyone using them to > flush data who is also reprocessing data. > > There's two very different use cases ("I need to wait, and block data" vs "I > want to act without data, aka NOT wait for data") and I think we should serve > both of them, but it doesn't have to be with the same low-level feature. > > Kenn > > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev > wrote: >> >> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke wrote: >> > >> > While I'm currently on the other side of the fence, I would not be against >> > changing/requiring the semantics of ProcessingTime constructs to be "must >> > wait and execute" as such a solution, and enables the Proposed "batch" >> > process continuation throttling mechanism to work as hypothesized for both >> > "batch" and "streaming" execution. >> > >> > There's a lot to like, as it leans Beam further into the unification of >> > Batch and Stream, with one fewer exception (eg. unifies timer experience >> > further). It doesn't require a new primitive. It probably matches more >> > with user expectations anyway. >> > >> > It does cause looping timer execution with processing time to be a problem >> > for Drains however. >> >> I think we have a problem with looping timers plus drain (a mostly >> streaming idea anyway) regardless. >> >> > I'd argue though that in the case of a drain, we could updated the >> > semantics as "move watermark to infinity" "existing timers are executed, >> > but new timers are ignored", >> >> I don't like the idea of dropping timers for drain. I think correct >> handling here requires user visibility into whether a pipeline is >> draining or not. >> >> > and ensure/and update the requirements around OnWindowExpiration callbacks >> > to be a bit more insistent on being implemented for correct execution, >> > which is currently the only "hard" signal to the SDK side that the >> > window's work is guaranteed to be over, and remaining state needs to be >> > addressed by the transform or be garbage collected. This remains critical >> > for developing a good pattern for ProcessingTime timers within a Global >> > Window too. >> >> +1 >> >> > >> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote: >> > > Thanks for bringing this up. >> > > >> > > My position is that both batch and streaming should wait for >> > > processing time timers, according to local time (with the exception of >> > > tests that can accelerate this via faked clocks). >> > > >> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO >> > > isomorphic, and can be implemented in terms of each other (at least in >> > > one direction, and likely the other). Both are an indication that I >> > > can't act on something yet due to external constraints (e.g. not all >> > > the data has been published, or I lack sufficient capacity/quota to >> > > push things downstream) but I expect to be able to (or at least would >> > > like to check again) at some time in the processing-time future. I >> > > can't think of a batch or streaming scenario where it would be correct >> > > to not wait at least that long (even in batch inputs, e.g. suppose I'm >> > > tailing logs and was eagerly started before they were fully written, >> > > or waiting for some kind of (non-data-dependent) quiess
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On 2/27/24 16:36, Robert Burke wrote: An "as fast as it can runner" with dynamic splits, would ultimately split to the systems maximum available parallelism (for stateful DoFns, this is the number of keys; for SplittableDoFns, this is the maximum sharding of each input element's restriction. That's what would happen with a "normal" sleep. I see. It is definitely possible for a runner to split all processing to maximum parallelism, but - provided this cannot be controlled by user - can the semantics of the Throttle transform be even consistently defined in terms of processing time? Seems it would require a coordination with the runner so that user-code would at least be aware of current parallelism. The situation is easier for runners that set parallelism upfront. WRT Portability, this means adding a current ProcessingTime field to the ProcessBundleRequest, and likely also to the ProgressRequest so the runner could coordinate. ProgressResponse may then need a "asleepUntil" field to communicate back the state of the bundle, which the runner could then use to better time its next ProgressRequest, and potentially arrest dynamic splitting for that bundle. After all, the sleeping bundle is blocked until processing time has advanced anyway; no progress can be made. I like moving the abstraction out of the timer space, as it better aligns with user intent for the throttle case, and it doesn't require a Stateful DoFn to operate (orthogonal!), meaning it's useful for It also solves the testing issue WRT ProcessingTime timers using an absolute time, rather than a relative time, as the SDK can rebuild it's relative setters for output time on the new canonical processing time, without user code changing. With what was said above - is the definition of sleep (pause) valid in the context of a bundle? By the same logic of splitting keys, "enough fast and efficient runner" could delay only the paused bundle and start processing different bundle (via different DoFn). It might require splitting bundles by keys, but should be possible. Seems that would in the end make the feature useless as well. The sleeping inprogress bundle naturally holds back the watermark too. I suspect this mechanism would end up tending to over throttle as Reuven described earlier, since the user is only pushing back on immediate processing for the current element, not necessarily all elements. This is particularly likely if there's a long gap between ProgressRequests for the bundle and the runner doesn't adapt it's cadence. An external source of rate doesn't really exist, other than some external source that can provide throttle information. There would remain time skew between the runner system and the external system though, but for a throttle that's likely fine. A central notion of ProcessingTime also allows the runner to "smear" processing time so if there's a particularly long delay, it doesn't need to catch up at once. I don't think that's relevant for the throttle case though, since with the described clock mechanism and the communication back to the runner, the unblocking notion is probably fine. We'd need a discussion of what an SDK must do if the runner doesn't support the central clock for completeness, and consistency. On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský wrote: On 2/27/24 14:51, Kenneth Knowles wrote: I very much like the idea of processing time clock as a parameter to @ProcessElement. That will be obviously useful and remove a source of inconsistency, in addition to letting the runner/SDK harness control it. I also like the idea of passing a Sleeper or to @ProcessElement. These are both good practices for testing and flexibility and runner/SDK language differences. In your (a) (b) (c) can you be more specific about which watermarks you are referring to? Are they the same as in my opening email? If so, then what you describe is what we already have. Yes, we have that for streaming, but it does not work this way in batch. In my understanding we violate (a), we ignore (b) because we fire timers at GC time only and (c) is currently relevant only immediately preceding window GC time, but can be defined more generally. But essentially yes, I was just trying to restate the streaming processing time semantics in the limited batch case. Kenn On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský wrote: I think that before we introduce a possibly somewhat duplicate new feature we should be certain that it is really semantically different. I'll rephrase the two cases: a) need to wait and block data (delay) - the use case is the motivating example of Throttle transform b) act without data, not block Provided we align processing time with local machine clock (or better, because of testing, make current processing time available via context t
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
An "as fast as it can runner" with dynamic splits, would ultimately split to the systems maximum available parallelism (for stateful DoFns, this is the number of keys; for SplittableDoFns, this is the maximum sharding of each input element's restriction. That's what would happen with a "normal" sleep. WRT Portability, this means adding a current ProcessingTime field to the ProcessBundleRequest, and likely also to the ProgressRequest so the runner could coordinate. ProgressResponse may then need a "asleepUntil" field to communicate back the state of the bundle, which the runner could then use to better time its next ProgressRequest, and potentially arrest dynamic splitting for that bundle. After all, the sleeping bundle is blocked until processing time has advanced anyway; no progress can be made. I like moving the abstraction out of the timer space, as it better aligns with user intent for the throttle case, and it doesn't require a Stateful DoFn to operate (orthogonal!), meaning it's useful for It also solves the testing issue WRT ProcessingTime timers using an absolute time, rather than a relative time, as the SDK can rebuild it's relative setters for output time on the new canonical processing time, without user code changing. The sleeping inprogress bundle naturally holds back the watermark too. I suspect this mechanism would end up tending to over throttle as Reuven described earlier, since the user is only pushing back on immediate processing for the current element, not necessarily all elements. This is particularly likely if there's a long gap between ProgressRequests for the bundle and the runner doesn't adapt it's cadence. An external source of rate doesn't really exist, other than some external source that can provide throttle information. There would remain time skew between the runner system and the external system though, but for a throttle that's likely fine. A central notion of ProcessingTime also allows the runner to "smear" processing time so if there's a particularly long delay, it doesn't need to catch up at once. I don't think that's relevant for the throttle case though, since with the described clock mechanism and the communication back to the runner, the unblocking notion is probably fine. We'd need a discussion of what an SDK must do if the runner doesn't support the central clock for completeness, and consistency. On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský wrote: > On 2/27/24 14:51, Kenneth Knowles wrote: > > I very much like the idea of processing time clock as a parameter > to @ProcessElement. That will be obviously useful and remove a source of > inconsistency, in addition to letting the runner/SDK harness control it. I > also like the idea of passing a Sleeper or to @ProcessElement. These are > both good practices for testing and flexibility and runner/SDK language > differences. > > In your (a) (b) (c) can you be more specific about which watermarks you > are referring to? Are they the same as in my opening email? If so, then > what you describe is what we already have. > > Yes, we have that for streaming, but it does not work this way in batch. > In my understanding we violate (a), we ignore (b) because we fire timers at > GC time only and (c) is currently relevant only immediately preceding > window GC time, but can be defined more generally. But essentially yes, I > was just trying to restate the streaming processing time semantics in the > limited batch case. > > > Kenn > > On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský wrote: > >> I think that before we introduce a possibly somewhat duplicate new >> feature we should be certain that it is really semantically different. I'll >> rephrase the two cases: >> >> a) need to wait and block data (delay) - the use case is the motivating >> example of Throttle transform >> >> b) act without data, not block >> >> Provided we align processing time with local machine clock (or better, >> because of testing, make current processing time available via context to >> @ProcessElement) it seems to possble to unify both cases under slightly >> updated semantics of processing time timer in batch: >> >> a) processing time timers fire with best-effort, i.e. trying to minimize >> delay between firing timestamp and timer's timestamp >> b) timer is valid only in the context of current key-window, once >> watermark passes window GC time for the particular window that created the >> timer, it is ignored >> c) if timer has output timestamp, this timestamp holds watermark (but >> this is currently probably noop, because runners currently do no propagate >> (per-key) watermark in batch, I assume) >> >> In case b) there might be needed to distinguish cases when timer has >> output timestamp, if so, it probably should be taken into account. >> >> Now, such semantics should be quite aligned with what we do in streaming >> case and what users generally expect. The blocking part can be implemented >> in @ProcessElement using buffer & timer, o
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
On 2/27/24 14:51, Kenneth Knowles wrote: I very much like the idea of processing time clock as a parameter to @ProcessElement. That will be obviously useful and remove a source of inconsistency, in addition to letting the runner/SDK harness control it. I also like the idea of passing a Sleeper or to @ProcessElement. These are both good practices for testing and flexibility and runner/SDK language differences. In your (a) (b) (c) can you be more specific about which watermarks you are referring to? Are they the same as in my opening email? If so, then what you describe is what we already have. Yes, we have that for streaming, but it does not work this way in batch. In my understanding we violate (a), we ignore (b) because we fire timers at GC time only and (c) is currently relevant only immediately preceding window GC time, but can be defined more generally. But essentially yes, I was just trying to restate the streaming processing time semantics in the limited batch case. Kenn On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský wrote: I think that before we introduce a possibly somewhat duplicate new feature we should be certain that it is really semantically different. I'll rephrase the two cases: a) need to wait and block data (delay) - the use case is the motivating example of Throttle transform b) act without data, not block Provided we align processing time with local machine clock (or better, because of testing, make current processing time available via context to @ProcessElement) it seems to possble to unify both cases under slightly updated semantics of processing time timer in batch: a) processing time timers fire with best-effort, i.e. trying to minimize delay between firing timestamp and timer's timestamp b) timer is valid only in the context of current key-window, once watermark passes window GC time for the particular window that created the timer, it is ignored c) if timer has output timestamp, this timestamp holds watermark (but this is currently probably noop, because runners currently do no propagate (per-key) watermark in batch, I assume) In case b) there might be needed to distinguish cases when timer has output timestamp, if so, it probably should be taken into account. Now, such semantics should be quite aligned with what we do in streaming case and what users generally expect. The blocking part can be implemented in @ProcessElement using buffer & timer, once there is need to wait, it can be implemented in user code using plain sleep(). That is due to the alignment between local time and definition of processing time. If we had some reason to be able to run faster-than-wall-clock (as I'm still not in favor of that), we could do that using ProcessContext.sleep(). Delaying processing in the @ProcessElement should result in backpressuring and backpropagation of this backpressure from the Throttle transform to the sources as mentioned (of course this is only for the streaming case). Is there anything missing in such definition that would still require splitting the timers into two distinct features? Jan On 2/26/24 21:22, Kenneth Knowles wrote: Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such. OutputTime is always an event time timestamp so it isn't even allowed to be set outside the window (or you'd end up with an element assigned to a window that it isn't within, since OutputTime essentially represents reserving the right to output an element with that timestamp) Kenn On Mon, Feb 26, 2024 at 3:19 PM Robert Burke wrote: Agreed that a retroactive behavior change would be bad, even if tied to a beam version change. I agree that it meshes well with the general theme of State & Timers exposing underlying primitives for implementing Windowing and similar. I'd say the distinction between the two might be additional complexity for users to grok, and would need to be documented well, as both operate in the ProcessingTime domain, but differently. What to call this new timer then? DelayTimer? "A DelayTimer sets an instant in ProcessingTime at which point computations can continue. Runners will prevent the EventTimer watermark from advancing past the set OutputTime until Processing Time has advanced to at least the provided instant to execute the timers callback. This can be used to allow the runner to constrain pipeline throughput with user guidance." I'd probably add that a timer with an output time outside of the window would not be guaranteed to fire, and that OnWindowExpiry is the correct way to ensure cleanup occurs. No solution to the Looping Timers on Drain problem here, but i think that's ulti
Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)
I very much like the idea of processing time clock as a parameter to @ProcessElement. That will be obviously useful and remove a source of inconsistency, in addition to letting the runner/SDK harness control it. I also like the idea of passing a Sleeper or to @ProcessElement. These are both good practices for testing and flexibility and runner/SDK language differences. In your (a) (b) (c) can you be more specific about which watermarks you are referring to? Are they the same as in my opening email? If so, then what you describe is what we already have. Kenn On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský wrote: > I think that before we introduce a possibly somewhat duplicate new feature > we should be certain that it is really semantically different. I'll > rephrase the two cases: > > a) need to wait and block data (delay) - the use case is the motivating > example of Throttle transform > > b) act without data, not block > > Provided we align processing time with local machine clock (or better, > because of testing, make current processing time available via context to > @ProcessElement) it seems to possble to unify both cases under slightly > updated semantics of processing time timer in batch: > > a) processing time timers fire with best-effort, i.e. trying to minimize > delay between firing timestamp and timer's timestamp > b) timer is valid only in the context of current key-window, once > watermark passes window GC time for the particular window that created the > timer, it is ignored > c) if timer has output timestamp, this timestamp holds watermark (but > this is currently probably noop, because runners currently do no propagate > (per-key) watermark in batch, I assume) > > In case b) there might be needed to distinguish cases when timer has > output timestamp, if so, it probably should be taken into account. > > Now, such semantics should be quite aligned with what we do in streaming > case and what users generally expect. The blocking part can be implemented > in @ProcessElement using buffer & timer, once there is need to wait, it can > be implemented in user code using plain sleep(). That is due to the > alignment between local time and definition of processing time. If we had > some reason to be able to run faster-than-wall-clock (as I'm still not in > favor of that), we could do that using ProcessContext.sleep(). Delaying > processing in the @ProcessElement should result in backpressuring and > backpropagation of this backpressure from the Throttle transform to the > sources as mentioned (of course this is only for the streaming case). > > Is there anything missing in such definition that would still require > splitting the timers into two distinct features? > > Jan > On 2/26/24 21:22, Kenneth Knowles wrote: > > Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such. > > OutputTime is always an event time timestamp so it isn't even allowed to > be set outside the window (or you'd end up with an element assigned to a > window that it isn't within, since OutputTime essentially represents > reserving the right to output an element with that timestamp) > > Kenn > > On Mon, Feb 26, 2024 at 3:19 PM Robert Burke wrote: > >> Agreed that a retroactive behavior change would be bad, even if tied to a >> beam version change. I agree that it meshes well with the general theme of >> State & Timers exposing underlying primitives for implementing Windowing >> and similar. I'd say the distinction between the two might be additional >> complexity for users to grok, and would need to be documented well, as both >> operate in the ProcessingTime domain, but differently. >> >> What to call this new timer then? DelayTimer? >> >> "A DelayTimer sets an instant in ProcessingTime at which point >> computations can continue. Runners will prevent the EventTimer watermark >> from advancing past the set OutputTime until Processing Time has advanced >> to at least the provided instant to execute the timers callback. This can >> be used to allow the runner to constrain pipeline throughput with user >> guidance." >> >> I'd probably add that a timer with an output time outside of the window >> would not be guaranteed to fire, and that OnWindowExpiry is the correct way >> to ensure cleanup occurs. >> >> No solution to the Looping Timers on Drain problem here, but i think >> that's ultimately an orthogonal discussion, and will restrain my thoughts >> on that for now. >> >> This isn't a proposal, but exploring the solution space within our >> problem. We'd want to break down exactly what different and the same for >> the 3 kinds of timers... >> >> >> >> >> On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles wrote: >> >>> Pulling out focus points: >>> >>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> > I can't act on something yet [...] but I expect to be able to [...] at >>> some time in the processing-time future. >>> >>> I like this as a clear and internally-consistent
Beam High Priority Issue Report (37)
This is your daily summary of Beam's current high priority issues that may need attention. See https://beam.apache.org/contribute/issue-priorities for the meaning and expectations around issue priorities. Unassigned P0 Issues: https://github.com/apache/beam/issues/30377 [Failing Test]: 404 opensource.org/licenses cause java container build fail Unassigned P1 Issues: https://github.com/apache/beam/issues/29971 [Bug]: FixedWindows not working for large Kafka topic https://github.com/apache/beam/issues/29926 [Bug]: FileIO: lack of timeouts may cause the pipeline to get stuck indefinitely https://github.com/apache/beam/issues/29902 [Bug]: Messages are not ACK on Pubsub starting Beam 2.52.0 on Flink Runner in detached mode https://github.com/apache/beam/issues/29099 [Bug]: FnAPI Java SDK Harness doesn't update user counters in OnTimer callback functions https://github.com/apache/beam/issues/28760 [Bug]: EFO Kinesis IO reader provided by apache beam does not pick the event time for watermarking https://github.com/apache/beam/issues/28383 [Failing Test]: org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testMaxThreadMetric https://github.com/apache/beam/issues/28326 Bug: apache_beam.io.gcp.pubsublite.ReadFromPubSubLite not working https://github.com/apache/beam/issues/27892 [Bug]: ignoreUnknownValues not working when using CreateDisposition.CREATE_IF_NEEDED https://github.com/apache/beam/issues/27616 [Bug]: Unable to use applyRowMutations() in bigquery IO apache beam java https://github.com/apache/beam/issues/27486 [Bug]: Read from datastore with inequality filters https://github.com/apache/beam/issues/27314 [Failing Test]: bigquery.StorageApiSinkCreateIfNeededIT.testCreateManyTables[1] https://github.com/apache/beam/issues/27238 [Bug]: Window trigger has lag when using Kafka and GroupByKey on Dataflow Runner https://github.com/apache/beam/issues/26911 [Bug]: UNNEST ARRAY with a nested ROW (described below) https://github.com/apache/beam/issues/26343 [Bug]: apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests.test_read_queries is flaky https://github.com/apache/beam/issues/26329 [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource https://github.com/apache/beam/issues/26041 [Bug]: Unable to create exactly-once Flink pipeline with stream source and file sink https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK Harness ProcessBundleProgress https://github.com/apache/beam/issues/24313 [Flaky]: apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder will drop message id and orderingKey https://github.com/apache/beam/issues/22913 [Bug]: beam_PostCommit_Java_ValidatesRunner_Flink is flakes in org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit test action StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial (order 1000 elements) numpy input flakes in non-cython environment https://github.com/apache/beam/issues/21476 WriteToBigQuery Dynamic table destinations returns wrong tableId https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit data at GC time https://github.com/apache/beam/issues/21121 apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it flakey https://github.com/apache/beam/issues/21104 Flaky: apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers https://github.com/apache/beam/issues/20976 apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit empty pane when it should https://github.com/apache/beam/issues/19814 Flink streaming flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful P1 Issues with no update in the last week: https://github.com/apache/beam/issues/29515 [Bug]: WriteToFiles in python leave few records in temp directory when writing to large number (100+) of files https://github.com/apache/beam/issues/28219 [Bug]: BigQuery IO Batch load using File_load causing the same job id ignoring inserts as the