>   - Why restrict de-duplication for the key only? Should we not also
> consider the value (or make it somehow flexible and let the user choose?)

Wasn't it the first idea that we abandoned (I mean, to provide 'KeyExtractor' and so on)?

In order to keep things simple we decided to make

.selectKey(...) //here we select anything we need
  //add markAsPartitioned from KIP-759 to taste
  .groupByKey()
  .windowed(...)
  .distinct()
        //the only new operation that we add to the API, reusing
        //all the windowed aggregations' infrastructure

>   - I am wondering if the return type should be `KStream` instead of a
> `KTable`? If I deduplicate a stream, I might expect a stream back? I
> don't really consider a stream de-duplication an aggregation with
> "mutable state"...

First, because it's going to be an operation on a Time/SessionWindowedKStream, and these operations usually return KTable<Windowed<...>, ...>. Then, it might be useful to know to which time window a deduplicated record actually belongs. And it is trivial task to turn this table back to a stream.

> IMHO, an unordered stream and it's ordered "cousin" should
> yield the same result? -- Given your example it seems you want to keep
> the first record base on offset order. Wondering why?

I see it this way: we define 'distinct' operation as returning a single record per time window per selected key, no matter what record. So it's ok if it yields different results for different orderings if its main property holds!

And since we can select any key we like, we can get any degree of 'deduplication granularity' and 'determinism'.

> While I agree that deduplication for overlapping window is questionable,
> I am still wondering if you plan to disallow it (by adding a runtime
> check and throwing an exception), or not?

Thanks for this point! I think that 'fail-fast' approach is good. We might need to throw an exception, I will add this into the KIP:

- SessionWindows -- OK
- SlidingWindows -- Exception
- TimeWindows --
     tumbling -- OK
     hopping  -- Exception


Regards,

Ivan


04.08.2021 4:22, Matthias J. Sax пишет:
Couple of questions:

  - Why restrict de-duplication for the key only? Should we not also
consider the value (or make it somehow flexible and let the user choose?)

  - I am wondering if the return type should be `KStream` instead of a
`KTable`? If I deduplicate a stream, I might expect a stream back? I
don't really consider a stream de-duplication an aggregation with
"mutable state"...

Also, why would the result key need to be windowed?

Btw: How should out-of-order data be handled? Given that you only want
to consider the key, the value could be different, and thus, if there is
out-of-order data, keeping the one or other value could make a
difference? IMHO, an unordered stream and it's ordered "cousin" should
yield the same result? -- Given your example it seems you want to keep
the first record base on offset order. Wondering why?


While I agree that deduplication for overlapping window is questionable,
I am still wondering if you plan to disallow it (by adding a runtime
check and throwing an exception), or not?


On 8/1/21 6:42 AM, Ivan Ponomarev wrote:
Hi Bruno,

I'm sorry for the delay with the answer. Unfortunately your messages
were put to spam folder, that's why I didn't answer them right away.

Concerning your question about comparing serialized values vs. using
equals: I think it must be clear now due to John's explanations.
Distinct is a stateful operation, thus we will need to use
serialization. (Although AFAICS the in-memory storage might be a good
practical solution in many cases).

I do currently not see why it should not make sense in hopping
windows... I do not understand  the following sentence: "...one record
can be multiplied instead of deduplication."

Ok, let me explain.

As it's written in the KIP, "The distinct operation returns only a first
record that falls into a new window, and filters out all the other
records that fall into an already existing window."

Also it's worth to remember that the result of `distinct` is
KTable<Windowed<K>, V>, not Stream<K, V>.

If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a
record (key, val) with timestamp 25 arrives, it will be forwarded three
times ('multiplied') since is falls into the intersection of all three
windows. The output will be

(key@[0/40],  val)
(key@[10/50], val)
(key@[20/60], val)

You can reason about `distinct` operation just like you reason about
`sum` or `count`. When a record arrives that falls into a window, we
update the aggregation on this window. For `distinct`, when extra
records arrive into the same window, we also perform some sort of
aggregation (we may even count them internally!), but, unlike sum or
count, we will not forward anything since counter is strictly greater
than zero.

You may refer to 'usage examples' of the KIP
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-UsageExamples)
to get clearer idea of how it works.

