If you write to remote DB, keep in mind that this will impact you
Streams app, as you loose data locality.

Thus, populating a DB from the changelog might be better. It also
decouples both systems what give you the advantage that your Streams app
can still run if DB has an issues. If you write directly into DB and DB
is not available Streams App is doomed to fail too.


-Matthias


On 6/7/17 2:54 PM, Jan Filipiak wrote:
> Depends, embedded postgress puts you into the same spot.
> 
> But if you use your state store change log to materialize into a
> postgress; that might work out decently.
> Current JDBC doesn't support delete which is an issue but writing a
> custom sink is not to hard.
> 
> Best Jan
> 
> 
> On 07.06.2017 23:47, Steven Schlansker wrote:
>> I was actually considering writing my own KeyValueStore backed
>> by e.g. a Postgres or the like.
>>
>> Is there some feature Connect gains me that would make it better
>> than such an approach?
>>
>> thanks
>>
>>> On Jun 7, 2017, at 2:20 PM, Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> have you thought about using connect to put data into a store that is
>>> more reasonable for your kind of query requirements?
>>>
>>> Best Jan
>>>
>>> On 07.06.2017 00:29, Steven Schlansker wrote:
>>>>> On Jun 6, 2017, at 2:52 PM, Damian Guy <damian....@gmail.com> wrote:
>>>>>
>>>>> Steven,
>>>>>
>>>>> In practice, data shouldn't be migrating that often. If it is then you
>>>>> probably have bigger problems.
>>>> Understood and agreed, but when designing distributed systems, it
>>>> usually
>>>> helps to model for the worst case rather than the "well that should
>>>> never
>>>> happen" case, lest you find yourself fixing those bugs at 3am
>>>> instead :)
>>>>
>>>> I'd like to be able to induce extreme pain at the Kafka layer
>>>> (change leader
>>>> every 3 seconds and migrate all partitions around randomly) and
>>>> still have
>>>> my app behave correctly.
>>>>
>>>>> You should be able to use the metadata api
>>>>> to find the instance the key should be on and then when you check
>>>>> that node
>>>>> you can also check with the metadata api that the key should still
>>>>> be on
>>>>> this host. If streams is rebalancing while you query an exception
>>>>> will be
>>>>> raised and you'll need to retry the request once the rebalance has
>>>>> completed.
>>>> Agreed here as well.  But let's assume I have a very fast replication
>>>> setup (assume it takes zero time, for the sake of argument) -- I'm
>>>> fairly
>>>> sure there's still a race here as this exception only fires *during
>>>> a migration*
>>>> not *after a migration that may have invalidated your metadata
>>>> lookup completes*
>>>>
>>>>> HTH,
>>>>> Damian
>>>>>
>>>>> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker
>>>>> <sschlans...@opentable.com>
>>>>> wrote:
>>>>>
>>>>>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska <eno.there...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Steven,
>>>>>>>
>>>>>>> Do you know beforehand if a key exists? If you know that and are
>>>>>>> getting
>>>>>> null() the code will have to retry by refreshing the metadata and
>>>>>> going to
>>>>>> the new instance. If you don’t know beforehand if a key exists or
>>>>>> not you
>>>>>> might have to check all instances of a store to make sure.
>>>>>> No, I am not presupposing that the key can exist -- this is a user
>>>>>> visible
>>>>>> API and will
>>>>>> be prone to "accidents" :)
>>>>>>
>>>>>> Thanks for the insight.  I worry that even checking all stores is not
>>>>>> truly sufficient,
>>>>>> as querying different all workers at different times in the
>>>>>> presence of
>>>>>> migrating data
>>>>>> can still in theory miss it given pessimal execution.
>>>>>>
>>>>>> I'm sure I've long wandered off into the hypothetical, but I dream
>>>>>> of some
>>>>>> day being
>>>>>> cool like Jepsen :)
>>>>>>
>>>>>>> Eno
>>>>>>>
>>>>>>>
>>>>>>>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>>>>>> sschlans...@opentable.com> wrote:
>>>>>>>> Hi everyone, me again :)
>>>>>>>>
>>>>>>>> I'm still trying to implement my "remoting" layer that allows
>>>>>>>> my clients to see the partitioned Kafka Streams state
>>>>>>>> regardless of which instance they hit.  Roughly, my lookup is:
>>>>>>>>
>>>>>>>> Message get(Key key) {
>>>>>>>>   RemoteInstance instance = selectPartition(key);
>>>>>>>>   return instance.get(key); // http remoting
>>>>>>>> }
>>>>>>>>
>>>>>>>> RemoteInstance.get(Key key) { // http endpoint
>>>>>>>>   return readOnlyKeyValueStore.get(key);
>>>>>>>> }
>>>>>>>>
>>>>>>>> However, the mapping of partitions to instances may change.
>>>>>>>> If you call KeyValueStore.get(K) where K is on a partition you
>>>>>>>> don't own, it returns null.  This is indistinguishable from a
>>>>>>>> successful get on a key that doesn't exist.
>>>>>>>>
>>>>>>>> If one instance selects a sibling instance right as the
>>>>>>>> partition is
>>>>>> failing
>>>>>>>> off of that instance, it may get routed there and by the time it
>>>>>>>> gets
>>>>>>>> the request no longer "owns" the partition -- returns a false
>>>>>>>> 'null'.
>>>>>>>>
>>>>>>>> You can try re-checking after you get a null value, but that's
>>>>>> susceptible
>>>>>>>> to the same race -- it's unlikely but possible that the data
>>>>>>>> migrates
>>>>>> *back*
>>>>>>>> before you do this re-check.
>>>>>>>>
>>>>>>>> Is there any way to correctly implement this without races?  I'd
>>>>>>>> imagine
>>>>>>>> you need a new primitive like KeyValueStore#get that atomically
>>>>>>>> finds
>>>>>>>> the key or throws an exception if it is not in an owned partition
>>>>>>>> at the time of lookup so you know to recheck the partition and
>>>>>>>> retry.
>>>>>>>>
>>>>>>>> Thoughts?
>>>>>>>>
>>>>>>>> Thanks again,
>>>>>>>> Steven
>>>>>>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to