Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-08-09 Thread Ivan Ponomarev

> To recap: Could it be that the idea to apply a DISTINCT-aggregation is
> for a different use-case than to remove duplicate messages from a 
KStream?


OK, imagine the following:

We have 10 thermometers. Each second they transmit the current 
measured temperature. The id of the thermometer is the key, the 
temperature is the value. The temperature measured by a single 
thermometer changes slowly: on average, say once per 30 seconds, so most 
of the messages from a given thermometer are duplicates. But we need to 
react to the change ASAP. And we need a materialized view: a relational 
database table with the columns 'thermometerID' and 'temperature'.


1) We don't want to send 100K updates per second to the database.

2) But it's still ok if some updates will be duplicates -- the updates 
are idempotent.


3) We even can afford to loose a fraction of data -- the data from each 
thermometer will be eventually 'fixed' by its fresh readings.


This is my use case (actually these are not thermometers and 
temperature, but it doesn't matter :-)


Is it conceptually different from what you were thinking about?

Regards,

Ivan

09.08.2021 3:05, Matthias J. Sax пишет:

Thanks for sharing your thoughts. I guess my first question about why
using the key boils down to the use case, and maybe you have something
else in mind than I do.


I see it this way: we define 'distinct' operation as returning a single
record per time window per selected key,


I believe this sentence explains your way of thinking about it. My way
of thinking about it is different though: KStream de-duplication means
to "filter/drop" duplicate records, and a record is by definition a
duplicate if key AND value are the same. --- Or are the use case, for
which there might be an "message ID" and even if the "message ID" is the
same, the content of the message might be different? If this holds, do
we really de-duplicate records (sounds more like, pick a random one)?


Just re-read some of my older replies, and I guess, back than I did just
comment about your KeyExtractor idea, without considering the
end-the-end picture. Thus, my reply below goes into a different
direction now:

We only need to apply a window because we need to purge state
eventually. To this end, I actually believe that applying a "sliding
window" is the best way to go: for each message we encounter, we start
the de-duplication window when the message arrives, don't emit any
duplicates as long as the window is open, and purge the state afterwards.

Of course, internally, the implementation must be quite different
compared to a regular aggregation: we need to pull the value into the
key, to create a unique window for each message, but this seems to be an
implementation detail. Thus, I am wondering if we should not try to put
a square pig into a round whole: the current windowing/aggregation API
is not designed for a KStream de-duplication use-case, because a
de-duplication is no aggregation to begin with. Why not use a different API:

KStream KStream#deduplicate(final Duration windowSize);

Including some more overloads to allow configuring the internal state
store (this state store should not be queryable similar to
KStream-KStream state stores...).


To recap: Could it be that the idea to apply a DISTINCT-aggregation is
for a different use-case than to remove duplicate messages from a KStream?



-Matthias


On 8/6/21 4:12 AM, Ivan Ponomarev wrote:

    - Why restrict de-duplication for the key only? Should we not also
consider the value (or make it somehow flexible and let the user choose?)


Wasn't it the first idea that we abandoned (I mean, to provide
'KeyExtractor' and so on)?

In order to keep things simple we decided to make

.selectKey(...) //here we select anything we need
   //add markAsPartitioned from KIP-759 to taste
   .groupByKey()
   .windowed(...)
   .distinct()
 //the only new operation that we add to the API, reusing
 //all the windowed aggregations' infrastructure


    - I am wondering if the return type should be `KStream` instead of a
`KTable`? If I deduplicate a stream, I might expect a stream back? I
don't really consider a stream de-duplication an aggregation with
"mutable state"...


First, because it's going to be an operation on a
Time/SessionWindowedKStream, and these operations usually return
KTable, ...>. Then, it might be useful to know to which
time window a deduplicated record actually belongs. And it is trivial
task to turn this table back to a stream.


IMHO, an unordered stream and it's ordered "cousin" should
yield the same result? -- Given your example it seems you want to keep
the first record base on offset order. Wondering why?


I see it this way: we define 'distinct' operation as returning a single
record per time window per selected key, no matter what record. So it's
ok if it yields different results for different orderings if its main
property h

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-09 Thread Ivan Ponomarev
wed-aggregations: The result of a window aggregation is partitioned
by the "plain key": if we write the result into a topic using the same
partitioning function, we would write to different partitions... (I
guess it was never an issue so far, as we don't have KIP-300 in place
yet...)

It's also not an issue for IQ, because we know the plain key, and thus
can route to the right task.


About a solution: I think it might be ok to say we don't need to solve
this problem, but it's the users responsibility to take IQ into account.
Ie, if they want to use IQ downstream, the need to repartition: for this
case, repartitioning is _NOT_ unnecessary... The same argument seems to
apply for the join case I mentioned above. -- Given that
`markAsPartitioned()` is an advanced feature, it seems ok to leave it to
the user to use correctly (we should of course call it out in the docs!).



-Matthias



On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:

Before I dive in to the question of IQ and the approaches you proposed, can
you just
elaborate on the problem itself? By definition, the `markAsPartitioned`
flag means that
a repartition would be a no-op, ie that the stream (and its partitioning)
would be the same
whether or not a repartition is inserted. For this to be true, it then has
to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))

The left-hand side of the above is precisely how we determine the partition
number that
a key belongs to when using IQ. It shouldn't matter whether the user is
querying a key
in a store upstream of the key-changing operation or a mapped key
downstream of it
-- either way we just apply the given Partitioner.

See StreamsMetadataState#getKeyQueryMetadataForKey
<https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
for where this happens


If we're concerned that users might try to abuse the new
`markAsPartitioned` feature,
or accidentally misuse it, then we could add a runtime check that applies
the Partitioner
associated with that subtopology to the key being processed and the mapped
key result
to assert that they do indeed match. Imo this is probably overkill, just
putting it out there.

On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev 
wrote:


Hi Sophie,

thanks for your reply! So your proposal is:

1). For each key-changing operation, deprecate the existing overloads
that accept a Named, and replace them with overloads that take an
operator-specific config object.
2). Add `markAsPartitioned` flag to these configs.

IMO, this looks much better than the original proposal, I like it very
much and I think I will rewrite the KIP soon. I absolutely agree with
your points. Repartition logic is not a part of the public contract, and
it's much better to give it correct hints instead of telling explicitly
what it should do.

...

Since we're generating such bright ideas, maybe we should also
brainstorm the interactive query problem?

The problem is that interactive queries will not work properly when
`markAsPartitioned` is used. Although original key and mapped key will
be in the same partition, we will no longer be able to guess this
partition given the mapped key only.

The possible approaches are:

1) Give up and don't use interactive queries together with
`markAsPartitioned`. This is what I suppose now. But can we do better?

2) Maybe we should ask the user to provide 'reverse mapping' that will
allow IQ to restore the original key in order to choose the correct
partition. We can place this mapping in our new configuration object. Of
course, there is no way for KStreams to verify in compile time/startup
time that the this function is actually the reverse mapping that extract
the old key from the new one. Users will forget to provide this
function. Users will provide wrong functions. This all looks too fragile.

3) Maybe there can be a completely different approach. Let's introduce a
new entity -- composite keys, consisting of "head" and "tail". The
partition for the composite key is calculated based on its 'head' value
only. If we provide a key mapping in form key -> CompositeKey(key,
tail), then it's obvious that we do not need a repartition. When an
interactive query needs to guess the partition for CompositeKey, it just
extracts its head and calculates the correct partition.

We can select CompositeKey before groupByKey() and aggregation
operations, and this will not involve repartition. And IQ will work.

Is it too daring idea, WDYT? My concern: will it cover all the cases
when we want to choose a different key, but also avoid repartition?

Regards,

Ivan



06.08.2021 23:19, Sophie Blee-Goldman пишет:

Hey Ivan

I completely agree that adding it as a config to Grouped/Joined/etc isn't
much better, I was just
listing it for completeness, and that I would prefer to make it a
configuration of the key-chang

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-07 Thread Ivan Ponomarev
s same conversation over
and over, because we can now
stick any additional operator properties into that same config object.

WDYT? By the way, the above idea (introducing a single config object to
wrap all operator properties) was also
raised by John Roesler a while back. Let's hope he hasn't changed his mind
since then :)


On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev 
wrote:


Hi Matthias,

Concerning the naming: I like `markAsPartitioned`, because it describes
what this operation is actually doing!

Hi Sophie,

I see the concern about poor code cohesion. We declare key mapping in
one place of code, then later in another place we say
"markAsPartitioned()". When we change the code six months later, we
might forget to remove markAsPartitioned(), especially if it's placed in
another method or class. But I don't understand why do you propose to
include this config into Grouped/Joined/StreamJoined, because from this
point of view it's not a better solution?

The best approach regarding the cohesion might be to to add an extra
'preservePartition' flag to every key-changing operation, that is

1) selectKey
2) map
3) flatMap
4) transform
5) flatTransform

in order to tell if the provided mapping require repartition or not.
Indeed, this is a mapping operation property, not grouping one! BTW: the
idea of adding extra parameter to `selectKey` was once coined by John
Roesler.

Arguments in favour for this approach: 1) better code cohesion from the
point of view of the user, 2) 'smarter' code (the decision is taken
depending on metadata provided for all the upstream mappings), 3)
overall safer for the user.

Arguments against: invasive KStreams API change, 5 more method
overloads. Further on, when we add a new key-changing operation to
KStream, we must add an overloaded version with 'preservePartition'.
When we add a new overloaded version for existing operation, we actually
might need to add two or more overloaded versions. This will soon become
a mess.

I thought that since `markAsPartitioned` is intended for advanced users,
they will use it with care. When you're in a position where every
serialization/deserialization round matters for the latency, you're
extremely careful with the topology and you will not thoughtlessly add
new key-changing operations without controlling how it's going to change
the overall topology.

By the way, if we later find a better solution, it's way more easy to
deprecate a single `markAsPartitioned` operation than 5 method overloads.

What do you think?




04.08.2021 4:23, Sophie Blee-Goldman пишет:

Do we really need a whole DSL operator for this? I think the original

name

for this
operator -- `cancelRepartition()` -- is itself a sign that this is not an
operation on the
stream itself but rather a command/request to whichever operator would

have

otherwise triggered this repartition.

What about instead adding a new field to the Grouped/Joined/StreamJoined
config
objects that signals them to skip the repartitioning?

The one downside to this specific proposal is that you would then need to
specify
this for every stateful operation downstream of the key-changing

operation.

So a
better alternative might be to introduce this `skipRepartition` field, or
whatever we
want to call it, to the config object of the operator that's actually

doing

the key
changing operation which is apparently preserving the partitioning.

Imo this would be more "safe" relative to the current proposal, as the

user

has to
explicitly consider whether every key changing operation is indeed
preserving the
partitioning. Otherwise you could code up a topology with several key
changing
operations at the beginning which do require repartitioning. Then you get
to the end
of the topology and insert one final key changing operation that doesn't,
assume
you can just cancel the repartition, and suddenly you're wondering why

your

results
are all screwed up

On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax  wrote:


Thanks for the KIP Ivan!

I think it's a good feature to give advanced users more control, and
allow them to build more efficient application.

Not sure if I like the proposed named though (the good old "naming
things" discussion :))

Did you consider alternatives? What about

   - markAsPartitioned()
   - markAsKeyed()
   - skipRepartition()

Not sure if there are other idea on a good name?



-Matthias

On 6/24/21 7:45 AM, Ivan Ponomarev wrote:

Hello,

I'd like to start a discussion for KIP-759:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling



This is an offshoot of the discussion of KIP-655 for a `distinct`
operator, which turned out to be a separate proposal.

The proposal is quite trivial, however, we still might consider
alternatives (see 'Possible Alternatives' section).

Regards,

Ivan













Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-08-06 Thread Ivan Ponomarev

>   - Why restrict de-duplication for the key only? Should we not also
> consider the value (or make it somehow flexible and let the user choose?)

Wasn't it the first idea that we abandoned (I mean, to provide 
'KeyExtractor' and so on)?


In order to keep things simple we decided to make

.selectKey(...) //here we select anything we need
  //add markAsPartitioned from KIP-759 to taste
  .groupByKey()
  .windowed(...)
  .distinct()
//the only new operation that we add to the API, reusing
//all the windowed aggregations' infrastructure

>   - I am wondering if the return type should be `KStream` instead of a
> `KTable`? If I deduplicate a stream, I might expect a stream back? I
> don't really consider a stream de-duplication an aggregation with
> "mutable state"...

First, because it's going to be an operation on a 
Time/SessionWindowedKStream, and these operations usually return 
KTable, ...>. Then, it might be useful to know to which 
time window a deduplicated record actually belongs. And it is trivial 
task to turn this table back to a stream.


> IMHO, an unordered stream and it's ordered "cousin" should
> yield the same result? -- Given your example it seems you want to keep
> the first record base on offset order. Wondering why?

I see it this way: we define 'distinct' operation as returning a single 
record per time window per selected key, no matter what record. So it's 
ok if it yields different results for different orderings if its main 
property holds!


And since we can select any key we like, we can get any degree of 
'deduplication granularity' and 'determinism'.


> While I agree that deduplication for overlapping window is questionable,
> I am still wondering if you plan to disallow it (by adding a runtime
> check and throwing an exception), or not?

Thanks for this point! I think that 'fail-fast' approach is good. We 
might need to throw an exception, I will add this into the KIP:


- SessionWindows -- OK
- SlidingWindows -- Exception
- TimeWindows --
 tumbling -- OK
 hopping  -- Exception


Regards,

Ivan


04.08.2021 4:22, Matthias J. Sax пишет:

Couple of questions:

  - Why restrict de-duplication for the key only? Should we not also
consider the value (or make it somehow flexible and let the user choose?)

  - I am wondering if the return type should be `KStream` instead of a
`KTable`? If I deduplicate a stream, I might expect a stream back? I
don't really consider a stream de-duplication an aggregation with
"mutable state"...

Also, why would the result key need to be windowed?

Btw: How should out-of-order data be handled? Given that you only want
to consider the key, the value could be different, and thus, if there is
out-of-order data, keeping the one or other value could make a
difference? IMHO, an unordered stream and it's ordered "cousin" should
yield the same result? -- Given your example it seems you want to keep
the first record base on offset order. Wondering why?


While I agree that deduplication for overlapping window is questionable,
I am still wondering if you plan to disallow it (by adding a runtime
check and throwing an exception), or not?


On 8/1/21 6:42 AM, Ivan Ponomarev wrote:

Hi Bruno,

I'm sorry for the delay with the answer. Unfortunately your messages
were put to spam folder, that's why I didn't answer them right away.

Concerning your question about comparing serialized values vs. using
equals: I think it must be clear now due to John's explanations.
Distinct is a stateful operation, thus we will need to use
serialization. (Although AFAICS the in-memory storage might be a good
practical solution in many cases).


I do currently not see why it should not make sense in hopping

windows... I do not understand  the following sentence: "...one record
can be multiplied instead of deduplication."

Ok, let me explain.

As it's written in the KIP, "The distinct operation returns only a first
record that falls into a new window, and filters out all the other
records that fall into an already existing window."

Also it's worth to remember that the result of `distinct` is
KTable, V>, not Stream.

If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a
record (key, val) with timestamp 25 arrives, it will be forwarded three
times ('multiplied') since is falls into the intersection of all three
windows. The output will be

(key@[0/40],  val)
(key@[10/50], val)
(key@[20/60], val)

You can reason about `distinct` operation just like you reason about
`sum` or `count`. When a record arrives that falls into a window, we
update the aggregation on this window. For `distinct`, when extra
records arrive into the same window, we also perform some sort of
aggregation (we may even count them internally!), but, unlike sum or
count, we will not forward anything since counter is strictly greater
than zero.

You may refer to 'usage exampl

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-06 Thread Ivan Ponomarev

Hi Matthias,

Concerning the naming: I like `markAsPartitioned`, because it describes 
what this operation is actually doing!


Hi Sophie,

I see the concern about poor code cohesion. We declare key mapping in 
one place of code, then later in another place we say 
"markAsPartitioned()". When we change the code six months later, we 
might forget to remove markAsPartitioned(), especially if it's placed in 
another method or class. But I don't understand why do you propose to 
include this config into Grouped/Joined/StreamJoined, because from this 
point of view it's not a better solution?


The best approach regarding the cohesion might be to to add an extra 
'preservePartition' flag to every key-changing operation, that is


1) selectKey
2) map
3) flatMap
4) transform
5) flatTransform

in order to tell if the provided mapping require repartition or not. 
Indeed, this is a mapping operation property, not grouping one! BTW: the 
idea of adding extra parameter to `selectKey` was once coined by John 
Roesler.


Arguments in favour for this approach: 1) better code cohesion from the 
point of view of the user, 2) 'smarter' code (the decision is taken 
depending on metadata provided for all the upstream mappings), 3) 
overall safer for the user.


Arguments against: invasive KStreams API change, 5 more method 
overloads. Further on, when we add a new key-changing operation to 
KStream, we must add an overloaded version with 'preservePartition'. 
When we add a new overloaded version for existing operation, we actually 
might need to add two or more overloaded versions. This will soon become 
a mess.


I thought that since `markAsPartitioned` is intended for advanced users, 
they will use it with care. When you're in a position where every 
serialization/deserialization round matters for the latency, you're 
extremely careful with the topology and you will not thoughtlessly add 
new key-changing operations without controlling how it's going to change 
the overall topology.


By the way, if we later find a better solution, it's way more easy to 
deprecate a single `markAsPartitioned` operation than 5 method overloads.


What do you think?




04.08.2021 4:23, Sophie Blee-Goldman пишет:

Do we really need a whole DSL operator for this? I think the original name
for this
operator -- `cancelRepartition()` -- is itself a sign that this is not an
operation on the
stream itself but rather a command/request to whichever operator would have
otherwise triggered this repartition.

What about instead adding a new field to the Grouped/Joined/StreamJoined
config
objects that signals them to skip the repartitioning?

The one downside to this specific proposal is that you would then need to
specify
this for every stateful operation downstream of the key-changing operation.
So a
better alternative might be to introduce this `skipRepartition` field, or
whatever we
want to call it, to the config object of the operator that's actually doing
the key
changing operation which is apparently preserving the partitioning.

Imo this would be more "safe" relative to the current proposal, as the user
has to
explicitly consider whether every key changing operation is indeed
preserving the
partitioning. Otherwise you could code up a topology with several key
changing
operations at the beginning which do require repartitioning. Then you get
to the end
of the topology and insert one final key changing operation that doesn't,
assume
you can just cancel the repartition, and suddenly you're wondering why your
results
are all screwed up

On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax  wrote:


Thanks for the KIP Ivan!

I think it's a good feature to give advanced users more control, and
allow them to build more efficient application.

Not sure if I like the proposed named though (the good old "naming
things" discussion :))

Did you consider alternatives? What about

  - markAsPartitioned()
  - markAsKeyed()
  - skipRepartition()

Not sure if there are other idea on a good name?



-Matthias

On 6/24/21 7:45 AM, Ivan Ponomarev wrote:

Hello,

I'd like to start a discussion for KIP-759:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling



This is an offshoot of the discussion of KIP-655 for a `distinct`
operator, which turned out to be a separate proposal.

The proposal is quite trivial, however, we still might consider
alternatives (see 'Possible Alternatives' section).

Regards,

Ivan








Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-08-01 Thread Ivan Ponomarev

Hi Bruno,

I'm sorry for the delay with the answer. Unfortunately your messages 
were put to spam folder, that's why I didn't answer them right away.


Concerning your question about comparing serialized values vs. using 
equals: I think it must be clear now due to John's explanations. 
Distinct is a stateful operation, thus we will need to use 
serialization. (Although AFAICS the in-memory storage might be a good 
practical solution in many cases).


> I do currently not see why it should not make sense in hopping 
windows... I do not understand  the following sentence: "...one record 
can be multiplied instead of deduplication."


Ok, let me explain.

As it's written in the KIP, "The distinct operation returns only a first 
record that falls into a new window, and filters out all the other 
records that fall into an already existing window."


Also it's worth to remember that the result of `distinct` is 
KTable, V>, not Stream.


If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a 
record (key, val) with timestamp 25 arrives, it will be forwarded three 
times ('multiplied') since is falls into the intersection of all three 
windows. The output will be


(key@[0/40],  val)
(key@[10/50], val)
(key@[20/60], val)

You can reason about `distinct` operation just like you reason about 
`sum` or `count`. When a record arrives that falls into a window, we 
update the aggregation on this window. For `distinct`, when extra 
records arrive into the same window, we also perform some sort of 
aggregation (we may even count them internally!), but, unlike sum or 
count, we will not forward anything since counter is strictly greater 
than zero.


You may refer to 'usage examples' of the KIP 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-UsageExamples) 
to get clearer idea of how it works.


> As I said earlier, I do not think that SQL and the Java Stream API 
are good arguments to not use a verb


This is an important matter. As we all know, naming is hard.

However, `distinct` name is not used just in SQL and Java Streams. It is 
a kind of a standard operation that is used in nearly all the data 
processing frameworks, see all the hyperlinked examples in 'Motivation' 
section of KIP 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation)


Please look at it and let me know what do you think.

Regards,

Ivan

29.07.2021 4:49, John Roesler пишет:

Hi Bruno,

I had previously been thinking to use equals(), since I
thought that this might be a stateless operation. Comparing
the serialized form requires a serde and a fairly expensive
serialization operation, so while byte equality is superior
to equals(), we shouldn't use it in operations unless they
already require serialization.

I chnaged my mind when I later realized I had been mistaken,
and this operation is of course stateful.

I hope this helps clarify it.

Thanks,
-John

On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote:

Hi Ivan and John,

1. John, could you clarify why comparing serialized values seems the way
to go, now?

2. Ivan, Could you please answer my questions that I posted earlier? I
will repost it here:
Ivan, could you please make this matter a bit clearer in the KIP?
Actually, thinking about it again, I do currently not see why it should
not make sense in hopping windows. Regarding this, I do not understand
the following sentence:

"hopping and sliding windows do not make much sense for distinct()
because they produce multiple intersected windows, so that one record
can be multiplied instead of deduplication."

Ivan, what do you mean with "multiplied"?

3. As I said earlier, I do not think that SQL and the Java Stream API
are good arguments to not use a verb. However, if everybody else is fine
with it, I can get behind it.

John, good catch about the missing overloads!
BTW, the overload with Named should be there regardless of stateful or
stateless.

Best,
Bruno

On 22.07.21 20:58, John Roesler wrote:

Hi Ivan,

Thanks for the reply.

1. I think I might have gotten myself confused. I was
thinking of this operation as stateless, but now I'm not
sure what I was thinking... This operator has to be
stateful, right? In that case, I agree that comparing
serialized values seems to be way to do it.

2. Thanks for the confirmation

3. I continue to be satisfied to let you all hash it out.

Thanks,
-John

On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote:

Hi all,

1. Actually I always thought about the serialized byte array only -- at
least this is what local stores depend upon, and what Kafka itself
depends upon when doing log compaction.

I can imagine a case where two different byte arrays deserialize to
objects which are `equals` to each other. B

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-07-26 Thread Ivan Ponomarev

Hi!

I have updated the KIP with the definition of what is considered to be a 
duplicated record ("The records are considered to be duplicates iff 
serialized forms of their keys are equal.")


I have also added all the standard overloads for the distinct() method. 
These overloads are the same as for `count()`


Thank you John for noticing this, this is extremely important, in my 
opinion, some users will want to use in-memory stores here. Although 
`distinct` is stateful, it's ok to lose state occasionally, because an 
idempotent handler is expected downstream, which must be able to cope 
with some duplicates.


Concerning the naming. I have also added more examples of `distinct` 
name usage to the KIP (Apache Spark, Apache Flink, Apache Beam, 
Hazelcast Jet -- see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation)


But this is, of course, not a principal point for me, I am ready to be 
convinced.


Sophie and Bruno, do you think that this operation needs another name?

What do other people think?

Regards,

Ivan


22.07.2021 22:03, John Roesler пишет:

Hi Ivan,

Sorry for the late amendment, but I was just reviewing the
KIP again for your vote thread, and it struck me that, if
this is a stateful operation, we need a few more overloads.

Following the example from the other stateful windowed
operators, we should have:

distinct()
distinct(Named)
distinct(Materialized)

It's a small update, but an important one, since people will
inevitably need to customize the state store for the
operation.

Thanks,
John

On Thu, 2021-07-22 at 13:58 -0500, John Roesler wrote:

Hi Ivan,

Thanks for the reply.

1. I think I might have gotten myself confused. I was
thinking of this operation as stateless, but now I'm not
sure what I was thinking... This operator has to be
stateful, right? In that case, I agree that comparing
serialized values seems to be way to do it.

2. Thanks for the confirmation

3. I continue to be satisfied to let you all hash it out.

Thanks,
-John

On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote:

Hi all,

1. Actually I always thought about the serialized byte array only -- at
least this is what local stores depend upon, and what Kafka itself
depends upon when doing log compaction.

I can imagine a case where two different byte arrays deserialize to
objects which are `equals` to each other. But I think we can ignore this
for now because IMO the risks related to buggy `equals` implementations
outweigh the potential benefits.

I will mention the duplicate definition in the KIP.

2. I agree with John, he got my point.

3. Let me gently insist on `distinct`. I believe that an exception to
the rule is appropriate here, because the name `distinct()` is ubiquitous.

It's not only about Java Streams API (or .NET LINQ, which appeared
earlier and also has `Distinct`): Spark's DataFrame has `distinct()`
method, Hazelcast Jet has `distinct()` method, and I bet I can find more
examples if I search. When we teach KStreams, we always say that
KStreams are just like other streaming APIs, and they have roots in SQL
queries. Users know what `distinct` is and they expect it to be in the API.


Regards,

Ivan


13.07.2021 0:10, John Roesler пишет:

Hi all,

Bruno raised some very good points. I’d like to chime in with additional 
context.

1. Great point. We faced a similar problem defining KIP-557. For 557, we chose 
to use the serialized byte array instead of the equals() method, but I think 
the situation in KIP-655 is a bit different. I think it might make sense to use 
the equals() method here, but am curious what Ivan thinks.

2. I figured we'd do nothing. I thought Ivan was just saying that it doesn't 
make a ton of sense to use it, which I agree with, but it doesn't seem like 
that means we should prohibit it.

3. FWIW, I don't have a strong feeling either way.

Thanks,
-John