As I said earlier, I do not think that SQL and the Java Stream API are
good arguments to not use a verb

This is an important matter. As we all know, naming is hard.

However, `distinct` name is not used just in SQL and Java Streams. It is
a kind of a standard operation that is used in nearly all the data
processing frameworks, see all the hyperlinked examples in 'Motivation'
section of KIP
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation)


Please look at it and let me know what do you think.

Regards,

Ivan

29.07.2021 4:49, John Roesler пишет:
Hi Bruno,

I had previously been thinking to use equals(), since I
thought that this might be a stateless operation. Comparing
the serialized form requires a serde and a fairly expensive
serialization operation, so while byte equality is superior
to equals(), we shouldn't use it in operations unless they
already require serialization.

I chnaged my mind when I later realized I had been mistaken,
and this operation is of course stateful.

I hope this helps clarify it.

Thanks,
-John

On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote:
Hi Ivan and John,

1. John, could you clarify why comparing serialized values seems the way
to go, now?

2. Ivan, Could you please answer my questions that I posted earlier? I
will repost it here:
Ivan, could you please make this matter a bit clearer in the KIP?
Actually, thinking about it again, I do currently not see why it should
not make sense in hopping windows. Regarding this, I do not understand
the following sentence:

"hopping and sliding windows do not make much sense for distinct()
because they produce multiple intersected windows, so that one record
can be multiplied instead of deduplication."

Ivan, what do you mean with "multiplied"?

3. As I said earlier, I do not think that SQL and the Java Stream API
are good arguments to not use a verb. However, if everybody else is fine
with it, I can get behind it.

John, good catch about the missing overloads!
BTW, the overload with Named should be there regardless of stateful or
stateless.

Best,
Bruno

On 22.07.21 20:58, John Roesler wrote:
Hi Ivan,

Thanks for the reply.

1. I think I might have gotten myself confused. I was
thinking of this operation as stateless, but now I'm not
sure what I was thinking... This operator has to be
stateful, right? In that case, I agree that comparing
serialized values seems to be way to do it.

2. Thanks for the confirmation

3. I continue to be satisfied to let you all hash it out.

Thanks,
-John

On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote:
Hi all,

1. Actually I always thought about the serialized byte array only
-- at
least this is what local stores depend upon, and what Kafka itself
depends upon when doing log compaction.

I can imagine a case where two different byte arrays deserialize to
objects which are `equals` to each other. But I think we can ignore
this
for now because IMO the risks related to buggy `equals`
implementations
outweigh the potential benefits.

I will mention the duplicate definition in the KIP.

2. I agree with John, he got my point.

3. Let me gently insist on `distinct`. I believe that an exception to
the rule is appropriate here, because the name `distinct()` is
ubiquitous.

It's not only about Java Streams API (or .NET LINQ, which appeared
earlier and also has `Distinct`): Spark's DataFrame has `distinct()`
method, Hazelcast Jet has `distinct()` method, and I bet I can find
more
examples if I search. When we teach KStreams, we always say that
KStreams are just like other streaming APIs, and they have roots in
SQL
queries. Users know what `distinct` is and they expect it to be in
the API.


Regards,

Ivan


13.07.2021 0:10, John Roesler пишет:
Hi all,

Bruno raised some very good points. I’d like to chime in with
additional context.

1. Great point. We faced a similar problem defining KIP-557. For
557, we chose to use the serialized byte array instead of the
equals() method, but I think the situation in KIP-655 is a bit
different. I think it might make sense to use the equals() method
here, but am curious what Ivan thinks.

2. I figured we'd do nothing. I thought Ivan was just saying that
it doesn't make a ton of sense to use it, which I agree with, but
it doesn't seem like that means we should prohibit it.

3. FWIW, I don't have a strong feeling either way.

Thanks,
-John

On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:
Hi Ivan,

Thank you for the KIP!

Some aspects are not clear to me from the KIP and I have a proposal.

1. The KIP does not describe the criteria that define a
duplicate. Could
you add a definition of duplicate to the KIP?

2. The KIP does not describe what happens if distinct() is
applied on a
hopping window. On the DSL level, I do not see how you can avoid
that
users apply distinct() on a hopping window, i.e., you cannot
avoid it at
compile time, you need to check it at runtime and throw an
exception. Is
this correct or am I missing something?

