@Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could
allow it but then we would exit the world of the deluxe stream and per-key
watermarks and go back to the realm of normal streams and keyed streams.

On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Throwing in some thoughts:
>
> When a source determines that no more data will come for a key (which
> in itself is a bit of a tricky problem) then it should signal to
> downstream
> operations to take the key out of watermark calculations, that is that we
> can release some space.
> I don’t think this is possible without exposing API for the UDF to signal
> there will be no more data for a specific key. We could detect idleness of
> a key at the source operator, but without any help from user logic,
> essentially it can only be seen as "temporarily idle", which is not helpful
> in reducing the state as the watermark state for that key still needs to be
> kept downstream.
>
> So to achieve this, I think the only option would be to expose new APIs
> here too.
>
> It’s like how we recently exposed a new `markAsTemporarilyIdle` method in
> the SourceFunction.SourceContext interface, but instead a
> `markKeyTerminated` that must be called by the source UDF to be able to
> save state space and have no feasible fallback detection strategy.
>
> DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> input
> .map()
> .window(...) // notice that we don't need keyBy because it is implicit
> .reduce(...)
> .map(...)
> .window(...)
> ...
>
> Would this mean that another `keyBy` isn’t allowed downstream? Or still
> allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta key”
> to track key lineage?
>
> On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek (aljos...@apache.org)
> wrote:
>
> This is indeed an interesting topic, thanks for starting the discussion,
> Jamie!
>
> I now thought about this for a while, since more and more people seem to be
> asking about it lately. First, I thought that per-key watermark handling
> would not be necessary because it can be done locally (as Paris suggested),
> then I realised that that's not actually the case and thought that this
> wouldn't be possible. In the end, I came to realise that it is indeed
> possible (with some caveats), although with a huge overhead in the amount
> of state that we have to keep and with changes to our API. I'll try and
> walk you through my thought process.
>
> Let's first look at local watermark tracking, that is, tracking the
> watermark locally at the operator that needs it, for example a
> WindowOperator. I initially thought that this would be sufficient. Assume
> we have a pipeline like this:
>
> Source -> KeyBy -> WindowOperator -> ...
>
> If we have parallelism=1, then all elements for a given key k will be read
> by the same source operator instance and they will arrive (in-order) at the
> WindowOperator. It doesn't matter whether we track the per-key watermarks
> at the Source or at the WindowOperator because we see the same elements in
> the same order at each operator, per key.
>
> Now, think about this pipeline:
>
> Source1 --+
> |-> Union -> KeyBy -> WindowOperator -> ...
> Source2 --+
>
> (you can either think about two sources or once source that has several
> parallel instances, i.e. parallelism > 1)
>
> Here, both Source1 and Source2 can emit elements with our key k. If Source1
> is faster than Source2 and the watermarking logic at the WindowOperator
> determines the watermark based on the incoming element timestamps (for
> example, using the BoundedLatenessTimestampExtractor) then the elements
> coming from Source2 will be considered late at the WindowOperator.
>
> From this we know that our WindowOperator needs to calculate the watermark
> similarly to how watermark calculation currently happens in Flink: the
> watermark is the minimum of the watermark of all upstream operations. In
> this case it would be: the minimum upstream watermarks of operations that
> emit elements with key k. For per-partition watermarks this works because
> the number of upstream operations is know and we simply keep an array that
> has the current upstream watermark for each input operation. For per-key
> watermarks this would mean that we have to keep k*u upstream watermarks
> where u is the number of upstream operations. This can be quite large.
> Another problem is that the observed keys change, i.e. the key space is
> evolving and we need to retire keys from our calculations lest we run out
> of space.
>
> We could find a solution based on a feature we recently introduced in
> Flink: https://github.com/apache/flink/pull/2801. The sources keep track
> of
> whether they have input and signal to downstream operations whether they
> should be included in the watermark calculation logic. A similar thing
> could be done per-key, where each source signals to downstream operations
> that there is a new key and that we should start calculating watermarks for
> this. When a source determines that no more data will come for a key (which
> in itself is a bit of a tricky problem) then it should signal to downstream
> operations to take the key out of watermark calculations, that is that we
> can release some space.
>
> The above is analysing, on a purely technical level, the feasibility of
> such a feature. I think it is feasible but can be very expensive in terms
> of state size requirements. Gabor also pointed this out above and gave a
> few suggestions on reducing that size.
>
> We would also need to change our API to allow tracking the lineage of keys
> or to enforce that a key stays the same throughout a pipeline. Consider
> this pipeline:
>
> Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator
>
> where KeyBy1 and KeyBy2 extract a different key, respectively. How would
> watermarks be tracked across this change of keys? Would we know which of
> the prior keys and up being keys according to KeyBy2, i.e. do we have some
> kind of key lineage information?
>
> One approach for solving this would be to introduce a new API that allows
> extracting a key at the source and will keep this key on the elements until
> the sink. For example:
>
> DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> input
> .map()
> .window(...) // notice that we don't need keyBy because it is implicit
> .reduce(...)
> .map(...)
> .window(...)
> ...
>
> The DeluxeKeyedStream (name preliminary ;-) would allow the operations that
> we today have on KeyedStream and on DataStream and it would always maintain
> the key that was assigned at the sources. The result of each operation
> would again be a DeluxeKeyedStream. This way, we could track watermarks per
> key.
>
> I know it's a bit of a (very) lengthy mail, but what do you think?
>
>
> On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <m...@gaborhermann.com> wrote:
>
> > Hey all,
> >
> > Let me share some ideas about this.
> >
> > @Paris: The local-only progress tracking indeed seems easier, we do not
> > need to broadcast anything. Implementation-wise it is easier, but
> > performance-wise probably not. If one key can come from multiple
> > sources, there could be a lot more network overhead with per-key
> > tracking then broadcasting, somewhat paradoxically. Say source instance
> > S1 sends messages and watermarks to operator instances O1, O2. In the
> > broadcasting case, S1 would send one message to O1 and one to O2 per
> > watermark (of course it depends on how fast the watermarks arrive),
> > total of 2. Although, if we keep track of per-key watermarks, S1 would
> > need to send watermarks for every key directed to O1, also for O2. So if
> > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks
> > arrive at the same rate per-key as per-source in the previous case) we
> > S1 would send a total of 20 watermarks.
> >
> > Another question is whether how large the state-per-key is? If it's
> > really small (an integer maybe, or state of a small state machine), then
> > the overhead of keeping track of a (Long) watermark is large
> > memory-wise. E.g. Int state vs. Long watermark results in 3x as large
> > state. Also, the checkpointing would be ~3x as slow. Of course, for
> > large states a Long watermark would not mean much overhead.
> >
> > We could resolve the memory issue by using some kind of sketch data
> > structure. Right now the granularity of watermark handling is
> > per-operator-instance. On the other hand, per-key granularity might be
> > costly. What if we increased the granularity of watermarks inside an
> > operator by keeping more than one watermark tracker in one operator?
> > This could be quite simply done with a hash table. With a hash table of
> > size 1, we would yield the current semantics (per-operator-instance
> > granularity). With a hash table large enough to have at most one key per
> > bucket, we would yield per-key watermark tracking. In between lies the
> > trade-off between handling time-skew and a lot of memory overhead. This
> > does not seem hard to implement.
> >
> > Of course, at some point we would still need to take care of watermarks
> > per-key. Imagine that keys A and B would go to the same bucket of the
> > hash table, and watermarks are coming in like this: (B,20), (A,10),
> > (A,15), (A,40). Then the watermark of the bucket should be the minimum
> > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of
> > the watermarks of A and B separately. But after we have a correct
> > watermark for the bucket, all we need to care about is the bucket
> > watermarks. So somewhere (most probably at the source) we would have to
> > pay memory overhead of tracking every key, but nowhere else in the
> > topology.
> >
> > Regarding the potentially large network overhead, the same compression
> > could be useful. I.e. we would not send watermarks from one operator
> > per-key, but rather per-hash. Again, the trade-off between time skew and
> > memory consumption is configurable by the size of the hash table used.
> >
> > Cheers,
> > Gabor
> >
> > On 2017-02-23 08:57, Paris Carbone wrote:
> >
> > > Hey Jamie!
> > >
> > > Key-based progress tracking sounds like local-only progress tracking to
> > me, there is no need to use a low watermarking mechanism at all since all
> > streams of a key are handled by a single partition at a time (per
> operator).
> > > Thus, this could be much easier to implement and support (i.e., no need
> > to broadcast the progress state of each partition all the time).
> > > State-wise it should be fine too if it is backed by rocksdb, especially
> > if we have MapState in the future.
> > >
> > > Just my quick thoughts on this, to get the discussion going :)
> > >
> > > cheers
> > > Paris
> > >
> > >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com>
> wrote:
> > >>
> > >> Hi Flink Devs,
> > >>
> > >> Use cases that I see quite frequently in the real world would benefit
> > from
> > >> a different watermarking / event time model than the one currently
> > >> implemented in Flink.
> > >>
> > >> I would call Flink's current approach partition-based watermarking or
> > maybe
> > >> subtask-based watermarking. In this model the current "event time" is
> a
> > >> property local to each subtask instance in a dataflow graph. The event
> > >> time at any subtask is the minimum of the watermarks it has received
> on
> > >> each of it's input streams.
> > >>
> > >> There are a couple of issues with this model that are not optimal for
> > some
> > >> (maybe many) use cases.
> > >>
> > >> 1) A single slow subtask (or say source partition) anywhere in the
> > dataflow
> > >> can mean no progress can be made on the computation at all.
> > >>
> > >> 2) In many real world scenarios the time skew across keys can be
> *many*
> > >> times greater than the time skew within the data with the same key.
> > >>
> > >> In this discussion I'll use "time skew" to refer to the
> out-of-orderness
> > >> with respect to timestamp of the data. Out-of-orderness is a mouthful
> > ;)
> > >>
> > >> Anyway, let me provide an example or two.
> > >>
> > >> In IoT applications the source of events is a particular device out in
> > the
> > >> world, let's say a device in a connected car application. The data for
> > >> some particular device may be very bursty and we will certainly get
> > events
> > >> from these devices in Flink out-of-order just because of things like
> > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in
> > the
> > >> data for a single device should likely be very small (milliseconds or
> > maybe
> > >> seconds)..
> > >>
> > >> However, in the same application the time skew across different
> devices
> > can
> > >> be huge (hours or even days). An obvious example of this, again using
> > >> connected cars as a representative example is the following: Car A is
> > >> recording data locally at 12:00 pm on Saturday but doesn't currently
> > have a
> > >> network connection. Car B is doing the same thing but does have a
> > network
> > >> connection. Car A will transmit it's data when the network comes back
> > on
> > >> line. Let's say this is at 4pm. Car B was transmitting it's data
> > >> immediately. This creates a huge time skew (4 hours) in the observed
> > >> datastream when looked at as a whole. However, the time skew in that
> > data
> > >> for Car A or Car B alone could be tiny. It will be out of order of
> > course
> > >> but maybe by only milliseconds or seconds.
> > >>
> > >> What the above means in the end for Flink is that the watermarks must
> be
> > >> delayed by up to 4 hours or more because we're looking at the data
> > stream
> > >> as a whole -- otherwise the data for Car A will be considered late.
> The
> > >> time skew in the data stream when looked at as a whole is large even
> > though
> > >> the time skew for any key may be tiny.
> > >>
> > >> This is the problem I would like to see a solution for. The basic idea
> > of
> > >> keeping track of watermarks and event time "per-key" rather than per
> > >> partition or subtask would solve I think both of these problems stated
> > >> above and both of these are real issues for production applications.
> > >>
> > >> The obvious downside of trying to do this per-key is that the amount
> of
> > >> state you have to track is much larger and potentially unbounded.
> > However,
> > >> I could see this approach working if the keyspace isn't growing
> rapidly
> > but
> > >> is stable or grows slowly. The saving grace here is that this may
> > actually
> > >> be true of the types of applications where this would be especially
> > >> useful. Think IoT use cases. Another approach to keeping state size in
> > >> check would be a configurable TTL for a key.
> > >>
> > >> Anyway, I'm throwing this out here on the mailing list in case anyone
> is
> > >> interested in this discussion, has thought about the problem deeply
> > >> already, has use cases of their own they've run into or has ideas for
> a
> > >> solution to this problem.
> > >>
> > >> Thanks for reading..
> > >>
> > >> -Jamie
> > >>
> > >>
> > >> --
> > >>
> > >> Jamie Grier
> > >> data Artisans, Director of Applications Engineering
> > >> @jamiegrier <https://twitter.com/jamiegrier>
> > >> ja...@data-artisans.com
> > >
> >
> >
>

Reply via email to