On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:

Hi Ivan,

Thank you for the KIP!

Some aspects are not clear to me from the KIP and I have a proposal.

1. The KIP does not describe the criteria that define a duplicate. Could
you add a definition of duplicate to the KIP?

2. The KIP does not describe what happens if distinct() is applied on a
hopping window. On the DSL level, I do not see how you can avoid that
users apply distinct() on a hopping window, i.e., you cannot avoid it at
compile time, you need to check it at runtime and throw an exception. Is
this correct or am I missing something?

3. I would also like to back a proposal by Sophie. She proposed to use
deduplicate() instead of distinct(), since the other DSL operations are
also verbs. I do not think that SQL and the Java Stream API are good
arguments to not use a verb.

Best,
Bruno


On 10.07.21 19:11, John Roesler wrote:

Hi Ivan,

Sorry for the silence!

I have just re-read the proposal.

To summarize, you are now only

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-07-20 Thread Ivan Ponomarev

Hi all,

1. Actually I always thought about the serialized byte array only -- at 
least this is what local stores depend upon, and what Kafka itself 
depends upon when doing log compaction.


I can imagine a case where two different byte arrays deserialize to 
objects which are `equals` to each other. But I think we can ignore this 
for now because IMO the risks related to buggy `equals` implementations 
outweigh the potential benefits.


I will mention the duplicate definition in the KIP.

2. I agree with John, he got my point.

3. Let me gently insist on `distinct`. I believe that an exception to 
the rule is appropriate here, because the name `distinct()` is ubiquitous.


It's not only about Java Streams API (or .NET LINQ, which appeared 
earlier and also has `Distinct`): Spark's DataFrame has `distinct()` 
method, Hazelcast Jet has `distinct()` method, and I bet I can find more 
examples if I search. When we teach KStreams, we always say that 
KStreams are just like other streaming APIs, and they have roots in SQL 
queries. Users know what `distinct` is and they expect it to be in the API.



Regards,

Ivan


13.07.2021 0:10, John Roesler пишет:

Hi all,

Bruno raised some very good points. I’d like to chime in with additional 
context.

1. Great point. We faced a similar problem defining KIP-557. For 557, we chose 
to use the serialized byte array instead of the equals() method, but I think 
the situation in KIP-655 is a bit different. I think it might make sense to use 
the equals() method here, but am curious what Ivan thinks.

2. I figured we'd do nothing. I thought Ivan was just saying that it doesn't 
make a ton of sense to use it, which I agree with, but it doesn't seem like 
that means we should prohibit it.

3. FWIW, I don't have a strong feeling either way.

Thanks,
-John

On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:

Hi Ivan,

Thank you for the KIP!

Some aspects are not clear to me from the KIP and I have a proposal.

1. The KIP does not describe the criteria that define a duplicate. Could
you add a definition of duplicate to the KIP?

2. The KIP does not describe what happens if distinct() is applied on a
hopping window. On the DSL level, I do not see how you can avoid that
users apply distinct() on a hopping window, i.e., you cannot avoid it at
compile time, you need to check it at runtime and throw an exception. Is
this correct or am I missing something?

3. I would also like to back a proposal by Sophie. She proposed to use
deduplicate() instead of distinct(), since the other DSL operations are
also verbs. I do not think that SQL and the Java Stream API are good
arguments to not use a verb.

Best,
Bruno


On 10.07.21 19:11, John Roesler wrote:

Hi Ivan,

Sorry for the silence!

I have just re-read the proposal.

To summarize, you are now only proposing the zero-arg distict() method to be 
added to TimeWindowedKStream and SessionWindowedKStream, right?

I’m in favor of this proposal.

Thanks,
John

On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:

Hello everyone,

I would like to remind you about KIP-655 and KIP-759 just in case they
got lost in your inbox.

Now the initial proposal is split into two independent and smaller ones,
so it must be easier to review them. Of course, if you have time.

Regards,

Ivan


24.06.2021 18:11, Ivan Ponomarev пишет:

Hello all,

I have rewritten the KIP-655 summarizing what was agreed upon during
this discussion (now the proposal is much simpler and less invasive).

I have also created KIP-759 (cancelRepartition operation) and started a
discussion for it.

Regards,

Ivan.



04.06.2021 8:15, Matthias J. Sax пишет:

Just skimmed over the thread -- first of all, I am glad that we could
merge KIP-418 and ship it :)

About the re-partitioning concerns, there are already two tickets for it:

    - https://issues.apache.org/jira/browse/KAFKA-4835
    - https://issues.apache.org/jira/browse/KAFKA-10844

Thus, it seems best to exclude this topic from this KIP, and do a
separate KIP for it (if necessary, we can "pause" this KIP until the
repartition KIP is done). It's a long standing "issue" and we should
resolve it in a general way I guess.

(Did not yet ready all responses in detail yet, so keeping this comment
short.)


-Matthias

On 6/2/21 6:35 AM, John Roesler wrote:

Thanks, Ivan!

That sounds like a great plan to me. Two smaller KIPs are easier to
agree on than one big one.

I agree hopping and sliding windows will actually have a duplicating
effect. We can avoid adding distinct() to the sliding window
interface, but hopping windows are just a different parameterization
of epoch-aligned windows. It seems we can’t do much about that except
document the issue.

Thanks,
John

On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:

Hi John!

I think that your proposal is just fantastic, it simplifies things a
lot!

I also felt uncomfortable due to the fact that the proposed
`distinct()`
is not somewhere near `

[VOTE] KIP-655: Windowed "Distinct" Operation for Kafka Streams

2021-07-12 Thread Ivan Ponomarev

Hello all!

I'd like to start the vote for KIP-655 which proposes the zero-arg 
distict() method to be added to TimeWindowedKStream and 
SessionWindowedKStream.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API

Regards,

Ivan


Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-07-11 Thread Ivan Ponomarev

Hi, John!

> To summarize, you are now only proposing the zero-arg distict() 
method to be added to TimeWindowedKStream and SessionWindowedKStream, right?


Right, yes, it's that simple!

Okay, since you and Israel Ekpo both agreed, I'll start voting thread 
and will start the implementation.


But this change alone without .cancelRepartition() will not solve my 
problem :-) so I would also like to see your thoughts in KIP-759 
discussion thread.


Regards,

Ivan


10.07.2021 20:11, John Roesler пишет:

Hi Ivan,

Sorry for the silence!

I have just re-read the proposal.

To summarize, you are now only proposing the zero-arg distict() method to be 
added to TimeWindowedKStream and SessionWindowedKStream, right?

I’m in favor of this proposal.

Thanks,
John

On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:

Hello everyone,

I would like to remind you about KIP-655 and KIP-759 just in case they
got lost in your inbox.

Now the initial proposal is split into two independent and smaller ones,
so it must be easier to review them. Of course, if you have time.

Regards,

Ivan


24.06.2021 18:11, Ivan Ponomarev пишет:

Hello all,

I have rewritten the KIP-655 summarizing what was agreed upon during
this discussion (now the proposal is much simpler and less invasive).

I have also created KIP-759 (cancelRepartition operation) and started a
discussion for it.

Regards,

Ivan.



04.06.2021 8:15, Matthias J. Sax пишет:

Just skimmed over the thread -- first of all, I am glad that we could
merge KIP-418 and ship it :)

About the re-partitioning concerns, there are already two tickets for it:

   - https://issues.apache.org/jira/browse/KAFKA-4835
   - https://issues.apache.org/jira/browse/KAFKA-10844

Thus, it seems best to exclude this topic from this KIP, and do a
separate KIP for it (if necessary, we can "pause" this KIP until the
repartition KIP is done). It's a long standing "issue" and we should
resolve it in a general way I guess.

(Did not yet ready all responses in detail yet, so keeping this comment
short.)


-Matthias

On 6/2/21 6:35 AM, John Roesler wrote:

Thanks, Ivan!

That sounds like a great plan to me. Two smaller KIPs are easier to
agree on than one big one.

I agree hopping and sliding windows will actually have a duplicating
effect. We can avoid adding distinct() to the sliding window
interface, but hopping windows are just a different parameterization
of epoch-aligned windows. It seems we can’t do much about that except
document the issue.

Thanks,
John

On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:

Hi John!

I think that your proposal is just fantastic, it simplifies things a
lot!

I also felt uncomfortable due to the fact that the proposed
`distinct()`
is not somewhere near `count()` and `reduce(..)`. But
`selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like
a correct option for  me because of the issue with the unneeded
repartitioning.

The bold idea that we can just CANCEL the repartitioning didn't came to
my mind.

What seemed to me a single problem is in fact two unrelated problems:
`distinct` operation and cancelling the unneeded repartitioning.

   > what if we introduce a parameter to `selectKey()` that specifies
that
the caller asserts that the new key does _not_ change the data
partitioning?

I think a more elegant solution would be not to add a new parameter to
`selectKey` and all the other key-changing operations (`map`,
`transform`, `flatMap`, ...), but add a new operator
`KStream#cancelRepartitioning()` that resets `keyChangingOperation`
flag
for the upstream node. Of course, "use it only if you know what you're
doing" warning is to be added. Well, it's a topic for a separate KIP!

Concerning `distinct()`. If we use `XXXWindowedKStream` facilities,
then
changes to the API are minimally invasive: we're just adding
`distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
that's all.

We can now define `distinct` as an operation that returns only a first
record that falls into a new window, and filters out all the other
records that fall into an already existing window. BTW, we can mock the
behaviour of such an operation with `TopologyTestDriver` using
`reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)

Consider the following example (record times are in seconds):

//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35


1. 'Epoch-aligned deduplication' using tumbling windows:

.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()


produces

(key@[0/1], 4)
(key@[2/3], 23)
(key@[3/4], 34)

-- that is, one record per epoch-aligned window.

2. Hopping and sliding windows do not make much sense here, because
they
produce multiple intersected windows, so that one record can be
multiplied, but we want deduplication.

3. SessionWindows work for 'data-aligned deduplication'.

.groupByKey().windowedBy

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-07-10 Thread Ivan Ponomarev

Hello everyone,

I would like to remind you about KIP-655 and KIP-759 just in case they 
got lost in your inbox.


Now the initial proposal is split into two independent and smaller ones, 
so it must be easier to review them. Of course, if you have time.


Regards,

Ivan


24.06.2021 18:11, Ivan Ponomarev пишет:

Hello all,

I have rewritten the KIP-655 summarizing what was agreed upon during 
this discussion (now the proposal is much simpler and less invasive).


I have also created KIP-759 (cancelRepartition operation) and started a 
discussion for it.


Regards,

Ivan.



04.06.2021 8:15, Matthias J. Sax пишет:

Just skimmed over the thread -- first of all, I am glad that we could
merge KIP-418 and ship it :)

About the re-partitioning concerns, there are already two tickets for it:

  - https://issues.apache.org/jira/browse/KAFKA-4835
  - https://issues.apache.org/jira/browse/KAFKA-10844

Thus, it seems best to exclude this topic from this KIP, and do a
separate KIP for it (if necessary, we can "pause" this KIP until the
repartition KIP is done). It's a long standing "issue" and we should
resolve it in a general way I guess.

(Did not yet ready all responses in detail yet, so keeping this comment
short.)


-Matthias

On 6/2/21 6:35 AM, John Roesler wrote:

Thanks, Ivan!

That sounds like a great plan to me. Two smaller KIPs are easier to 
agree on than one big one.


I agree hopping and sliding windows will actually have a duplicating 
effect. We can avoid adding distinct() to the sliding window 
interface, but hopping windows are just a different parameterization 
of epoch-aligned windows. It seems we can’t do much about that except 
document the issue.


Thanks,
John

On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:

Hi John!

I think that your proposal is just fantastic, it simplifies things a 
lot!


I also felt uncomfortable due to the fact that the proposed 
`distinct()`

is not somewhere near `count()` and `reduce(..)`. But
`selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like
a correct option for  me because of the issue with the unneeded
repartitioning.

The bold idea that we can just CANCEL the repartitioning didn't came to
my mind.

What seemed to me a single problem is in fact two unrelated problems:
`distinct` operation and cancelling the unneeded repartitioning.

  > what if we introduce a parameter to `selectKey()` that specifies 
that
the caller asserts that the new key does _not_ change the data 
partitioning?


I think a more elegant solution would be not to add a new parameter to
`selectKey` and all the other key-changing operations (`map`,
`transform`, `flatMap`, ...), but add a new operator
`KStream#cancelRepartitioning()` that resets `keyChangingOperation` 
flag

for the upstream node. Of course, "use it only if you know what you're
doing" warning is to be added. Well, it's a topic for a separate KIP!

Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, 
then

changes to the API are minimally invasive: we're just adding
`distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
that's all.

We can now define `distinct` as an operation that returns only a first
record that falls into a new window, and filters out all the other
records that fall into an already existing window. BTW, we can mock the
behaviour of such an operation with `TopologyTestDriver` using
`reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)

Consider the following example (record times are in seconds):

//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35


1. 'Epoch-aligned deduplication' using tumbling windows:

.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() 



produces

(key@[0/1], 4)
(key@[2/3], 23)
(key@[3/4], 34)

-- that is, one record per epoch-aligned window.

2. Hopping and sliding windows do not make much sense here, because 
they

produce multiple intersected windows, so that one record can be
multiplied, but we want deduplication.

