Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-13 Thread Igor Maznitsa
Thanks a lot for explanation but could you provide a bit more details 
about KGroupedStream? It is just interface and not extends KStream so 
how I can add processor in the case below?

/
  KStream someStream = /
/  someStream /
/     .groupByKey()
/ */how to add processor for resulted grouped stream here ???/*

On 2024-Jan-13 01:22, Matthias J. Sax wrote:
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying 
you  need to use `KTable#groupBy()` (data needs to be repartitioned if 
you change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance 
provide way to not downstream selected events?







[Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Igor Maznitsa

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance provide 
way to not downstream selected events?



--
Igor Maznitsa
email: rrg4...@gmail.com



Re: Kafka Streams, read standby time window store

2023-09-06 Thread Igor Maznitsa

Hi Bruno

Looks like that I have found error in my code. The error was that I 
split create of  StoreQueryParameters and my code looked like snippet below


/*var query = StoreQueryParameters.fromNameAndType(TABLE_NAME, 
queryableStoreType);*/

/*if (useStale) {*/
/*    query.enableStaleStore();
*/
*/}/
*

It was my mistake because I expected that the already existing instance 
will be changed but it is immutable one and generates new instance. I 
don't know why they have not made it as Builder pattern which would be 
more appropriate one in the case imho.

I've made fix in my code

*/var query = StoreQueryParameters.fromNameAndType(TABLE_NAME, 
queryableStoreType);/*

*/if (useStale) {/*
*/    query = query.enableStaleStore();
/*
*/}/*

and now it works well! thanks a lot for your help!


On 9/6/23 16:05, Bruno Cadonna wrote:

Hi Igor,

Sorry to hear you have issues with querying standbys!

I have two questions to clarify the situation:

1. Did you enable querying stale stores with

StoreQueryParameters.fromNameAndType(TABLE_NAME, 
queryableStoreType).enableStaleStores()


as described in the blog post?


2. Since you are querying a stale store, could it be the the standby 
hasn't caught up yet?


Best,
Bruno

On 9/6/23 8:32 AM, Igor Maznitsa wrote:

Hello

1. I am starting two Kafka Streams applications worked in same group 
with num.standby.replicas=1


2. Application A has active TimeWindow data store and application B 
has the standby version of the data store


Is there any way to read the standby store time window data in bounds 
of B application?
I found some examples (like 
https://www.confluent.io/blog/kafka-streams-ksqldb-interactive-queries-go-prime-time/) 
and tests but all they use KeyValue stores. I made some experiments 
but got empty fetch on standby node for same key and time interval 
which returns windows on the active node.  Is there any restriction 
and such mechanism works only for KeyValue stores?






Kafka Streams, read standby time window store

2023-09-06 Thread Igor Maznitsa

Hello

1. I am starting two Kafka Streams applications worked in same group 
with num.standby.replicas=1


2. Application A has active TimeWindow data store and application B has 
the standby version of the data store


Is there any way to read the standby store time window data in bounds of 
B application?
I found some examples (like 
https://www.confluent.io/blog/kafka-streams-ksqldb-interactive-queries-go-prime-time/) 
and tests but all they use KeyValue stores. I made some experiments but 
got empty fetch on standby node for same key and time interval which 
returns windows on the active node.  Is there any restriction and such 
mechanism works only for KeyValue stores?


--

Igor Maznitsa



Re: KafkaStreams - InteractiveQuery and num.standby.replicas

2021-03-09 Thread Igor Maznitsa

yes, exactly it is from spring cloud kafka streams binder, thank you!

On 3/9/21 21:59, Matthias J. Sax wrote:

Please keep the discussion on the mailing list. Thanks.

There is no such thing as `InteractiveQueryService` in Kafka Streams.
Seems it's a Spring thing? Not sure how Spring work. Maybe somebody else
can help out?


-Matthias

On 3/9/21 3:15 AM, Igor Maznitsa wrote:

thanks for advice!

does standby.replicas play role during search of HostInfo through
InteractiveQueryService#getHostInfo ? will it return the local host info
(instead of remote instance) if it contains local store replica for
required key?

On 3/9/21 12:49, Matthias J. Sax wrote:

You can control it:

By default only active stores are queried. If you want to query standby
stores, you can use `StoreQueryParameters#enableStaleStores()`


-Matthias

On 3/9/21 1:30 AM, Igor Maznitsa wrote:

Hello

If I have started two KafkaStream application instances, and they have
parameter 'num.standby.replicas=1'. It means that each instance has
shadow copy of local state stores, so the question:

How InteractionQuery behaves in activated stand by replica case? does it
aware about replicated data and will be returning data from local
replicated store or it knows nothing about standby mode and will return
information that data situated on another host?



--
Igor Maznitsa

email: rrg4...@gmail.com
http://www.igormaznitsa.com



Re: KafkaStreams - InteractiveQuery and num.standby.replicas

2021-03-09 Thread Igor Maznitsa

thanks for advice!

does standby.replicas play role during search of HostInfo through 
InteractiveQueryService#getHostInfo ? will it return the local host info 
(instead of remote instance) if it contains local store replica for 
required key?


On 3/9/21 12:49, Matthias J. Sax wrote:

You can control it:

By default only active stores are queried. If you want to query standby
stores, you can use `StoreQueryParameters#enableStaleStores()`


-Matthias

On 3/9/21 1:30 AM, Igor Maznitsa wrote:

Hello

If I have started two KafkaStream application instances, and they have
parameter 'num.standby.replicas=1'. It means that each instance has
shadow copy of local state stores, so the question:

How InteractionQuery behaves in activated stand by replica case? does it
aware about replicated data and will be returning data from local
replicated store or it knows nothing about standby mode and will return
information that data situated on another host?



--
Igor Maznitsa

email: rrg4...@gmail.com
http://www.igormaznitsa.com



KafkaStreams - InteractiveQuery and num.standby.replicas

2021-03-09 Thread Igor Maznitsa

Hello

If I have started two KafkaStream application instances, and they have 
parameter 'num.standby.replicas=1'. It means that each instance has 
shadow copy of local state stores, so the question:


How InteractionQuery behaves in activated stand by replica case? does it 
aware about replicated data and will be returning data from local 
replicated store or it knows nothing about standby mode and will return 
information that data situated on another host?