Kafka logo on website

2017-06-06 Thread Maria Angelella
Hello,

My company, Dattell, is a consulting agency that helps businesses with big
data messaging, storage and analysis using open source tools. I am writing
to let you know that we listed your logo on our website, here
.

Please let us know if you would like us to remove it.


Sincerely,

Maria

Maria Angelella, PhD
Managing Partner

Mobile: 570-704-7493
Website: dattell.com


Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-06 Thread Guozhang Wang
Thanks Steven, interesting use case.

The current streams state store metadata discovery is assuming the
`DefaultStreamPartitioner` is used, which is a limitation for such cases.

Another workaround that I can think of is, that you can first partition on
D in the first stage to let the workers to the "real" work, then you can
pipe it to a second stage where you re-partition on K, and the second
processor is only for materializing the store for querying. I'm not sure if
it would be better since it may require doubling the store spaces (one on
the first processor and one on the second), and since you can hold the
whole K -> D map in a global state it seems this map is small enough so
maybe not worth the repartitioning.


Guozhang






On Tue, Jun 6, 2017 at 8:36 AM, Michael Noll  wrote:

> Happy to hear you found a working solution, Steven!
>
> -Michael
>
>
>
> On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker <
> sschlans...@opentable.com> wrote:
>
> > >
> > > On Jun 2, 2017, at 3:32 PM, Matthias J. Sax 
> > wrote:
> > >
> > > Thanks. That helps to understand the use case better.
> > >
> > > Rephrase to make sure I understood it correctly:
> > >
> > > 1) you are providing a custom partitioner to Streams that is base on
> one
> > > field in your value (that's fine with regard to fault-tolerance :))
> > > 2) you want to use interactive queries to query the store
> > > 3) because of your custom partitioning schema, you need to manually
> > > figure out the right application instance that hosts a key
> > > 4) thus, you use a GlobalKTable to maintain the information from K to D
> > > and thus to the partition ie, streams instance that hosts K
> > >
> > > If this is correct, than you cannot use the "by key" metadata
> interface.
> > > It's designed to find the streams instance base in the key only -- but
> > > your partitioner is based on the value. Internally, we call
> > >
> > >> final Integer partition = partitioner.partition(key, null,
> > sourceTopicsInfo.maxPartitions);
> > >
> > > Note, that `value==null` -- at this point, we don't have any value
> > > available and can't provide it to the partitioner.
> > >
> > > Thus, your approach to get all metadata is the only way you can go.
> >
> > Thanks for confirming this.  The code is a little ugly but I've done
> worse
> > :)
> >
> > >
> > >
> > > Very interesting (and quite special) use case. :)
> > >
> > >
> > > -Matthias
> > >
> > > On 6/2/17 2:32 PM, Steven Schlansker wrote:
> > >>
> > >>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax 
> > wrote:
> > >>>
> > >>> I am not sure if I understand the use case correctly. Could you give
> > >>> some more context?
> > >>
> > >> Happily, thanks for thinking about this!
> > >>
> > >>>
> >  backing store whose partitioning is value dependent
> > >>>
> > >>> In infer that you are using a custom store and not default RocksDB?
> If
> > >>> yes, what do you use? What does "value dependent" mean in this
> context?
> > >>
> > >> We're currently using the base in memory store.  We tried to use
> RocksDB
> > >> but the tuning to get it running appropriately in a Linux container
> > without
> > >> tripping the cgroups OOM killer is nontrivial.
> > >>
> > >>
> > >>> Right now, I am wondering, why you not just set a new key to get your
> > >>> data grouped by the field you are interesting in? Also, if you don't
> > >>> partitioned your data by key, you might break your streams
> application
> > >>> with regard to fault-tolerance -- or does your custom store not rely
> on
> > >>> changelog backup for fault-tolerance?
> > >>>
> > >>
> > >> That's an interesting point about making transformed key.  But I don't
> > think
> > >> it simplifies my problem too much.  Essentially, I have a list of
> > messages
> > >> that should get delivered to destinations.  Each message has a primary
> > key K
> > >> and a destination D.
> > >>
> > >> We partition over D so that all messages to the same destination are
> > handled by
> > >> the same worker, to preserve ordering and implement local rate limits
> > etc.
> > >>
> > >> I want to preserve the illusion to the client that they can look up a
> > key with
> > >> only K.  So, as an intermediate step, we use the GlobalKTable to look
> > up D.  Once
> > >> we have K,D we can then compute the partition and execute a lookup.
> > >>
> > >> Transforming the key to be a composite K,D isn't helpful because the
> > end user still
> > >> only knows K -- D's relevance is an implementation detail I wish to
> > hide -- so you still
> > >> need some sort of secondary lookup.
> > >>
> > >> We do use the changelog backup for fault tolerance -- how would having
> > the partition
> > >> based on the value break this?  Is the changelog implicitly
> partitioned
> > by a partitioner
> > >> other than the one we give to the topology?
> > >>
> > >> Hopefully that explains my situation a bit more?  Thanks!
> > >>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>>
> > >>> On 6/2/17 10:34 AM, Steven Schlansker wrote:

