Re: State store guarantees - KStreams 2.8.1

2022-01-26 Thread Matthias J. Sax

Glad you figured it out.

It's not easy to perform checks for this case. The problem is that the 
logic checking topics/partitions does not know anything about the 
operators that are used -- the DSL semantics are not available at runtime.


And the `merge()` operator itself does not have access to input topic 
partition information.



-Matthias

On 1/26/22 12:25 PM, Jiří Syrový wrote:

It's actually my mistake to blame. The whole problem was caused by topic
partition count mismatch.
I would have expected that at least *merge* operator let's you know when
merging two inputs with a mismatching number of partitions.

On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax 
wrote:


Well, it's unclear what the remote lookup does... As Kafka Streams does
not implement this part, my best guess at the moment is to blame it on a
bug in the remote request implementation.

Are you using some of-the-shelf implementation for the remove lookup
part or are you using something build in-house?


-Matthias



On 1/21/22 13:13, Jiří Syrový wrote:

I agree it sounds a bit off, but it seems that even a host that is not
marked as active allows me to query it's store and gives me a result that
is not null.

This application has an API that either queries local or remote
store (basically via HTTP API of active host), but the weird part is I

get

the local response from both instances instead of expected one remote (on
non-active and non-standby host) and one local.
In principle the code to query stores looks like this

  streams
.store(
  StoreQueryParameters
.fromNameAndType(
  storeName,
  QueryableStoreTypes.keyValueStore[K, V]()
)
)
.get(key)


And responses look like this:
$ curl //123 *(response from instance A)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],


partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}*

$ curl //123 *(response from instance B)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],


partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}*


This behaviour is not random and is 100% reproducible. I can try to

create

a minimal code example that will demonstrate it.

On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax  wrote:


but instance A returns

result X for a partition I and instance B returns result Y for the

same

partition I.


This sound a little off. As you stated, if both instances agree on the
active host, the active host must either be instance A or instance B,
and thus you can query partition I only on instance A or instance B. The
non-active instance should not return any data for a partition it does
not host.

Can you elaborate?

-Matthias

On 1/21/22 4:47 AM, Jiří Syrový wrote:

Hi everyone,

I'm trying for a while to answer myself a question about what are

actually

guarantees for state stores in regards to consistency when connected to
transformers.

I have an application where a single (persistent, rocksdb backed) state
store is connected to multiple transformers. Each transformer might

both

read (get) and write (put) data into the state store. All transformers
receive data from multiple input topics in the same way (the same key,

same

number of partitions) that before sending it to transformers merged
together.

All transformers are located in the same sub-topology.

What I observed is that even with 0 standby replicas I might get
inconsistent results when querying this state store connected to

multiple

transformers. I have 2 instances and metadata on both instances agree

on

the active host for this state store and partition, but instance A

returns

result X for a partition I and instance B returns result Y for the same
partition I.

Any suggestions if this is a bug or is my assumption incorrect that the
same state store should give the same result for the same key (same
partition) in 2 distinct transformers fed from the same input?

Thanks,
Jiri











Re: State store guarantees - KStreams 2.8.1

2022-01-26 Thread Jiří Syrový
It's actually my mistake to blame. The whole problem was caused by topic
partition count mismatch.
I would have expected that at least *merge* operator let's you know when
merging two inputs with a mismatching number of partitions.

On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax 
wrote:

> Well, it's unclear what the remote lookup does... As Kafka Streams does
> not implement this part, my best guess at the moment is to blame it on a
> bug in the remote request implementation.
>
> Are you using some of-the-shelf implementation for the remove lookup
> part or are you using something build in-house?
>
>
> -Matthias
>
>
>
> On 1/21/22 13:13, Jiří Syrový wrote:
> > I agree it sounds a bit off, but it seems that even a host that is not
> > marked as active allows me to query it's store and gives me a result that
> > is not null.
> >
> > This application has an API that either queries local or remote
> > store (basically via HTTP API of active host), but the weird part is I
> get
> > the local response from both instances instead of expected one remote (on
> > non-active and non-standby host) and one local.
> > In principle the code to query stores looks like this
> >
> >  streams
> >.store(
> >  StoreQueryParameters
> >.fromNameAndType(
> >  storeName,
> >  QueryableStoreTypes.keyValueStore[K, V]()
> >)
> >)
> >.get(key)
> >
> >
> > And responses look like this:
> > $ curl //123 *(response from instance A)*
> > *{"meta":"KeyQueryMetadata
> > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
> > port=8080}, standbyHosts=[],
> >
> partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}*
> > $ curl //123 *(response from instance B)*
> > *{"meta":"KeyQueryMetadata
> > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
> > port=8080}, standbyHosts=[],
> >
> partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}*
> >
> > This behaviour is not random and is 100% reproducible. I can try to
> create
> > a minimal code example that will demonstrate it.
> >
> > On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax  wrote:
> >
> >>> but instance A returns
>  result X for a partition I and instance B returns result Y for the
> same
>  partition I.
> >>
> >> This sound a little off. As you stated, if both instances agree on the
> >> active host, the active host must either be instance A or instance B,
> >> and thus you can query partition I only on instance A or instance B. The
> >> non-active instance should not return any data for a partition it does
> >> not host.
> >>
> >> Can you elaborate?
> >>
> >> -Matthias
> >>
> >> On 1/21/22 4:47 AM, Jiří Syrový wrote:
> >>> Hi everyone,
> >>>
> >>> I'm trying for a while to answer myself a question about what are
> >> actually
> >>> guarantees for state stores in regards to consistency when connected to
> >>> transformers.
> >>>
> >>> I have an application where a single (persistent, rocksdb backed) state
> >>> store is connected to multiple transformers. Each transformer might
> both
> >>> read (get) and write (put) data into the state store. All transformers
> >>> receive data from multiple input topics in the same way (the same key,
> >> same
> >>> number of partitions) that before sending it to transformers merged
> >>> together.
> >>>
> >>> All transformers are located in the same sub-topology.
> >>>
> >>> What I observed is that even with 0 standby replicas I might get
> >>> inconsistent results when querying this state store connected to
> multiple
> >>> transformers. I have 2 instances and metadata on both instances agree
> on
> >>> the active host for this state store and partition, but instance A
> >> returns
> >>> result X for a partition I and instance B returns result Y for the same
> >>> partition I.
> >>>
> >>> Any suggestions if this is a bug or is my assumption incorrect that the
> >>> same state store should give the same result for the same key (same
> >>> partition) in 2 distinct transformers fed from the same input?
> >>>
> >>> Thanks,
> >>> Jiri
> >>>
> >>
> >
>


Re: State store guarantees - KStreams 2.8.1

2022-01-22 Thread Jiří Syrový
It's built in-house, but the samples I've posted here are from the local
store with remote lookup disabled. API just adds meta data and hostname.


On Sat, 22 Jan 2022 at 03:46, Matthias J. Sax 
wrote:

> Well, it's unclear what the remote lookup does... As Kafka Streams does
> not implement this part, my best guess at the moment is to blame it on a
> bug in the remote request implementation.
>
> Are you using some of-the-shelf implementation for the remove lookup
> part or are you using something build in-house?
>
>
> -Matthias
>
>
>
> On 1/21/22 13:13, Jiří Syrový wrote:
> > I agree it sounds a bit off, but it seems that even a host that is not
> > marked as active allows me to query it's store and gives me a result that
> > is not null.
> >
> > This application has an API that either queries local or remote
> > store (basically via HTTP API of active host), but the weird part is I
> get
> > the local response from both instances instead of expected one remote (on
> > non-active and non-standby host) and one local.
> > In principle the code to query stores looks like this
> >
> >  streams
> >.store(
> >  StoreQueryParameters
> >.fromNameAndType(
> >  storeName,
> >  QueryableStoreTypes.keyValueStore[K, V]()
> >)
> >)
> >.get(key)
> >
> >
> > And responses look like this:
> > $ curl //123 *(response from instance A)*
> > *{"meta":"KeyQueryMetadata
> > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
> > port=8080}, standbyHosts=[],
> >
> partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}*
> > $ curl //123 *(response from instance B)*
> > *{"meta":"KeyQueryMetadata
> > {activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
> > port=8080}, standbyHosts=[],
> >
> partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}*
> >
> > This behaviour is not random and is 100% reproducible. I can try to
> create
> > a minimal code example that will demonstrate it.
> >
> > On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax  wrote:
> >
> >>> but instance A returns
>  result X for a partition I and instance B returns result Y for the
> same
>  partition I.
> >>
> >> This sound a little off. As you stated, if both instances agree on the
> >> active host, the active host must either be instance A or instance B,
> >> and thus you can query partition I only on instance A or instance B. The
> >> non-active instance should not return any data for a partition it does
> >> not host.
> >>
> >> Can you elaborate?
> >>
> >> -Matthias
> >>
> >> On 1/21/22 4:47 AM, Jiří Syrový wrote:
> >>> Hi everyone,
> >>>
> >>> I'm trying for a while to answer myself a question about what are
> >> actually
> >>> guarantees for state stores in regards to consistency when connected to
> >>> transformers.
> >>>
> >>> I have an application where a single (persistent, rocksdb backed) state
> >>> store is connected to multiple transformers. Each transformer might
> both
> >>> read (get) and write (put) data into the state store. All transformers
> >>> receive data from multiple input topics in the same way (the same key,
> >> same
> >>> number of partitions) that before sending it to transformers merged
> >>> together.
> >>>
> >>> All transformers are located in the same sub-topology.
> >>>
> >>> What I observed is that even with 0 standby replicas I might get
> >>> inconsistent results when querying this state store connected to
> multiple
> >>> transformers. I have 2 instances and metadata on both instances agree
> on
> >>> the active host for this state store and partition, but instance A
> >> returns
> >>> result X for a partition I and instance B returns result Y for the same
> >>> partition I.
> >>>
> >>> Any suggestions if this is a bug or is my assumption incorrect that the
> >>> same state store should give the same result for the same key (same
> >>> partition) in 2 distinct transformers fed from the same input?
> >>>
> >>> Thanks,
> >>> Jiri
> >>>
> >>
> >
>


Re: State store guarantees - KStreams 2.8.1

2022-01-21 Thread Matthias J. Sax
Well, it's unclear what the remote lookup does... As Kafka Streams does 
not implement this part, my best guess at the moment is to blame it on a 
bug in the remote request implementation.


Are you using some of-the-shelf implementation for the remove lookup 
part or are you using something build in-house?



-Matthias



On 1/21/22 13:13, Jiří Syrový wrote:

I agree it sounds a bit off, but it seems that even a host that is not
marked as active allows me to query it's store and gives me a result that
is not null.

This application has an API that either queries local or remote
store (basically via HTTP API of active host), but the weird part is I get
the local response from both instances instead of expected one remote (on
non-active and non-standby host) and one local.
In principle the code to query stores looks like this

 streams
   .store(
 StoreQueryParameters
   .fromNameAndType(
 storeName,
 QueryableStoreTypes.keyValueStore[K, V]()
   )
   )
   .get(key)


And responses look like this:
$ curl //123 *(response from instance A)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],
partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}*
$ curl //123 *(response from instance B)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],
partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}*

This behaviour is not random and is 100% reproducible. I can try to create
a minimal code example that will demonstrate it.

On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax  wrote:


but instance A returns

result X for a partition I and instance B returns result Y for the same
partition I.


This sound a little off. As you stated, if both instances agree on the
active host, the active host must either be instance A or instance B,
and thus you can query partition I only on instance A or instance B. The
non-active instance should not return any data for a partition it does
not host.

Can you elaborate?

-Matthias

On 1/21/22 4:47 AM, Jiří Syrový wrote:

Hi everyone,

I'm trying for a while to answer myself a question about what are

actually

guarantees for state stores in regards to consistency when connected to
transformers.

I have an application where a single (persistent, rocksdb backed) state
store is connected to multiple transformers. Each transformer might both
read (get) and write (put) data into the state store. All transformers
receive data from multiple input topics in the same way (the same key,

same

number of partitions) that before sending it to transformers merged
together.

All transformers are located in the same sub-topology.

What I observed is that even with 0 standby replicas I might get
inconsistent results when querying this state store connected to multiple
transformers. I have 2 instances and metadata on both instances agree on
the active host for this state store and partition, but instance A

returns

result X for a partition I and instance B returns result Y for the same
partition I.

Any suggestions if this is a bug or is my assumption incorrect that the
same state store should give the same result for the same key (same
partition) in 2 distinct transformers fed from the same input?

Thanks,
Jiri







Re: State store guarantees - KStreams 2.8.1

