Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Adrienne Kole
Hi Guozhang,

So, in this case I should know the max number of possible keys so that I
can create that number of partitions.

Thanks

Adrienne

On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang  wrote:

> By default the partitioner will use murmur hash on the key and mode on
> current num.partitions to determine which partitions to go to, so records
> with the same key will be assigned to the same partition. Would that be OK
> for your case?
>
>
> Guozhang
>
>
> On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole 
> wrote:
>
> > Hi,
> >
> > From Streams documentation, I can see that each Streams instance is
> > processing data independently (from other instances), reads from topic
> > partition(s) and writes to specified topic.
> >
> >
> > So here, the partitions of topic should be determined beforehand and
> should
> > remain static.
> > In my usecase I want to create partitioned/keyed (time) windows and
> > aggregate them.
> > I can partition the incoming data to specified topic's partitions and
> each
> > Stream instance can do windowed aggregations.
> >
> > However, if I don't know the number of possible keys (to partition), then
> > what should I do?
> >
> > Thanks
> > Adrienne
> >
>
>
>
> --
> -- Guozhang
>


0.10.1.0 RC0 release notes fix suggestion

2016-10-05 Thread Enrico Olivelli
Hi Jason,
I see in the release notes that this issue seems to be fixed, but is
marked as 'duplicate'
https://issues.apache.org/jira/browse/KAFKA-156

Maybe you can consider removing this kind of un-fixed issues from the
release notes.
In my case I'm waiting for that fix and I 'was' very happy to see in
resolved, but this is not the case

Thank you

-- Enrico



2016-10-05 1:13 GMT+02:00 Jason Gustafson :
> One clarification: this is a minor release, not a major one.
>
> -Jason
>
> On Tue, Oct 4, 2016 at 4:01 PM, Jason Gustafson  wrote:
>
>> Hello Kafka users, developers and client-developers,
>>
>> This is the first candidate for release of Apache Kafka 0.10.1.0. This is
>> a major release that includes great new features including throttled
>> replication, secure quotas, time-based log searching, and queryable state
>> for Kafka Streams. A full list of the content can be found here:
>> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1. Since
>> this is a major release, we will give people more time to try it out and
>> give feedback.
>>
>> Release notes for the 0.10.1.0 release:
>> http://home.apache.org/~jgus/kafka-0.10.1.0-rc0/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Monday, Oct 10, 9am PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> http://home.apache.org/~jgus/kafka-0.10.1.0-rc0/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/
>>
>> * Javadoc:
>> http://home.apache.org/~jgus/kafka-0.10.1.0-rc0/javadoc/
>>
>> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0 tag:
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>> b86130bad1a1a4a3d1dbe5c486977e6968b3ebc6
>>
>> * Documentation:
>> http://kafka.apache.org/0101/documentation.html
>>
>> * Protocol:
>> http://kafka.apache.org/0101/protocol.html
>>
>> Note that integration/system testing on Jenkins has been a major problem
>> this release cycle. In order to validate this RC, we need to get these
>> tests stable again. Any help we can get from the community will be greatly
>> appreciated.
>>
>> Thanks,
>>
>> Jason
>>


Kafka Streams windowed aggregation

2016-10-05 Thread Davood Rafiei
Hi,

I want to do windowed aggregation with streams library. However, I get the
output from particular operator immediately, independent of window size.
This makes sense for unlimited windows or sometimes for event time windows.
However, for ingestion time or processing time windows, users may want to
exact results (and in exact time) of windowed aggregation operator.
For example, if I have window of 4 minutes with 2 minutes slide, I would
expect to get an output once per 2 minutes. Otherwise I cannot know which
one of the outputted tuples from aggregator operator is the "right" that
contains aggregation result of whole window.
One solution for this, is using queryable state, but pulling states
regularly to get latest answers is not useful for my usecase.

So, is it on your roadmap to integrate purge/trigger mechanism to windowed
aggregates?

Thanks
Davood


Re: 0.10.1.0 RC0 release notes fix suggestion

2016-10-05 Thread Ismael Juma
Thanks Enrico. In general, we should avoid setting "Fix version" for
duplicate issues for the reason you state. I fixed the case you reported,
but I haven't checked if there are other ones.

Ismael

On Wed, Oct 5, 2016 at 8:35 AM, Enrico Olivelli  wrote:

> Hi Jason,
> I see in the release notes that this issue seems to be fixed, but is
> marked as 'duplicate'
> https://issues.apache.org/jira/browse/KAFKA-156
>
> Maybe you can consider removing this kind of un-fixed issues from the
> release notes.
> In my case I'm waiting for that fix and I 'was' very happy to see in
> resolved, but this is not the case
>
> Thank you
>
> -- Enrico
>
>
>
> 2016-10-05 1:13 GMT+02:00 Jason Gustafson :
> > One clarification: this is a minor release, not a major one.
> >
> > -Jason
> >
> > On Tue, Oct 4, 2016 at 4:01 PM, Jason Gustafson 
> wrote:
> >
> >> Hello Kafka users, developers and client-developers,
> >>
> >> This is the first candidate for release of Apache Kafka 0.10.1.0. This
> is
> >> a major release that includes great new features including throttled
> >> replication, secure quotas, time-based log searching, and queryable
> state
> >> for Kafka Streams. A full list of the content can be found here:
> >> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
> Since
> >> this is a major release, we will give people more time to try it out and
> >> give feedback.
> >>
> >> Release notes for the 0.10.1.0 release:
> >> http://home.apache.org/~jgus/kafka-0.10.1.0-rc0/RELEASE_NOTES.html
> >>
> >> *** Please download, test and vote by Monday, Oct 10, 9am PT
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the release:
> >> http://kafka.apache.org/KEYS
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >> http://home.apache.org/~jgus/kafka-0.10.1.0-rc0/
> >>
> >> * Maven artifacts to be voted upon:
> >> https://repository.apache.org/content/groups/staging/
> >>
> >> * Javadoc:
> >> http://home.apache.org/~jgus/kafka-0.10.1.0-rc0/javadoc/
> >>
> >> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0 tag:
> >> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> >> b86130bad1a1a4a3d1dbe5c486977e6968b3ebc6
> >>
> >> * Documentation:
> >> http://kafka.apache.org/0101/documentation.html
> >>
> >> * Protocol:
> >> http://kafka.apache.org/0101/protocol.html
> >>
> >> Note that integration/system testing on Jenkins has been a major problem
> >> this release cycle. In order to validate this RC, we need to get these
> >> tests stable again. Any help we can get from the community will be
> greatly
> >> appreciated.
> >>
> >> Thanks,
> >>
> >> Jason
> >>
>


Re: Consumer offsets reset for _all_ topics after increasing partitions for one topic

2016-10-05 Thread Juho Autio
Does anyone know about this? Altering topic partitions seems to reset
consumer offsets.

On Tue, Sep 27, 2016 at 1:28 PM, Juho Autio  wrote:

