Hi,

have you thought about using connect to put data into a store that is more reasonable for your kind of query requirements?

Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:
On Jun 6, 2017, at 2:52 PM, Damian Guy <damian....@gmail.com> wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.
Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.

You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.
Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*

HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker <sschlans...@opentable.com>
wrote:

On Jun 6, 2017, at 6:16 AM, Eno Thereska <eno.there...@gmail.com> wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting
null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)

Eno


On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:
Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is
failing
off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's
susceptible
to the same race -- it's unlikely but possible that the data migrates
*back*
before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven



Reply via email to