3. SessionWindows work for 'data-aligned deduplication'.

.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() 




produces only

([key@4000/4000], 4)
([key@23000/23000], 23)

because all the records bigger than 7 are stuck together in one 
session.

Setting inactivity gap to 9 seconds will return three records:

([key@4000/4000], 4)
([key@23000/23000], 23)
([key@34000/34000], 34)

WDYT? If you like this variant, I will re-write KIP-655 and propose a
separate KIP for `cancelRepartitioning` (or whatever name we will 
choose

for it).

Regards,

Ivan


24.05.2021 22:32, John Roesler пишет:

Hey there, Ivan!

In typical fashion, I'm going to make a somewhat outlandish
proposal. I'm hoping that we can side-step some of the
complications that have arisen. Please bear with me.

It seems like `distinct()` is not fundamentally unlike other windowed

[DISCUSS] KIP-759: Unneeded repartition canceling

2021-06-24 Thread Ivan Ponomarev

Hello,

I'd like to start a discussion for KIP-759: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling


This is an offshoot of the discussion of KIP-655 for a `distinct` 
operator, which turned out to be a separate proposal.


The proposal is quite trivial, however, we still might consider 
alternatives (see 'Possible Alternatives' section).


Regards,

Ivan


Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-06-24 Thread Ivan Ponomarev

Hello all,

I have rewritten the KIP-655 summarizing what was agreed upon during 
this discussion (now the proposal is much simpler and less invasive).


I have also created KIP-759 (cancelRepartition operation) and started a 
discussion for it.


Regards,

Ivan.



04.06.2021 8:15, Matthias J. Sax пишет:

Just skimmed over the thread -- first of all, I am glad that we could
merge KIP-418 and ship it :)

About the re-partitioning concerns, there are already two tickets for it:

  - https://issues.apache.org/jira/browse/KAFKA-4835
  - https://issues.apache.org/jira/browse/KAFKA-10844

Thus, it seems best to exclude this topic from this KIP, and do a
separate KIP for it (if necessary, we can "pause" this KIP until the
repartition KIP is done). It's a long standing "issue" and we should
resolve it in a general way I guess.

(Did not yet ready all responses in detail yet, so keeping this comment
short.)


-Matthias

On 6/2/21 6:35 AM, John Roesler wrote:

Thanks, Ivan!

That sounds like a great plan to me. Two smaller KIPs are easier to agree on 
than one big one.

I agree hopping and sliding windows will actually have a duplicating effect. We 
can avoid adding distinct() to the sliding window interface, but hopping 
windows are just a different parameterization of epoch-aligned windows. It 
seems we can’t do much about that except document the issue.

Thanks,
John

On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:

Hi John!

I think that your proposal is just fantastic, it simplifies things a lot!

I also felt uncomfortable due to the fact that the proposed `distinct()`
is not somewhere near `count()` and `reduce(..)`. But
`selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like
a correct option for  me because of the issue with the unneeded
repartitioning.

The bold idea that we can just CANCEL the repartitioning didn't came to
my mind.

What seemed to me a single problem is in fact two unrelated problems:
`distinct` operation and cancelling the unneeded repartitioning.

  > what if we introduce a parameter to `selectKey()` that specifies that
the caller asserts that the new key does _not_ change the data partitioning?

I think a more elegant solution would be not to add a new parameter to
`selectKey` and all the other key-changing operations (`map`,
`transform`, `flatMap`, ...), but add a new operator
`KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag
for the upstream node. Of course, "use it only if you know what you're
doing" warning is to be added. Well, it's a topic for a separate KIP!

Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then
changes to the API are minimally invasive: we're just adding
`distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
that's all.

We can now define `distinct` as an operation that returns only a first
record that falls into a new window, and filters out all the other
records that fall into an already existing window. BTW, we can mock the
behaviour of such an operation with `TopologyTestDriver` using
`reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)

Consider the following example (record times are in seconds):

//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35


1. 'Epoch-aligned deduplication' using tumbling windows:

.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()

produces

(key@[0/1], 4)
(key@[2/3], 23)
(key@[3/4], 34)

-- that is, one record per epoch-aligned window.

2. Hopping and sliding windows do not make much sense here, because they
produce multiple intersected windows, so that one record can be
multiplied, but we want deduplication.

3. SessionWindows work for 'data-aligned deduplication'.

.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()


produces only

([key@4000/4000], 4)
([key@23000/23000], 23)

because all the records bigger than 7 are stuck together in one session.
Setting inactivity gap to 9 seconds will return three records:

([key@4000/4000], 4)
([key@23000/23000], 23)
([key@34000/34000], 34)

WDYT? If you like this variant, I will re-write KIP-655 and propose a
separate KIP for `cancelRepartitioning` (or whatever name we will choose
for it).

Regards,

Ivan


24.05.2021 22:32, John Roesler пишет:

Hey there, Ivan!

In typical fashion, I'm going to make a somewhat outlandish
proposal. I'm hoping that we can side-step some of the
complications that have arisen. Please bear with me.

It seems like `distinct()` is not fundamentally unlike other windowed
"aggregation" operations. Your concern about unnecessary
repartitioning seems to apply just as well to `count()` as to `distinct()`.
This has come up before, but I don't remember when: what if we
introduce a parameter to `selectKey()` that specifies that the caller
asserts that the new key does _not_ change the data partitioning?

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-05-26 Thread Ivan Ponomarev

Hi John!

I think that your proposal is just fantastic, it simplifies things a lot!

I also felt uncomfortable due to the fact that the proposed `distinct()` 
is not somewhere near `count()` and `reduce(..)`. But 
`selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like 
a correct option for  me because of the issue with the unneeded 
repartitioning.


The bold idea that we can just CANCEL the repartitioning didn't came to 
my mind.


What seemed to me a single problem is in fact two unrelated problems: 
`distinct` operation and cancelling the unneeded repartitioning.


> what if we introduce a parameter to `selectKey()` that specifies that 
the caller asserts that the new key does _not_ change the data partitioning?


I think a more elegant solution would be not to add a new parameter to 
`selectKey` and all the other key-changing operations (`map`, 
`transform`, `flatMap`, ...), but add a new operator 
`KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag 
for the upstream node. Of course, "use it only if you know what you're 
doing" warning is to be added. Well, it's a topic for a separate KIP!


Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then 
changes to the API are minimally invasive: we're just adding 
`distinct()` to TimeWindowedKStream and SessionWindowedKStream, and 
that's all.