> I increased partitions for one existing topic (2->10), but was surprised
> to see that it entirely reset the committed offsets of my consumer group.
>
> All topics & partitions were reset to the earliest offset available, and
> the consumer read everything again.
>
> Documentation doesn't mention anything like this. Is this how it's
> supposed to work, or a bug?
>
> I would've expected the consumer offsets to not decrease at all,
> especially for the topics that I didn't even touch.
>
> For the altered topic I would've expected that consuming the previously
> existing partitions 0 and 1 would've continued from the position where they
> were, and naturally starting to read the new added partitions from 0.
>
> I added partitions according to the "Modifying topics" section of Kafka
> 0.10.0 Documentation:
>
> "To add partitions you can do
>
>  > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
> altered_topic --partitions 10
> "
>
> Previously this topic had 2 partitions.
>
> For the consumer I'm using:
> kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()
>
> And version is:
>
> org.apache.kafka
> kafka_2.11
> 0.10.0.1
>
> Kafka cluster itself is kafka_2.11-0.10.0.1.
>


Re: Kafka Streams windowed aggregation

2016-10-05 Thread Eno Thereska
Hi Davood,

The behaviour is indeed as you say. Recently we checked in KIP-63 in trunk (it 
will be part of the 0.10.1 release coming up). That should reduce the amount of 
downstream traffic you see 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams).
 However your app should still be prepared to receive multiple downstream 
records. The optimisation is such that many of them will be de-duped if they 
have the same key.

Triggers are something we are thinking about but we haven't decided to put it 
on the roadmap yet.

Thanks
Eno

> On 5 Oct 2016, at 08:55, Davood Rafiei  wrote:
> 
> Hi,
> 
> I want to do windowed aggregation with streams library. However, I get the
> output from particular operator immediately, independent of window size.
> This makes sense for unlimited windows or sometimes for event time windows.
> However, for ingestion time or processing time windows, users may want to
> exact results (and in exact time) of windowed aggregation operator.
> For example, if I have window of 4 minutes with 2 minutes slide, I would
> expect to get an output once per 2 minutes. Otherwise I cannot know which
> one of the outputted tuples from aggregator operator is the "right" that
> contains aggregation result of whole window.
> One solution for this, is using queryable state, but pulling states
> regularly to get latest answers is not useful for my usecase.
> 
> So, is it on your roadmap to integrate purge/trigger mechanism to windowed
> aggregates?
> 
> Thanks
> Davood



Re: kafka streams with dynamic content and filtering

2016-10-05 Thread Gary Ogden
Thanks Guozhang.

So there's no way we could also use InternalTopicManager to specify the
number of partitions and RF?

https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java



On 4 October 2016 at 19:25, Guozhang Wang  wrote:

> Hello Gary,
>
> What you described should be workable with the lower-level Processor
> interface of Kafka Streams, i.e. dynamic aggregations based on the input
> data indicating changes to the JSON schemas. For detailed examples of how
> the Processor API works please read the corresponding sections on the web
> docs:
>
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#processor-api
>
>
> Guozhang
>
> On Mon, Oct 3, 2016 at 6:51 AM, Gary Ogden  wrote:
>
> > I have a use case, and I'm wondering if it's possible to do this with
> > Kafka.
> >
> > Let's say we will have customers that will be uploading JSON to our
> system,
> > but that JSON layout will be different between each customer. They are
> able
> > to define the schema of the JSON being uploaded.
> >
> > They will then be able to define the fields in that JSON they want to
> > gather metrics on (sum, counts etc).
> >
> > Is there a way with Kafka streaming to dynamically read the configuration
> > for that customer and process the json and do counts and sums for the
> > fields they've defined.
> >
> > It's possible at any time they may want to modify the configuration for
> > their json as well. Stop counting one field, start counting another.
> >
> > They will also want to do some inferences as well. IE, if this particular
> > JSON is uploaded with a field in it, then check to see if another json
> was
> > uploaded within 8 hours.
> >
> > Is it possible for Kafka streaming to be this dynamic?
> >
>
>
>
> --
> -- Guozhang
>


Re: kafka streams with dynamic content and filtering

2016-10-05 Thread Gary Ogden
Sorry. I responded to the wrong message

On 5 October 2016 at 09:40, Gary Ogden  wrote:

> Thanks Guozhang.
>
> So there's no way we could also use InternalTopicManager to specify the
> number of partitions and RF?
>
> https://github.com/apache/kafka/blob/0.10.1/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> InternalTopicManager.java
>
>
>
> On 4 October 2016 at 19:25, Guozhang Wang  wrote:
>
>> Hello Gary,
>>
>> What you described should be workable with the lower-level Processor
>> interface of Kafka Streams, i.e. dynamic aggregations based on the input
>> data indicating changes to the JSON schemas. For detailed examples of how
>> the Processor API works please read the corresponding sections on the web
>> docs:
>>
>> http://docs.confluent.io/3.0.1/streams/developer-guide.html#processor-api
>>
>>
>> Guozhang
>>
>> On Mon, Oct 3, 2016 at 6:51 AM, Gary Ogden  wrote:
>>
>> > I have a use case, and I'm wondering if it's possible to do this with
>> > Kafka.
>> >
>> > Let's say we will have customers that will be uploading JSON to our
>> system,
>> > but that JSON layout will be different between each customer. They are
>> able
>> > to define the schema of the JSON being uploaded.
>> >
>> > They will then be able to define the fields in that JSON they want to
>> > gather metrics on (sum, counts etc).
>> >
>> > Is there a way with Kafka streaming to dynamically read the
>> configuration
>> > for that customer and process the json and do counts and sums for the
>> > fields they've defined.
>> >
>> > It's possible at any time they may want to modify the configuration for
>> > their json as well. Stop counting one field, start counting another.
>> >
>> > They will also want to do some inferences as well. IE, if this
>> particular
>> > JSON is uploaded with a field in it, then check to see if another json
>> was
>> > uploaded within 8 hours.
>> >
>> > Is it possible for Kafka streaming to be this dynamic?
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Re: kafka stream to new topic based on message key

2016-10-05 Thread Gary Ogden
Thanks Guozhang.

So there's no way we could also use InternalTopicManager to specify the
number of partitions and RF?

https://github.com/apache/kafka/blob/0.10.1/streams/src/
main/java/org/apache/kafka/streams/processor/internals/
InternalTopicManager.java

On 4 October 2016 at 19:34, Guozhang Wang  wrote:

> Hello Gary,
>
> This is also doable in the Processor API, you can use the record collector
> from ProcessorContext to send data to arbitrary topics, i.e.:
>
> RecordCollector collector = ((RecordCollector.Supplier)
> context).recordCollector();
> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
> valSerializer, partitioner);
>
>
> But note that if the new topic, e.g. "123456_lv2" is not created, then
> the send call will thrown an exception unless the borker-side config
> "auto.topic.create.enabled" is set to true; and even in this case, the
> topic will be auto-created with the pre-defined number of partitions,
> i.e. you cannot control how the topics can be created with what
> configs such as compaction policy, num.partitions, segment sizes, etc.
> If that works for you then I think it should be fine.
>
>
> Guozhang
>
>
>
> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden  wrote:
>
> > Is it possible, in a kafka streaming job, to write to another topic based
> > on the key in the messages?
> >
> > For example, say the message is:
> >
> > 123456#{"id":56789,"type":1}
> >
> > where the key is 123456, # is the delimeter, and the {} is the json data.
> >
> > And I want to push the json data to another topic that will have the name
> > 123456_lv2.
> >
> > Is this possible with kafka streaming?
> >
>
>
>
> --
> -- Guozhang
>


Re: kafka stream to new topic based on message key

2016-10-05 Thread Gary Ogden
What if we were to use kafka connect instead of streams? Does it have the
ability to specify partitions, rf, segment size etc?

On 5 October 2016 at 09:42, Gary Ogden  wrote:

> Thanks Guozhang.
>
> So there's no way we could also use InternalTopicManager to specify the
> number of partitions and RF?
>
> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> /java/org/apache/kafka/streams/processor/internals/InternalT
> opicManager.java
>
> On 4 October 2016 at 19:34, Guozhang Wang  wrote:
>
>> Hello Gary,
>>
>> This is also doable in the Processor API, you can use the record collector
>> from ProcessorContext to send data to arbitrary topics, i.e.:
>>
>> RecordCollector collector = ((RecordCollector.Supplier)
>> context).recordCollector();
>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
>> valSerializer, partitioner);
>>
>>
>> But note that if the new topic, e.g. "123456_lv2" is not created, then
>> the send call will thrown an exception unless the borker-side config
>> "auto.topic.create.enabled" is set to true; and even in this case, the
>> topic will be auto-created with the pre-defined number of partitions,
>> i.e. you cannot control how the topics can be created with what
>> configs such as compaction policy, num.partitions, segment sizes, etc.
>> If that works for you then I think it should be fine.
>>
>>
>> Guozhang
>>
>>
>>
>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden  wrote:
>>
>> > Is it possible, in a kafka streaming job, to write to another topic
>> based
>> > on the key in the messages?
>> >
>> > For example, say the message is:
>> >
>> > 123456#{"id":56789,"type":1}
>> >
>> > where the key is 123456, # is the delimeter, and the {} is the json
>> data.
>> >
>> > And I want to push the json data to another topic that will have the
>> name
>> > 123456_lv2.
>> >
>> > Is this possible with kafka streaming?
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


difficulty to delete a topic because of its syntax

2016-10-05 Thread Hamza HACHANI
Hi,

I created a topic called device-connection-invert-key-value-the 
metric-changelog.

I insit that there is a space in it.



Now that i want to delete it because my  cluster can no longer work correctly i 
can't do it as it  only reads the first part of it : ( 
device-connection-invert-key-value-the) which obviously it doesn't find.

Does some body have a wolution to delete it ?

Thanks in advance.


Hamza



Re: difficulty to delete a topic because of its syntax

2016-10-05 Thread Ali Akhtar
I don't see a space in that topic name

On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
wrote:

> Hi,
>
> I created a topic called device-connection-invert-key-value-the
> metric-changelog.
>
> I insit that there is a space in it.
>
>
>
> Now that i want to delete it because my  cluster can no longer work
> correctly i can't do it as it  only reads the first part of it : (
> device-connection-invert-key-value-the) which obviously it doesn't find.
>
> Does some body have a wolution to delete it ?
>
> Thanks in advance.
>
>
> Hamza
>
>


RE: difficulty to delete a topic because of its syntax

2016-10-05 Thread Hamza HACHANI
It's between "the" and "metric"


De : Ali Akhtar 
Envoyé : mercredi 5 octobre 2016 02:16:33
À : users@kafka.apache.org
Objet : Re: difficulty to delete a topic because of its syntax

I don't see a space in that topic name

On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
wrote:

> Hi,
>
> I created a topic called device-connection-invert-key-value-the
> metric-changelog.
>
> I insit that there is a space in it.
>
>
>
> Now that i want to delete it because my  cluster can no longer work
> correctly i can't do it as it  only reads the first part of it : (
> device-connection-invert-key-value-the) which obviously it doesn't find.
>
> Does some body have a wolution to delete it ?
>
> Thanks in advance.
>
>
> Hamza
>
>


Support for Kafka

2016-10-05 Thread Syed Hussaini
Dear Kafka team.
I am in the Implementation stage of Kafka cluster and looking to find out 
does Apache Kafka supported for Ubuntu 16.04 LTS - Xenial.

Would be great if you please let us know.


[The Exchange Lab]

Syed Hussaini
Infrastructure Engineer

1 Neathouse Place
5th Floor
London, England, SW1V 1LH


syed.hussa...@theexchangelab.com

T 0203 701 3177





Follow us on Twitter: @exchangelab | Visit us 
on LinkedIn: The Exchange Lab





kafka new consumer is ready for production setup?

2016-10-05 Thread Gaurav Shaha
Hi,

I want to use kafka new consumer. But in the documentation of 0.10.0
version, it is mentioned that "The code is considered beta quality."
I am using the latest version which is 0.10.1. I am not able to find
out any documentation for this version.

Can you please tell me, if the new consumer is ready for production
use or it is still in the beta version?

-- 
Thanks and Regards,
Gaurav Shaha


Re: difficulty to delete a topic because of its syntax

2016-10-05 Thread Ben Davison
Try putting "" or '' around the string when running the command.

On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI 
wrote:

> It's between "the" and "metric"
>
> 
> De : Ali Akhtar 
> Envoyé : mercredi 5 octobre 2016 02:16:33
> À : users@kafka.apache.org
> Objet : Re: difficulty to delete a topic because of its syntax
>
> I don't see a space in that topic name
>
> On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
> wrote:
>
> > Hi,
> >
> > I created a topic called device-connection-invert-key-value-the
> > metric-changelog.
> >
> > I insit that there is a space in it.
> >
> >
> >
> > Now that i want to delete it because my  cluster can no longer work
> > correctly i can't do it as it  only reads the first part of it : (
> > device-connection-invert-key-value-the) which obviously it doesn't find.
> >
> > Does some body have a wolution to delete it ?
> >
> > Thanks in advance.
> >
> >
> > Hamza
> >
> >
>

-- 


This email, including attachments, is private and confidential. If you have 
received this email in error please notify the sender and delete it from 
your system. Emails are not secure and may contain viruses. No liability 
can be accepted for viruses that might be transferred by this email or any 
attachment. Any unauthorised copying of this message or unauthorised 
distribution and publication of the information contained herein are 
prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in England and Wales. Registered No. 04843573.


RE: difficulty to delete a topic because of its syntax

