@Jan I'm +1 on the idea. Just confirming that this would not negate the ability to buffer or otherwise make use of a settings like FlinkPipelineOptions#setMaxBundleSize[1], the change would imply simply refraining from outputting a watermark change until @FinishBundle is called (across all runners)? Assuming this garners the required support, I'd be interested in collaborating/contributing if development could be parallelized for various runners.
[1] https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L207-L211 - Evan On Mon, Mar 28, 2022 at 4:07 AM Jan Lukavský <[email protected]> wrote: > Hi Robert, > > I had the same impression that holding the watermark between bundles is > actually not part of the computational model. Now the question is - > should it be? > > As you said, buffering and emitting in-memory buffered data means > possibly outputting data that arrived as ON_TIME, but is outputted as > LATE (or droppable, which is even worse). My understanding is that this > is why there is the (deprecated) getAllowedTimestampSkew() method of > DoFn, but that only bypasses the check, does not solve the issue (which > is why it is deprecated, I suppose). I strongly believe that outputting > elements that switch from ON_TIME to LATE is a correctness bug, because > it has the potential to violate causality (which is strongly > counter-intuitive in our universe :)). For some pipelines it can > definitely cause incorrect outputs. > > If we could ensure the output watermark gets updated only between > @FinishBundle and @StartBundle call then this problem would go away. I > looked into the code of FlinkRunner and it seems to me that we could > quite easily ensure this by not outputting watermark when a bundle is > open and output it once it finishes. I didn't dig into that too deep, so > I don't know if there would be any caveats, the question is apparently, > if we could make these guarantees for other runners as well and if we > could sensibly create a @ValidatesRunner test. > > WDYT? > > Jan > > On 3/25/22 23:06, Robert Bradshaw wrote: > > I do not think there is a hard and fast rule about updating watermarks > > only at bundle boundaries. This seems perfectly legal for a pure 1:1 > > mapping DoFn. The issue is that DoFns are allowed to buffer data and > > emit them in a later process (or finishBundle). If the watermark has > > moved on, that may result in late data. We don't really have a way for > > a DoFn to declare *it's* output watermark (i.e. "I promise not to emit > > any data before this timestamp.") > > > > On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin <[email protected]> wrote: > >> Thanks for starting this thread Jan, I'm keen to hear thoughts and > outcomes! I thought I would mention that answers to the questions posed > here will help to unblock a 2.38.0 release blocker[1]. > >> > >> [1] https://issues.apache.org/jira/browse/BEAM-14064 > >> > >> On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský <[email protected]> wrote: > >>> Hi, > >>> > >>> this is follow-up thread started from [1]. In the thread there is > mentioned multiple times that (in stateless ParDo), the output watermark is > allowed to advance only on bundle boundaries [2]. Essentially that would > mean that anything in between calls to @StartBundle and @FinishBundle would > be processed in single instant in (output) event-time. This makes perfect > sense. > >>> > >>> The issue is that it seems that not all runners actually implement > this behavior. FlinkRunner for instance does not have a "natural" concept > of bundles and those are created in a more ad-hoc way to adhere with the > DoFn life-cycle (see [3]). Watermark updates and elements are completely > interleaved without any synchronization with bundle "open" or "close". If > watermark updates are allowed to happen only on boundaries of bundles, then > this seems to break this contract. > >>> > >>> The question therefore is - should we consider FlinkRunner as > non-compliant with this aspect of the Apache Beam model or is this an > "optional" part that runners are free to implement at will? In the case of > the former, do we miss some @ValidatesRunner tests for this? > >>> > >>> Jan > >>> > >>> [1] https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db > >>> > >>> [2] https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50 > >>> > >>> [3] > https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L786 > >>> > >>> >