3. I would also like to back a proposal by Sophie. She proposed
to use
deduplicate() instead of distinct(), since the other DSL
operations are
also verbs. I do not think that SQL and the Java Stream API are good
arguments to not use a verb.

Best,
Bruno


On 10.07.21 19:11, John Roesler wrote:
Hi Ivan,

Sorry for the silence!

I have just re-read the proposal.

To summarize, you are now only proposing the zero-arg distict()
method to be added to TimeWindowedKStream and
SessionWindowedKStream, right?

I’m in favor of this proposal.

Thanks,
John

On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:
Hello everyone,

I would like to remind you about KIP-655 and KIP-759 just in
case they
got lost in your inbox.

Now the initial proposal is split into two independent and
smaller ones,
so it must be easier to review them. Of course, if you have time.

Regards,

Ivan


24.06.2021 18:11, Ivan Ponomarev пишет:
Hello all,

I have rewritten the KIP-655 summarizing what was agreed upon
during
this discussion (now the proposal is much simpler and less
invasive).

I have also created KIP-759 (cancelRepartition operation) and
started a
discussion for it.

Regards,

Ivan.



04.06.2021 8:15, Matthias J. Sax пишет:
Just skimmed over the thread -- first of all, I am glad that
we could
merge KIP-418 and ship it :)

About the re-partitioning concerns, there are already two
tickets for it:

       - https://issues.apache.org/jira/browse/KAFKA-4835
       - https://issues.apache.org/jira/browse/KAFKA-10844

Thus, it seems best to exclude this topic from this KIP, and
do a
separate KIP for it (if necessary, we can "pause" this KIP
until the
repartition KIP is done). It's a long standing "issue" and we
should
resolve it in a general way I guess.

(Did not yet ready all responses in detail yet, so keeping
this comment
short.)


-Matthias

On 6/2/21 6:35 AM, John Roesler wrote:
Thanks, Ivan!

That sounds like a great plan to me. Two smaller KIPs are
easier to
agree on than one big one.

I agree hopping and sliding windows will actually have a
duplicating
effect. We can avoid adding distinct() to the sliding window
interface, but hopping windows are just a different
parameterization
of epoch-aligned windows. It seems we can’t do much about
that except
document the issue.

Thanks,
John

On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
Hi John!

I think that your proposal is just fantastic, it simplifies
things a
lot!

I also felt uncomfortable due to the fact that the proposed
`distinct()`
is not somewhere near `count()` and `reduce(..)`. But
`selectKey(..).groupByKey().windowedBy(..).distinct()`
didn't look like
a correct option for  me because of the issue with the
unneeded
repartitioning.

The bold idea that we can just CANCEL the repartitioning
didn't came to
my mind.

What seemed to me a single problem is in fact two unrelated
problems:
`distinct` operation and cancelling the unneeded
repartitioning.

       > what if we introduce a parameter to `selectKey()`
that specifies
that
the caller asserts that the new key does _not_ change the data
partitioning?

I think a more elegant solution would be not to add a new
parameter to
`selectKey` and all the other key-changing operations (`map`,
`transform`, `flatMap`, ...), but add a new operator
`KStream#cancelRepartitioning()` that resets
`keyChangingOperation`
flag
for the upstream node. Of course, "use it only if you know
what you're
doing" warning is to be added. Well, it's a topic for a
separate KIP!

Concerning `distinct()`. If we use `XXXWindowedKStream`
facilities,
then
changes to the API are minimally invasive: we're just adding
`distinct()` to TimeWindowedKStream and
SessionWindowedKStream, and
that's all.

We can now define `distinct` as an operation that returns
only a first
record that falls into a new window, and filters out all
the other
records that fall into an already existing window. BTW, we
can mock the
behaviour of such an operation with `TopologyTestDriver` using
`reduce((l, r) -> STOP)`.filterNot((k,
v)->STOP.equals(v)).  ;-)

Consider the following example (record times are in seconds):

//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35


1. 'Epoch-aligned deduplication' using tumbling windows:

.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()



produces

(key@[00000/10000], 4)
(key@[20000/30000], 23)
(key@[30000/40000], 34)