2016-10-05 Thread Hamza HACHANI
Well ackwardly when i list the topics i find it but when i do delete it it says 
that this topic does not exist.


De : Ben Davison 
Envoyé : mercredi 5 octobre 2016 02:37:14
À : users@kafka.apache.org
Objet : Re: difficulty to delete a topic because of its syntax

Try putting "" or '' around the string when running the command.

On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI 
wrote:

> It's between "the" and "metric"
>
> 
> De : Ali Akhtar 
> Envoyé : mercredi 5 octobre 2016 02:16:33
> À : users@kafka.apache.org
> Objet : Re: difficulty to delete a topic because of its syntax
>
> I don't see a space in that topic name
>
> On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
> wrote:
>
> > Hi,
> >
> > I created a topic called device-connection-invert-key-value-the
> > metric-changelog.
> >
> > I insit that there is a space in it.
> >
> >
> >
> > Now that i want to delete it because my  cluster can no longer work
> > correctly i can't do it as it  only reads the first part of it : (
> > device-connection-invert-key-value-the) which obviously it doesn't find.
> >
> > Does some body have a wolution to delete it ?
> >
> > Thanks in advance.
> >
> >
> > Hamza
> >
> >
>

--


This email, including attachments, is private and confidential. If you have
received this email in error please notify the sender and delete it from
your system. Emails are not secure and may contain viruses. No liability
can be accepted for viruses that might be transferred by this email or any
attachment. Any unauthorised copying of this message or unauthorised
distribution and publication of the information contained herein are
prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in England and Wales. Registered No. 04843573.


Re: kafka stream to new topic based on message key

2016-10-05 Thread Gary Ogden
Guozhang. I was just looking at the source for this, and it looks like the
RecordCollector.Supplier is part of the internal ProcessorContextImpl
class.  I don't think that's exposed to me, is it?

If I create a processor class that extends AbstractProcess, it only has
access to the ProcessorContext interface, which doesn't expose the
Supplier.

On 5 October 2016 at 09:42, Gary Ogden  wrote:

> What if we were to use kafka connect instead of streams? Does it have the
> ability to specify partitions, rf, segment size etc?
>
> On 5 October 2016 at 09:42, Gary Ogden  wrote:
>
>> Thanks Guozhang.
>>
>> So there's no way we could also use InternalTopicManager to specify the
>> number of partitions and RF?
>>
>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
>> /java/org/apache/kafka/streams/processor/internals/InternalT
>> opicManager.java
>>
>> On 4 October 2016 at 19:34, Guozhang Wang  wrote:
>>
>>> Hello Gary,
>>>
>>> This is also doable in the Processor API, you can use the record
>>> collector
>>> from ProcessorContext to send data to arbitrary topics, i.e.:
>>>
>>> RecordCollector collector = ((RecordCollector.Supplier)
>>> context).recordCollector();
>>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
>>> valSerializer, partitioner);
>>>
>>>
>>> But note that if the new topic, e.g. "123456_lv2" is not created, then
>>> the send call will thrown an exception unless the borker-side config
>>> "auto.topic.create.enabled" is set to true; and even in this case, the
>>> topic will be auto-created with the pre-defined number of partitions,
>>> i.e. you cannot control how the topics can be created with what
>>> configs such as compaction policy, num.partitions, segment sizes, etc.
>>> If that works for you then I think it should be fine.
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden  wrote:
>>>
>>> > Is it possible, in a kafka streaming job, to write to another topic
>>> based
>>> > on the key in the messages?
>>> >
>>> > For example, say the message is:
>>> >
>>> > 123456#{"id":56789,"type":1}
>>> >
>>> > where the key is 123456, # is the delimeter, and the {} is the json
>>> data.
>>> >
>>> > And I want to push the json data to another topic that will have the
>>> name
>>> > 123456_lv2.
>>> >
>>> > Is this possible with kafka streaming?
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>


Re: difficulty to delete a topic because of its syntax

2016-10-05 Thread Manikumar
Kafka doesn't support white spaces in topic names.  Only support '.', '_'
and '-' these are allowed.
Not sure how you got white space in topic name.

On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI 
wrote:

> Well ackwardly when i list the topics i find it but when i do delete it it
> says that this topic does not exist.
>
> 
> De : Ben Davison 
> Envoyé : mercredi 5 octobre 2016 02:37:14
> À : users@kafka.apache.org
> Objet : Re: difficulty to delete a topic because of its syntax
>
> Try putting "" or '' around the string when running the command.
>
> On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI 
> wrote:
>
> > It's between "the" and "metric"
> >
> > 
> > De : Ali Akhtar 
> > Envoyé : mercredi 5 octobre 2016 02:16:33
> > À : users@kafka.apache.org
> > Objet : Re: difficulty to delete a topic because of its syntax
> >
> > I don't see a space in that topic name
> >
> > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
> > wrote:
> >
> > > Hi,
> > >
> > > I created a topic called device-connection-invert-key-value-the
> > > metric-changelog.
> > >
> > > I insit that there is a space in it.
> > >
> > >
> > >
> > > Now that i want to delete it because my  cluster can no longer work
> > > correctly i can't do it as it  only reads the first part of it : (
> > > device-connection-invert-key-value-the) which obviously it doesn't
> find.
> > >
> > > Does some body have a wolution to delete it ?
> > >
> > > Thanks in advance.
> > >
> > >
> > > Hamza
> > >
> > >
> >
>
> --
>
>
> This email, including attachments, is private and confidential. If you have
> received this email in error please notify the sender and delete it from
> your system. Emails are not secure and may contain viruses. No liability
> can be accepted for viruses that might be transferred by this email or any
> attachment. Any unauthorised copying of this message or unauthorised
> distribution and publication of the information contained herein are
> prohibited.
>
> 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
> Registered in England and Wales. Registered No. 04843573.
>


Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Michael Noll
> So, in this case I should know the max number of possible keys so that
> I can create that number of partitions.

Assuming I understand your original question correctly, then you would not
need to do/know this.  Rather, pick the number of partitions in a way that
matches your needs to process the data in parallel (e.g. if you expect that
you require 10 machines in order to process the incoming data, then you'd
need 10 partitions).  Also, as a general recommendation:  It's often a good
idea to over-partition your topics.  For example, even if today 10 machines
(and thus 10 partitions) would be sufficient, pick a higher number of
partitions (say, 50) so you have some wiggle room to add more machines
(11...50) later if need be.



On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole 
wrote:

> Hi Guozhang,
>
> So, in this case I should know the max number of possible keys so that I
> can create that number of partitions.
>
> Thanks
>
> Adrienne
>
> On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang  wrote:
>
> > By default the partitioner will use murmur hash on the key and mode on
> > current num.partitions to determine which partitions to go to, so records
> > with the same key will be assigned to the same partition. Would that be
> OK
> > for your case?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole 
> > wrote:
> >
> > > Hi,
> > >
> > > From Streams documentation, I can see that each Streams instance is
> > > processing data independently (from other instances), reads from topic
> > > partition(s) and writes to specified topic.
> > >
> > >
> > > So here, the partitions of topic should be determined beforehand and
> > should
> > > remain static.
> > > In my usecase I want to create partitioned/keyed (time) windows and
> > > aggregate them.
> > > I can partition the incoming data to specified topic's partitions and
> > each
> > > Stream instance can do windowed aggregations.
> > >
> > > However, if I don't know the number of possible keys (to partition),
> then
> > > what should I do?
> > >
> > > Thanks
> > > Adrienne
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Ali Akhtar
> It's often a good
idea to over-partition your topics.  For example, even if today 10 machines
(and thus 10 partitions) would be sufficient, pick a higher number of
partitions (say, 50) so you have some wiggle room to add more machines
(11...50) later if need be.

If you create e.g 30 partitions, but only have e.g 5 instances of your
program, all on the same consumer group, all using kafka streams to consume
the topic, do you still receive all the data posted to the topic, or will
you need to have the same instances of the program as there are partitions?

(If you have 1 instance, 30 partitions, will the same rules apply, i.e it
will receive all data?)

On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll  wrote:

> > So, in this case I should know the max number of possible keys so that
> > I can create that number of partitions.
>
> Assuming I understand your original question correctly, then you would not
> need to do/know this.  Rather, pick the number of partitions in a way that
> matches your needs to process the data in parallel (e.g. if you expect that
> you require 10 machines in order to process the incoming data, then you'd
> need 10 partitions).  Also, as a general recommendation:  It's often a good
> idea to over-partition your topics.  For example, even if today 10 machines
> (and thus 10 partitions) would be sufficient, pick a higher number of
> partitions (say, 50) so you have some wiggle room to add more machines
> (11...50) later if need be.
>
>
>
> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole 
> wrote:
>
> > Hi Guozhang,
> >
> > So, in this case I should know the max number of possible keys so that I
> > can create that number of partitions.
> >
> > Thanks
> >
> > Adrienne
> >
> > On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang 
> wrote:
> >
> > > By default the partitioner will use murmur hash on the key and mode on
> > > current num.partitions to determine which partitions to go to, so
> records
> > > with the same key will be assigned to the same partition. Would that be
> > OK
> > > for your case?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole  >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > From Streams documentation, I can see that each Streams instance is
> > > > processing data independently (from other instances), reads from
> topic
> > > > partition(s) and writes to specified topic.
> > > >
> > > >
> > > > So here, the partitions of topic should be determined beforehand and
> > > should
> > > > remain static.
> > > > In my usecase I want to create partitioned/keyed (time) windows and
> > > > aggregate them.
> > > > I can partition the incoming data to specified topic's partitions and
> > > each
> > > > Stream instance can do windowed aggregations.
> > > >
> > > > However, if I don't know the number of possible keys (to partition),
> > then
> > > > what should I do?
> > > >
> > > > Thanks
> > > > Adrienne
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Kafka 0.8.1.1 and JDK1.8

2016-10-05 Thread Vijayanand Rengarajan
Team,
Currently we have kafka 0.8.1.1 running with Idk 1.6. Wanted to verify the 
compatibility of JDK 1.8 with kafka version 0.8.1.1.
is Kafka 0.8.1.1 is compatible with jdk1.8, or any other patches have to be 
applied before upgrading idk 1.6 to idk 1.8.
NOTE: along with upgrade to jdk1.8 , we are planning to use G1 GC collector 
also.
Thanks, Vijayanand.

Re: difficulty to delete a topic because of its syntax

2016-10-05 Thread Todd S
You *could* go in to zookeeper and nuke the topic, then delete the files on
disk

Slightly more risky but it should work

On Wednesday, 5 October 2016, Manikumar  wrote:

> Kafka doesn't support white spaces in topic names.  Only support '.', '_'
> and '-' these are allowed.
> Not sure how you got white space in topic name.
>
> On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI  >
> wrote:
>
> > Well ackwardly when i list the topics i find it but when i do delete it
> it
> > says that this topic does not exist.
> >
> > 
> > De : Ben Davison >
> > Envoyé : mercredi 5 octobre 2016 02:37:14
> > À : users@kafka.apache.org 
> > Objet : Re: difficulty to delete a topic because of its syntax
> >
> > Try putting "" or '' around the string when running the command.
> >
> > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI  >
> > wrote:
> >
> > > It's between "the" and "metric"
> > >
> > > 
> > > De : Ali Akhtar >
> > > Envoyé : mercredi 5 octobre 2016 02:16:33
> > > À : users@kafka.apache.org 
> > > Objet : Re: difficulty to delete a topic because of its syntax
> > >
> > > I don't see a space in that topic name
> > >
> > > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI  >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I created a topic called device-connection-invert-key-value-the
> > > > metric-changelog.
> > > >
> > > > I insit that there is a space in it.
> > > >
> > > >
> > > >
> > > > Now that i want to delete it because my  cluster can no longer work
> > > > correctly i can't do it as it  only reads the first part of it : (
> > > > device-connection-invert-key-value-the) which obviously it doesn't
> > find.
> > > >
> > > > Does some body have a wolution to delete it ?
> > > >
> > > > Thanks in advance.
> > > >
> > > >
> > > > Hamza
> > > >
> > > >
> > >
> >
> > --
> >
> >
> > This email, including attachments, is private and confidential. If you
> have
> > received this email in error please notify the sender and delete it from
> > your system. Emails are not secure and may contain viruses. No liability
> > can be accepted for viruses that might be transferred by this email or
> any
> > attachment. Any unauthorised copying of this message or unauthorised
> > distribution and publication of the information contained herein are
> > prohibited.
> >
> > 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
> > Registered in England and Wales. Registered No. 04843573.
> >
>


Re: kafka stream to new topic based on message key

2016-10-05 Thread Guozhang Wang
Hello Gary,


1. The InternalTopicManager is only used by the Streams-instantiated
PartitionAssignor to create internal topics for auto-repartitioning and
changelog.

2. About "RecordCollector.Supplier": you are right, and as I wrote in the
above email you have to force casting it to RecordCollector.Supplier,
theoretically this is not safe but the internal Impl is always used.


If you know before hand of all the possible topics that you would want to
send based on the key-value pair, you can then use KStreams.branch() to
branch the source stream into multiple ones based on the content, with each
branched stream to a different topic.


Guozhang


On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden  wrote:

> Guozhang. I was just looking at the source for this, and it looks like the
> RecordCollector.Supplier is part of the internal ProcessorContextImpl
> class.  I don't think that's exposed to me, is it?
>
> If I create a processor class that extends AbstractProcess, it only has
> access to the ProcessorContext interface, which doesn't expose the
> Supplier.
>
> On 5 October 2016 at 09:42, Gary Ogden  wrote:
>
> > What if we were to use kafka connect instead of streams? Does it have the
> > ability to specify partitions, rf, segment size etc?
> >
> > On 5 October 2016 at 09:42, Gary Ogden  wrote:
> >
> >> Thanks Guozhang.
> >>
> >> So there's no way we could also use InternalTopicManager to specify the
> >> number of partitions and RF?
> >>
> >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> >> /java/org/apache/kafka/streams/processor/internals/InternalT
> >> opicManager.java
> >>
> >> On 4 October 2016 at 19:34, Guozhang Wang  wrote:
> >>
> >>> Hello Gary,
> >>>
> >>> This is also doable in the Processor API, you can use the record
> >>> collector
> >>> from ProcessorContext to send data to arbitrary topics, i.e.:
> >>>
> >>> RecordCollector collector = ((RecordCollector.Supplier)
> >>> context).recordCollector();
> >>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
> >>> valSerializer, partitioner);
> >>>
> >>>
> >>> But note that if the new topic, e.g. "123456_lv2" is not created, then
> >>> the send call will thrown an exception unless the borker-side config
> >>> "auto.topic.create.enabled" is set to true; and even in this case, the
> >>> topic will be auto-created with the pre-defined number of partitions,
> >>> i.e. you cannot control how the topics can be created with what
> >>> configs such as compaction policy, num.partitions, segment sizes, etc.
> >>> If that works for you then I think it should be fine.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden  wrote:
> >>>
> >>> > Is it possible, in a kafka streaming job, to write to another topic
> >>> based
> >>> > on the key in the messages?
> >>> >
> >>> > For example, say the message is:
> >>> >
> >>> > 123456#{"id":56789,"type":1}
> >>> >
> >>> > where the key is 123456, # is the delimeter, and the {} is the json
> >>> data.
> >>> >
> >>> > And I want to push the json data to another topic that will have the
> >>> name
> >>> > 123456_lv2.
> >>> >
> >>> > Is this possible with kafka streaming?
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >
>



-- 
-- Guozhang


Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Adrienne Kole
Hi,

@Ali IMO, Yes. That is the job of kafka server to assign kafka
instances partition(s) to process. Each instance can process more than one
partition but one partition cannot be processed by more than one instance.

@Michael, Thanks for reply.
>Rather, pick the number of partitions in a way that matches your needs to
process the data in parallel
I think this should be ' pick number of partitions that matches max number
of possible keys in stream to be partitioned '.
At least in my usecase , in which I am trying to partition stream by key
and make windowed aggregations, if there are less number of topic
partitions than possible keys,  then application will not work correctly.

That is, if the number of topic partitions is less than possible stream
keys, then different keyed stream tuples will be assigned to same topic.
That was the problem that I was trying to solve and it seems the only
solution is to estimate max number of possible keys and assign accordingly.

Thanks
Adrienne





On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar  wrote:

> > It's often a good
> idea to over-partition your topics.  For example, even if today 10 machines
> (and thus 10 partitions) would be sufficient, pick a higher number of
> partitions (say, 50) so you have some wiggle room to add more machines
> (11...50) later if need be.
>
> If you create e.g 30 partitions, but only have e.g 5 instances of your
> program, all on the same consumer group, all using kafka streams to consume
> the topic, do you still receive all the data posted to the topic, or will
> you need to have the same instances of the program as there are partitions?
>
> (If you have 1 instance, 30 partitions, will the same rules apply, i.e it
> will receive all data?)
>
> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll  wrote:
>
> > > So, in this case I should know the max number of possible keys so that
> > > I can create that number of partitions.
> >
> > Assuming I understand your original question correctly, then you would
> not
> > need to do/know this.  Rather, pick the number of partitions in a way
> that
> > matches your needs to process the data in parallel (e.g. if you expect
> that
> > you require 10 machines in order to process the incoming data, then you'd
> > need 10 partitions).  Also, as a general recommendation:  It's often a
> good
> > idea to over-partition your topics.  For example, even if today 10
> machines
> > (and thus 10 partitions) would be sufficient, pick a higher number of
> > partitions (say, 50) so you have some wiggle room to add more machines
> > (11...50) later if need be.
> >
> >
> >
> > On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole 
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > So, in this case I should know the max number of possible keys so that
> I
> > > can create that number of partitions.
> > >
> > > Thanks
> > >
> > > Adrienne
> > >
> > > On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang 
> > wrote:
> > >
> > > > By default the partitioner will use murmur hash on the key and mode
> on
> > > > current num.partitions to determine which partitions to go to, so
> > records
> > > > with the same key will be assigned to the same partition. Would that
> be
> > > OK
> > > > for your case?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <
> adrienneko...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > From Streams documentation, I can see that each Streams instance is
> > > > > processing data independently (from other instances), reads from
> > topic
> > > > > partition(s) and writes to specified topic.
> > > > >
> > > > >
> > > > > So here, the partitions of topic should be determined beforehand
> and
> > > > should
> > > > > remain static.
> > > > > In my usecase I want to create partitioned/keyed (time) windows and
> > > > > aggregate them.
> > > > > I can partition the incoming data to specified topic's partitions
> and
> > > > each
> > > > > Stream instance can do windowed aggregations.
> > > > >
> > > > > However, if I don't know the number of possible keys (to
> partition),
> > > then
> > > > > what should I do?
> > > > >
> > > > > Thanks
> > > > > Adrienne
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Hi,

even if you have more distinct keys than partitions (ie, different key
go to the same partition), if you do "aggregate by key" Streams will
automatically separate the keys and compute an aggregate per key.
Thus, you do not need to worry about which keys is hashed to what
partition.

- -Matthias

On 10/5/16 1:37 PM, Adrienne Kole wrote:
> Hi,
> 
> @Ali IMO, Yes. That is the job of kafka server to assign kafka 
> instances partition(s) to process. Each instance can process more
> than one partition but one partition cannot be processed by more
> than one instance.
> 
> @Michael, Thanks for reply.
>> Rather, pick the number of partitions in a way that matches your
>> needs to
> process the data in parallel I think this should be ' pick number
> of partitions that matches max number of possible keys in stream to
> be partitioned '. At least in my usecase , in which I am trying to
> partition stream by key and make windowed aggregations, if there
> are less number of topic partitions than possible keys,  then
> application will not work correctly.
> 
> That is, if the number of topic partitions is less than possible
> stream keys, then different keyed stream tuples will be assigned to
> same topic. That was the problem that I was trying to solve and it
> seems the only solution is to estimate max number of possible keys
> and assign accordingly.
> 
> Thanks Adrienne
> 
> 
> 
> 
> 
> On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar 
> wrote:
> 
>>> It's often a good
>> idea to over-partition your topics.  For example, even if today
>> 10 machines (and thus 10 partitions) would be sufficient, pick a
>> higher number of partitions (say, 50) so you have some wiggle
>> room to add more machines (11...50) later if need be.
>> 
>> If you create e.g 30 partitions, but only have e.g 5 instances of
>> your program, all on the same consumer group, all using kafka
>> streams to consume the topic, do you still receive all the data
>> posted to the topic, or will you need to have the same instances
>> of the program as there are partitions?
>> 
>> (If you have 1 instance, 30 partitions, will the same rules
>> apply, i.e it will receive all data?)
>> 
>> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll
>>  wrote:
>> 
 So, in this case I should know the max number of possible
 keys so that I can create that number of partitions.
