Sorry for the late reply Matthias. Have been busy with other work recently.
I'll restart the discussion and update the KIP accordingly.

Lei

On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Any update on this KIP?
>
> On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> > Thanks for following up. Very nice examples!
> >
> > I think, that the window definition for Flink is semantically
> > questionable. If there is only a single record, why is the window
> > defined as [ts, ts+gap]? To me, this definition is not sound and seems
> > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention
> > would be semantically more useful -- still, I think that defining the
> > window as [ts,ts] as we do currently in Kafka Streams is semantically
> > the best.
> >
> > I have the impression, that Flink only defines them differently, because
> > it solves the issues in the implementation. (Ie, an implementation
> > details leaks into the semantics, what is usually not desired.)
> >
> > However, I believe that we could change the implementation accordingly.
> > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in
> > RocksDB, but at API level we return [ts,ts]. This way, we can still find
> > all windows we need and provide the same deterministic behavior and keep
> > the current window boundaries on the semantic level (there is no need to
> > store the window start and/or end time). With this technique, we can
> > also implement dynamic session gaps. I think, we would need to store the
> > used "gap" for each window, too. But again, this would be an
> > implementation detail.
> >
> > Let's see what others think.
> >
> > One tricky question we would need to address is, how we can be backward
> > compatible. I am currently working on KIP-258 that should help to
> > address this backward compatibility issue though.
> >
> >
> > -Matthias
> >
> >
> >
> > On 9/19/18 5:17 PM, Lei Chen wrote:
> >> Thanks Matthias. That makes sense.
> >>
> >> You're right that symmetric merge is necessary to ensure consistency. On
> >> the other hand, I kinda feel it defeats the purpose of dynamic gap,
> which
> >> is to update the gap from old value to new value. The symmetric merge
> >> always honor the larger gap in both direction, rather than honor the gap
> >> carried by record with larger timestamp. I wasn't able to find any
> semantic
> >> definitions w.r.t this particular aspect online, but spent some time
> >> looking into other streaming engines like Apache Flink.
> >>
> >> Apache Flink defines the window differently, that uses (start time,
> start
> >> time + gap).
> >>
> >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be:
> >> [10,20]
> >> [19,24] => merged to [10,24]
> >> [15,18] => merged to [10,24]
> >>
> >> while example (15,3)(19,5)(10,10) will be
> >> [15,18]
> >> [19,24] => no merge
> >> [10,20] => merged to [10,24]
> >>
> >> however, since it only records gap in future direction, not past, a late
> >> record might not trigger any merge where in symmetric merge it would.
> >> (7,2),(10, 10), (19,5),(15,3)
> >> [7,9]
> >> [10,20]
> >> [19,24] => merged to [10,24]
> >> [15,18] => merged to [10,24]
> >> so at the end
> >> two windows [7,9][10,24] are there.
> >>
> >> As you can see, in Flink, the gap semantic is more toward to the way
> that,
> >> a gap carried by one record only affects how this record merges with
> future
> >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
> is
> >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call
> >> this "forward-merge" way of handling this. I just went thought some
> source
> >> code and if my understanding is incorrect about Flink's implementation,
> >> please correct me.
> >>
> >> On the other hand, if we want to do symmetric merge in Kafka Streams, we
> >> can change the window definition to [start time - gap, start time +
> gap].
> >> This way the example (7,2),(10, 10), (19,5),(15,3) will be
> >> [5,9]
> >> [0,20] => merged to [0,20]
> >> [14,24] => merged to [0,24]
> >> [12,18] => merged to [0,24]
> >>
> >>  (19,5),(15,3)(7,2),(10, 10) will generate same result
> >> [14,24]
> >> [12,18] => merged to [12,24]
> >> [5,9] => no merge
> >> [0,20] => merged to [0,24]
> >>
> >> Note that symmetric-merge would require us to change the way how Kafka
> >> Steams fetch windows now, instead of fetching range from timestamp-gap
> to
> >> timestamp+gap, we will need to fetch all windows that are not expired
> yet.
> >> On the other hand, I'm not sure how this will impact the current logic
> of
> >> how a window is considered as closed, because the window doesn't carry
> end
> >> timestamp anymore, but end timestamp + gap.
> >>
> >> So do you guys think forward-merge approach used by Flink makes more
> sense
> >> in Kafka Streams, or symmetric-merge makes more sense? Both of them
> seems
> >> to me can give deterministic result.
> >>
> >> BTW I'll add the use case into original KIP.
> >>
> >> Lei
> >>
> >>
> >> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Thanks for explaining your understanding. And thanks for providing more
> >>> details about the use-case. Maybe you can add this to the KIP?
> >>>
> >>>
> >>> First one general comment. I guess that my and Guozhangs understanding
> >>> about gap/close/gracePeriod is the same as yours -- we might not have
> >>> use the term precisely correct in previous email.
> >>>
> >>>
> >>> To you semantics of gap in detail:
> >>>
> >>>> I thought when (15,3) is received, kafka streams look up for neighbor
> >>>> record/window that is within the gap
> >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> its
> >>> own
> >>>> window [10, 10], which is
> >>>> out of the gap, so nothing will be found and no merge occurs. Hence we
> >>> have
> >>>> two windows now in session store,
> >>>> [10, 10] and [15, 15] respectively.
> >>>
> >>> If you have record (10,10), we currently create a window of size
> >>> [10,10]. When record (15,3) arrives, your observation that the gap 3 is
> >>> too small to be merged into [10,10] window -- however, merging is a
> >>> symmetric operation and the existing window of [10,10] has a gap of 10
> >>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
> >>> merged into the existing window resulting in window [10,15].
> >>>
> >>> If we don't respect the gap both ways, we end up with inconsistencies
> if
> >>> data is out-of-order. For example, if we use the same input record
> >>> (10,10) and (15,3) from above, and it happens that (15,3) is processed
> >>> first, when processing out-of-order record (10,10) we would want to
> >>> merge both into a single window, too?
> >>>
> >>> Does this make sense?
> >>>
> >>> Now the question remains, if two records with different gap parameter
> >>> are merged, which gap should we apply for processing/merging future
> >>> records into the window? It seems, that we should use the gap parameter
> >>> from the record with this larges timestamp. In the example above
> (15,3).
> >>> We would use gap 3 after merging independent of the order of
> processing.
> >>>
> >>>
> >>>> Also another thing worth mentioning is that, the session window object
> >>>> created in current kafka streams
> >>>> implementation doesn't have gap info, it has start and end, which is
> the
> >>>> earliest and latest event timestamp
> >>>> in that window interval, i.e for (10,10), the session window gets
> created
> >>>> is [10,10], rather than [10,20]. Just to clarify
> >>>> so that it's clear why (10,10) cannot be fetched when looking for gap
> of
> >>>> (15,3), it's because the end boundary 10 of
> >>>> [10,10] is smaller than search boundary [12,18].
> >>>
> >>> We don't need to store the gap, because the gap is know from the window
> >>> definition. The created window size depends on the data that is
> >>> contained in the window. I guess one could define it differently, too,
> >>> ie, for the (10,10) record, we create a window [0,20] -- not sure if it
> >>> makes a big difference in practice though. Note, that creating window
> >>> [10,20] would not be correct, because the gap must be applied in both
> >>> directions, not just into the future.
> >>>
> >>> About the second part: the search would not be applied from (15,3) in
> >>> range [12,18], but from existing window [10,10] into range [0,20] and
> 15
> >>> is contained there. This example also shows, that we would need to come
> >>> up with a clever way, to identify window [10,10] when processing (15,3)
> >>> -- not sure atm how to do this. However, only consider (15,3) would
> >>> result in inconsistencies for out-of-order data as pointed out above
> and
> >>> would not be sufficient.
> >>>
> >>>
> >>> Does this make sense?
> >>>
> >>>
> >>> Or is there another way to define dynamic session gap semantics in a
> >>> deterministic way with regard to out-of-order data?
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 9/11/18 4:28 PM, Lei Chen wrote:
> >>>> Thanks Matthias and Guozhang for the response.
> >>>>
> >>>> Seems like our understanding mainly differs in the semantics of gap in
> >>>> session windows.
> >>>>
> >>>> My understanding is that gap is used to merge nearby records together
> >>> such
> >>>> that no record
> >>>> in the merged window has distance later than gap. In Kafka Streams's
> >>>> implementation it's
> >>>> mainly used to find neighbor records/windows in session store so that
> >>>> nearby records can
> >>>> be merge. It is NOT used to determine when a window should be closed,
> >>> which
> >>>> is in
> >>>> fact determined by window's grace period.
> >>>>
> >>>> Guozhang you said "b. When later we received (15, 3), it means that
> this
> >>>> record ** changed **
> >>>> the window gap interval from 10 to 3, and hence we received a new
> record
> >>> at
> >>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
> 3)
> >>> if
> >>>> we have not received any new data, the window should be closed, i.e.
> the
> >>>> window is now [10, 18) which includes two records at 10 and 15."
> >>>>
> >>>> This is different from what i thought will happen.
> >>>>
> >>>> I thought when (15,3) is received, kafka streams look up for neighbor
> >>>> record/window that is within the gap
> >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> its
> >>> own
> >>>> window [10, 10], which is
> >>>> out of the gap, so nothing will be found and no merge occurs. Hence we
> >>> have
> >>>> two windows now in session store,
> >>>> [10, 10] and [15, 15] respectively.
> >>>>
> >>>> Also another thing worth mentioning is that, the session window object
> >>>> created in current kafka streams
> >>>> implementation doesn't have gap info, it has start and end, which is
> the
> >>>> earliest and latest event timestamp
> >>>> in that window interval, i.e for (10,10), the session window gets
> created
> >>>> is [10,10], rather than [10,20]. Just to clarify
> >>>> so that it's clear why (10,10) cannot be fetched when looking for gap
> of
> >>>> (15,3), it's because the end boundary 10 of
> >>>> [10,10] is smaller than search boundary [12,18].
> >>>>
> >>>> Please correct me if my understanding is wrong here.
> >>>>
> >>>> @Matthias, to answer your use case question, we have an use case where
> >>>> asynchronous time series data
> >>>> are received in the stream, from different contributors, with
> different
> >>>> quality and at different pace.
> >>>> Inside Kafka Streams, we use state to maintain statistic aggregations
> and
> >>>> other mathematics model to track
> >>>> the liquidity and calculate time decay rate and dynamic gap, so that
> at
> >>>> runtime, for each contributor we can
> >>>> 1. determine how many historical records we should maintain in state.
> >>>> 2. for each incoming record, output a record using aggregations from
> >>>> *nearby* records from that contributor.
> >>>> Why fixed gap session window doesn't work here? Because the
> definition of
> >>>> "nearby" here is determined by
> >>>> several very dynamic factors in our case, it changes not only with
> >>>> different hours in a day, but also related to
> >>>> other contributors.
> >>>> The purpose of this KIP is to suggest a dynamic session window
> >>>> implementation so that we can embed such
> >>>> dynamic "nearby" calculation capability into kafka streams session
> >>> windows
> >>>> semantics. Hope it makes sense to you.
> >>>>
> >>>> Lei
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hello Lei,
> >>>>>
> >>>>> As Matthias mentioned, the key question here is that because of the
> late
> >>>>> arrivals of records which may indicate a shorter session gap
> interval,
> >>> some
> >>>>> session windows may be "mistakenly" merged and hence need to be
> undone
> >>> the
> >>>>> merge, i.e. to split them again.
> >>>>>
> >>>>> Back to my example, you are right that the processing result of
> >>>>>
> >>>>> (10, 10), (19, 5), (15, 3) ..
> >>>>>
> >>>>> should be the same as the processing result of
> >>>>>
> >>>>> (10, 10), (15, 3), (19, 5) ..
> >>>>>
> >>>>> Note that the second value is NOT the window end time, but the
> extracted
> >>>>> window gap interval, as you suggested in the KIP this value can be
> >>>>> dynamically changed
> >>>>>
> >>>>> a. If you take a look at the second ordering, when we receive (10,
> 10)
> >>> it
> >>>>> means a window starting at 10 is created, and its gap interval is 10,
> >>> which
> >>>>> means that if by the timestamp of 20 we do not receive any new data,
> >>> then
> >>>>> the window should be closed, i.e. the window [10, 20).
> >>>>>
> >>>>> b. When later we received (15, 3), it means that this record **
> changed
> >>> **
> >>>>> the window gap interval from 10 to 3, and hence we received a new
> >>> record at
> >>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
> >>> 3) if
> >>>>> we have not received any new data, the window should be closed, i.e.
> the
> >>>>> window is now [10, 18) which includes two records at 10 and 15.
> >>>>>
> >>>>> c. The third record is received at 19, which is after the window
> close
> >>> time
> >>>>> 18, it means that we should now start a new window starting at 19,
> i.e.
> >>> the
> >>>>> window is [19, 24),
> >>>>>
> >>>>>
> >>>>> BUT, because of the out of ordering, we did not receive (15, 3) in
> time,
> >>>>> but received (19, 5), it will cause us to mistakenly merge the
> window of
> >>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
> >>> (15, 3)
> >>>>> we realized that the previous window should have been ended at 18.
> >>>>>
> >>>>> Does that make sense to you?
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> I cannot follow the example:
> >>>>>>
> >>>>>>>> (10, 10), (15, 3), (19, 5) ...
> >>>>>>
> >>>>>> First, [10,10] is created, second the window is extended to [10,15],
> >>> and
> >>>>>> third [19,19] is created. Why would there be a [15,15]? And why
> would
> >>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
> >>>>>> thus [19,19] should be its own window?
> >>>>>>
> >>>>>>> Take a look at another example,
> >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> >>>>>>>
> >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved
> and
> >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
> >>> [13,
> >>>>>>> 19]. Correct?
> >>>>>>
> >>>>>> This example makes sense. However, Guozhang's example was different.
> >>> The
> >>>>>> late even, _reduces_ the gap and this can lead to a window split.
> >>>>>> Guozhang's example was
> >>>>>>
> >>>>>>>>> (10, 10), (19, 5), (15, 3) ...
> >>>>>>
> >>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10
> >>> and
> >>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15]
> and
> >>>>>> [19,19] must be two windows, ie, original window [10,19] must be
> split.
> >>>>>>
> >>>>>>
> >>>>>> Or maybe you have different semantic about gaps are dynamically
> >>> modified
> >>>>>> in mind? It's a little unclear for the KIP itself what semantics
> >>> dynamic
> >>>>>> sessions windows should have.
> >>>>>>
> >>>>>>
> >>>>>> What is also unclear to me atm is, what use cases you have in mind?
> The
> >>>>>> KIP only says
> >>>>>>
> >>>>>>> the statistical aggregation result, liquidity of the records,
> >>>>>>
> >>>>>>
> >>>>>> I am not sure what this means. Can you elaborate?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
> >>>>>>> Hi Guozhang,
> >>>>>>>
> >>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
> >>>>> events
> >>>>>>> and glad that you brought it up.
> >>>>>>>
> >>>>>>> In the example you gave,
> >>>>>>>
> >>>>>>> (10, 10), (19, 5), (15, 3) ...
> >>>>>>>
> >>>>>>> my understanding is that the correct result window should be the
> same
> >>>>> as
> >>>>>> in
> >>>>>>> order events
> >>>>>>>
> >>>>>>> (10, 10), (15, 3), (19, 5) ...
> >>>>>>>
> >>>>>>> when (15, 3) is received, [15,15] is creatd
> >>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15,
> >>> 19]
> >>>>>> is
> >>>>>>> created, meanwhile [15,15] is removed
> >>>>>>>
> >>>>>>> back to out of order case,
> >>>>>>>
> >>>>>>> when (19 ,5) is received, [19, 19] is created
> >>>>>>> when (15, 3) is received, in order to generate the same result,
> >>>>>>> 1. if late event is later than retention period, it will be dropped
> >>>>>>> 2. otherwise, adjacent session windows within gap should be
> retrieved
> >>>>> and
> >>>>>>> merged accordingly, in this case [19, 19], and create a new session
> >>>>> [15,
> >>>>>> 19]
> >>>>>>> I'm little confused when you said "the window [15, 15] SHOULD
> actually
> >>>>> be
> >>>>>>> expired at 18 and hence the next record (19, 5) should be for a new
> >>>>>> session
> >>>>>>> already.". If i understand it correctly, the expiration of the
> window
> >>>>> is
> >>>>>>> only checked when next event (19,5) comes and then it should be
> merged
> >>>>> to
> >>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
> >>>>>>> I cannot think of a case where a window will be split by a late
> event,
> >>>>>>> because if event A and C fall into the same session window, a late
> >>>>> event
> >>>>>> B
> >>>>>>> in middle will definitely fall into C's gap as well. IOW, late
> event
> >>>>> will
> >>>>>>> only cause window extension, not split.
> >>>>>>>
> >>>>>>> Take a look at another example,
> >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> >>>>>>>
> >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved
> and
> >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
> >>> [13,
> >>>>>>> 19]. Correct?
> >>>>>>>
> >>>>>>> To be able to achieve that, like you said, the gap needs to be
> stored
> >>>>> for
> >>>>>>> sessions. We don't need to save the gap with each event, but only
> for
> >>>>>> each
> >>>>>>> session window. To avoid upgrading existing session window, how
> about
> >>>>>>> create a new Window type extended from SessionWindow along with a
> new
> >>>>>>> KeySchema?
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Lei
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Lei,
> >>>>>>>>
> >>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
> >>> there
> >>>>>> is a
> >>>>>>>> question I have:
> >>>>>>>>
> >>>>>>>> The session windows are defined per key, i.e. does that mean that
> >>> each
> >>>>>>>> incoming record of the key can dynamically change the gap of the
> >>>>> window?
> >>>>>>>> For example, say you have the following record for the same key
> >>> coming
> >>>>>> in
> >>>>>>>> order, where the first time is the timestamp of the record, and
> the
> >>>>>> second
> >>>>>>>> value is the extracted gap value:
> >>>>>>>>
> >>>>>>>> (10, 10), (19, 5), ...
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> When we receive the first record at time 10, the gap is extracted
> as
> >>>>> 10,
> >>>>>>>> and hence the window will be expired at 20 if no other record is
> >>>>>> received.
> >>>>>>>> When we receive the second record at time 19, the gap is modified
> to
> >>>>> 5,
> >>>>>> and
> >>>>>>>> hence the window will be expired at 24 if no other record is
> >>> received.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> If that's the case, I'm wondering how out-of-order data can be
> >>> handled
> >>>>>>>> then, consider this stream:
> >>>>>>>>
> >>>>>>>> (10, 10), (19, 5), (15, 3) ...
> >>>>>>>>
> >>>>>>>> I.e. you received a late record indicating at timestamp 15, which
> >>>>>> shorten
> >>>>>>>> the gap to 3. It means that the window SHOULD actually be expired
> at
> >>>>> 18,
> >>>>>>>> and hence the next record (19, 5) should be for a new session
> >>> already.
> >>>>>>>> Today Streams session window implementation does not do "window
> >>>>> split",
> >>>>>> so
> >>>>>>>> have you thought about how this can be extended?
> >>>>>>>>
> >>>>>>>> Also since in your proposal each session window's gap value would
> be
> >>>>>>>> different, we need to store this value along with each record
> then,
> >>>>> how
> >>>>>>>> would we store it, and what would be the upgrade path if it is
> not a
> >>>>>>>> compatible change on disk storage etc?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>>> Hi All,
> >>>>>>>>>
> >>>>>>>>> I created a KIP to add dynamic gap session window support to
> Kafka
> >>>>>>>> Streams
> >>>>>>>>> DSL.
> >>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>> 362%3A+Support+dynamic+gap+session+window
> >>>>>>>>>
> >>>>>>>>> Please take a look,
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Lei
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to