Hi Flink Devs,

Use cases that I see quite frequently in the real world would benefit from
a different watermarking / event time model than the one currently
implemented in Flink.

I would call Flink's current approach partition-based watermarking or maybe
subtask-based watermarking.  In this model the current "event time" is a
property local to each subtask instance in a dataflow graph.  The event
time at any subtask is the minimum of the watermarks it has received on
each of it's input streams.

There are a couple of issues with this model that are not optimal for some
(maybe many) use cases.

1) A single slow subtask (or say source partition) anywhere in the dataflow
can mean no progress can be made on the computation at all.

2) In many real world scenarios the time skew across keys can be *many*
times greater than the time skew within the data with the same key.

In this discussion I'll use "time skew" to refer to the out-of-orderness
with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)

Anyway, let me provide an example or two.

In IoT applications the source of events is a particular device out in the
world, let's say a device in a connected car application.  The data for
some particular device may be very bursty and we will certainly get events
from these devices in Flink out-of-order just because of things like
partitions in Kafka, shuffles in Flink, etc.  However, the time skew in the
data for a single device should likely be very small (milliseconds or maybe
seconds)..

However, in the same application the time skew across different devices can
be huge (hours or even days).  An obvious example of this, again using
connected cars as a representative example is the following:  Car A is
recording data locally at 12:00 pm on Saturday but doesn't currently have a
network connection.  Car B is doing the same thing but does have a network
connection.  Car A will transmit it's data when the network comes back on
line.  Let's say this is at 4pm.  Car B was transmitting it's data
immediately.  This creates a huge time skew (4 hours) in the observed
datastream when looked at as a whole.  However, the time skew in that data
for Car A or Car B alone could be tiny.  It will be out of order of course
but maybe by only milliseconds or seconds.

What the above means in the end for Flink is that the watermarks must be
delayed by up to 4 hours or more because we're looking at the data stream
as a whole -- otherwise the data for Car A will be considered late.  The
time skew in the data stream when looked at as a whole is large even though
the time skew for any key may be tiny.

This is the problem I would like to see a solution for.  The basic idea of
keeping track of watermarks and event time "per-key" rather than per
partition or subtask would solve I think both of these problems stated
above and both of these are real issues for production applications.

The obvious downside of trying to do this per-key is that the amount of
state you have to track is much larger and potentially unbounded.  However,
I could see this approach working if the keyspace isn't growing rapidly but
is stable or grows slowly.  The saving grace here is that this may actually
be true of the types of applications where this would be especially
useful.  Think IoT use cases.  Another approach to keeping state size in
check would be a configurable TTL for a key.

Anyway, I'm throwing this out here on the mailing list in case anyone is
interested in this discussion, has thought about the problem deeply
already, has use cases of their own they've run into or has ideas for a
solution to this problem.

Thanks for reading..

-Jamie


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com

Reply via email to