We can now define `distinct` as an operation that returns only a first 
record that falls into a new window, and filters out all the other 
records that fall into an already existing window. BTW, we can mock the 
behaviour of such an operation with `TopologyTestDriver` using 
`reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)


Consider the following example (record times are in seconds):

//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35


1. 'Epoch-aligned deduplication' using tumbling windows:

.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()

produces

(key@[0/1], 4)
(key@[2/3], 23)
(key@[3/4], 34)

-- that is, one record per epoch-aligned window.

2. Hopping and sliding windows do not make much sense here, because they 
produce multiple intersected windows, so that one record can be 
multiplied, but we want deduplication.


3. SessionWindows work for 'data-aligned deduplication'.

.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() 



produces only

([key@4000/4000], 4)
([key@23000/23000], 23)

because all the records bigger than 7 are stuck together in one session. 
Setting inactivity gap to 9 seconds will return three records:


([key@4000/4000], 4)
([key@23000/23000], 23)
([key@34000/34000], 34)

WDYT? If you like this variant, I will re-write KIP-655 and propose a 
separate KIP for `cancelRepartitioning` (or whatever name we will choose 
for it).


Regards,

Ivan


24.05.2021 22:32, John Roesler пишет:

Hey there, Ivan!

In typical fashion, I'm going to make a somewhat outlandish
proposal. I'm hoping that we can side-step some of the
complications that have arisen. Please bear with me.

It seems like `distinct()` is not fundamentally unlike other windowed
"aggregation" operations. Your concern about unnecessary
repartitioning seems to apply just as well to `count()` as to `distinct()`.
This has come up before, but I don't remember when: what if we
introduce a parameter to `selectKey()` that specifies that the caller
asserts that the new key does _not_ change the data partitioning?
The docs on that parameter would of course spell out all the "rights
and responsibilities" of setting it.

In that case, we could indeed get back to
`selectKey(A).windowBy(B).distinct(...)`, where we get to compose the
key mapper and the windowing function without having to carve out
a separate domain just for `distinct()`. All the rest of the KStream
operations would also benefit.

What do you think?

Thanks,
John

On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:

Hello everyone,

let me revive the discussion for KIP-655. Now I have some time again and
I'm eager to finalize this.

Based on what was already discussed, I think that we can split the
discussion into three topics for our convenience.

The three topics are:

- idExtractor  (how should we extract the deduplication key for the record)

- timeWindows (what time windows should we use)

- miscellaneous (naming etc.)

 idExtractor 

Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, v) ->
k.  The drawback here is that we must warn the user to choose such a
function that sets different IDs for records from different partitions,
otherwise same IDs might be not co-partitioned (and not deduplicated as
a result). Additional concern: what should we do when this function
returns null?

Matthias proposed key-only deduplication: that is, no idExtractor at
all, and if we want to use `distinct` for a partic

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-05-23 Thread Ivan Ponomarev
ggregations, a
well-known
limitation of the current API. There is actually a KIP
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
going
on in parallel to fix this
exact issue and make the windowing interface much more flexible. Maybe
instead
of re-implementing this windowing interface in a similarly limited fashion
for the
Distinct operator, we could leverage it here and get all the benefits
coming with
KIP-645.

Specifically, I'm proposing to remove the TimeWindows/etc config from the
DistinctParameters class, and move the distinct() method from the KStream
interface
to the TimeWindowedKStream interface. Since it's semantically similar to a
kind of
windowed aggregation, it makes sense to align it with the existing windowing
framework, ie:

inputStream
 .groupKyKey()
 .windowedBy()
 .distinct()

Then we could use data-aligned windows if SlidingWindows is specified in
the
windowedBy(), and epoch-aligned (or some other kind of enumerable window)
if a Windows is specified in windowedBy() (or an EnumerableWindowDefinition
once KIP-645 is implemented to replace Windows).

*SlidingWindows*: should forward a record once when it's first seen, and
then not again
for any identical records that fall into the next N timeUnits. This
includes out-of-order
records, ie if you have a SlidingWindows of size 10s and process records at
time
15s, 20s, 14s then you would just forward the one at 15s. Presumably, if
you're
using SlidingWindows, you don't care about what falls into exact time
boxes, you just
want to deduplicate. If you do care about exact time boxing then you should
use...

*EnumerableWindowDefinition* (eg *TimeWindows*): should forward only one
record
per enumerated time window. If you get a records at 15s, 20s,14s where the
windows
are enumerated at [5,14], [15, 24], etc then you forward the record at 15s
and also
the record at 14s

Just an idea: not sure if the impedance mismatch would throw users off
since the
semantics of the distinct windows are slightly different than in the
aggregations.
But if we don't fit this into the existing windowed framework, then we
shouldn't use
any existing Windows-type classes at all, imo. ie we should create a new
DistinctWindows config class, similar to how stream-stream joins get their
own
JoinWindows class

I also think that non-windowed deduplication could be useful, in which case
we
would want to also have the distinct() operator on the KStream interface.


One quick note regarding the naming: it seems like the Streams DSL operators
are typically named as verbs rather than adjectives, for example. #suppress
or
#aggregate. I get that there's some precedent for  'distinct' specifically,
but
maybe something like 'deduplicate' would be more appropriate for the Streams
API.

WDYT?


On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev 
wrote:


Hi Matthias,

Thanks for your review! It made me think deeper, and indeed I understood
that I was missing some important details.

To simplify, let me explain my particular use case first so I can refer
to it later.

We have a system that collects information about ongoing live sporting
events from different sources. The information sources have their IDs
and these IDs are keys of the stream. Each source emits messages
concerning sporting events, and we can have many messages about each
sporing event from each source. Event ID is extracted from the message.

We need a database of event IDs that were reported at least once by each
source (important: events from different sources are considered to be
different entities). The requirements are:

1) each new event ID should be written to the database as soon as possible

2) although it's ok and sometimes even desired to repeat the
notification about already known event ID, but we wouldn’t like our
database to be bothered by the same event ID more often than once in a
given period of time (say, 15 minutes).

With this example in mind let me answer your questions

  > (1) Using the `idExtractor` has the issue that data might not be
  > co-partitioned as you mentioned in the KIP. Thus, I am wondering if it
  > might be better to do deduplication only on the key? If one sets a new
  > key upstream (ie, extracts the deduplication id into the key), the
  > `distinct` operator could automatically repartition the data and thus we
  > would avoid user errors.

Of course with 'key-only' deduplication + autorepartitioning we will
never cause problems with co-partitioning. But in practice, we often
don't need repartitioning even if 'dedup ID' is different from the key,
like in my example above. So here we have a sort of 'performance vs
security' tradeoff.

The 'golden middle way' here can be the following: we can form a
deduplication ID as KEY + separator + idExtractor(VALUE). In case
idExtractor is not provided, we deduplicate by key only (as in original
proposal). Then idExtractor transforms only the value (and not the

Re: [VOTE] KIP-418: A method-chaining way to branch KStream

2021-01-23 Thread Ivan Ponomarev

Hello everyone,

this is to warn that actual implementation of KIP-418 differs with the 
approved specification in two points:


1. Instead of multiple overloaded variants of Branched.with we now have 
Branched.withFunction (for functions) and Branched.withConsumer (for 
consumers). This is because of compiler warnings about overloading: 
Function and Consumer are indistinguishable for Java compiler when 
supplied as lambdas, and thus we need to name methods differently.


2. 'Fully covariant' signatures like Consumer? super V>> don't work as expected. Using ConsumerV>> etc. instead


I have updated the KIP.

Does anyone want to object these changes?

Regards,

Ivan


06.07.2020 21:43, Matthias J. Sax пишет:

I am late, but I am also +1 (binding).

-Matthias

On 7/6/20 2:16 AM, Ivan Ponomarev wrote:

Wow!

So excited to hear that!

Thanks for your collaboration, now it's my turn to write a PR.

Regards,

Ivan

04.07.2020 20:16, John Roesler пишет:

Hi Ivan,

Congratulations! It looks like you have 3 binding and 2 non-binding
votes, so you can announce this KIP as accepted and follow up with a PR.

Thanks,
John

On Mon, Jun 29, 2020, at 20:46, Bill Bejeck wrote:

Thanks for the KIP Ivan, +1 (binding).

-Bill

On Mon, Jun 29, 2020 at 7:22 PM Guozhang Wang 
wrote:


+1 (binding). Thanks Ivan!


Guozhang

On Mon, Jun 29, 2020 at 3:55 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


This will be a great addition. Thanks Ivan!

+1 (non-binding)

On Fri, Jun 26, 2020 at 7:07 PM John Roesler 

wrote:



Thanks, Ivan!

I’m +1 (binding)

-John

On Thu, May 28, 2020, at 17:24, Ivan Ponomarev wrote:

Hello all!

I'd like to start the vote for KIP-418 which proposes deprecation of
current `branch` method and provides a method-chaining based API for
branching.







https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream



Regards,

Ivan








--
-- Guozhang











[jira] [Resolved] (KAFKA-12230) Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system

2021-01-21 Thread Ivan Ponomarev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev resolved KAFKA-12230.

Resolution: Duplicate

Duplicate of KAFKA-12190

> Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file 
> system
> --
>
> Key: KAFKA-12230
> URL: https://issues.apache.org/jira/browse/KAFKA-12230
> Project: Kafka
>  Issue Type: Bug
>    Reporter: Ivan Ponomarev
>    Assignee: Ivan Ponomarev
>Priority: Minor
>
> While developing Kafka on Windows machine, I get some false 
> TopologyTestDriver-based test failures because of 
> `Files.setPosixFilePermissions` failure with UnsupportedOperationException, 
> see e. g. stack trace below
>  
> Simply  catching UnsupportedOperationException together with IOException in 
> StateDirectory. solves this issue
>  
>  
> {noformat}
> java.lang.UnsupportedOperationException
>   at 
> java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2078)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:118)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.setupTopology(TopologyTestDriver.java:431)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:335)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:306)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:265)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportForeignKeyTableTableJoinWithKTableFromKStream(KStreamImplTest.java:2751)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.gradle.internal.dispatch.R

[jira] [Created] (KAFKA-12230) Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system

2021-01-21 Thread Ivan Ponomarev (Jira)
Ivan Ponomarev created KAFKA-12230:
--

 Summary: Some Kafka TopologyTestDriver-based unit tests can't be 
run on Windows file system
 Key: KAFKA-12230
 URL: https://issues.apache.org/jira/browse/KAFKA-12230
 Project: Kafka
  Issue Type: Bug
Reporter: Ivan Ponomarev
Assignee: Ivan Ponomarev


While developing Kafka on Windows machine, I get some false 
TopologyTestDriver-based test failures because of 
`Files.setPosixFilePermissions` failure with UnsupportedOperationException, see 
e. g. stack trace below

 

Simply  catching UnsupportedOperationException together with IOException in 
StateDirectory. solves this issue

 

 
{noformat}
java.lang.UnsupportedOperationException
at 
java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2078)
at 
org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:118)
at 
org.apache.kafka.streams.TopologyTestDriver.setupTopology(TopologyTestDriver.java:431)
at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:335)
at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:306)
at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:265)
at 
org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportForeignKeyTableTableJoinWithKTableFromKStream(KStreamImplTest.java:2751)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
at 
java.base

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2020-09-14 Thread Ivan Ponomarev
ve one overload
> with all require parameters, and add optional parameters using the
> builder pattern? This seems to follow the DSL Grammer proposal.

Oh, I can explain. We can't fully rely on the builder pattern because of 
Java type inference limitations. We have to provide type parameters to 
the builder methods or the code won't compile: see e. g. this 
https://twitter.com/inponomarev/status/1265053286933159938 and following 
discussion with Tagir Valeev.


When we came across the similar difficulties in KIP-418, we finally 
decided to add all the necessary overloads to parameter class. So I just 
reproduced that approach here.


> (5) Even if it might be an implementation detail (and maybe the KIP
> itself does not need to mention it), can you give a high level overview
> how you intent to implement it (that would be easier to grog, compared
> to reading the PR).

Well as with any operation on KStreamImpl level I'm building a store and 
a processor node.


KStreamDistinct class is going to be the ProcessorSupplier, with the 
logic regarding the forwarding/muting of the records located in 
KStreamDistinct.KStreamDistinctProcessor#process




Matthias, if you are still reading this :-) a gentle reminder: my PR for 
already accepted KIP-418 is still waiting for your review. I think it's 
better for me to finalize at least one  KIP before proceeding to a new 
one :-)


Regards,

Ivan

03.09.2020 4:20, Matthias J. Sax пишет:

Thanks for the KIP Ivan. Having a built-in deduplication operator is for
sure a good addition.

Couple of questions:

(1) Using the `idExtractor` has the issue that data might not be
co-partitioned as you mentioned in the KIP. Thus, I am wondering if it
might be better to do deduplication only on the key? If one sets a new
key upstream (ie, extracts the deduplication id into the key), the
`distinct` operator could automatically repartition the data and thus we
would avoid user errors.

(2) What is the motivation for allowing the `idExtractor` to return
`null`? Might be good to have some use-case examples for this feature.

(2) Is using a `TimeWindow` really what we want? I was wondering if a
`SlidingWindow` might be better? Or maybe we need a new type of window?

It would be helpful if you could describe potential use cases in more
detail. -- I am mainly wondering about hopping window? Each record would
always falls into multiple window and thus would be emitted multiple
times, ie, each time the window closes. Is this really a valid use case?

It seems that for de-duplication, one wants to have some "expiration
time", ie, for each ID, deduplicate all consecutive records with the
same ID and emit the first record after the "expiration time" passed. In
terms of a window, this would mean that the window starts at `r.ts` and
ends at `r.ts + windowSize`, ie, the window is aligned to the data.
TimeWindows are aligned to the epoch though. While `SlidingWindows` also
align to the data, for the aggregation use-case they go backward in
time, while we need a window that goes forward in time. It's an open
question if we can re-purpose `SlidingWindows` -- it might be ok the
make the alignment (into the past vs into the future) an operator
dependent behavior?

(3) `isPersistent` -- instead of using this flag, it seems better to
allow users to pass in a `Materialized` parameter next to
`DistinctParameters` to configure the state store?

(4) I am wondering if we should really have 4 overloads for
`DistinctParameters.with()`? It might be better to have one overload
with all require parameters, and add optional parameters using the
builder pattern? This seems to follow the DSL Grammer proposal.

(5) Even if it might be an implementation detail (and maybe the KIP
itself does not need to mention it), can you give a high level overview
how you intent to implement it (that would be easier to grog, compared
to reading the PR).



-Matthias

On 8/23/20 4:29 PM, Ivan Ponomarev wrote:

Sorry, I forgot to add [DISCUSS] tag to the topic

24.08.2020 2:27, Ivan Ponomarev пишет:

Hello,

I'd like to start a discussion for KIP-655.

KIP-655:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API


I also opened a proof-of-concept PR for you to experiment with the API:

PR#9210: https://github.com/apache/kafka/pull/9210

Regards,

Ivan Ponomarev








Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2020-08-23 Thread Ivan Ponomarev

Sorry, I forgot to add [DISCUSS] tag to the topic

24.08.2020 2:27, Ivan Ponomarev пишет:

Hello,

I'd like to start a discussion for KIP-655.

KIP-655: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API 



I also opened a proof-of-concept PR for you to experiment with the API:

PR#9210: https://github.com/apache/kafka/pull/9210

Regards,

Ivan Ponomarev




KIP-655: Windowed "Distinct" Operation for KStream

2020-08-23 Thread Ivan Ponomarev

Hello,

I'd like to start a discussion for KIP-655.

KIP-655: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API


I also opened a proof-of-concept PR for you to experiment with the API:

PR#9210: https://github.com/apache/kafka/pull/9210

Regards,

Ivan Ponomarev


[jira] [Created] (KAFKA-10369) Introduce Distinct operation in KStream

2020-08-06 Thread Ivan Ponomarev (Jira)
Ivan Ponomarev created KAFKA-10369:
--

 Summary: Introduce Distinct operation in KStream
 Key: KAFKA-10369
 URL: https://issues.apache.org/jira/browse/KAFKA-10369
 Project: Kafka
  Issue Type: Improvement
Reporter: Ivan Ponomarev
Assignee: Ivan Ponomarev


Message deduplication is a common task.

One example: we might have multiple data sources each reporting its state 
periodically with a relatively high frequency, their current states should be 
stored in a database. In case the actual change of the state occurs with a 
lower frequency than it is reported, in order to reduce the number of writes to 
the database we might want to filter out duplicated messages using Kafka 
Streams.

'Distinct' operation is common in data processing, e. g.
 * Java Stream has [distinct() 
|https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--]
 operation,
 * SQL has DISTINCT keyword.

 

Hence it is natural to expect the similar functionality from Kafka Streams.

Although Kafka Streams Tutorials contains an 
[example|https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html]
 of how distinct can be emulated , but this example is complicated: it involves 
low-level coding with local state store and a custom transformer. It might be 
much more convenient to have distinct as a first-class DSL operation.

Due to 'infinite' nature of KStream, distinct operation should be windowed, 
similar to windowed joins and aggregations for KStreams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-418: A method-chaining way to branch KStream

2020-07-06 Thread Ivan Ponomarev

Wow!

So excited to hear that!

Thanks for your collaboration, now it's my turn to write a PR.

Regards,

Ivan

04.07.2020 20:16, John Roesler пишет:

Hi Ivan,

Congratulations! It looks like you have 3 binding and 2 non-binding votes, so 
you can announce this KIP as accepted and follow up with a PR.

Thanks,
John

On Mon, Jun 29, 2020, at 20:46, Bill Bejeck wrote:

Thanks for the KIP Ivan, +1 (binding).

-Bill

On Mon, Jun 29, 2020 at 7:22 PM Guozhang Wang  wrote:


+1 (binding). Thanks Ivan!


Guozhang

On Mon, Jun 29, 2020 at 3:55 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


This will be a great addition. Thanks Ivan!

+1 (non-binding)

On Fri, Jun 26, 2020 at 7:07 PM John Roesler 

wrote:



Thanks, Ivan!

I’m +1 (binding)

-John

On Thu, May 28, 2020, at 17:24, Ivan Ponomarev wrote:

Hello all!

I'd like to start the vote for KIP-418 which proposes deprecation of
current `branch` method and provides a method-chaining based API for
branching.







https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream


Regards,

Ivan








--
-- Guozhang







[VOTE] KIP-418: A method-chaining way to branch KStream

2020-05-28 Thread Ivan Ponomarev

Hello all!

I'd like to start the vote for KIP-418 which proposes deprecation of 
current `branch` method and provides a method-chaining based API for 
branching.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

Regards,

Ivan


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-22 Thread Ivan Ponomarev

Hello John,


1.
-

> Perhaps it would be better to stick with "as" for now
> and just file a Jira to switch them all at the same time [for 
compatibility with Kotlin]


Fully agree! BTW it's really not a big problem: in Kotlin they have a 
standard workaround 
(https://kotlinlang.org/docs/reference/java-interop.html#escaping-for-java-identifiers-that-are-keywords-in-kotlin). 
So actually this should be a very low priority issue, if an issue at all.


> I don't understand how your new proposed
> methods would work any differently than the ones you already
> had proposed in the KIP. It seems like you'd still have to provide
> the generic type parameters on the first static factory call. Can you
> explain how your new interface proposal differs from the existing KIP?

In the KIP, I didn't clarify what methods should be static. Now I 
propose the following methods:


non-static: withChain(Function), withName(String).

static: as(String), with(Function), with(Function, String).

The overloaded `with` version that provides both Function and name can 
be used without causing type inference problem!!


2.


> Regarding making the K,V types covariant also, yes, that would indeed
> be nice, but I'm not sure it will actually work.

What I'm keeping in mind is the following example: imagine

static KStream func(KStream s) {
return s.mapValues(n -> (Integer) n + 1);
}

BranchedKStream b =
s.split().branch((k, v) -> isInteger(v),
   //Won't compile!!
   Branched.with(Me::func));

The simple workaround here is to change `func`'s return type from 
KStream<...Integer> to KStream<...Number>.


[On the other hand, we already agreed to remove `withJavaConsumer` from 
`Branched`, so during code migration I will have to modify my functions' 
return types anyway -- I mean, from `void` to `KStream`!! ]


>  the map you're returning is Map, and of course a K is not the 
same as "? extends K", so it doesn't seem compatible.


I think what you actually meant here is that KStreamextends V> is not fit as a value for Map>. This 
particularly is not a problem, since KStream 
can be safely explicitly cast to KStream, and be put to the map.


BUT, I do really afraid of pitfalls of nested wildcard types. So maybe 
for now it's better to just admit that API is not absolutely perfect and 
accept it as is, that is


Function, ? extends KStream>

Regards,

Ivan


21.05.2020 17:59, John Roesler пишет:

Hello Ivan,

Thanks for the refinement. Actually, I did not know that "as" would
clash with a Kotlin operator. Maybe we should depart from convention
and just avoid methods named "as" in the future.

The convention is that "as(String name)" is used for the static factory
method, whereas "withName(String name)" is an instance method
inherited from NamedOperation. If you wish to propose to avoid "as"
for compatibility with Kotlin, I might suggest "fromName(String name)",
although it's somewhat dubious, since all the other configuration
classes use "as". Perhaps it would be better to stick with "as" for now
and just file a Jira to switch them all at the same time.

Re. 3:
Regarding the type inference problem, yes, it's a blemish on all of our
configuraion objects. The problem is that Java infers the type
based on the _first_ method in the chain. While it does consider what
the recipient of the method result wants, it only considers the _next_
recipient.

Thus, if you call as("foo") and immediately assign it to a
Branched variable, java infers the type correctly. But
when the "next recipient" is a chained method call, like "withChain",
then the chained method doesn't bound the type (by definition,
withChain is defined on Branched, so Java will take
the broadest possible inferece and bind the type to
Branched, at which point, it can't be revised anymore.

As a user of Java, this is exceedingly annoying, since it doesn't seem
that hard to recursively consider the entire context when inferring the
generic type parameters, but this is what we have to work with.

To be honest, though, I don't understand how your new proposed
methods would work any differently than the ones you already
had proposed in the KIP. It seems like you'd still have to provide
the generic type parameters on the first static factory call. Can you
explain how your new interface proposal differs from the existing KIP?

Re. 4:
Regarding making the K,V types covariant also, yes, that would indeed
be nice, but I'm not sure it will actually work. You might want to give it a
try. In the past, we've run into soe truly strange interactions between the
Java type inferencer and lambdas (and/or anonymous inner classes) in
combination with nested covariant types.

Another issue is that the value type 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-21 Thread Ivan Ponomarev
? IMHO, we should throw an exception for this case to avoid a `key
-> null` entry in the returned Map.

Following this train of through, and if we want to allow the "return
null" pattern in general, we need `withJavaConsumer` that does not add
an entry to the Map.

Following your proposal, the semantics of `withJavaConsumer` could also
be achieved with `withChain`:

Branched.withChain(s -> {
   s.to();
   return s;
})

Thus, for you proposal `withJavaConumer` is purely syntactic sugar,
while for the first proposal it adds new functionality (if `return null`
is not allowed, using `withChain()` is not possible to "hide a
sub-stream in the result). Furthermore, we might need to allow `return
null` in your prosal to allow uses to "hide" a sub-stream in the Map.



I guess I can be convinced either way. However, if we follow your
proposal, I am wondering if we need `withJavaConsumer` at all? Its
benefit seems to be small? Also, having a reduced API is usually
preferable as it's simpler to learn.



-Matthias




On 5/15/20 3:12 PM, Ivan Ponomarev wrote:

Hello, John, hello Matthias!

Thank you very much for your detailed feedback!

-

John,


It looks like you missed my reply on Apr 23rd.


For some unknown reason it didn't reach my inbox, fortunately we have
all the emails on the web.


1. Can you propose to deprecate (but not remove) the existing ‘branch’

method?

Done, in "Compatibility, Deprecation, and Migration Plan" section.


2. [Explain why 'branch' operator is superior to branching directly

off of the parent KStream for the needs of dynamic branching]

Done, see an ugly counterexample in 'Dynamic Branching' section.


3. [`withConsumer` is confusing with Kafka Consumer... maybe `withSink`?]


As Mathhias noted, `withSink` can also be confusing. I renamed this
method to `withJavaConsumer` per Matthias' suggestion.


4. ...It seems like there are two disjoint use cases: EITHER using

chain and the result map OR using just the sink

This is discussed below.

--

Mathhias,


1. [We should rename `KBranchedStream` -> `BranchedKStream`]


Done.


2. [Ambiguous phrase about 'parameterless' version of the `branch`

method]

Fixed.



3. Overview of newly added methods/interfaces


Done in `Proposed Changes` section.



4. [Concerning John's note] > I don't think that using both

`withChain()` and `withConsumer()` is the
issue, as the KIP clearly states that the result of `withChain()` will
be given to the `Consumer`.

Yes, I agree!


The issue is really with the `Consumer` and the returned `Map` of

`defautBranch()` and `noDefaultBranch()`. Maybe a reasonable
implementation would be to not add the "branch" to the result map if
`withConsumer` is used?

But is it also an issue? With Kafka Streams, we can split the topology
graph at any point. Technically, it's OK to do both: feed the KStream to
a [Java]Consumer AND save it in resulting Map. If one doesn't need the
stream in the Map, one simply does not extract it from there :-)

In the current version of KIP it is assumed that the returned map
contains ALL the branches, either tagged with IDs explicitly set by the
programmer, or with some default auto-generated ids. Dealing with this
map is the user's responsibility.

What seems to me to be an issue is introducing exclusions to this
general rule, like 'swallowing' some streams by provided
[Java]Consumers. This can make things complicated. What if a user
provides both the name of the branch and a [Java]Consumer? What do they
mean in this case? Should we 'swallow' the stream or save it to the map?
There's no point in 'saving the space' in this map, so maybe just leave
it as it is?



I rewrote the KIP and also fixed a couple of typos.

Looking forward for your feedback again!

Regards,

Ivan.



08.05.2020 22:55, Matthias J. Sax пишет:

Thanks for updating the KIP!

I also have some minor comment:



(1) We should rename `KBranchedStream` -> `BranchedKStream`

(Most classed follow this naming pattern now, eg, CoGroupedKStream,
TimeWindowedKStream etc -- there is just the "legacy" `KGroupedStream`
and `KGroupedKTable` that we cannot rename without a breaking change...
so we just keep them.)



(2) Quote:


Both branch and defaultBranch operations also have overloaded
parameterless alternatives.


I think `branch()` always needs to take a `Predicate` and assume you
meant that `Branched` is optional. Can you maybe rephrase it accordingly
as `branch()` would not be "parameterless".



(3) Can you maybe add an overview in the "Public Interface" section) of
newly added and deprecated methods/classes (cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup#KIP-150-Kafka-StreamsCogroup-PublicInterfaces)




(4) What is unclear from the KIP is the interaction of `withConsumer()`
and the finally returned `Map`. This rel

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-15 Thread Ivan Ponomarev
the issue that it might be confused with a "sink
node", ie., writing the KStream to a topic.

Maybe `withJavaConsumer` would make it less ambiguous?




-Matthias




On 5/8/20 7:13 AM, John Roesler wrote:

Hi Ivan,

It looks like you missed my reply on Apr 23rd. I think it’s close, but I had a 
few last comments.

Thanks,
John

On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote:

Hello everyone,

will someone please take a look at the reworked KIP?

I believe that now it follows design principles and takes into account
all the arguments discussed here.


Regards,

Ivan


23.04.2020 2:45, Ivan Ponomarev пишет:

Hi,

I have read the John's "DSL design principles" and have completely
rewritten the KIP, see
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream



This version includes all the previous discussion results and follows
the design principles, with one exception.

The exception is

branch(Predicate predicate, Branched branched)

which formally violates 'no more than one parameter' rule, but I think
here it is justified.

We must provide a predicate for a branch and don't need to provide one
for the default branch. Thus for both operations we may use a single
Branched parameter class, with an extra method parameter for `branch`.

Since predicate is a natural, necessary part of a branch, no
'proliferation of overloads, deprecations, etc.' is expected here as it
is said in the rationale for the 'single parameter rule'.

WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:

Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on
this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar

-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
  .branch()
  .defaultBranch(result::set)
  .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
    -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
    -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
    -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
    -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
    -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
    -> Map

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:

I am moving this KIP into "inactive status". Feel free to resume the
K

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-03 Thread Ivan Ponomarev

Hello everyone,

will someone please take a look at the reworked KIP?

I believe that now it follows design principles and takes into account 
all the arguments discussed here.



Regards,

Ivan


23.04.2020 2:45, Ivan Ponomarev пишет:

Hi,

I have read the John's "DSL design principles" and have completely 
rewritten the KIP, see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream 




This version includes all the previous discussion results and follows 
the design principles, with one exception.


The exception is

branch(Predicate predicate, Branched branched)

which formally violates 'no more than one parameter' rule, but I think 
here it is justified.


We must provide a predicate for a branch and don't need to provide one 
for the default branch. Thus for both operations we may use a single 
Branched parameter class, with an extra method parameter for `branch`.


Since predicate is a natural, necessary part of a branch, no 
'proliferation of overloads, deprecations, etc.' is expected here as it 
is said in the rationale for the 'single parameter rule'.


WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:

Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on 
this.


Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar 


-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
 .branch()
 .defaultBranch(result::set)
 .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
   -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
   -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
   -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
   -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
   -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:
I am moving this KIP into "inactive status". Feel free to resume the 
KIP

at any point.

If anybody else is interested in picking up this KIP, feel free to 
do so.




-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-22 Thread Ivan Ponomarev

Hi,

I have read the John's "DSL design principles" and have completely 
rewritten the KIP, see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream



This version includes all the previous discussion results and follows 
the design principles, with one exception.


The exception is

branch(Predicate predicate, Branched branched)

which formally violates 'no more than one parameter' rule, but I think 
here it is justified.


We must provide a predicate for a branch and don't need to provide one 
for the default branch. Thus for both operations we may use a single 
Branched parameter class, with an extra method parameter for `branch`.


Since predicate is a natural, necessary part of a branch, no 
'proliferation of overloads, deprecations, etc.' is expected here as it 
is said in the rationale for the 'single parameter rule'.


WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:

Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
     .branch()
     .defaultBranch(result::set)
     .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
   -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
   -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
   -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
   -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
   -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:

I am moving this KIP into "inactive status". Feel free to resume the KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


under your suggestion, it seems that the name is required


If you want to get the `KStream` as part of the `Map` back using a
`Function`, yes. If you follow the "embedded chaining" pattern using a
`Consumer`, no.

Allowing for a default 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-17 Thread Ivan Ponomarev
/ assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
   -> Map

// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
   -> Map

// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map



Hence, for each sub-stream, the user can pick to add a name and return
the branch "result" to the calling scope or not. The implementation can
also check at runtime that all returned names are unique. The returned
Map can be empty and it's also optional to use the Map.

To me, it seems like a good way to get best of both worlds.

Thoughts?



-Matthias




On 5/6/19 5:15 PM, John Roesler wrote:

Ivan,

That's a very good point about the "start" operator in the dynamic case.
I had no problem with "split()"; I was just questioning the necessity.
Since you've provided a proof of necessity, I'm in favor of the
"split()" start operator. Thanks!

Separately, I'm interested to see where the present discussion leads.
I've written enough Javascript code in my life to be suspicious of
nested closures. You have a good point about using method references (or
indeed function literals also work). It should be validating that this
was also the JS community's first approach to flattening the logic when
their nested closure situation got out of hand. Unfortunately, it's
replacing nesting with redirection, both of which disrupt code
readability (but in different ways for different reasons). In other
words, I agree that function references is *the* first-order solution if
the nested code does indeed become a problem.

However, the history of JS also tells us that function references aren't
the end of the story either, and you can see that by observing that
there have been two follow-on eras, as they continue trying to cope with
the consequences of living in such a callback-heavy language. First, you
have Futures/Promises, which essentially let you convert nested code to
method-chained code (Observables/FP is a popular variation on this).
Most lately, you have async/await, which is an effort to apply language
(not just API) syntax to the problem, and offer the "flattest" possible
programming style to solve the problem (because you get back to just one
code block per functional unit).

Stream-processing is a different domain, and Java+KStreams is nowhere
near as callback heavy as JS, so I don't think we have to take the JS
story for granted, but then again, I think we can derive some valuable
lessons by looking sideways to adjacent domains. I'm just bringing this
up to inspire further/deeper discussion. At the same time, just like JS,
we can afford to take an iterative approach to the problem.

Separately again, I'm interested in the post-branch merge (and I'd also
add join) problem that Paul brought up. We can clearly punt on it, by
terminating the nested branches with sink operators. But is there a DSL
way to do it?

Thanks again for your driving this,
-John

On Thu, May 2, 2019 at 7:39 PM Paul Whalen mailto:pgwha...@gmail.com>> wrote:

 Ivan, I’ll definitely forfeit my point on the clumsiness of the
 branch(predicate, consumer) solution, I don’t see any real drawbacks
 for the dynamic case.

 IMO the one trade off to consider at this point is the scope
 question. I don’t know if I totally agree that “we rarely need them
 in the same scope” since merging the branches back together later
 seems like a perfectly plausible use case that can be a lot nicer
 when the branched streams are in the same scope. That being said,
 for the reasons Ivan listed, I think it is overall the better
 solution - working around the scope thing is easy enough if you need
 to.

 > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
  wrote:
 >
 > Hello everyone, thank you all for joining the discussion!
 >
 > Well, I don't think the idea of named branches, be it a
 LinkedHashMap (no other Map will do, because order of definition
 matters) or `branch` method  taking name and Consumer has more
 advantages than drawbacks.
 >
 > In my opinion, the only real positive outcome from Michael's
 proposal is that all the returned branches are in the same scope.
 But 1) we rarely need them in the same scope 2) there is a
 workaround for the scope problem, described in the KIP.
 >
 > 'Inlining the complex logic' is not a problem, because we can use
 method references instead of lambdas. In real world scenarios you
 tend to split the complex logic to methods anyway, so the code is
 going to be clean.
 >
 > The drawbacks are strong. The cohesion between predicates and
 handlers is lost. We have to define predicates in one place, and
 handlers in another. This 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Ivan Ponomarev
 Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <

matth...@confluent.io

wrote:


Thanks for updating the KIP and your answers.



this is to make the name similar to String#split

that also returns an array, right?

The intend was to avoid name duplication. The return type should

_not_

be an array.

The current proposal is

stream.branch()
  .branch(Predicate)
  .branch(Predicate)
  .defaultBranch();

IMHO, this reads a little odd, because the first `branch()` does

not

take any parameters and has different semantics than the later
`branch()` calls. Note, that from the code snippet above, it's

hidden

that the first call is `KStream#branch()` while the others are
`KBranchedStream#branch()` what makes reading the code harder.