Re: Reliably implementing global KeyValueStore#get

2017-06-06 Thread Steven Schlansker

> On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:
> 
> Steven,
> 
> In practice, data shouldn't be migrating that often. If it is then you
> probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.

> You should be able to use the metadata api
> to find the instance the key should be on and then when you check that node
> you can also check with the metadata api that the key should still be on
> this host. If streams is rebalancing while you query an exception will be
> raised and you'll need to retry the request once the rebalance has
> completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*

> 
> HTH,
> Damian
> 
> On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
> wrote:
> 
>> 
>>> On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:
>>> 
>>> Hi Steven,
>>> 
>>> Do you know beforehand if a key exists? If you know that and are getting
>> null() the code will have to retry by refreshing the metadata and going to
>> the new instance. If you don’t know beforehand if a key exists or not you
>> might have to check all instances of a store to make sure.
>>> 
>> 
>> No, I am not presupposing that the key can exist -- this is a user visible
>> API and will
>> be prone to "accidents" :)
>> 
>> Thanks for the insight.  I worry that even checking all stores is not
>> truly sufficient,
>> as querying different all workers at different times in the presence of
>> migrating data
>> can still in theory miss it given pessimal execution.
>> 
>> I'm sure I've long wandered off into the hypothetical, but I dream of some
>> day being
>> cool like Jepsen :)
>> 
>>> Eno
>>> 
>>> 
 On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
>> sschlans...@opentable.com> wrote:
 
 Hi everyone, me again :)
 
 I'm still trying to implement my "remoting" layer that allows
 my clients to see the partitioned Kafka Streams state
 regardless of which instance they hit.  Roughly, my lookup is:
 
 Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
 }
 
 RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
 }
 
 However, the mapping of partitions to instances may change.
 If you call KeyValueStore.get(K) where K is on a partition you
 don't own, it returns null.  This is indistinguishable from a
 successful get on a key that doesn't exist.
 
 If one instance selects a sibling instance right as the partition is
>> failing
 off of that instance, it may get routed there and by the time it gets
 the request no longer "owns" the partition -- returns a false 'null'.
 
 You can try re-checking after you get a null value, but that's
>> susceptible
 to the same race -- it's unlikely but possible that the data migrates
>> *back*
 before you do this re-check.
 
 Is there any way to correctly implement this without races?  I'd imagine
 you need a new primitive like KeyValueStore#get that atomically finds
 the key or throws an exception if it is not in an owned partition
 at the time of lookup so you know to recheck the partition and retry.
 
 Thoughts?
 
 Thanks again,
 Steven
 
>>> 
>> 
>> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Reliably implementing global KeyValueStore#get

2017-06-06 Thread Damian Guy
Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems. You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:

>
> > On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:
> >
> > Hi Steven,
> >
> > Do you know beforehand if a key exists? If you know that and are getting
> null() the code will have to retry by refreshing the metadata and going to
> the new instance. If you don’t know beforehand if a key exists or not you
> might have to check all instances of a store to make sure.
> >
>
> No, I am not presupposing that the key can exist -- this is a user visible
> API and will
> be prone to "accidents" :)
>
> Thanks for the insight.  I worry that even checking all stores is not
> truly sufficient,
> as querying different all workers at different times in the presence of
> migrating data
> can still in theory miss it given pessimal execution.
>
> I'm sure I've long wandered off into the hypothetical, but I dream of some
> day being
> cool like Jepsen :)
>
> > Eno
> >
> >
> >> On Jun 5, 2017, at 10:12 PM, Steven Schlansker <
> sschlans...@opentable.com> wrote:
> >>
> >> Hi everyone, me again :)
> >>
> >> I'm still trying to implement my "remoting" layer that allows
> >> my clients to see the partitioned Kafka Streams state
> >> regardless of which instance they hit.  Roughly, my lookup is:
> >>
> >> Message get(Key key) {
> >>   RemoteInstance instance = selectPartition(key);
> >>   return instance.get(key); // http remoting
> >> }
> >>
> >> RemoteInstance.get(Key key) { // http endpoint
> >>   return readOnlyKeyValueStore.get(key);
> >> }
> >>
> >> However, the mapping of partitions to instances may change.
> >> If you call KeyValueStore.get(K) where K is on a partition you
> >> don't own, it returns null.  This is indistinguishable from a
> >> successful get on a key that doesn't exist.
> >>
> >> If one instance selects a sibling instance right as the partition is
> failing
> >> off of that instance, it may get routed there and by the time it gets
> >> the request no longer "owns" the partition -- returns a false 'null'.
> >>
> >> You can try re-checking after you get a null value, but that's
> susceptible
> >> to the same race -- it's unlikely but possible that the data migrates
> *back*
> >> before you do this re-check.
> >>
> >> Is there any way to correctly implement this without races?  I'd imagine
> >> you need a new primitive like KeyValueStore#get that atomically finds
> >> the key or throws an exception if it is not in an owned partition
> >> at the time of lookup so you know to recheck the partition and retry.
> >>
> >> Thoughts?
> >>
> >> Thanks again,
> >> Steven
> >>
> >
>
>


Can a SourceTask run out of things to do?

2017-06-06 Thread Gautam Pulla
Hi,

I'm creating a Kafka source connector that load's some data that is in the form 
of individual files that are being created continuously. I was planning 
initially to create one task per file - that would allow the framework to 
balance the work across all workers in a straightforward way. In the poll() 
method of the source task, I would read and return all records in the file, and 
when poll would reach the end of the file, it would terminate and the task 
would be "finished".

This notion of a task being "finished" and running out of things to do is where 
I ran into a problem. It doesn't seem to fit into connect's model. The worker 
thread calls poll() continuously on a source task & there's no simple way in 
the framework to finish a task (for example: returning null from poll will 
cause the worker thread to call poll again after a short pause).

>From this, I believe that source tasks are supposed to produce an *infinite* 
>stream of data - and I should allocate the work between tasks in some other 
>fashion than make each individual file a task.

Is this correct?

Thanks,
Gautam


Re: Reliably implementing global KeyValueStore#get

2017-06-06 Thread Steven Schlansker

> On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:
> 
> Hi Steven,
> 
> Do you know beforehand if a key exists? If you know that and are getting 
> null() the code will have to retry by refreshing the metadata and going to 
> the new instance. If you don’t know beforehand if a key exists or not you 
> might have to check all instances of a store to make sure.
> 

No, I am not presupposing that the key can exist -- this is a user visible API 
and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not truly 
sufficient,
as querying different all workers at different times in the presence of 
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some day 
being
cool like Jepsen :)

