I agree with temporal join's semantics being clean. I guess for some (most) cases one can choose/promote one of the inputs to be the "main" one and do the temporal join with the others. For others, though, there might be no such "main" stream—at least in a clearly defined way. Do you think something along the lines of #3 would make sense?
Huge thanks for your answer David. Much appreciated! Salva On Mon, Mar 30, 2026 at 11:22 PM David Anderson <[email protected]> wrote: > Interesting question. Maybe it's useful to compare this with what Flink > SQL does. > > A strategy that sometimes works well is the one used by temporal joins in > Flink SQL. In this scenario there's an event that's being enriched, e.g., > an Order being enriched with info about the Customer, Product, Payment, > Shipment, etc, using the versions of those enrichment tables that were in > effect at the time of the Order. The timestamp of the output record is the > timestamp of the Order, and the join will wait until the watermarks of the > other tables have advanced to that point. > > The semantics of this event time temporal join are clean, and the output > stream is monotonic -- but things get messy when enrichment streams become > idle. Unfortunately, this is rather common. > > Flink SQL's non-temporal joins don't set a timestamp on the output record. > This makes it impossible to use the output of a regular join as input to a > temporal operation without first materializing and re-ingesting the stream > of join results. An alternative that's been discussed would be to use the > timestamp of the latest processed record (your strategy #2), but I'm not > aware of any concrete plans to pursue this. > > As for per-key watermarks, in my opinion we'll never see this in Flink. > > On Wed, Mar 25, 2026 at 9:58 AM Salva Alcántara <[email protected]> > wrote: > >> This is a conceptual question, and the answer will probably depend on the >> specific use case. >> >> But to make the context clear, I'm joining multiple input streams by a >> common (entity) id. Those inputs represent partial/disjoint updates for the >> same entity. E.g., consider device interfaces, device vms, device metrics, >> device vendor information, etc. I've personally worked on multiple such >> assemblers and the event time strategy was never super clear. That is, how >> to propagate the inputs' event times to the output, assembled entity. >> Again, conceptually those services emit entity snapshots at a given time. >> >> The strategies that I've commonly found in practice: >> >> 1. Use the max timestamp seen among the inputs (that has some nice >> properties like being monotonically increasing but it can potentially hide >> lag/delay in one of the inputs) >> 2. Use the timestamp of the latest processed event (this is how Flink >> would propagate the timestamp by default, e.g., when using a >> CoProcessFunction; this approach does not look semantically consistent plus >> the output timestamp wouldn't be monotonically increasing) >> 3. Use the watermark in conjunction with event time timers to emit >> periodic snapshots for example. This would recover the monotonicity at the >> expense of extra latency, exacerbated by the fact that watermarks are not >> calculated per-key so to say. On the flip side, it would immediately show >> if one source is delayed, which might be desirable—or not! >> >> Taking the chance for asking if there is something on the roadmap for >> supporting per-key watermarks along the lines of this: >> - https://github.com/TawfikYasser/Keyed-Watermarks-in-Apache-Flink >> >> Thanks! >> >> Salva >> >>