-- that is, one record per epoch-aligned window.

2. Hopping and sliding windows do not make much sense here,
because
they
produce multiple intersected windows, so that one record
can be
multiplied, but we want deduplication.

3. SessionWindows work for 'data-aligned deduplication'.

.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()




produces only

([key@4000/4000], 4)
([key@23000/23000], 23)

because all the records bigger than 7 are stuck together in
one
session.
Setting inactivity gap to 9 seconds will return three records:

([key@4000/4000], 4)
([key@23000/23000], 23)
([key@34000/34000], 34)

WDYT? If you like this variant, I will re-write KIP-655 and
propose a
separate KIP for `cancelRepartitioning` (or whatever name
we will
choose
for it).

Regards,

Ivan


24.05.2021 22:32, John Roesler пишет:
Hey there, Ivan!

In typical fashion, I'm going to make a somewhat outlandish
proposal. I'm hoping that we can side-step some of the
complications that have arisen. Please bear with me.

It seems like `distinct()` is not fundamentally unlike
other windowed
"aggregation" operations. Your concern about unnecessary
repartitioning seems to apply just as well to `count()` as to
`distinct()`.
This has come up before, but I don't remember when: what
if we
introduce a parameter to `selectKey()` that specifies that
the caller
asserts that the new key does _not_ change the data
partitioning?
The docs on that parameter would of course spell out all
the "rights
and responsibilities" of setting it.

In that case, we could indeed get back to
`selectKey(A).windowBy(B).distinct(...)`, where we get to
compose the
key mapper and the windowing function without having to
carve out
a separate domain just for `distinct()`. All the rest of
the KStream
operations would also benefit.

What do you think?

Thanks,
John

On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
Hello everyone,

let me revive the discussion for KIP-655. Now I have some
time
again and
I'm eager to finalize this.

Based on what was already discussed, I think that we can
split the
discussion into three topics for our convenience.

The three topics are:

- idExtractor  (how should we extract the deduplication
key for
the record)

- timeWindows (what time windows should we use)

- miscellaneous (naming etc.)

---- idExtractor ----

Original proposal: use (k, v) -> f(k, v) mapper,
defaulting to (k,
v) ->
k.  The drawback here is that we must warn the user to
choose such a
function that sets different IDs for records from different
partitions,
otherwise same IDs might be not co-partitioned (and not
deduplicated as
a result). Additional concern: what should we do when
this function
returns null?

Matthias proposed key-only deduplication: that is, no
idExtractor at
all, and if we want to use `distinct` for a particular
identifier, we
must `selectKey()` before. The drawback of this approach
is that
we will
always have repartitioning after the key selection, while
in practice
repartitioning will not always be necessary (for example,
when the
data
stream is such that different values infer different keys).

So here we have a 'safety vs. performance' trade-off. But
'safe'
variant
is also not very convenient for developers, since we're
forcing
them to
change the structure of their records.

A 'golden mean' here might be using composite ID with its
first
component equals to k and its second component equals to
some f(v) (f
defaults to v -> null, and null value returned by f(v) means
'deduplicate by the key only'). The nuance here is that
we will have
serializers only for types of k and f(v), and we must
correctly
serialize a tuple (k, f(v)), but of course this is doable.

What do you think?

---- timeWindows ----

Originally I proposed TimeWindows only just because they
solved my
particular case :-) but agree with Matthias' and Sophie's
objections.

I like the Sophie's point: we need both epoch-aligned and
data-aligned
windows. IMO this is absolutely correct: "data-aligned is
useful for
example when you know that a large number of updates to a
single key
will occur in short bursts, and epoch-aligned when you
specifically want
to get just a single update per discrete time interval."

I just cannot agree right away with Sophie's
.groupByKey().windowedBy(...).distinct() proposal, as it
implies  the
key-only deduplication -- see the previous topic.

Epoch-aligned windows are very simple: they should
forward only one
record per enumerated time window. TimeWindows are
exactly what we
want
here. I mentioned in the KIP both tumbling and hopping
windows just
because both are possible for TimeWindows, but indeed I
don't see any
real use case for hopping windows, only tumbling windows
make
sence IMO.