> Eno
> 
> 
>> On Jun 5, 2017, at 10:12 PM, Steven Schlansker  
>> wrote:
>> 
>> Hi everyone, me again :)
>> 
>> I'm still trying to implement my "remoting" layer that allows
>> my clients to see the partitioned Kafka Streams state
>> regardless of which instance they hit.  Roughly, my lookup is:
>> 
>> Message get(Key key) {
>>   RemoteInstance instance = selectPartition(key);
>>   return instance.get(key); // http remoting
>> }
>> 
>> RemoteInstance.get(Key key) { // http endpoint
>>   return readOnlyKeyValueStore.get(key);
>> }
>> 
>> However, the mapping of partitions to instances may change.
>> If you call KeyValueStore.get(K) where K is on a partition you
>> don't own, it returns null.  This is indistinguishable from a
>> successful get on a key that doesn't exist.
>> 
>> If one instance selects a sibling instance right as the partition is failing
>> off of that instance, it may get routed there and by the time it gets
>> the request no longer "owns" the partition -- returns a false 'null'.
>> 
>> You can try re-checking after you get a null value, but that's susceptible
>> to the same race -- it's unlikely but possible that the data migrates *back*
>> before you do this re-check.
>> 
>> Is there any way to correctly implement this without races?  I'd imagine
>> you need a new primitive like KeyValueStore#get that atomically finds
>> the key or throws an exception if it is not in an owned partition
>> at the time of lookup so you know to recheck the partition and retry.
>> 
>> Thoughts?
>> 
>> Thanks again,
>> Steven
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Turning on rack awareness

2017-06-06 Thread Neil Moore
I am practicing turning on rack awareness on a test cluster. With a cluster of 
3 nodes that currently does not use rack awareness (broker.rack is not set). 
During a rolling restart of the brokers where broker.rack was filled in one by 
one, on each node I got the error


[2017-06-06 15:41:08,780] ERROR kafka.admin.AdminOperationException: Not all 
brokers have rack information. Add --disable-rack-aware in command line to make 
replica assignment

without rack information.

at kafka.admin.AdminUtils$.getBrokerMetadatas(AdminUtils.scala:395)

at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:413)

at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:107)

at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)

at kafka.admin.TopicCommand.main(TopicCommand.scala)


Then Kafka client fail to connect to the brokers.


I do another rolling restart and because now all brokers know their rack it 
seems that things sort themselves out.


Wondering if anyone can recommend how I should turn on rack awareness without 
downtime? Is that possible?


Thanks,


Neil



Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-06 Thread Michael Noll
Happy to hear you found a working solution, Steven!

-Michael