>>> 
>>> Assuming I understand your original question correctly, then
>>> you would
>> not
>>> need to do/know this.  Rather, pick the number of partitions in
>>> a way
>> that
>>> matches your needs to process the data in parallel (e.g. if you
>>> expect
>> that
>>> you require 10 machines in order to process the incoming data,
>>> then you'd need 10 partitions).  Also, as a general
>>> recommendation:  It's often a
>> good
>>> idea to over-partition your topics.  For example, even if today
>>> 10
>> machines
>>> (and thus 10 partitions) would be sufficient, pick a higher
>>> number of partitions (say, 50) so you have some wiggle room to
>>> add more machines (11...50) later if need be.
>>> 
>>> 
>>> 
>>> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole
>>>  wrote:
>>> 
 Hi Guozhang,
 
 So, in this case I should know the max number of possible
 keys so that
>> I
 can create that number of partitions.
 
 Thanks
 
 Adrienne
 
 On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang
 
>>> wrote:
 
> By default the partitioner will use murmur hash on the key
> and mode
>> on
> current num.partitions to determine which partitions to go
> to, so
>>> records
> with the same key will be assigned to the same partition.
> Would that
>> be
 OK
> for your case?
> 
> 
> Guozhang
> 
> 
> On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <
>> adrienneko...@gmail.com
 
> wrote:
> 
>> Hi,
>> 
>> From Streams documentation, I can see that each Streams
>> instance is processing data independently (from other
>> instances), reads from
>>> topic
>> partition(s) and writes to specified topic.
>> 
>> 
>> So here, the partitions of topic should be determined
>> beforehand
>> and
> should
>> remain static. In my usecase I want to create
>> partitioned/keyed (time) windows and aggregate them. I
>> can partition the incoming data to specified topic's
>> partitions
>> and
> each
>> Stream instance can do windowed aggregations.
>> 
>> However, if I don't know the number of possible keys (to
>> partition),
 then
>> what should I do?
>> 
>> Thanks Adrienne
>> 
> 
> 
> 
> -- -- Guozhang
> 
 
>>> 
>> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9Wl2AAoJECnhiMLycopPVNQQAJnLVIEFTWRdUY41jLEjHEdJ
Nwqk/M/VrZ3/s8BR9+XKKN+lTd+lQaBFgQUxyae18kIchnEe5r+QB+PoDB4IkTV8
zS6XhDTr7RwiHdhykGK9bKxhF/0gAiQ4qFu8iBlmwTfH3mOSDgY76z4/wQ

Re: Kafka consumer configuration / performance issues

2016-10-05 Thread Shamik Banerjee
Sorry to bump this up, can anyone provide some input on this ? I need to make a 
call soon whether kafka is a good fit to our requirement.



On Tuesday, October 4, 2016 8:57 PM, Shamik Banerjee 
 wrote:
Hi,

  I'm a newbie trying out kafka as an alternative to AWS SQS. The motivation 
primarily is to improve performance where kafka would eliminate the constraint 
of pulling 10 messages at a time with a cap of 256kb. Here's a high-level 
scenario of my use case. I've a bunch of crawlers which are sending documents 
for indexing. The size of the payload is around 1 mb on average. The crawlers 
call a SOAP end-point which in turn runs a producer code to submit the messages 
to a kafka queue. The consumer app picks up the messages and processes them. 
For my test box, I've configured the topic with 30 partitions with 2 
replication. The two kafka instances are running with 1 zookeeper instance. The 
kafka version is 0.10.0.

For my testing, I published 7 million messages in the queue. I created a 
consumer group with 30 consumer thread , one per partition. I was initially 
under the impression that this would substantially speed up the processing 
power compared to what I was getting via SQS. Unfortunately, that was not to be 
the case. In my case, the processing of data is complex and takes up 1-2 
minutes on average to complete.That lead to a flurry of partition rebalancing 
as the threads were not able to heartbeat on time. I could see a bunch of 
messages in the log citing "Auto offset commit failed for group full_group: 
Commit cannot be completed since the group has already rebalanced and assigned 
the partitions to another member. This means that the time between subsequent 
calls to poll() was longer than the configured session.timeout.ms, which 
typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in the poll() with 
max.poll.records." This lead to the same message being processed multiple 
times. I tried playing around with session timeout, max.poll.records and poll 
time to avoid this, but that slowed down the overall processing bigtime. Here's 
some of the configuration parameter:

metadata.max.age.ms = 30 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] 
enable.auto.commit = true 
max.poll.records = 1 
request.timeout.ms = 31 
heartbeat.interval.ms = 10 
auto.commit.interval.ms = 1000 
receive.buffer.bytes = 65536 
fetch.min.bytes = 1 
send.buffer.bytes = 131072 
value.deserializer = class 
com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer 
group.id = full_group 
retry.backoff.ms = 100 
fetch.max.wait.ms = 500 
connections.max.idle.ms = 54 
session.timeout.ms = 30 
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer 
metrics.sample.window.ms = 3 
auto.offset.reset = latest

I reduced the consumer poll time to 100 ms. It reduced the rebalancing issues, 
eliminated duplicate processing but slowed down the overall process 
significantly. It ended up taking 35 hours to complete processing all 6 million 
messages compared to 25 hours using the SQS based solution. Each consumer 
thread on average retrieved 50-60 messages per poll, though some of them polled 
0 records at times. I'm not sure about this behavior when there are a huge 
amount messages available in the partition. The same thread was able to pick up 
messages during the subsequent iteration. Could this be due to rebalancing ? 

Here's my consumer code:

while (true) { 
try{ 
ConsumerRecords records = 
consumer.poll(100); 
for (ConsumerRecord record : records) { 
if(record.value()!=null){ 
TextAnalysisRequest textAnalysisObj = record.value(); 
if(textAnalysisObj!=null){ 
// Process record
PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
}
} 
} 
}catch(Exception ex){ 
LOGGER.error("Error in Full Consumer group worker", ex); 
}

I understanding that record processing part is one bottleneck in my case. But 
I'm sure a few folks here have a similar use case of dealing with large 
processing time. I thought of doing an async processing by spinning each 
processor in it's dedicated thread or use a thread pool with large capacity, 
but not sure if it would create a big load in the system. At the same time, 
I've seen a couple of instances where people have used pause and resume API to 
perform the processing in order to avoid rebalancing issue.

I'm really looking for some advice / best practice in this circumstance. 
Particularly, the recommended configuration setting around hearbeat, request 
timeout, max poll records, auto commit interval, poll interval, etc. if kafka 
is not the right tool for my use case, please let m

Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Adrienne Kole
Thanks, I got the point. That solves my problem.



On Wed, Oct 5, 2016 at 10:58 PM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Hi,
>
> even if you have more distinct keys than partitions (ie, different key
> go to the same partition), if you do "aggregate by key" Streams will
> automatically separate the keys and compute an aggregate per key.
> Thus, you do not need to worry about which keys is hashed to what
> partition.
>
> - -Matthias
>
> On 10/5/16 1:37 PM, Adrienne Kole wrote:
> > Hi,
> >
> > @Ali IMO, Yes. That is the job of kafka server to assign kafka
> > instances partition(s) to process. Each instance can process more
> > than one partition but one partition cannot be processed by more
> > than one instance.
> >
> > @Michael, Thanks for reply.
> >> Rather, pick the number of partitions in a way that matches your
> >> needs to
> > process the data in parallel I think this should be ' pick number
> > of partitions that matches max number of possible keys in stream to
> > be partitioned '. At least in my usecase , in which I am trying to
> > partition stream by key and make windowed aggregations, if there
> > are less number of topic partitions than possible keys,  then
> > application will not work correctly.
> >
> > That is, if the number of topic partitions is less than possible
> > stream keys, then different keyed stream tuples will be assigned to
> > same topic. That was the problem that I was trying to solve and it
> > seems the only solution is to estimate max number of possible keys
> > and assign accordingly.
> >
> > Thanks Adrienne
> >
> >
> >
> >
> >
> > On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar 
> > wrote:
> >
> >>> It's often a good
> >> idea to over-partition your topics.  For example, even if today
> >> 10 machines (and thus 10 partitions) would be sufficient, pick a
> >> higher number of partitions (say, 50) so you have some wiggle
> >> room to add more machines (11...50) later if need be.
> >>
> >> If you create e.g 30 partitions, but only have e.g 5 instances of
> >> your program, all on the same consumer group, all using kafka
> >> streams to consume the topic, do you still receive all the data
> >> posted to the topic, or will you need to have the same instances
> >> of the program as there are partitions?
> >>
> >> (If you have 1 instance, 30 partitions, will the same rules
> >> apply, i.e it will receive all data?)
> >>
> >> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll
> >>  wrote:
> >>
>  So, in this case I should know the max number of possible
>  keys so that I can create that number of partitions.
> >>>
> >>> Assuming I understand your original question correctly, then
> >>> you would
> >> not
> >>> need to do/know this.  Rather, pick the number of partitions in
> >>> a way
> >> that
> >>> matches your needs to process the data in parallel (e.g. if you
> >>> expect
> >> that
> >>> you require 10 machines in order to process the incoming data,
> >>> then you'd need 10 partitions).  Also, as a general
> >>> recommendation:  It's often a
> >> good
> >>> idea to over-partition your topics.  For example, even if today
> >>> 10
> >> machines
> >>> (and thus 10 partitions) would be sufficient, pick a higher
> >>> number of partitions (say, 50) so you have some wiggle room to
> >>> add more machines (11...50) later if need be.
> >>>
> >>>
> >>>
> >>> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole
> >>>  wrote:
> >>>
>  Hi Guozhang,
> 
>  So, in this case I should know the max number of possible
>  keys so that
> >> I
>  can create that number of partitions.
> 
>  Thanks
> 
>  Adrienne
> 
>  On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang
>  
> >>> wrote:
> 
> > By default the partitioner will use murmur hash on the key
> > and mode
> >> on
> > current num.partitions to determine which partitions to go
> > to, so
> >>> records
> > with the same key will be assigned to the same partition.
> > Would that
> >> be
>  OK
> > for your case?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole <
> >> adrienneko...@gmail.com
> 
> > wrote:
> >
> >> Hi,
> >>
> >> From Streams documentation, I can see that each Streams
> >> instance is processing data independently (from other
> >> instances), reads from
> >>> topic
> >> partition(s) and writes to specified topic.
> >>
> >>
> >> So here, the partitions of topic should be determined
> >> beforehand
> >> and
> > should
> >> remain static. In my usecase I want to create
> >> partitioned/keyed (time) windows and aggregate them. I
> >> can partition the incoming data to specified topic's
> >> partitions
> >> and
> > each
> >> Stream instance can do windowed aggregations.
> >>
> >> However, if I don't know the number of possible keys (to
> >> partition),
>  then
> >> what sh

Re: kafka new consumer is ready for production setup?

2016-10-05 Thread Hans Jespersen
The new consumer is absolutely ready for production use. I think this is
leftover warning from older releases and should be updated to better
reflect the current reality.
0.10.1 is currenlty in RC status (see
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1) so
that could be why the 0.10.1 docs are hard to find.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, Oct 4, 2016 at 11:42 PM, Gaurav Shaha  wrote:

> Hi,
>
> I want to use kafka new consumer. But in the documentation of 0.10.0
> version, it is mentioned that "The code is considered beta quality."
> I am using the latest version which is 0.10.1. I am not able to find
> out any documentation for this version.
>
> Can you please tell me, if the new consumer is ready for production
> use or it is still in the beta version?
>
> --
> Thanks and Regards,
> Gaurav Shaha
>


Delayed Queue usecase

2016-10-05 Thread Akshay Joglekar
Hi,

I have a use case where I need to process certain messages only after a certain 
amount time has elapsed. Does Kafka have any support for time delays?
Currently I am putting messages in different queues based on when the message 
should get processed and at any given time the consumers only poll the queue 
whose time is current. However this does not scale very well and it's hard to 
provide finer second-level or millisecond level granularity since the number of 
queues required becomes huge at that point. So was wondering if Kafka provides 
any built-in mechanism for time delays that can be used.

Thanks,
Akshay


Re: Architecture recommendations for a tricky use case

2016-10-05 Thread Avi Flax

> On Sep 29, 2016, at 16:39, Ali Akhtar  wrote:
> 
> Why did you choose Druid over Postgres / Cassandra / Elasticsearch?

Well, to be clear, we haven’t chosen it yet — we’re evaluating it.

That said, it is looking quite promising for our use case.

The Druid docs say it well:

> Druid is an open source data store designed for OLAP queries on event data.

And that’s exactly what we need. The other options you listed are excellent 
systems, but they’re more general than Druid. Because Druid is specifically 
focused on OLAP queries on event data, it has features and properties that make 
it very well suited to such use cases.

In addition, Druid has built-in support for ingesting events from Kafka topics 
and making those events available for querying with very low latency. This is 
very attractive for my use case.

If you’d like to learn more about Druid I recommend this talk from last month 
at Strange Loop: https://www.youtube.com/watch?v=vbH8E0nH2Nw

HTH!

Avi


Software Architect @ Park Assist
We’re hiring! http://tech.parkassist.com/jobs/