For data-aligned windows SlidingWindow interface seems to
be a nearly
valid choice. Nearly. It should forward a record once
when it's first
seen, and then not again for any identical records that
fall into the
next N timeUnits.  However, we cannot reuse SlidingWindow
as is,
because
just as Matthias noted, SlidingWindows go backward in
time, while we
need a windows that go forward in time, and are not
opened while
records
fall into an already existing window. We definitely
should make
our own
implementation, maybe we should call it ExpirationWindow?
WDYT?


---- miscellaneous ----

Persistent/in-memory stores. Matthias proposed to pass
Materialized
parameter next to DistinctParameters (and this is necessary,
because we
will need to provide a serializer for extracted id). This is
absolutely
valid point, I agree and I will fix it in the KIP.

Naming. Sophie noted that the Streams DSL operators are
typically
named
as verbs, so she proposes `deduplicate` in favour of
`distinct`. I
think
that while it's important to stick to the naming
conventions, it
is also
important to think of the experience of those who come
from different
stacks/technologies. People who are familiar with SQL and
Java
Streams
API must know for sure what does 'distinct' mean, while data
deduplication in general is a more complex task and thus
`deduplicate`
might be misleading. But I'm ready to be convinced if the
majority
thinks otherwise.


Regards,

Ivan



14.09.2020 21:31, Sophie Blee-Goldman пишет:
Hey all,

I'm not convinced either epoch-aligned or data-aligned
will fit all
possible use cases.
Both seem totally reasonable to me: data-aligned is
useful for
example when
you know
that a large number of updates to a single key will
occur in
short bursts,
and epoch-
aligned when you specifically want to get just a single
update
per discrete
time
interval.

Going a step further, though, what if you want just a
single
update per
calendar
month, or per year with accounting for leap years?
Neither of
those are
serviced that
well by the existing Windows specification to windowed
aggregations, a
well-known
limitation of the current API. There is actually a KIP
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>


going
on in parallel to fix this
exact issue and make the windowing interface much more
flexible.
Maybe
instead
of re-implementing this windowing interface in a similarly
limited fashion
for the
Distinct operator, we could leverage it here and get all
the
benefits
coming with
KIP-645.

Specifically, I'm proposing to remove the
TimeWindows/etc config
from the
DistinctParameters class, and move the distinct() method
from the
KStream
interface
to the TimeWindowedKStream interface. Since it's
semantically
similar to a
kind of
windowed aggregation, it makes sense to align it with the
existing windowing
framework, ie:

inputStream
            .groupKyKey()
            .windowedBy()
            .distinct()

Then we could use data-aligned windows if SlidingWindows is
specified in
the
windowedBy(), and epoch-aligned (or some other kind of
enumerable
window)
if a Windows is specified in windowedBy() (or an
EnumerableWindowDefinition
once KIP-645 is implemented to replace Windows).

*SlidingWindows*: should forward a record once when it's
first
seen, and
then not again
for any identical records that fall into the next N
timeUnits. This
includes out-of-order
records, ie if you have a SlidingWindows of size 10s and
process
records at
time
15s, 20s, 14s then you would just forward the one at 15s.
Presumably, if
you're
using SlidingWindows, you don't care about what falls
into exact
time
boxes, you just
want to deduplicate. If you do care about exact time
boxing then
you should
use...

*EnumerableWindowDefinition* (eg *TimeWindows*): should
forward
only one
record
per enumerated time window. If you get a records at 15s,
20s,14s
where the
windows
are enumerated at [5,14], [15, 24], etc then you forward
the
record at 15s
and also
the record at 14s

Just an idea: not sure if the impedance mismatch would
throw
users off
since the
semantics of the distinct windows are slightly different
than in the
aggregations.
But if we don't fit this into the existing windowed
framework,
then we
shouldn't use
any existing Windows-type classes at all, imo. ie we should
create a new
DistinctWindows config class, similar to how
stream-stream joins
get their
own
JoinWindows class

I also think that non-windowed deduplication could be
useful, in
which case
we
would want to also have the distinct() operator on the
KStream
interface.


One quick note regarding the naming: it seems like the
Streams
DSL operators
are typically named as verbs rather than adjectives, for
example.
#suppress
or
#aggregate. I get that there's some precedent for
'distinct'
specifically,
but
maybe something like 'deduplicate' would be more
appropriate for
the Streams
API.

