Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-17 Thread Matthias J. Sax
You cannot add a `Processor`. You can only use `aggregate() / reduce() / 
count()` (which of course will add a pre-defined processor).


`groupByKey()` is really just a "meta operation" that checks if the key 
was changes upstream, and to insert a repartition/shuffle step if necessary.


Thus, if you don't change the upstream key, you can just add a processor 
to `someStream` (groupByKey() would be a no-op anyway).


If you did change the key upstream, you can do 
`someStream.repartition().transform()` to repartition explicitly.



HTH.

On 1/13/24 3:14 AM, Igor Maznitsa wrote:
Thanks a lot for explanation but could you provide a bit more details 
about KGroupedStream? It is just interface and not extends KStream so 
how I can add processor in the case below?

/
   KStream someStream = /
/  someStream /
/     .groupByKey()
/ */how to add processor for resulted grouped stream here ???/*

On 2024-Jan-13 01:22, Matthias J. Sax wrote:
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying 
you  need to use `KTable#groupBy()` (data needs to be repartitioned if 
you change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance 
provide way to not downstream selected events?







Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-13 Thread Igor Maznitsa
Thanks a lot for explanation but could you provide a bit more details 
about KGroupedStream? It is just interface and not extends KStream so 
how I can add processor in the case below?

/
  KStream someStream = /
/  someStream /
/     .groupByKey()
/ */how to add processor for resulted grouped stream here ???/*

On 2024-Jan-13 01:22, Matthias J. Sax wrote:
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying 
you  need to use `KTable#groupBy()` (data needs to be repartitioned if 
you change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance 
provide way to not downstream selected events?







Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Matthias J. Sax
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying you 
 need to use `KTable#groupBy()` (data needs to be repartitioned if you 
change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance provide 
way to not downstream selected events?





[Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Igor Maznitsa

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance provide 
way to not downstream selected events?



--
Igor Maznitsa
email: rrg4...@gmail.com