Because I suggested to rename `addBranch()` -> `branch()`, I though

it

might be better to also rename `KStream#branch()` to avoid the

naming

overlap that seems to be confusing. The following reads much

cleaner

to

me:

stream.split()
  .branch(Predicate)
  .branch(Predicate)
  .defaultBranch();

Maybe there is a better alternative to `split()` though to avoid

the

naming overlap.



'default' is, however, a reserved word, so unfortunately we

cannot

have

a method with such name :-)

Bummer. Didn't consider this. Maybe we can still come up with a

short

name?


Can you add the interface `KBranchedStream` to the KIP with all

it's

methods? It will be part of public API and should be contained in

the

KIP. For example, it's unclear atm, what the return type of
`defaultBranch()` is.


You did not comment on the idea to add a `KBranchedStream#get(int

index)

-> KStream` method to get the individually branched-KStreams. Would

be

nice to get your feedback about it. It seems you suggest that users
would need to write custom utility code otherwise, to access them.

We

should discuss the pros and cons of both approaches. It feels
"incomplete" to me atm, if the API has no built-in support to get

the

branched-KStreams directly.



-Matthias



On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
Hi all!

I have updated the KIP-418 according to the new vision.

Matthias, thanks for your comment!


Renaming KStream#branch() -> #split()

