Re: [Kafka Streams], Processor API for KTable and KGroupedStream
Thanks a lot for explanation but could you provide a bit more details about KGroupedStream? It is just interface and not extends KStream so how I can add processor in the case below? / KStream someStream = / / someStream / / .groupByKey() / */how to add processor for resulted grouped stream here ???/* On 2024-Jan-13 01:22, Matthias J. Sax wrote: `KGroupedStream` is just an "intermediate representation" to get a better flow in the DSL. It's not a "top level" abstraction like KStream/KTable. For `KTable` there is `transformValue()` -- there is no `transform()` because keying must be preserved -- if you want to change the keying you need to use `KTable#groupBy()` (data needs to be repartitioned if you change the key). HTH. -Matthias On 1/12/24 11:47 AM, Igor Maznitsa wrote: Hello Is there any way in Kafka Streams API to define processors for KTable and KGroupedStream like KStream#transform? How to provide a custom processor for KTable or KGroupedStream which could for instance provide way to not downstream selected events?
[Kafka Streams], Processor API for KTable and KGroupedStream
Hello Is there any way in Kafka Streams API to define processors for KTable and KGroupedStream like KStream#transform? How to provide a custom processor for KTable or KGroupedStream which could for instance provide way to not downstream selected events? -- Igor Maznitsa email: rrg4...@gmail.com
Re: Kafka Streams, read standby time window store
Hi Bruno Looks like that I have found error in my code. The error was that I split create of StoreQueryParameters and my code looked like snippet below /*var query = StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType);*/ /*if (useStale) {*/ /* query.enableStaleStore(); */ */}/ * It was my mistake because I expected that the already existing instance will be changed but it is immutable one and generates new instance. I don't know why they have not made it as Builder pattern which would be more appropriate one in the case imho. I've made fix in my code */var query = StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType);/* */if (useStale) {/* */ query = query.enableStaleStore(); /* */}/* and now it works well! thanks a lot for your help! On 9/6/23 16:05, Bruno Cadonna wrote: Hi Igor, Sorry to hear you have issues with querying standbys! I have two questions to clarify the situation: 1. Did you enable querying stale stores with StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores() as described in the blog post? 2. Since you are querying a stale store, could it be the the standby hasn't caught up yet? Best, Bruno On 9/6/23 8:32 AM, Igor Maznitsa wrote: Hello 1. I am starting two Kafka Streams applications worked in same group with num.standby.replicas=1 2. Application A has active TimeWindow data store and application B has the standby version of the data store Is there any way to read the standby store time window data in bounds of B application? I found some examples (like https://www.confluent.io/blog/kafka-streams-ksqldb-interactive-queries-go-prime-time/) and tests but all they use KeyValue stores. I made some experiments but got empty fetch on standby node for same key and time interval which returns windows on the active node. Is there any restriction and such mechanism works only for KeyValue stores?
Kafka Streams, read standby time window store
Hello 1. I am starting two Kafka Streams applications worked in same group with num.standby.replicas=1 2. Application A has active TimeWindow data store and application B has the standby version of the data store Is there any way to read the standby store time window data in bounds of B application? I found some examples (like https://www.confluent.io/blog/kafka-streams-ksqldb-interactive-queries-go-prime-time/) and tests but all they use KeyValue stores. I made some experiments but got empty fetch on standby node for same key and time interval which returns windows on the active node. Is there any restriction and such mechanism works only for KeyValue stores? -- Igor Maznitsa
Re: KafkaStreams - InteractiveQuery and num.standby.replicas
yes, exactly it is from spring cloud kafka streams binder, thank you! On 3/9/21 21:59, Matthias J. Sax wrote: Please keep the discussion on the mailing list. Thanks. There is no such thing as `InteractiveQueryService` in Kafka Streams. Seems it's a Spring thing? Not sure how Spring work. Maybe somebody else can help out? -Matthias On 3/9/21 3:15 AM, Igor Maznitsa wrote: thanks for advice! does standby.replicas play role during search of HostInfo through InteractiveQueryService#getHostInfo ? will it return the local host info (instead of remote instance) if it contains local store replica for required key? On 3/9/21 12:49, Matthias J. Sax wrote: You can control it: By default only active stores are queried. If you want to query standby stores, you can use `StoreQueryParameters#enableStaleStores()` -Matthias On 3/9/21 1:30 AM, Igor Maznitsa wrote: Hello If I have started two KafkaStream application instances, and they have parameter 'num.standby.replicas=1'. It means that each instance has shadow copy of local state stores, so the question: How InteractionQuery behaves in activated stand by replica case? does it aware about replicated data and will be returning data from local replicated store or it knows nothing about standby mode and will return information that data situated on another host? -- Igor Maznitsa email: rrg4...@gmail.com http://www.igormaznitsa.com
Re: KafkaStreams - InteractiveQuery and num.standby.replicas
thanks for advice! does standby.replicas play role during search of HostInfo through InteractiveQueryService#getHostInfo ? will it return the local host info (instead of remote instance) if it contains local store replica for required key? On 3/9/21 12:49, Matthias J. Sax wrote: You can control it: By default only active stores are queried. If you want to query standby stores, you can use `StoreQueryParameters#enableStaleStores()` -Matthias On 3/9/21 1:30 AM, Igor Maznitsa wrote: Hello If I have started two KafkaStream application instances, and they have parameter 'num.standby.replicas=1'. It means that each instance has shadow copy of local state stores, so the question: How InteractionQuery behaves in activated stand by replica case? does it aware about replicated data and will be returning data from local replicated store or it knows nothing about standby mode and will return information that data situated on another host? -- Igor Maznitsa email: rrg4...@gmail.com http://www.igormaznitsa.com
KafkaStreams - InteractiveQuery and num.standby.replicas
Hello If I have started two KafkaStream application instances, and they have parameter 'num.standby.replicas=1'. It means that each instance has shadow copy of local state stores, so the question: How InteractionQuery behaves in activated stand by replica case? does it aware about replicated data and will be returning data from local replicated store or it knows nothing about standby mode and will return information that data situated on another host?