Re: State store guarantees - KStreams 2.8.1
Glad you figured it out. It's not easy to perform checks for this case. The problem is that the logic checking topics/partitions does not know anything about the operators that are used -- the DSL semantics are not available at runtime. And the `merge()` operator itself does not have access to input topic partition information. -Matthias On 1/26/22 12:25 PM, Jiří Syrový wrote: It's actually my mistake to blame. The whole problem was caused by topic partition count mismatch. I would have expected that at least *merge* operator let's you know when merging two inputs with a mismatching number of partitions. On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax wrote: Well, it's unclear what the remote lookup does... As Kafka Streams does not implement this part, my best guess at the moment is to blame it on a bug in the remote request implementation. Are you using some of-the-shelf implementation for the remove lookup part or are you using something build in-house? -Matthias On 1/21/22 13:13, Jiří Syrový wrote: I agree it sounds a bit off, but it seems that even a host that is not marked as active allows me to query it's store and gives me a result that is not null. This application has an API that either queries local or remote store (basically via HTTP API of active host), but the weird part is I get the local response from both instances instead of expected one remote (on non-active and non-standby host) and one local. In principle the code to query stores looks like this streams .store( StoreQueryParameters .fromNameAndType( storeName, QueryableStoreTypes.keyValueStore[K, V]() ) ) .get(key) And responses look like this: $ curl //123 *(response from instance A)* *{"meta":"KeyQueryMetadata {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', port=8080}, standbyHosts=[], partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}* $ curl //123 *(response from instance B)* *{"meta":"KeyQueryMetadata {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', port=8080}, standbyHosts=[], partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}* This behaviour is not random and is 100% reproducible. I can try to create a minimal code example that will demonstrate it. On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax wrote: but instance A returns result X for a partition I and instance B returns result Y for the same partition I. This sound a little off. As you stated, if both instances agree on the active host, the active host must either be instance A or instance B, and thus you can query partition I only on instance A or instance B. The non-active instance should not return any data for a partition it does not host. Can you elaborate? -Matthias On 1/21/22 4:47 AM, Jiří Syrový wrote: Hi everyone, I'm trying for a while to answer myself a question about what are actually guarantees for state stores in regards to consistency when connected to transformers. I have an application where a single (persistent, rocksdb backed) state store is connected to multiple transformers. Each transformer might both read (get) and write (put) data into the state store. All transformers receive data from multiple input topics in the same way (the same key, same number of partitions) that before sending it to transformers merged together. All transformers are located in the same sub-topology. What I observed is that even with 0 standby replicas I might get inconsistent results when querying this state store connected to multiple transformers. I have 2 instances and metadata on both instances agree on the active host for this state store and partition, but instance A returns result X for a partition I and instance B returns result Y for the same partition I. Any suggestions if this is a bug or is my assumption incorrect that the same state store should give the same result for the same key (same partition) in 2 distinct transformers fed from the same input? Thanks, Jiri
Re: State store guarantees - KStreams 2.8.1
It's actually my mistake to blame. The whole problem was caused by topic partition count mismatch. I would have expected that at least *merge* operator let's you know when merging two inputs with a mismatching number of partitions. On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax wrote: > Well, it's unclear what the remote lookup does... As Kafka Streams does > not implement this part, my best guess at the moment is to blame it on a > bug in the remote request implementation. > > Are you using some of-the-shelf implementation for the remove lookup > part or are you using something build in-house? > > > -Matthias > > > > On 1/21/22 13:13, Jiří Syrový wrote: > > I agree it sounds a bit off, but it seems that even a host that is not > > marked as active allows me to query it's store and gives me a result that > > is not null. > > > > This application has an API that either queries local or remote > > store (basically via HTTP API of active host), but the weird part is I > get > > the local response from both instances instead of expected one remote (on > > non-active and non-standby host) and one local. > > In principle the code to query stores looks like this > > > > streams > >.store( > > StoreQueryParameters > >.fromNameAndType( > > storeName, > > QueryableStoreTypes.keyValueStore[K, V]() > >) > >) > >.get(key) > > > > > > And responses look like this: > > $ curl //123 *(response from instance A)* > > *{"meta":"KeyQueryMetadata > > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', > > port=8080}, standbyHosts=[], > > > partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}* > > $ curl //123 *(response from instance B)* > > *{"meta":"KeyQueryMetadata > > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', > > port=8080}, standbyHosts=[], > > > partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}* > > > > This behaviour is not random and is 100% reproducible. I can try to > create > > a minimal code example that will demonstrate it. > > > > On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax wrote: > > > >>> but instance A returns > result X for a partition I and instance B returns result Y for the > same > partition I. > >> > >> This sound a little off. As you stated, if both instances agree on the > >> active host, the active host must either be instance A or instance B, > >> and thus you can query partition I only on instance A or instance B. The > >> non-active instance should not return any data for a partition it does > >> not host. > >> > >> Can you elaborate? > >> > >> -Matthias > >> > >> On 1/21/22 4:47 AM, Jiří Syrový wrote: > >>> Hi everyone, > >>> > >>> I'm trying for a while to answer myself a question about what are > >> actually > >>> guarantees for state stores in regards to consistency when connected to > >>> transformers. > >>> > >>> I have an application where a single (persistent, rocksdb backed) state > >>> store is connected to multiple transformers. Each transformer might > both > >>> read (get) and write (put) data into the state store. All transformers > >>> receive data from multiple input topics in the same way (the same key, > >> same > >>> number of partitions) that before sending it to transformers merged > >>> together. > >>> > >>> All transformers are located in the same sub-topology. > >>> > >>> What I observed is that even with 0 standby replicas I might get > >>> inconsistent results when querying this state store connected to > multiple > >>> transformers. I have 2 instances and metadata on both instances agree > on > >>> the active host for this state store and partition, but instance A > >> returns > >>> result X for a partition I and instance B returns result Y for the same > >>> partition I. > >>> > >>> Any suggestions if this is a bug or is my assumption incorrect that the > >>> same state store should give the same result for the same key (same > >>> partition) in 2 distinct transformers fed from the same input? > >>> > >>> Thanks, > >>> Jiri > >>> > >> > > >
Re: State store guarantees - KStreams 2.8.1
It's built in-house, but the samples I've posted here are from the local store with remote lookup disabled. API just adds meta data and hostname. On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax wrote: > Well, it's unclear what the remote lookup does... As Kafka Streams does > not implement this part, my best guess at the moment is to blame it on a > bug in the remote request implementation. > > Are you using some of-the-shelf implementation for the remove lookup > part or are you using something build in-house? > > > -Matthias > > > > On 1/21/22 13:13, Jiří Syrový wrote: > > I agree it sounds a bit off, but it seems that even a host that is not > > marked as active allows me to query it's store and gives me a result that > > is not null. > > > > This application has an API that either queries local or remote > > store (basically via HTTP API of active host), but the weird part is I > get > > the local response from both instances instead of expected one remote (on > > non-active and non-standby host) and one local. > > In principle the code to query stores looks like this > > > > streams > >.store( > > StoreQueryParameters > >.fromNameAndType( > > storeName, > > QueryableStoreTypes.keyValueStore[K, V]() > >) > >) > >.get(key) > > > > > > And responses look like this: > > $ curl //123 *(response from instance A)* > > *{"meta":"KeyQueryMetadata > > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', > > port=8080}, standbyHosts=[], > > > partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}* > > $ curl //123 *(response from instance B)* > > *{"meta":"KeyQueryMetadata > > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', > > port=8080}, standbyHosts=[], > > > partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}* > > > > This behaviour is not random and is 100% reproducible. I can try to > create > > a minimal code example that will demonstrate it. > > > > On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax wrote: > > > >>> but instance A returns > result X for a partition I and instance B returns result Y for the > same > partition I. > >> > >> This sound a little off. As you stated, if both instances agree on the > >> active host, the active host must either be instance A or instance B, > >> and thus you can query partition I only on instance A or instance B. The > >> non-active instance should not return any data for a partition it does > >> not host. > >> > >> Can you elaborate? > >> > >> -Matthias > >> > >> On 1/21/22 4:47 AM, Jiří Syrový wrote: > >>> Hi everyone, > >>> > >>> I'm trying for a while to answer myself a question about what are > >> actually > >>> guarantees for state stores in regards to consistency when connected to > >>> transformers. > >>> > >>> I have an application where a single (persistent, rocksdb backed) state > >>> store is connected to multiple transformers. Each transformer might > both > >>> read (get) and write (put) data into the state store. All transformers > >>> receive data from multiple input topics in the same way (the same key, > >> same > >>> number of partitions) that before sending it to transformers merged > >>> together. > >>> > >>> All transformers are located in the same sub-topology. > >>> > >>> What I observed is that even with 0 standby replicas I might get > >>> inconsistent results when querying this state store connected to > multiple > >>> transformers. I have 2 instances and metadata on both instances agree > on > >>> the active host for this state store and partition, but instance A > >> returns > >>> result X for a partition I and instance B returns result Y for the same > >>> partition I. > >>> > >>> Any suggestions if this is a bug or is my assumption incorrect that the > >>> same state store should give the same result for the same key (same > >>> partition) in 2 distinct transformers fed from the same input? > >>> > >>> Thanks, > >>> Jiri > >>> > >> > > >
Re: State store guarantees - KStreams 2.8.1
Well, it's unclear what the remote lookup does... As Kafka Streams does not implement this part, my best guess at the moment is to blame it on a bug in the remote request implementation. Are you using some of-the-shelf implementation for the remove lookup part or are you using something build in-house? -Matthias On 1/21/22 13:13, Jiří Syrový wrote: I agree it sounds a bit off, but it seems that even a host that is not marked as active allows me to query it's store and gives me a result that is not null. This application has an API that either queries local or remote store (basically via HTTP API of active host), but the weird part is I get the local response from both instances instead of expected one remote (on non-active and non-standby host) and one local. In principle the code to query stores looks like this streams .store( StoreQueryParameters .fromNameAndType( storeName, QueryableStoreTypes.keyValueStore[K, V]() ) ) .get(key) And responses look like this: $ curl //123 *(response from instance A)* *{"meta":"KeyQueryMetadata {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', port=8080}, standbyHosts=[], partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}* $ curl //123 *(response from instance B)* *{"meta":"KeyQueryMetadata {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', port=8080}, standbyHosts=[], partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}* This behaviour is not random and is 100% reproducible. I can try to create a minimal code example that will demonstrate it. On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax wrote: but instance A returns result X for a partition I and instance B returns result Y for the same partition I. This sound a little off. As you stated, if both instances agree on the active host, the active host must either be instance A or instance B, and thus you can query partition I only on instance A or instance B. The non-active instance should not return any data for a partition it does not host. Can you elaborate? -Matthias On 1/21/22 4:47 AM, Jiří Syrový wrote: Hi everyone, I'm trying for a while to answer myself a question about what are actually guarantees for state stores in regards to consistency when connected to transformers. I have an application where a single (persistent, rocksdb backed) state store is connected to multiple transformers. Each transformer might both read (get) and write (put) data into the state store. All transformers receive data from multiple input topics in the same way (the same key, same number of partitions) that before sending it to transformers merged together. All transformers are located in the same sub-topology. What I observed is that even with 0 standby replicas I might get inconsistent results when querying this state store connected to multiple transformers. I have 2 instances and metadata on both instances agree on the active host for this state store and partition, but instance A returns result X for a partition I and instance B returns result Y for the same partition I. Any suggestions if this is a bug or is my assumption incorrect that the same state store should give the same result for the same key (same partition) in 2 distinct transformers fed from the same input? Thanks, Jiri
Re: State store guarantees - KStreams 2.8.1
I agree it sounds a bit off, but it seems that even a host that is not marked as active allows me to query it's store and gives me a result that is not null. This application has an API that either queries local or remote store (basically via HTTP API of active host), but the weird part is I get the local response from both instances instead of expected one remote (on non-active and non-standby host) and one local. In principle the code to query stores looks like this streams .store( StoreQueryParameters .fromNameAndType( storeName, QueryableStoreTypes.keyValueStore[K, V]() ) ) .get(key) And responses look like this: $ curl //123 *(response from instance A)* *{"meta":"KeyQueryMetadata {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', port=8080}, standbyHosts=[], partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}* $ curl //123 *(response from instance B)* *{"meta":"KeyQueryMetadata {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal', port=8080}, standbyHosts=[], partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}* This behaviour is not random and is 100% reproducible. I can try to create a minimal code example that will demonstrate it. On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax wrote: > > but instance A returns > >> result X for a partition I and instance B returns result Y for the same > >> partition I. > > This sound a little off. As you stated, if both instances agree on the > active host, the active host must either be instance A or instance B, > and thus you can query partition I only on instance A or instance B. The > non-active instance should not return any data for a partition it does > not host. > > Can you elaborate? > > -Matthias > > On 1/21/22 4:47 AM, Jiří Syrový wrote: > > Hi everyone, > > > > I'm trying for a while to answer myself a question about what are > actually > > guarantees for state stores in regards to consistency when connected to > > transformers. > > > > I have an application where a single (persistent, rocksdb backed) state > > store is connected to multiple transformers. Each transformer might both > > read (get) and write (put) data into the state store. All transformers > > receive data from multiple input topics in the same way (the same key, > same > > number of partitions) that before sending it to transformers merged > > together. > > > > All transformers are located in the same sub-topology. > > > > What I observed is that even with 0 standby replicas I might get > > inconsistent results when querying this state store connected to multiple > > transformers. I have 2 instances and metadata on both instances agree on > > the active host for this state store and partition, but instance A > returns > > result X for a partition I and instance B returns result Y for the same > > partition I. > > > > Any suggestions if this is a bug or is my assumption incorrect that the > > same state store should give the same result for the same key (same > > partition) in 2 distinct transformers fed from the same input? > > > > Thanks, > > Jiri > > >
Re: State store guarantees - KStreams 2.8.1
but instance A returns result X for a partition I and instance B returns result Y for the same partition I. This sound a little off. As you stated, if both instances agree on the active host, the active host must either be instance A or instance B, and thus you can query partition I only on instance A or instance B. The non-active instance should not return any data for a partition it does not host. Can you elaborate? -Matthias On 1/21/22 4:47 AM, Jiří Syrový wrote: Hi everyone, I'm trying for a while to answer myself a question about what are actually guarantees for state stores in regards to consistency when connected to transformers. I have an application where a single (persistent, rocksdb backed) state store is connected to multiple transformers. Each transformer might both read (get) and write (put) data into the state store. All transformers receive data from multiple input topics in the same way (the same key, same number of partitions) that before sending it to transformers merged together. All transformers are located in the same sub-topology. What I observed is that even with 0 standby replicas I might get inconsistent results when querying this state store connected to multiple transformers. I have 2 instances and metadata on both instances agree on the active host for this state store and partition, but instance A returns result X for a partition I and instance B returns result Y for the same partition I. Any suggestions if this is a bug or is my assumption incorrect that the same state store should give the same result for the same key (same partition) in 2 distinct transformers fed from the same input? Thanks, Jiri