I can see your point: this is to make the name similar to

String#split

that also returns an array, right? But is it worth the loss of

backwards

compatibility? We can have overloaded branch() as well without

affecting

the existing code. Maybe the old array-based `branch` method

should

be

deprecated, but this is a subject for discussion.


Renaming KBranchedStream#addBranch() ->

BranchingKStream#branch(),

KBranchedStream#defaultBranch() -> BranchingKStream#default()

Totally agree with 'addBranch->branch' rename. 'default' is,

however, a

reserved word, so unfortunately we cannot have a method with such

name

:-)

defaultBranch() does take an `Predicate` as argument, but I

think

that

is not required?

Absolutely! I think that was just copy-paste error or something.

Dear colleagues,

please revise the new version of the KIP and Paul's PR
(https://github.com/apache/kafka/pull/6512)

Any new suggestions/objections?

Regards,

Ivan


11.04.2019 11:47, Matthias J. Sax пишет:

Thanks for driving the discussion of this KIP. It seems that

everybody

agrees that the current branch() method using arrays is not

optimal.

I had a quick look into the PR and I like the overall proposal.

There

are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as

argument,

but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and

is

currently implemented:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different

scopes:

could we extend `KBranchedStream` with a `get(int index)` method

that

returns the corresponding "branched" result `KStream` object?

Maybe,

the

second argument of `addBranch()` should not be a

`Consumer`

but

a `Function` and `get()` could return whatever

the

`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias




On 3/31/19 12:22 PM, Paul Whalen wrote:
Ivan,

I'm a bit of a novice here as well, but I think it makes sense

for

you

to

revise the KIP and continue the discussion.  Obviously we'll

need

some

buy-in from committers that have actual binding votes on

whether

the

KIP

could be adopted.  It would be great to hear if they think this


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-13 Thread Ivan Ponomarev

Hi all!

I have updated the KIP-418 according to the new vision.

Matthias, thanks for your comment!


Renaming KStream#branch() -> #split()


I can see your point: this is to make the name similar to String#split 
that also returns an array, right? But is it worth the loss of backwards 
compatibility? We can have overloaded branch() as well without affecting 
the existing code. Maybe the old array-based `branch` method should be 
deprecated, but this is a subject for discussion.


> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(), 
KBranchedStream#defaultBranch() -> BranchingKStream#default()


Totally agree with 'addBranch->branch' rename. 'default' is, however, a 
reserved word, so unfortunately we cannot have a method with such name :-)


> defaultBranch() does take an `Predicate` as argument, but I think 
that is not required?


Absolutely! I think that was just copy-paste error or something.

Dear colleagues,

please revise the new version of the KIP and Paul's PR 
(https://github.com/apache/kafka/pull/6512)


Any new suggestions/objections?

Regards,

Ivan


11.04.2019 11:47, Matthias J. Sax пишет:

Thanks for driving the discussion of this KIP. It seems that everybody
agrees that the current branch() method using arrays is not optimal.

I had a quick look into the PR and I like the overall proposal. There
are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as argument,
but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and is
currently implemented:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different scopes:
could we extend `KBranchedStream` with a `get(int index)` method that
returns the corresponding "branched" result `KStream` object? Maybe, the
second argument of `addBranch()` should not be a `Consumer` but
a `Function` and `get()` could return whatever the
`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias



On 3/31/19 12:22 PM, Paul Whalen wrote:

Ivan,

I'm a bit of a novice here as well, but I think it makes sense for you to
revise the KIP and continue the discussion.  Obviously we'll need some
buy-in from committers that have actual binding votes on whether the KIP
could be adopted.  It would be great to hear if they think this is a good
idea overall.  I'm not sure if that happens just by starting a vote, or if
there is generally some indication of interest beforehand.

That being said, I'll continue the discussion a bit: assuming we do move
forward the solution of "stream.branch() returns KBranchedStream", do we
deprecate "stream.branch(...) returns KStream[]"?  I would favor
deprecating, since having two mutually exclusive APIs that accomplish the
same thing is confusing, especially when they're fairly similar anyway.  We
just need to be sure we're not making something impossible/difficult that
is currently possible/easy.

Regarding my PR - I think the general structure would work, it's just a
little sloppy overall in terms of naming and clarity. In particular,
passing in the "predicates" and "children" lists which get modified in
KBranchedStream but read from all the way KStreamLazyBranch is a bit
complicated to follow.

Thanks,
Paul

On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev  wrote:


Hi Paul!

I read your code carefully and now I am fully convinced: your proposal
looks better and should work. We just have to document the crucial fact
that KStream consumers are invoked as they're added. And then it's all
going to be very nice.

What shall we do now? I should re-write the KIP and resume the
discussion here, right?

Why are you telling that your PR 'should not be even a starting point if
we go in this direction'? To me it looks like a good starting point. But
as a novice in this project I might miss some important details.

Regards,

Ivan


28.03.2019 17:38, Paul Whalen пишет:

Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution

supports this. The couponIssuer::set* consumers will be invoked as they’re
added, not during streamsBuilder.build(). So the user still ought to be
able to call couponIssuer.coupons() afterward and depend on the branched
streams having been set.

The issue I mean to point out is that it is hard to access the branched

streams in the same scope as the original stream (that is, not inside the
couponIssuer), wh

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-03-30 Thread Ivan Ponomarev

Hi all!

I was about to write another KIP, but found out that KIP-401 addresses 
exactly the problem I faced. So let me jump into your discussion and ask 
you to assess another idea.


I fully agree with the KIP-401's motivation part. E. g in my project I 
had to invent a wrapper class that hides the details of KeyValueStore 
management from business logic. Of course this should be done better in 
KStreams API.


But I was about to look at this problem from another side and propose a 
simple alternative in high-level DSL, that will not fit all the cases, 
but  most of them. Hence my idea does not exclude the Paul's proposal.


What if we restrict ourselves to *only one* KeyValueStore and propose a 
method that resembles `aggregate` and `reduce` methods, like this:


stream
   .map(...)
   .filter(...)
   .transform ((k, v, s)->{}, Transformed.with())

where
* k, v -- input key & value
* s -- a KeyValueStore provided as an argument
* return value of the lambda should be KeyValue.pair(...)
* Transformed.with... is a builder, used in order to define the 
Transformer and KeyValueStore building parameters. Some of these 
parameters should be:

** store's KeySerde,
** store's ValueSerde,
** whether the store is persistent or in-memory,
** store's name -- optional parameter, the system should be able to 
devise the name of the store transparently for the user, if we don't 
want to devise it ourselves/share the store between processors.

** scheduled punctuation.

Imagine we have a KStream, and we need to calculate a 
`derivative` stream, that is, a stream of 'deltas' of the provided 
integer values.


This could be achieved as simple as

stream.transform((key, value, stateStore) -> {
    int previousValue = 
Optional.ofNullable(stateStore.get(key)).orElse(0);

    stateStore.put(key, value);
    return KeyValue.pair(key, value - previousValue);
    }
    //we do not need to bother with store name, punctuation etc.
    //may be even Serde part can be omitted, since we can inherit 
the serdes from stream by default

    , Transformed.with(Serdes.String(), Serdes.Integer())
}

The hard part of it is that new `transform` method definition should be 
parameterized by six type parameters:


* input/output/KeyValueStore key type,
* input/output/KeyValueStore value type.

However, it seems that all these types can be inferred from the provided 
lambda and Transformed.with instances.


What do you think about this?

Regards,

Ivan


27.03.2019 20:45, Guozhang Wang пишет:

Hello Paul,

Thanks for the uploaded PR and the detailed description! I've made a pass
on it and left some comments.

Overall I think I agree with you that passing in the storebuilder directly
that store name is more convienent as it does not require another
`addStore` call, but we just need to spend some more documentation effort
on educating users about the two ways of connecting their stores. I'm
slightly concerned about this education curve but I can be convinced if
most people felt it is worthy.


Guozhang

On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen  wrote:


I'd like to resurrect this discussion with a cursory, proof-of-concept
implementation of the KIP which combines many of our ideas:
https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
small as possible for now, just using it to convey the main ideas.  But
I'll separately address some of our earlier discussion:

- Will there be a new, separate interface for users to implement for the
new functionality? No, to hopefully keep things simple, all of the
Processor/TransformerSupplier interfaces will just extend
StateStoresSupplier, allowing users to opt in to this functionality by
overriding the default implementation that gives an empty list.
- Will the interface allow users to specify the store name, or the
entire StoreBuilder? The entire StoreBuilder, so the
Processor/TransformerSupplier can completely encapsulate name and
implementation of a state store if desired.
- Will the old way of specifying store names alongside the supplier when
calling stream.process/transform() be deprecated? No, this is still a
legitimate way to wire up Processors/Transformers and their stores. But
I
would recommend not allowing stream.process/transform() calls that use
both
store declaration mechanisms (this restriction is not in the proof of
concept)
- How will we handle adding the same state store to the topology
multiple times because different Processor/TransformerSuppliers declare
it?
topology.addStateStore() will be slightly relaxed for convenience, and
will
allow adding the same StoreBuilder multiple times as long as the exact
same
StoreBuilder instance is being added for the same store name.  This
seems
to prevent in practice the issue of accidentally making two state stores
one by adding with the same name.  For additional safety, if we wanted
to
(not in the 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-29 Thread Ivan Ponomarev

Hi Paul!

I read your code carefully and now I am fully convinced: your proposal 
looks better and should work. We just have to document the crucial fact 
that KStream consumers are invoked as they're added. And then it's all 
going to be very nice.


What shall we do now? I should re-write the KIP and resume the 
discussion here, right?


Why are you telling that your PR 'should not be even a starting point if 
we go in this direction'? To me it looks like a good starting point. But 
as a novice in this project I might miss some important details.


Regards,

Ivan


28.03.2019 17:38, Paul Whalen пишет:

Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution 
supports this. The couponIssuer::set* consumers will be invoked as they’re 
added, not during streamsBuilder.build(). So the user still ought to be able to 
call couponIssuer.coupons() afterward and depend on the branched streams having 
been set.

The issue I mean to point out is that it is hard to access the branched streams 
in the same scope as the original stream (that is, not inside the 
couponIssuer), which is a problem with both proposed solutions. It can be 
worked around though.

[Also, great to hear additional interest in 401, I’m excited to hear your 
thoughts!]

Paul


On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:

Hi Paul!

The idea to postpone the wiring of branches to the streamsBuilder.build() also 
looked great for me at first glance, but ---


the newly branched streams are not available in the same scope as each other.  
That is, if we wanted to merge them back together again I don't see a way to do 
that.


You just took the words right out of my mouth, I was just going to write in 
details about this issue.

Consider the example from Bill's book, p. 101: say we need to identify 
customers who have bought coffee and made a purchase in the electronics store 
to give them coupons.

This is the code I usually write under these circumstances using my 'brancher' 
class:

@Setter
class CouponIssuer{
   private KStream<> coffePurchases;
   private KStream<> electronicsPurchases;

   KStream<...> coupons(){
   return coffePurchases.join(electronicsPurchases...)...whatever

   /*In the real world the code here can be complex, so creation of a 
separate CouponIssuer class is fully justified, in order to separate classes' 
responsibilities.*/

  }
}

CouponIssuer couponIssuer = new CouponIssuer();

new KafkaStreamsBrancher<>()
 .branch(predicate1, couponIssuer::setCoffePurchases)
 .branch(predicate2, couponIssuer::setElectronicsPurchases)
 .onTopOf(transactionStream);

/*Alas, this won't work if we're going to wire up everything later, without the 
terminal operation!!!*/
couponIssuer.coupons()...

Does this make sense?  In order to properly initialize the CouponIssuer we need 
the terminal operation to be called before streamsBuilder.build() is called.


[BTW Paul, I just found out that your KIP-401 is essentially the next KIP I was 
going to write here. I have some thoughts based on my experience, so I will 
join the discussion on KIP-401 soon.]

Regards,

Ivan

28.03.2019 6:29, Paul Whalen пишет:

Ivan,
I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.
- Compatibility: I was incorrect earlier about compatibility issues,
there aren't any direct ones.  I was unaware that Java is smart enough to
distinguish between a branch(varargs...) returning one thing and branch()
with no arguments returning another thing.
- Requiring a terminal method: We don't actually need it.  We can just
build up the branches in the KBranchedStream who shares its state with the
ProcessorSupplier that will actually do the branching.  It's not terribly
pretty in its current form, but I think it demonstrates its feasibility.
To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.
I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.
Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:
OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs
improvement. Motivation is given in the KIP.

There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-28 Thread Ivan Ponomarev

Hi Paul!

The idea to postpone the wiring of branches to the 
streamsBuilder.build() also looked great for me at first glance, but ---


> the newly branched streams are not available in the same scope as 
each other.  That is, if we wanted to merge them back together again I 
don't see a way to do that.


You just took the words right out of my mouth, I was just going to write 
in details about this issue.


Consider the example from Bill's book, p. 101: say we need to identify 
customers who have bought coffee and made a purchase in the electronics 
store to give them coupons.


This is the code I usually write under these circumstances using my 
'brancher' class:


@Setter
class CouponIssuer{
   private KStream<> coffePurchases;
   private KStream<> electronicsPurchases;

   KStream<...> coupons(){
   return coffePurchases.join(electronicsPurchases...)...whatever

   /*In the real world the code here can be complex, so creation of 
a separate CouponIssuer class is fully justified, in order to separate 
classes' responsibilities.*/


  }
}

CouponIssuer couponIssuer = new CouponIssuer();

new KafkaStreamsBrancher<>()
 .branch(predicate1, couponIssuer::setCoffePurchases)
 .branch(predicate2, couponIssuer::setElectronicsPurchases)
 .onTopOf(transactionStream);

/*Alas, this won't work if we're going to wire up everything later, 
without the terminal operation!!!*/

couponIssuer.coupons()...

Does this make sense?  In order to properly initialize the CouponIssuer 
we need the terminal operation to be called before 
streamsBuilder.build() is called.



[BTW Paul, I just found out that your KIP-401 is essentially the next 
KIP I was going to write here. I have some thoughts based on my 
experience, so I will join the discussion on KIP-401 soon.]


Regards,

Ivan

28.03.2019 6:29, Paul Whalen пишет:

Ivan,

I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.

- Compatibility: I was incorrect earlier about compatibility issues,
there aren't any direct ones.  I was unaware that Java is smart enough to
distinguish between a branch(varargs...) returning one thing and branch()
with no arguments returning another thing.
- Requiring a terminal method: We don't actually need it.  We can just
build up the branches in the KBranchedStream who shares its state with the
ProcessorSupplier that will actually do the branching.  It's not terribly
pretty in its current form, but I think it demonstrates its feasibility.

To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.

I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.

Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:


OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs
improvement. Motivation is given in the KIP.

There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
.branch(predicate1, ks ->..)
.branch(predicate2, ks->..)
.defaultBranch(ks->..) //optional
.onTopOf(stream).mapValues(...) //onTopOf returns its argument

PROS: 1) Fully backwards compatible. 2) The code won't make sense until
all the necessary ingredients are provided.

