There's another interesting API, which is being discussed for the
internal variant of Dataflow, which is that rather than allowing one
to fabricate timestamps (or windows) ex nihilo one would instead need
ot ask for a "timestamped" or "windowed" element in the Process
method, from which one could construct a new timestamped/windowed
element (with a new value, but the same timestamp/window/paneinfo)
that could then be safely emitted. I'm curious how constraining this
would be.

I'm not sure I follow. Do you suggest that - for the case of in-memory batching - one would store a TimestampedValue in the buffer and when flushing the buffer one would say "I'm emitting this value, that was created based on this input element"? That seems to work fine, though I suppose this is probably not the main motivation for such API. :)

On 3/28/22 20:54, Robert Bradshaw wrote:
On Mon, Mar 28, 2022 at 11:45 AM Jan Lukavský <> wrote:
On 3/28/22 20:17, Reuven Lax wrote:

On Mon, Mar 28, 2022 at 11:08 AM Robert Bradshaw <> wrote:
On Mon, Mar 28, 2022 at 11:04 AM Reuven Lax <> wrote:
On Mon, Mar 28, 2022 at 10:59 AM Evan Galpin <> wrote:
I don't believe that the issue is Flink specific but rather that Flink is one 
example of many potential examples.  Enforcing that watermark updates can only 
happen at bundle boundaries would ensure that any data buffered while 
processing a single bundle in a DoFn could be output ON_TIME, especially 
without any need for a TimerSpec to explicitly hold the watermark for that 
purpose.  This is in reference to data buffered within a single bundle, and not 
cross-bundle buffering such as in the case of GroupIntoBatches.

Any in-flight data (i.e. data being processed that is not yet committed back to 
the runner) must hold up the output watermark. Since in the Beam model all 
records in a bundle are somewhat atomic (e.g. if the bundle succeeds, none of 
of them should be replayed in a proper exactly-once runner), I think this 
implicitly means that any elements in an in-flight bundle must hold up the 
watermark. This doesn't mean that the watermark can't advance while the bundle 
is in flight -just that it can't advance past any of the timestamps outstanding 
in the bundle.
Yes. The difficulty is that we don't have much visibility into
"timestamps outstanding in the bundle" so we have to take
min(timestamps of input elements in the bundle) which is not that
different from only having watermark updates at bundle boundaries.


Agree, this works exactly the same. The requirement is not to not update the 
watermark, but not to update it past any on-time element in the bundle. Not 
updating the watermark at all is one solution, computing min(timestamps in 
bundle) works the same. Unfortunately, Flink does not construct bundles in 
advance, it is more an ad-hoc concept. Therefore the only way to hold the 
watermark is not to update it, because the timestamps of elements that will be 
part of the bundle are not known.

Two more questions:

  a) it seems that we are missing some @ValidatesRunner tests for this, right?

  b) should we relax the restriction of not allowing outputWithTimestamp() output element before the current 
element? I think it should be "before lowest element in the current bundle" or "before output 
watermark, if not already late, or not droppable if late (uh, this gets a little complicated :))". Not 
allowing outputting element with timestamp lower than the current element seems to be just a 
"safety-first" solution to the problem discussed here and is too restrictive. It could be 
worked-around using getAllowedTimestampSkew(), but that can cause errors.
Yes, outputWithTimestamp should likely be restricted to min(elements
seen so far).

Take for example a PCollection with 1 second Fixed windowing.  The PCollection 
holds payload bodies for an external API to which requests will be made.  A 
hypothetical runner creates a bundle with Element A and Element B where Element 
A belongs to the window [0:00:01, 0:00:02) and Element B belongs to the window 
[0:00:02, 0:00:03).  Assume that the DoFn is going to buffer all elements in a 
bundle so as to generate fewer round-trip requests to the external API, and 
then output the corresponding responses. The following is a high-level order of 
events that could result in data being labelled as LATE:

1. Watermark is 0:00:00
2. DoFn receives the bundle containing both Element A and Element B
3. Element A is processed by the DoFn, buffering in-memory and 
4. Watermark is (maybe) updated after having processed the element; let's 
assume in this example it is in fact updated to 0:00:02
5. Element B (from the same bundle) is processed by the DoFn
6. It's the end of the bundle, so now the in-memory buffered entities are used 
to make a request to external API
7. The API responses are gathered and intended to be output to the same window 
from which the corresponding element with request data originated (Element A 
and Element B carried this data)
8. The response data associated with the request payload found in Element A is 
output with the timestamp of Element A i.e. something in the range of Element 
A's window [0:00:01, 0:00:02)
9. The data in the prior step is considered LATE, strictly as a result of 
updating the watermark to 0:00:02 in Step 4 above

If Step 4 was moved to be the last step in the process (i.e. at the bundle 
boundary) this issue would be avoided.  I would also argue that updating the 
watermark only after receiving a response for an input Element is a more 
accurate depiction of having completed processing for the element.  All that 
said, I could buy the argument that the above description might represent an 
anti-pattern of sorts where response data should actually be output with a 
timestamp corresponding to its receipt rather than the timestamp of its 
corresponding input element carrying the request body.

- Evan

On Mon, Mar 28, 2022 at 11:07 AM Reuven Lax <> wrote:
I agree with you that changing on-time elements to late elements is incorrect, 
however I don't quite understand why doing things on bundle boundaries helps. 
Is this specific to Flink?

On Mon, Mar 28, 2022 at 1:07 AM Jan Lukavský <> wrote:
Hi Robert,

I had the same impression that holding the watermark between bundles is
actually not part of the computational model. Now the question is -
should it be?

As you said, buffering and emitting in-memory buffered data means
possibly outputting data that arrived as ON_TIME, but is outputted as
LATE (or droppable, which is even worse). My understanding is that this
is why there is the (deprecated) getAllowedTimestampSkew() method of
DoFn, but that only bypasses the check, does not solve the issue (which
is why it is deprecated, I suppose). I strongly believe that outputting
elements that switch from ON_TIME to LATE is a correctness bug, because
it has the potential to violate causality (which is strongly
counter-intuitive in our universe :)). For some pipelines it can
definitely cause incorrect outputs.

If we could ensure the output watermark gets updated only between
@FinishBundle and @StartBundle call then this problem would go away. I
looked into the code of FlinkRunner and it seems to me that we could
quite easily ensure this by not outputting watermark when a bundle is
open and output it once it finishes. I didn't dig into that too deep, so
I don't know if there would be any caveats, the question is apparently,
if we could make these guarantees for other runners as well and if we
could sensibly create a @ValidatesRunner test.



On 3/25/22 23:06, Robert Bradshaw wrote:
I do not think there is a hard and fast rule about updating watermarks
only at bundle boundaries. This seems perfectly legal for a pure 1:1
mapping DoFn. The issue is that DoFns are allowed to buffer data and
emit them in a later process (or finishBundle). If the watermark has
moved on, that may result in late data. We don't really have a way for
a DoFn to declare *it's* output watermark (i.e. "I promise not to emit
any data before this timestamp.")

On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin <> wrote:
Thanks for starting this thread Jan, I'm keen to hear thoughts and outcomes!  I 
thought I would mention that answers to the questions posed here will help to 
unblock a 2.38.0 release blocker[1].


On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský <> wrote:

this is follow-up thread started from [1]. In the thread there is mentioned 
multiple times that (in stateless ParDo), the output watermark is allowed to 
advance only on bundle boundaries [2]. Essentially that would mean that 
anything in between calls to @StartBundle and @FinishBundle would be processed 
in single instant in (output) event-time. This makes perfect sense.

The issue is that it seems that not all runners actually implement this behavior. FlinkRunner for instance 
does not have a "natural" concept of bundles and those are created in a more ad-hoc way to adhere 
with the DoFn life-cycle (see [3]). Watermark updates and elements are completely interleaved without any 
synchronization with bundle "open" or "close". If watermark updates are allowed to happen 
only on boundaries of bundles, then this seems to break this contract.

The question therefore is - should we consider FlinkRunner as non-compliant with this 
aspect of the Apache Beam model or is this an "optional" part that runners are 
free to implement at will? In the case of the former, do we miss some @ValidatesRunner 
tests for this?