2022-01-21 Thread Jiří Syrový
I agree it sounds a bit off, but it seems that even a host that is not
marked as active allows me to query it's store and gives me a result that
is not null.

This application has an API that either queries local or remote
store (basically via HTTP API of active host), but the weird part is I get
the local response from both instances instead of expected one remote (on
non-active and non-standby host) and one local.
In principle the code to query stores looks like this

streams
  .store(
StoreQueryParameters
  .fromNameAndType(
storeName,
QueryableStoreTypes.keyValueStore[K, V]()
  )
  )
  .get(key)


And responses look like this:
$ curl //123 *(response from instance A)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],
partition=0}","value":{"timestamp":"2022-01-21T00:00:02.433Z","enabled":true},"hostname":"ip-1-2-3-4.eu-west-1.compute.internal"}*
$ curl //123 *(response from instance B)*
*{"meta":"KeyQueryMetadata
{activeHost=HostInfo{host='ip-1-2-3-4.eu-west-1.compute.internal',
port=8080}, standbyHosts=[],
partition=0}","value":{"timestamp":"2022-01-21T15:55:27.807Z","enabled":false},"hostname":"ip-9-8-7-6.eu-west-1.compute.internal"}*

This behaviour is not random and is 100% reproducible. I can try to create
a minimal code example that will demonstrate it.

On Fri, 21 Jan 2022 at 18:20, Matthias J. Sax  wrote:

> > but instance A returns
> >> result X for a partition I and instance B returns result Y for the same
> >> partition I.
>
> This sound a little off. As you stated, if both instances agree on the
> active host, the active host must either be instance A or instance B,
> and thus you can query partition I only on instance A or instance B. The
> non-active instance should not return any data for a partition it does
> not host.
>
> Can you elaborate?
>
> -Matthias
>
> On 1/21/22 4:47 AM, Jiří Syrový wrote:
> > Hi everyone,
> >
> > I'm trying for a while to answer myself a question about what are
> actually
> > guarantees for state stores in regards to consistency when connected to
> > transformers.
> >
> > I have an application where a single (persistent, rocksdb backed) state
> > store is connected to multiple transformers. Each transformer might both
> > read (get) and write (put) data into the state store. All transformers
> > receive data from multiple input topics in the same way (the same key,
> same
> > number of partitions) that before sending it to transformers merged
> > together.
> >
> > All transformers are located in the same sub-topology.
> >
> > What I observed is that even with 0 standby replicas I might get
> > inconsistent results when querying this state store connected to multiple
> > transformers. I have 2 instances and metadata on both instances agree on
> > the active host for this state store and partition, but instance A
> returns
> > result X for a partition I and instance B returns result Y for the same
> > partition I.
> >
> > Any suggestions if this is a bug or is my assumption incorrect that the
> > same state store should give the same result for the same key (same
> > partition) in 2 distinct transformers fed from the same input?
> >
> > Thanks,
> > Jiri
> >
>


Re: State store guarantees - KStreams 2.8.1

2022-01-21 Thread Matthias J. Sax

but instance A returns

result X for a partition I and instance B returns result Y for the same
partition I.


This sound a little off. As you stated, if both instances agree on the 
active host, the active host must either be instance A or instance B, 
and thus you can query partition I only on instance A or instance B. The 
non-active instance should not return any data for a partition it does 
not host.


Can you elaborate?

-Matthias

On 1/21/22 4:47 AM, Jiří Syrový wrote:

Hi everyone,

I'm trying for a while to answer myself a question about what are actually
guarantees for state stores in regards to consistency when connected to
transformers.

I have an application where a single (persistent, rocksdb backed) state
store is connected to multiple transformers. Each transformer might both
read (get) and write (put) data into the state store. All transformers
receive data from multiple input topics in the same way (the same key, same
number of partitions) that before sending it to transformers merged
together.

All transformers are located in the same sub-topology.

What I observed is that even with 0 standby replicas I might get
inconsistent results when querying this state store connected to multiple
transformers. I have 2 instances and metadata on both instances agree on
the active host for this state store and partition, but instance A returns
result X for a partition I and instance B returns result Y for the same
partition I.

Any suggestions if this is a bug or is my assumption incorrect that the
same state store should give the same result for the same key (same
partition) in 2 distinct transformers fed from the same input?

Thanks,
Jiri