CONS: The need to create a KafkaStreamsBrancher instance contrasts the
fluency of other KStream methods.

2. (as Paul proposes)

stream
.branch(predicate1, ks ->...)
.branch(predicate2, ks->...)
.defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and
noDefault() return void

PROS: Generally follows the way KStreams interface is defined.

CONS: We need to define two terminal methods (defaultBranch(ks->) and
noDefault()). And for a user it is very easy to miss the fact that one
of the terminal methods should be called. If these methods are not
called, we can throw an exception in runtime.

Colleagues, what are your thoughts? Can we do better?

Regards,

Ivan

27.03.2019 18:46, Ivan Ponomarev пишет:



25.03.2019 17:43, Ivan Ponomarev пишет:

Paul,

I see your point when you are talking about
stream..branch..branch...default..

Still, I believe that this cannot not be implemented the easy way.
Maybe we all should think further.

Let me comment on two of your idea

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-27 Thread Ivan Ponomarev

OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs 
improvement. Motivation is given in the KIP.


There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
  .branch(predicate1, ks ->..)
  .branch(predicate2, ks->..)
  .defaultBranch(ks->..) //optional
  .onTopOf(stream).mapValues(...) //onTopOf returns its argument

PROS: 1) Fully backwards compatible. 2) The code won't make sense until 
all the necessary ingredients are provided.


CONS: The need to create a KafkaStreamsBrancher instance contrasts the 
fluency of other KStream methods.


2. (as Paul proposes)

stream
  .branch(predicate1, ks ->...)
  .branch(predicate2, ks->...)
  .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and 
noDefault() return void


PROS: Generally follows the way KStreams interface is defined.

CONS: We need to define two terminal methods (defaultBranch(ks->) and 
noDefault()). And for a user it is very easy to miss the fact that one 
of the terminal methods should be called. If these methods are not 
called, we can throw an exception in runtime.


Colleagues, what are your thoughts? Can we do better?

Regards,

Ivan

27.03.2019 18:46, Ivan Ponomarev пишет:



25.03.2019 17:43, Ivan Ponomarev пишет:

Paul,

I see your point when you are talking about 
stream..branch..branch...default..


Still, I believe that this cannot not be implemented the easy way. 
Maybe we all should think further.


Let me comment on two of your ideas.

user could specify a terminal method that assumes nothing will reach 
the default branch,

throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option besides 
`default`, because there are scenarios when we want to just silently 
drop the messages that didn't match any predicate. 2) Throwing an 
exception in the middle of data flow processing looks like a bad idea. 
In stream processing paradigm, I would prefer to emit a special 
message to a dedicated stream. This is exactly where `default` can be 
used.


it would be fairly easily for the InternalTopologyBuilder to track 
dangling

branches that haven't been terminated and raise a clear error before it
becomes an issue.

You mean a runtime exception, when the program is compiled and run? 
Well,  I'd prefer an API that simply won't compile if used 
incorrectly. Can we build such an API as a method chain starting from 
KStream object? There is a huge cost difference between runtime and 
compile-time errors. Even if a failure uncovers instantly on unit 
tests, it costs more for the project than a compilation failure.


Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:

Ivan,

Good point about the terminal operation being required.  But is that 
really
such a bad thing?  If the user doesn't want a defaultBranch they can 
call
some other terminal method (noDefaultBranch()?) just as easily.  In 
fact I

think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more 
subtle

behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well 
documented, but
it would be fairly easily for the InternalTopologyBuilder to track 
dangling

branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's 
critical to

allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just 
need the
stream as a variable so you can call as many operations on it as you 
desire.


Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev  
wrote:



Hello Paul,

I afraid this won't work because we do not always need the
defaultBranch. And without a terminal operation we don't know when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do something
more with the original branch after branching.

I understand your point that the need of special object construction
contrasts the fluency of most KStream methods. But here we have a
special case: we build the switch to split the flow, so I think this is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:

Ivan,

I think it's a great idea to improve this API, but I find the 
onTopOff()

mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the

s

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-25 Thread Ivan Ponomarev

Paul,

I see your point when you are talking about 
stream..branch..branch...default..


Still, I believe that this cannot not be implemented the easy way. Maybe 
we all should think further.


Let me comment on two of your ideas.


user could specify a terminal method that assumes nothing will reach the 
default branch,

throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option besides `default`, 
because there are scenarios when we want to just silently drop the 
messages that didn't match any predicate. 2) Throwing an exception in 
the middle of data flow processing looks like a bad idea. In stream 
processing paradigm, I would prefer to emit a special message to a 
dedicated stream. This is exactly where `default` can be used.



it would be fairly easily for the InternalTopologyBuilder to track dangling

branches that haven't been terminated and raise a clear error before it
becomes an issue.

You mean a runtime exception, when the program is compiled and run? 
Well,  I'd prefer an API that simply won't compile if used incorrectly. 
Can we build such an API as a method chain starting from KStream object? 
There is a huge cost difference between runtime and compile-time errors. 
Even if a failure uncovers instantly on unit tests, it costs more for 
the project than a compilation failure.


Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:

Ivan,

Good point about the terminal operation being required.  But is that really
such a bad thing?  If the user doesn't want a defaultBranch they can call
some other terminal method (noDefaultBranch()?) just as easily.  In fact I
think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well documented, but
it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's critical to
allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just need the
stream as a variable so you can call as many operations on it as you desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev  wrote:


Hello Paul,

I afraid this won't work because we do not always need the
defaultBranch. And without a terminal operation we don't know when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do something
more with the original branch after branching.

I understand your point that the need of special object construction
contrasts the fluency of most KStream methods. But here we have a
special case: we build the switch to split the flow, so I think this is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:

Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the

stream

so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right

way

to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
  .addBranch(predicate1, this::handle1)
  .addBranch(predicate2, this::handle2)
  .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or

something,

which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so

the

new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched()

or

branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev



wrote:


Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
  ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
  ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
 .addBranch(

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-24 Thread Ivan Ponomarev

Hello Paul,

I afraid this won't work because we do not always need the 
defaultBranch. And without a terminal operation we don't know when to 
finalize and build the 'branch switch'.


In my proposal, onTopOf returns its argument, so we can do something 
more with the original branch after branching.


I understand your point that the need of special object construction 
contrasts the fluency of most KStream methods. But here we have a 
special case: we build the switch to split the flow, so I think this is 
still idiomatic.


Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:

Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the stream
so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right way
to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
 .addBranch(predicate1, this::handle1)
 .addBranch(predicate2, this::handle2)
 .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or something,
which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so the
new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched() or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev 
wrote:


Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
 ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
 ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
.addBranch(predicate1, this::handleFirstCase)
.addBranch(predicate2, this::handleSecondCase)
.onTopOf()

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:

Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a

second

argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter().mapValues(...)..
branches[1].selectKey(...).groupByKey().


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:


All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a look at the
KIP if you can, I would appreciate any feedback :)

KIP-418:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev








Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-23 Thread Ivan Ponomarev

Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
  .addBranch(predicate1, this::handleFirstCase)
  .addBranch(predicate2, this::handleSecondCase)
  .onTopOf()

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:

Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a second
argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter().mapValues(...)..
branches[1].selectKey(...).groupByKey().


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:


All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a look at the
KIP if you can, I would appreciate any feedback :)

KIP-418: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev








Re: [ANNOUNCE] New Committer: Bill Bejeck

2019-02-14 Thread Ivan Ponomarev

Congratulations, Bill!

Your 'Kafka Streams in Action' is a great book. These months it is 
always travelling with me in my backpack with my laptop ))


Regards,

Ivan

14.02.2019 3:56, Guozhang Wang пишет:

Hello all,

The PMC of Apache Kafka is happy to announce that we've added Bill Bejeck
as our newest project committer.

Bill has been active in the Kafka community since 2015. He has made
significant contributions to the Kafka Streams project with more than 100
PRs and 4 authored KIPs, including the streams topology optimization
framework. Bill's also very keen on tightening Kafka's unit test / system
tests coverage, which is a great value to our project codebase.

In addition, Bill has been very active in evangelizing Kafka for stream
processing in the community. He has given several Kafka meetup talks in the
past year, including a presentation at Kafka Summit SF. He's also authored
a book about Kafka Streams (
https://www.manning.com/books/kafka-streams-in-action), as well as various
of posts in public venues like DZone as well as his personal blog (
http://codingjunkie.net/).

We really appreciate the contributions and are looking forward to see more
from him. Congratulations, Bill !


Guozhang, on behalf of the Apache Kafka PMC





[DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-01-18 Thread Ivan Ponomarev

Hello,

I'd like to start a discussion about KIP-418. Please take a look at the 
KIP if you can, I would appreciate any feedback :)


KIP-418: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream


JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev