I agree with temporal join's semantics being clean. I guess for some (most)
cases one can choose/promote one of the inputs to be the "main" one and do
the temporal join with the others. For others, though, there might be no
such "main" stream—at least in a clearly defined way. Do you think
something along the lines of #3 would make sense?

Huge thanks for your answer David. Much appreciated!

Salva

On Mon, Mar 30, 2026 at 11:22 PM David Anderson <[email protected]>
wrote:

> Interesting question. Maybe it's useful to compare this with what Flink
> SQL does.
>
> A strategy that sometimes works well is the one used by temporal joins in
> Flink SQL. In this scenario there's an event that's being enriched, e.g.,
> an Order being enriched with info about the Customer, Product, Payment,
> Shipment, etc, using the versions of those enrichment tables that were in
> effect at the time of the Order. The timestamp of the output record is the
> timestamp of the Order, and the join will wait until the watermarks of the
> other tables have advanced to that point.
>
> The semantics of this event time temporal join are clean, and the output
> stream is monotonic -- but things get messy when enrichment streams become
> idle. Unfortunately, this is rather common.
>
> Flink SQL's non-temporal joins don't set a timestamp on the output record.
> This makes it impossible to use the output of a regular join as input to a
> temporal operation without first materializing and re-ingesting the stream
> of join results. An alternative that's been discussed would be to use the
> timestamp of the latest processed record (your strategy #2), but I'm not
> aware of any concrete plans to pursue this.
>
> As for per-key watermarks, in my opinion we'll never see this in Flink.
>
> On Wed, Mar 25, 2026 at 9:58 AM Salva Alcántara <[email protected]>
> wrote:
>
>> This is a conceptual question, and the answer will probably depend on the
>> specific use case.
>>
>> But to make the context clear, I'm joining multiple input streams by a
>> common (entity) id. Those inputs represent partial/disjoint updates for the
>> same entity. E.g., consider device interfaces, device vms, device metrics,
>> device vendor information, etc. I've personally worked on multiple such
>> assemblers and the event time strategy was never super clear. That is, how
>> to propagate the inputs' event times to the output, assembled entity.
>> Again, conceptually those services emit entity snapshots at a given time.
>>
>> The strategies that I've commonly found in practice:
>>
>> 1. Use the max timestamp seen among the inputs (that has some nice
>> properties like being monotonically increasing but it can potentially hide
>> lag/delay in one of the inputs)
>> 2. Use the timestamp of the latest processed event (this is how Flink
>> would propagate the timestamp by default, e.g., when using a
>> CoProcessFunction; this approach does not look semantically consistent plus
>> the output timestamp wouldn't be monotonically increasing)
>> 3. Use the watermark in conjunction with event time timers to emit
>> periodic snapshots for example. This would recover the monotonicity at the
>> expense of extra latency, exacerbated by the fact that watermarks are not
>> calculated per-key so to say. On the flip side, it would immediately show
>> if one source is delayed, which might be desirable—or not!
>>
>> Taking the chance for asking if there is something on the roadmap for
>> supporting per-key watermarks along the lines of this:
>> - https://github.com/TawfikYasser/Keyed-Watermarks-in-Apache-Flink
>>
>> Thanks!
>>
>> Salva
>>
>>

Reply via email to