WDYT?


On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
<iponoma...@mail.ru.invalid>
wrote:

Hi Matthias,

Thanks for your review! It made me think deeper, and
indeed I
understood
that I was missing some important details.

To simplify, let me explain my particular use case
first so I
can refer
to it later.

We have a system that collects information about
ongoing live
sporting
events from different sources. The information sources
have
their IDs
and these IDs are keys of the stream. Each source emits
messages
concerning sporting events, and we can have many
messages about
each
sporing event from each source. Event ID is extracted
from the
message.

We need a database of event IDs that were reported at
least once
by each
source (important: events from different sources are
considered
to be
different entities). The requirements are:

1) each new event ID should be written to the database
as soon
as possible

2) although it's ok and sometimes even desired to
repeat the
notification about already known event ID, but we
wouldn’t like our
database to be bothered by the same event ID more often
than
once in a
given period of time (say, 15 minutes).

With this example in mind let me answer your questions

         > (1) Using the `idExtractor` has the issue
that data might
not be
         > co-partitioned as you mentioned in the KIP.
Thus, I am
wondering if it
         > might be better to do deduplication only on
the key? If
one sets a new
         > key upstream (ie, extracts the deduplication
id into the
key), the
         > `distinct` operator could automatically
repartition the
data and thus we
         > would avoid user errors.

Of course with 'key-only' deduplication +
autorepartitioning we
will
never cause problems with co-partitioning. But in
practice, we
often
don't need repartitioning even if 'dedup ID' is
different from
the key,
like in my example above. So here we have a sort of
'performance vs
security' tradeoff.

The 'golden middle way' here can be the following: we
can form a
deduplication ID as KEY + separator +
idExtractor(VALUE). In case
idExtractor is not provided, we deduplicate by key only
(as in
original
proposal). Then idExtractor transforms only the value
(and not
the key)
and its result is appended to the key. Records from
different
partitions
will inherently have different deduplication IDs and
all the
data will
be co-partitioned. As with any stateful operation, we will
repartition
the topic in case the key was changed upstream, but
only in this
case,
thus avoiding unnecessary repartitioning. My example
above fits
this
perfectly.

         > (2) What is the motivation for allowing the
`idExtractor`
to return
         > `null`? Might be good to have some use-case
examples for
this feature.

Can't think of any use-cases. As it often happens, it's
just
came with a
copy-paste from StackOverflow -- see Michael Noll's
answer here:

https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions



But, jokes aside, we'll have to decide what to do with
nulls. If we
accept the above proposal of having deduplication ID as
KEY +
postfix,
then null can be treated as no postfix at all. If we don't
accept this
approach, then treating nulls as 'no-deduplication'
seems to be a
reasonable assumption (we can't get or put null as a
key to a KV
store,
so a record with null ID is always going to look 'new'
for us).


         > (2) Is using a `TimeWindow` really what we
want? I was
wondering if a
         > `SlidingWindow` might be better? Or maybe we
need a new
type of window?

Agree. It's probably not what we want. Once I thought
that reusing
TimeWindow is a clever idea, now I don't.

Do we need epoch alignment in our use case? No, we
don't, and I
don't
know if anyone going to need this. Epoch alignment is
good for
aggregation, but deduplication is a different story.

Let me describe the semantic the way I see it now and
tell me
what you
think:

- the only parameter that defines the deduplication
logic is
'expiration
period'

- when a deduplication ID arrives and we cannot find it
in the
store, we
forward the message downstream and store the ID + its
timestamp.

- when an out-of-order ID arrives with an older
timestamp and we
find a
'fresher' record, we do nothing and don't forward the
message
(??? OR
NOT? In what case would we want to forward an out-of-order
message?)

- when an ID with fresher timestamp arrives we check if
it falls
into
the expiration period and either forward it or not, but
in both
cases we
update the timestamp of the message in the store

- the WindowStore retention mechanism should clean up
very old
records
in order not to run out of space.

         > (3) `isPersistent` -- instead of using this
flag, it seems
better to
         > allow users to pass in a `Materialized`
parameter next to
         > `DistinctParameters` to configure the state
store?

Fully agree! Users might also want to change the
retention time.

         > (4) I am wondering if we should really have 4
overloads for
         > `DistinctParameters.with()`? It might be
better to have
one overload
         > with all require parameters, and add optional
parameters
using the
         > builder pattern? This seems to follow the DSL
Grammer
proposal.

Oh, I can explain. We can't fully rely on the builder
pattern
because of
Java type inference limitations. We have to provide type
parameters to
the builder methods or the code won't compile: see e.
g. this
https://twitter.com/inponomarev/status/1265053286933159938
and
following
discussion with Tagir Valeev.

When we came across the similar difficulties in
KIP-418, we finally
decided to add all the necessary overloads to parameter
class.
So I just
reproduced that approach here.

         > (5) Even if it might be an implementation
detail (and
maybe the KIP
         > itself does not need to mention it), can you
give a high
level overview
         > how you intent to implement it (that would be
easier to
grog, compared
         > to reading the PR).

Well as with any operation on KStreamImpl level I'm
building a
store and
a processor node.

KStreamDistinct class is going to be the
ProcessorSupplier, with
the
logic regarding the forwarding/muting of the records
located in
KStreamDistinct.KStreamDistinctProcessor#process

----

Matthias, if you are still reading this :-) a gentle
reminder:
my PR for
already accepted KIP-418 is still waiting for your
review. I
think it's
better for me to finalize at least one  KIP before
proceeding to
a new
one :-)

Regards,

Ivan

03.09.2020 4:20, Matthias J. Sax пишет:
Thanks for the KIP Ivan. Having a built-in deduplication
operator is for
sure a good addition.

Couple of questions:

(1) Using the `idExtractor` has the issue that data
might not be
co-partitioned as you mentioned in the KIP. Thus, I am
wondering if it
might be better to do deduplication only on the key?
If one
sets a new
key upstream (ie, extracts the deduplication id into
the key), the
`distinct` operator could automatically repartition
the data
and thus we
would avoid user errors.

(2) What is the motivation for allowing the
`idExtractor` to
return
`null`? Might be good to have some use-case examples
for this
feature.

(2) Is using a `TimeWindow` really what we want? I was
wondering if a
`SlidingWindow` might be better? Or maybe we need a
new type of
window?

It would be helpful if you could describe potential
use cases
in more
detail. -- I am mainly wondering about hopping window?
Each
record would
always falls into multiple window and thus would be
emitted
multiple
times, ie, each time the window closes. Is this really
a valid
use case?

It seems that for de-duplication, one wants to have some
"expiration
time", ie, for each ID, deduplicate all consecutive
records
with the
same ID and emit the first record after the
"expiration time"
passed. In
terms of a window, this would mean that the window
starts at
`r.ts` and
ends at `r.ts + windowSize`, ie, the window is aligned
to the
data.
TimeWindows are aligned to the epoch though. While
`SlidingWindows` also
align to the data, for the aggregation use-case they go
backward in
time, while we need a window that goes forward in
time. It's an
open
question if we can re-purpose `SlidingWindows` -- it
might be
ok the
make the alignment (into the past vs into the future)
an operator
dependent behavior?

(3) `isPersistent` -- instead of using this flag, it
seems
better to
allow users to pass in a `Materialized` parameter next to
`DistinctParameters` to configure the state store?

(4) I am wondering if we should really have 4
overloads for
`DistinctParameters.with()`? It might be better to
have one
overload
with all require parameters, and add optional
parameters using the
builder pattern? This seems to follow the DSL Grammer
proposal.

(5) Even if it might be an implementation detail (and
maybe the
KIP
itself does not need to mention it), can you give a
high level
overview
how you intent to implement it (that would be easier
to grog,
compared
to reading the PR).



-Matthias

On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
Sorry, I forgot to add [DISCUSS] tag to the topic

24.08.2020 2:27, Ivan Ponomarev пишет:
Hello,

I'd like to start a discussion for KIP-655.

KIP-655:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API




I also opened a proof-of-concept PR for you to
experiment
with the API:

PR#9210: https://github.com/apache/kafka/pull/9210

Regards,

Ivan Ponomarev






















Reply via email to