Hi Flink Devs, Use cases that I see quite frequently in the real world would benefit from a different watermarking / event time model than the one currently implemented in Flink.
I would call Flink's current approach partition-based watermarking or maybe subtask-based watermarking. In this model the current "event time" is a property local to each subtask instance in a dataflow graph. The event time at any subtask is the minimum of the watermarks it has received on each of it's input streams. There are a couple of issues with this model that are not optimal for some (maybe many) use cases. 1) A single slow subtask (or say source partition) anywhere in the dataflow can mean no progress can be made on the computation at all. 2) In many real world scenarios the time skew across keys can be *many* times greater than the time skew within the data with the same key. In this discussion I'll use "time skew" to refer to the out-of-orderness with respect to timestamp of the data. Out-of-orderness is a mouthful ;) Anyway, let me provide an example or two. In IoT applications the source of events is a particular device out in the world, let's say a device in a connected car application. The data for some particular device may be very bursty and we will certainly get events from these devices in Flink out-of-order just because of things like partitions in Kafka, shuffles in Flink, etc. However, the time skew in the data for a single device should likely be very small (milliseconds or maybe seconds).. However, in the same application the time skew across different devices can be huge (hours or even days). An obvious example of this, again using connected cars as a representative example is the following: Car A is recording data locally at 12:00 pm on Saturday but doesn't currently have a network connection. Car B is doing the same thing but does have a network connection. Car A will transmit it's data when the network comes back on line. Let's say this is at 4pm. Car B was transmitting it's data immediately. This creates a huge time skew (4 hours) in the observed datastream when looked at as a whole. However, the time skew in that data for Car A or Car B alone could be tiny. It will be out of order of course but maybe by only milliseconds or seconds. What the above means in the end for Flink is that the watermarks must be delayed by up to 4 hours or more because we're looking at the data stream as a whole -- otherwise the data for Car A will be considered late. The time skew in the data stream when looked at as a whole is large even though the time skew for any key may be tiny. This is the problem I would like to see a solution for. The basic idea of keeping track of watermarks and event time "per-key" rather than per partition or subtask would solve I think both of these problems stated above and both of these are real issues for production applications. The obvious downside of trying to do this per-key is that the amount of state you have to track is much larger and potentially unbounded. However, I could see this approach working if the keyspace isn't growing rapidly but is stable or grows slowly. The saving grace here is that this may actually be true of the types of applications where this would be especially useful. Think IoT use cases. Another approach to keeping state size in check would be a configurable TTL for a key. Anyway, I'm throwing this out here on the mailing list in case anyone is interested in this discussion, has thought about the problem deeply already, has use cases of their own they've run into or has ideas for a solution to this problem. Thanks for reading.. -Jamie -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> ja...@data-artisans.com