This is indeed an interesting topic, thanks for starting the discussion,
Jamie!

I now thought about this for a while, since more and more people seem to be
asking about it lately. First, I thought that per-key watermark handling
would not be necessary because it can be done locally (as Paris suggested),
then I realised that that's not actually the case and thought that this
wouldn't be possible. In the end, I came to realise that it is indeed
possible (with some caveats), although with a huge overhead in the amount
of state that we have to keep and with changes to our API. I'll try and
walk you through my thought process.

Let's first look at local watermark tracking, that is, tracking the
watermark locally at the operator that needs it, for example a
WindowOperator. I initially thought that this would be sufficient. Assume
we have a pipeline like this:

Source -> KeyBy -> WindowOperator -> ...

If we have parallelism=1, then all elements for a given key k will be read
by the same source operator instance and they will arrive (in-order) at the
WindowOperator. It doesn't matter whether we track the per-key watermarks
at the Source or at the WindowOperator because we see the same elements in
the same order at each operator, per key.

Now, think about this pipeline:

Source1 --+
          |-> Union -> KeyBy -> WindowOperator -> ...
Source2 --+

(you can either think about two sources or once source that has several
parallel instances, i.e. parallelism > 1)

Here, both Source1 and Source2 can emit elements with our key k. If Source1
is faster than Source2 and the watermarking logic at the WindowOperator
determines the watermark based on the incoming element timestamps (for
example, using the BoundedLatenessTimestampExtractor) then the elements
coming from Source2 will be considered late at the WindowOperator.

>From this we know that our WindowOperator needs to calculate the watermark
similarly to how watermark calculation currently happens in Flink: the
watermark is the minimum of the watermark of all upstream operations. In
this case it would be: the minimum upstream watermarks of operations that
emit elements with key k. For per-partition watermarks this works because
the number of upstream operations is know and we simply keep an array that
has the current upstream watermark for each input operation. For per-key
watermarks this would mean that we have to keep k*u upstream watermarks
where u is the number of upstream operations. This can be quite large.
Another problem is that the observed keys change, i.e. the key space is
evolving and we need to retire keys from our calculations lest we run out
of space.

We could find a solution based on a feature we recently introduced in
Flink: https://github.com/apache/flink/pull/2801. The sources keep track of
whether they have input and signal to downstream operations whether they
should be included in the watermark calculation logic. A similar thing
could be done per-key, where each source signals to downstream operations
that there is a new key and that we should start calculating watermarks for
this. When a source determines that no more data will come for a key (which
in itself is a bit of a tricky problem) then it should signal to downstream
operations to take the key out of watermark calculations, that is that we
can release some space.

The above is analysing, on a purely technical level, the feasibility of
such a feature. I think it is feasible but can be very expensive in terms
of state size requirements. Gabor also pointed this out above and gave a
few suggestions on reducing that size.

We would also need to change our API to allow tracking the lineage of keys
or to enforce that a key stays the same throughout a pipeline. Consider
this pipeline:

Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator

where KeyBy1 and KeyBy2 extract a different key, respectively. How would
watermarks be tracked across this change of keys? Would we know which of
the prior keys and up being keys according to KeyBy2, i.e. do we have some
kind of key lineage information?

One approach for solving this would be to introduce a new API that allows
extracting a key at the source and will keep this key on the elements until
the sink. For example:

DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
input
  .map()
  .window(...) // notice that we don't need keyBy because it is implicit
  .reduce(...)
  .map(...)
  .window(...)
  ...

The DeluxeKeyedStream (name preliminary ;-) would allow the operations that
we today have on KeyedStream and on DataStream and it would always maintain
the key that was assigned at the sources. The result of each operation
would again be a DeluxeKeyedStream. This way, we could track watermarks per
key.

I know it's a bit of a (very) lengthy mail, but what do you think?


On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <m...@gaborhermann.com> wrote:

> Hey all,
>
> Let me share some ideas about this.
>
> @Paris: The local-only progress tracking indeed seems easier, we do not
> need to broadcast anything. Implementation-wise it is easier, but
> performance-wise probably not. If one key can come from multiple
> sources, there could be a lot more network overhead with per-key
> tracking then broadcasting, somewhat paradoxically. Say source instance
> S1 sends messages and watermarks to operator instances O1, O2. In the
> broadcasting case, S1 would send one message to O1 and one to O2 per
> watermark (of course it depends on how fast the watermarks arrive),
> total of 2. Although, if we keep track of per-key watermarks, S1 would
> need to send watermarks for every key directed to O1, also for O2. So if
> 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks
> arrive at the same rate per-key as per-source in the previous case) we
> S1 would send a total of 20 watermarks.
>
> Another question is whether how large the state-per-key is? If it's
> really small (an integer maybe, or state of a small state machine), then
> the overhead of keeping track of a (Long) watermark is large
> memory-wise. E.g. Int state vs. Long watermark results in 3x as large
> state. Also, the checkpointing would be ~3x as slow. Of course, for
> large states a Long watermark would not mean much overhead.
>
> We could resolve the memory issue by using some kind of sketch data
> structure. Right now the granularity of watermark handling is
> per-operator-instance. On the other hand, per-key granularity might be
> costly. What if we increased the granularity of watermarks inside an
> operator by keeping more than one watermark tracker in one operator?
> This could be quite simply done with a hash table. With a hash table of
> size 1, we would yield the current semantics (per-operator-instance
> granularity). With a hash table large enough to have at most one key per
> bucket, we would yield per-key watermark tracking. In between lies the
> trade-off between handling time-skew and a lot of memory overhead. This
> does not seem hard to implement.
>
> Of course, at some point we would still need to take care of watermarks
> per-key. Imagine that keys A and B would go to the same bucket of the
> hash table, and watermarks are coming in like this: (B,20), (A,10),
> (A,15), (A,40). Then the watermark of the bucket should be the minimum
> as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of
> the watermarks of A and B separately. But after we have a correct
> watermark for the bucket, all we need to care about is the bucket
> watermarks. So somewhere (most probably at the source) we would have to
> pay memory overhead of tracking every key, but nowhere else in the
> topology.
>
> Regarding the potentially large network overhead, the same compression
> could be useful. I.e. we would not send watermarks from one operator
> per-key, but rather per-hash. Again, the trade-off between time skew and
> memory consumption is configurable by the size of the hash table used.
>
> Cheers,
> Gabor
>
> On 2017-02-23 08:57, Paris Carbone wrote:
>
> > Hey Jamie!
> >
> > Key-based progress tracking sounds like local-only progress tracking to
> me, there is no need to use a low watermarking mechanism at all since all
> streams of a key are handled by a single partition at a time (per operator).
> > Thus, this could be much easier to implement and support (i.e., no need
> to broadcast the progress state of each partition all the time).
> > State-wise it should be fine too if it is backed by rocksdb, especially
> if we have MapState in the future.
> >
> > Just my quick thoughts on this, to get the discussion going :)
> >
> > cheers
> > Paris
> >
> >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> wrote:
> >>
> >> Hi Flink Devs,
> >>
> >> Use cases that I see quite frequently in the real world would benefit
> from
> >> a different watermarking / event time model than the one currently
> >> implemented in Flink.
> >>
> >> I would call Flink's current approach partition-based watermarking or
> maybe
> >> subtask-based watermarking.  In this model the current "event time" is a
> >> property local to each subtask instance in a dataflow graph.  The event
> >> time at any subtask is the minimum of the watermarks it has received on
> >> each of it's input streams.
> >>
> >> There are a couple of issues with this model that are not optimal for
> some
> >> (maybe many) use cases.
> >>
> >> 1) A single slow subtask (or say source partition) anywhere in the
> dataflow
> >> can mean no progress can be made on the computation at all.
> >>
> >> 2) In many real world scenarios the time skew across keys can be *many*
> >> times greater than the time skew within the data with the same key.
> >>
> >> In this discussion I'll use "time skew" to refer to the out-of-orderness
> >> with respect to timestamp of the data.  Out-of-orderness is a mouthful
> ;)
> >>
> >> Anyway, let me provide an example or two.
> >>
> >> In IoT applications the source of events is a particular device out in
> the
> >> world, let's say a device in a connected car application.  The data for
> >> some particular device may be very bursty and we will certainly get
> events
> >> from these devices in Flink out-of-order just because of things like
> >> partitions in Kafka, shuffles in Flink, etc.  However, the time skew in
> the
> >> data for a single device should likely be very small (milliseconds or
> maybe
> >> seconds)..
> >>
> >> However, in the same application the time skew across different devices
> can
> >> be huge (hours or even days).  An obvious example of this, again using
> >> connected cars as a representative example is the following:  Car A is
> >> recording data locally at 12:00 pm on Saturday but doesn't currently
> have a
> >> network connection.  Car B is doing the same thing but does have a
> network
> >> connection.  Car A will transmit it's data when the network comes back
> on
> >> line.  Let's say this is at 4pm.  Car B was transmitting it's data
> >> immediately.  This creates a huge time skew (4 hours) in the observed
> >> datastream when looked at as a whole.  However, the time skew in that
> data
> >> for Car A or Car B alone could be tiny.  It will be out of order of
> course
> >> but maybe by only milliseconds or seconds.
> >>
> >> What the above means in the end for Flink is that the watermarks must be
> >> delayed by up to 4 hours or more because we're looking at the data
> stream
> >> as a whole -- otherwise the data for Car A will be considered late.  The
> >> time skew in the data stream when looked at as a whole is large even
> though
> >> the time skew for any key may be tiny.
> >>
> >> This is the problem I would like to see a solution for.  The basic idea
> of
> >> keeping track of watermarks and event time "per-key" rather than per
> >> partition or subtask would solve I think both of these problems stated
> >> above and both of these are real issues for production applications.
> >>
> >> The obvious downside of trying to do this per-key is that the amount of
> >> state you have to track is much larger and potentially unbounded.
> However,
> >> I could see this approach working if the keyspace isn't growing rapidly
> but
> >> is stable or grows slowly.  The saving grace here is that this may
> actually
> >> be true of the types of applications where this would be especially
> >> useful.  Think IoT use cases.  Another approach to keeping state size in
> >> check would be a configurable TTL for a key.
> >>
> >> Anyway, I'm throwing this out here on the mailing list in case anyone is
> >> interested in this discussion, has thought about the problem deeply
> >> already, has use cases of their own they've run into or has ideas for a
> >> solution to this problem.
> >>
> >> Thanks for reading..
> >>
> >> -Jamie
> >>
> >>
> >> --
> >>
> >> Jamie Grier
> >> data Artisans, Director of Applications Engineering
> >> @jamiegrier <https://twitter.com/jamiegrier>
> >> ja...@data-artisans.com
> >
>
>

Reply via email to