On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> >
> > On Jun 2, 2017, at 3:32 PM, Matthias J. Sax 
> wrote:
> >
> > Thanks. That helps to understand the use case better.
> >
> > Rephrase to make sure I understood it correctly:
> >
> > 1) you are providing a custom partitioner to Streams that is base on one
> > field in your value (that's fine with regard to fault-tolerance :))
> > 2) you want to use interactive queries to query the store
> > 3) because of your custom partitioning schema, you need to manually
> > figure out the right application instance that hosts a key
> > 4) thus, you use a GlobalKTable to maintain the information from K to D
> > and thus to the partition ie, streams instance that hosts K
> >
> > If this is correct, than you cannot use the "by key" metadata interface.
> > It's designed to find the streams instance base in the key only -- but
> > your partitioner is based on the value. Internally, we call
> >
> >> final Integer partition = partitioner.partition(key, null,
> sourceTopicsInfo.maxPartitions);
> >
> > Note, that `value==null` -- at this point, we don't have any value
> > available and can't provide it to the partitioner.
> >
> > Thus, your approach to get all metadata is the only way you can go.
>
> Thanks for confirming this.  The code is a little ugly but I've done worse
> :)
>
> >
> >
> > Very interesting (and quite special) use case. :)
> >
> >
> > -Matthias
> >
> > On 6/2/17 2:32 PM, Steven Schlansker wrote:
> >>
> >>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax 
> wrote:
> >>>
> >>> I am not sure if I understand the use case correctly. Could you give
> >>> some more context?
> >>
> >> Happily, thanks for thinking about this!
> >>
> >>>
>  backing store whose partitioning is value dependent
> >>>
> >>> In infer that you are using a custom store and not default RocksDB? If
> >>> yes, what do you use? What does "value dependent" mean in this context?
> >>
> >> We're currently using the base in memory store.  We tried to use RocksDB
> >> but the tuning to get it running appropriately in a Linux container
> without
> >> tripping the cgroups OOM killer is nontrivial.
> >>
> >>
> >>> Right now, I am wondering, why you not just set a new key to get your
> >>> data grouped by the field you are interesting in? Also, if you don't
> >>> partitioned your data by key, you might break your streams application
> >>> with regard to fault-tolerance -- or does your custom store not rely on
> >>> changelog backup for fault-tolerance?
> >>>
> >>
> >> That's an interesting point about making transformed key.  But I don't
> think
> >> it simplifies my problem too much.  Essentially, I have a list of
> messages
> >> that should get delivered to destinations.  Each message has a primary
> key K
> >> and a destination D.
> >>
> >> We partition over D so that all messages to the same destination are
> handled by
> >> the same worker, to preserve ordering and implement local rate limits
> etc.
> >>
> >> I want to preserve the illusion to the client that they can look up a
> key with
> >> only K.  So, as an intermediate step, we use the GlobalKTable to look
> up D.  Once
> >> we have K,D we can then compute the partition and execute a lookup.
> >>
> >> Transforming the key to be a composite K,D isn't helpful because the
> end user still
> >> only knows K -- D's relevance is an implementation detail I wish to
> hide -- so you still
> >> need some sort of secondary lookup.
> >>
> >> We do use the changelog backup for fault tolerance -- how would having
> the partition
> >> based on the value break this?  Is the changelog implicitly partitioned
> by a partitioner
> >> other than the one we give to the topology?
> >>
> >> Hopefully that explains my situation a bit more?  Thanks!
> >>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
>  I have a KTable and backing store whose partitioning is value
> dependent.
>  I want certain groups of messages to be ordered and that grouping is
> determined
>  by one field (D) of the (possibly large) value.
> 
>  When I lookup by only K, obviously you don't know the partition it
> should be on.
>  So I will build a GlobalKTable of K -> D.  This gives me enough
> information
>  to determine the partition.
> 
>  Unfortunately, the KafkaStreams metadata API doesn't fit this use
> case well.
>  It allows you to either get all metadata, or by key -- but if you
> lookup by key
>  it just substitutes a null value (causing a downstream NPE)
> 
>  I can iterate over all metadata and compute the mapping of K -> K,D
> -> P
>  and then iterate over all metadata looking for P.  It's not difficult
> but ends
>  up being a bit of somewhat ugly code that feels like I shouldn't have
> to write it.
> 
>  Am I missing something here?  Is there a better way that I've
> m

Re: Reliably implementing global KeyValueStore#get

2017-06-06 Thread Eno Thereska
Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting null() 
the code will have to retry by refreshing the metadata and going to the new 
instance. If you don’t know beforehand if a key exists or not you might have to 
check all instances of a store to make sure. 

Eno


> On Jun 5, 2017, at 10:12 PM, Steven Schlansker  
> wrote:
> 
> Hi everyone, me again :)
> 
> I'm still trying to implement my "remoting" layer that allows
> my clients to see the partitioned Kafka Streams state
> regardless of which instance they hit.  Roughly, my lookup is:
> 
> Message get(Key key) {
>RemoteInstance instance = selectPartition(key);
>return instance.get(key); // http remoting
> }
> 
> RemoteInstance.get(Key key) { // http endpoint
>return readOnlyKeyValueStore.get(key);
> }
> 
> However, the mapping of partitions to instances may change.
> If you call KeyValueStore.get(K) where K is on a partition you
> don't own, it returns null.  This is indistinguishable from a
> successful get on a key that doesn't exist.
> 
> If one instance selects a sibling instance right as the partition is failing
> off of that instance, it may get routed there and by the time it gets
> the request no longer "owns" the partition -- returns a false 'null'.
> 
> You can try re-checking after you get a null value, but that's susceptible
> to the same race -- it's unlikely but possible that the data migrates *back*
> before you do this re-check.
> 
> Is there any way to correctly implement this without races?  I'd imagine
> you need a new primitive like KeyValueStore#get that atomically finds
> the key or throws an exception if it is not in an owned partition
> at the time of lookup so you know to recheck the partition and retry.
> 
> Thoughts?
> 
> Thanks again,
> Steven
>