Re: Updated Kafka Roadmap?

2014-09-03 Thread Joe Stein
Hey Jonathan, I just sent an email on the dev list to discuss this.

Not to be double effort but for posterity and good thread communication if
you can voice your opinion there it would be great (please)

I would volunteer to release 0.8.1.2, not a problem.

My concern with 0.8.2 is with any new release are corner cases and some
features (like KAFKA-1419) are important enough for dot release.  The
source jar in maven too!

The community needs to speak up if they want this!

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


On Wed, Sep 3, 2014 at 9:40 PM, Jonathan Weeks 
wrote:

> Hi,
>
> I was wondering whether 0.8.2 is on track for being released this month,
> as I haven’t read much about betas or release candidates in any of the
> kafka groups (although I haven’t been following the IRC)?
>
> Although many of the new features targeted for 0.8.2, including topic
> deletion, the new producer client API and offset mgmt outside ZK would be
> great, we are primarily looking for scala 2.11.x support, and have an
> in-house build that we are currently managing, but would appreciate an
> update on the timing of the official road map.
>
> In case it is looking like stabilizing the new features might push the
> full 0.8.2 release out a while, an 0.8.1.2 build with just incremental
> scala 2.11.x support would be highly appreciated by many of us!
>
> https://issues.apache.org/jira/browse/KAFKA-1419
>
> If it is going to be a while, we will likely do additional sustainment
> work with our internal Kafka 2.11 build and begin working with other
> dependencies as well (e.g. akka-kafka) - just trying to get all the
> information before making that investment on something that might have a
> short lifetime.
>
> Best Regards,
>
> -Jonathan
>
>
> On Aug 3, 2014, at 9:21 PM, Jun Rao  wrote:
>
> > I just updated the wiki with some rough timelines.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Aug 1, 2014 at 11:52 AM, Jonathan Weeks <
> jonathanbwe...@gmail.com>
> > wrote:
> >
> >> Howdy,
> >>
> >> I was wondering if it would be possible to update the release plan:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
> >>
> >> aligned with the feature roadmap:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/Index
> >>
> >> We have several active projects actively and planning to use Kafka, and
> >> any current guidance on the new releases related to ZK dependence,
> producer
> >> and consumer API/client timing would be very helpful. For example, is
> 0.8.2
> >> possible in August, or is September likely?
> >>
> >> Also, any chance something like:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
> >>
> >> …might make it into 0.9?
> >>
> >> Thanks!
>
>


Re: Updated Kafka Roadmap?

2014-09-03 Thread Jonathan Weeks
Hi,

I was wondering whether 0.8.2 is on track for being released this month, as I 
haven’t read much about betas or release candidates in any of the kafka groups 
(although I haven’t been following the IRC)?

Although many of the new features targeted for 0.8.2, including topic deletion, 
the new producer client API and offset mgmt outside ZK would be great, we are 
primarily looking for scala 2.11.x support, and have an in-house build that we 
are currently managing, but would appreciate an update on the timing of the 
official road map.

In case it is looking like stabilizing the new features might push the full 
0.8.2 release out a while, an 0.8.1.2 build with just incremental scala 2.11.x 
support would be highly appreciated by many of us!

https://issues.apache.org/jira/browse/KAFKA-1419

If it is going to be a while, we will likely do additional sustainment work 
with our internal Kafka 2.11 build and begin working with other dependencies as 
well (e.g. akka-kafka) - just trying to get all the information before making 
that investment on something that might have a short lifetime.

Best Regards,

-Jonathan


On Aug 3, 2014, at 9:21 PM, Jun Rao  wrote:

> I just updated the wiki with some rough timelines.
> 
> Thanks,
> 
> Jun
> 
> 
> On Fri, Aug 1, 2014 at 11:52 AM, Jonathan Weeks 
> wrote:
> 
>> Howdy,
>> 
>> I was wondering if it would be possible to update the release plan:
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>> 
>> aligned with the feature roadmap:
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Index
>> 
>> We have several active projects actively and planning to use Kafka, and
>> any current guidance on the new releases related to ZK dependence, producer
>> and consumer API/client timing would be very helpful. For example, is 0.8.2
>> possible in August, or is September likely?
>> 
>> Also, any chance something like:
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
>> 
>> …might make it into 0.9?
>> 
>> Thanks!



Re: High Level Consumer and Commit

2014-09-03 Thread Gwen Shapira
This is not how I'd expect this to work.
Offsets are per-partition and each thread reads its own partition
(Assuming you use Balaji's solution).

So:
Thread 1 reads messages 1..50 from partition 1, processes, indexes,
whatever and commits.
Thread 2 reads messages 1..55 from partition 2, processes, indexes,
whatever and commits.

Next iteration thread 1 reads messages 51...103 from partition 1,
thread 2 reads messages 56...160 from partition 2, etc.

No fragmentation :)

Gwen



On Wed, Sep 3, 2014 at 3:42 PM, Bhavesh Mistry
 wrote:
> Thanks got the idea !!  But it will create a fragments for example
>
> Main Thread  reads 0-50 messages give to Thread 1 for bulk index and commit
> 0 to 50 offset...
> Main Thread  reads 51-100 message give to Thread 2 for bulk index and
> commit 51 100 offset...
>
> So Zookeeper might have offset that will overridden by thread that finish
> first Indexing (Thread 2 will finish first and Thread 1 will commit offset
> then ZK will have older offset).  I guess it does not matter in our case
> since indexing same document will over write.
>
> Thanks,
>
> Bhavesh
>
>
>
> On Wed, Sep 3, 2014 at 3:20 PM, Gwen Shapira  wrote:
>
>> Thanks, Balaji!
>>
>> It looks like your approach depends on specific implementation
>> details, such as the directory structure in ZK.
>> In this case it doesn't matter much since the APIs are not stable yet,
>> but in general, wouldn't you prefer to use public APIs, even if it
>> means multiple consumers without threads?
>>
>> Gwen
>>
>> On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji
>>  wrote:
>> > We can still do with single ConsumerConnector with multiple threads.
>> >
>> > Each thread updates its own data in zookeeper.The below one is our own
>> implementation of commitOffset.
>> >
>> > public void commitOffset(DESMetadata metaData) {
>> > log.debug("Update offsets only for ->"+
>> metaData.toString());
>> > String key =
>> metaData.getTopic()+"/"+metaData.getPartitionNumber();
>> > Long nextOffset = metaData.getOffSet()+1;
>> > if(nextOffset!=checkPointedOffset.get(key)){
>> > ZKGroupTopicDirs topicDirs = new
>> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
>> > ZkUtils.updatePersistentPath(zkClient,
>> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
>> > checkPointedOffset.put(key,nextOffset);
>> > }
>> > }
>> >
>> > -Original Message-
>> > From: Gwen Shapira [mailto:gshap...@cloudera.com]
>> > Sent: Tuesday, September 02, 2014 11:38 PM
>> > To: users@kafka.apache.org; Philip O'Toole
>> > Subject: Re: High Level Consumer and Commit
>> >
>> > I believe a simpler solution would be to create multiple
>> ConsumerConnector, each with 1 thread (single ConsumerStream) and use
>> commitOffset API to commit all partitions managed by each ConsumerConnector
>> after the thread finished processing the messages.
>> >
>> > Does that solve the problem, Bhavesh?
>> >
>> > Gwen
>> >
>> > On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole
>>  wrote:
>> >> Yeah, from reading that I suspect you need the SimpleConsumer. Try it
>> out and see.
>> >>
>> >> Philip
>> >>
>> >>
>> >> -
>> >> http://www.philipotoole.com
>> >>
>> >>
>> >> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry <
>> mistry.p.bhav...@gmail.com> wrote:
>> >>
>> >>
>> >>
>> >> Hi Philip,
>> >>
>> >> Yes, We have disabled auto commit but, we need to be able to read from
>> >> particular offset if we manage the offset ourself in some storage(DB).
>> >> High Level consumer does not allow per partition management
>> plug-ability.
>> >>
>> >> I like to have the High Level consumers Failover and auto rebalancing.
>> >> We just need plug ability of offset management.
>> >>
>> >> Thanks,
>> >>
>> >> Bhavesh
>> >>
>> >>
>> >>
>> >> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
>> >> philip.oto...@yahoo.com.invalid> wrote:
>> >>
>> >>> No, you'll need to write your own failover.
>> >>>
>> >>> I'm not sure I follow your second question, but the high-level
>> >>> Consumer should be able to do what you want if you disable
>> >>> auto-commit. I'm not sure what else you're asking.
>> >>>
>> >>>
>> >>> Philip
>> >>>
>> >>>
>> >>> -
>> >>> http://www.philipotoole.com
>> >>>
>> >>>
>> >>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
>> >>> mistry.p.bhav...@gmail.com> wrote:
>> >>>
>> >>>
>> >>>
>> >>> Hi Philip,
>> >>>
>> >>> Thanks for the update.  With Simple Consumer I will not get failover
>> >>> and rebalance that is provided out of box.  what is other option not
>> >>> to block reading and keep processing and commit only when batch is
>> done.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Bhavesh
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
>> >>> philip.oto...@yahoo.com.invalid> wrote:
>> >

Re: message size limit

2014-09-03 Thread Alexis Midon
are you referring to socket.request.max.bytes ?
it looks like it could indeed limit the size of a batch accepted by a
broker.
So, you're right, batch.num.messages * message.max.bytes must be smaller
than socket.request.max.bytes.

It looks like this case has been addressed in the new producer.
See max.request.size and batch.size at
https://kafka.apache.org/documentation.html#newproducerconfigs

Alexis


On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry 
wrote:

> I am referring to wiki http://kafka.apache.org/08/configuration.html and
> following parameter control max batch message bytes as far as I know.
> Kafka Community, please correct me if I am wrong.  I do not want to create
> confusion for Kafka User Community here.   Also, if you increase this limit
> than you have to set the corresponding limit increase on consumer side as
> well (fetch.message.max.bytes).
>
> Since we are using batch async mode, our messages are getting drop sometime
> if the entire batch bytes  exceed this limit so I was asking Kafka
> Developers if any optimal way to determine the batch size based on this
> limit to minimize the data loss. Because, entire batch is rejected by
> brokers.
>
> message.max.bytes 100 The maximum size of a message that the server can
> receive. It is important that this property be in sync with the maximum
> fetch size your consumers use or else an unruly producer will be able to
> publish messages too large for consumers to consume.
>
> Thanks,
>
> Bhavesh
>
>
> On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> alexis.mi...@airbedandbreakfast.com> wrote:
>
> > Hi Bhavesh
> >
> > can you explain what limit you're referring to?
> > I'm asking because `message.max.bytes` is applied per message not per
> > batch.
> > is there another limit I should be aware of?
> >
> > thanks
> >
> >
> > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > We have similar problem.  We have variable length of messages.  So when
> > we
> > > have fixed size of Batch sometime the batch exceed the limit set on the
> > > brokers (2MB).
> > >
> > > So can Producer have some extra logic to determine the optimal batch
> size
> > > by looking at configured message.max.bytes  value.
> > >
> > > During the metadata update, Producer will get this value from the
> Broker
> > > for each topic and Producer will check if current batch size reach this
> > > limit than break batch into smaller chunk such way that It would not
> > exceed
> > > limit (unless single message exceed the limit). Basically try to avoid
> > data
> > > loss as much as possible.
> > >
> > > Please let me know what is your opinion on this...
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > >
> > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > alexis.mi...@airbedandbreakfast.com> wrote:
> > >
> > > > Thanks Jun.
> > > >
> > > > I'll create a jira and try to provide a patch. I think this is pretty
> > > > serious.
> > > >
> > > > On Friday, August 29, 2014, Jun Rao  wrote:
> > > >
> > > > > The goal of batching is mostly to reduce the # RPC calls to the
> > broker.
> > > > If
> > > > > compression is enabled, a larger batch typically implies better
> > > > compression
> > > > > ratio.
> > > > >
> > > > > The reason that we have to fail the whole batch is that the error
> > code
> > > in
> > > > > the produce response is per partition, instead of per message.
> > > > >
> > > > > Retrying individual messages on MessageSizeTooLarge seems
> reasonable.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > alexis.mi...@airbedandbreakfast.com > wrote:
> > > > >
> > > > > > Could you explain the goals of batches? I was assuming this was
> > > simply
> > > > a
> > > > > > performance optimization, but this behavior makes me think I'm
> > > missing
> > > > > > something.
> > > > > > is a batch more than a list of *independent* messages?
> > > > > >
> > > > > > Why would you reject the whole batch? One invalid message causes
> > the
> > > > loss
> > > > > > of batch.num.messages-1 messages :(
> > > > > > It seems pretty critical to me.
> > > > > >
> > > > > > If ack=0, the producer will never know about it.
> > > > > > If ack !=0, the producer will retry the whole batch. If the issue
> > was
> > > > > > related to data corruption (etc), retries might work. But in the
> > case
> > > > of
> > > > > > "big message", the batch will always be rejected and the producer
> > > will
> > > > > give
> > > > > > up.
> > > > > >
> > > > > > If the messages are indeed considered independent, I think this
> is
> > a
> > > > > pretty
> > > > > > serious issue.
> > > > > >
> > > > > > I see 2 possible fix approaches:
> > > > > > - the broker could reject only the invalid messages
> > > > > > - the broker could reject the whole batch (like today) but the
> > > producer
> > > > > (if
> > > > > > ack!=0) could retry messages one at a

Re: High Level Consumer and Commit

2014-09-03 Thread Philip O'Toole

>>Only problem is no of connections to Kafka is increased.

*Why* is it a problem?


Philip

Re: High Level Consumer and Commit

2014-09-03 Thread Bhavesh Mistry
Thanks got the idea !!  But it will create a fragments for example

Main Thread  reads 0-50 messages give to Thread 1 for bulk index and commit
0 to 50 offset...
Main Thread  reads 51-100 message give to Thread 2 for bulk index and
commit 51 100 offset...

So Zookeeper might have offset that will overridden by thread that finish
first Indexing (Thread 2 will finish first and Thread 1 will commit offset
then ZK will have older offset).  I guess it does not matter in our case
since indexing same document will over write.

Thanks,

Bhavesh



On Wed, Sep 3, 2014 at 3:20 PM, Gwen Shapira  wrote:

> Thanks, Balaji!
>
> It looks like your approach depends on specific implementation
> details, such as the directory structure in ZK.
> In this case it doesn't matter much since the APIs are not stable yet,
> but in general, wouldn't you prefer to use public APIs, even if it
> means multiple consumers without threads?
>
> Gwen
>
> On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji
>  wrote:
> > We can still do with single ConsumerConnector with multiple threads.
> >
> > Each thread updates its own data in zookeeper.The below one is our own
> implementation of commitOffset.
> >
> > public void commitOffset(DESMetadata metaData) {
> > log.debug("Update offsets only for ->"+
> metaData.toString());
> > String key =
> metaData.getTopic()+"/"+metaData.getPartitionNumber();
> > Long nextOffset = metaData.getOffSet()+1;
> > if(nextOffset!=checkPointedOffset.get(key)){
> > ZKGroupTopicDirs topicDirs = new
> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
> > ZkUtils.updatePersistentPath(zkClient,
> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
> > checkPointedOffset.put(key,nextOffset);
> > }
> > }
> >
> > -Original Message-
> > From: Gwen Shapira [mailto:gshap...@cloudera.com]
> > Sent: Tuesday, September 02, 2014 11:38 PM
> > To: users@kafka.apache.org; Philip O'Toole
> > Subject: Re: High Level Consumer and Commit
> >
> > I believe a simpler solution would be to create multiple
> ConsumerConnector, each with 1 thread (single ConsumerStream) and use
> commitOffset API to commit all partitions managed by each ConsumerConnector
> after the thread finished processing the messages.
> >
> > Does that solve the problem, Bhavesh?
> >
> > Gwen
> >
> > On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole
>  wrote:
> >> Yeah, from reading that I suspect you need the SimpleConsumer. Try it
> out and see.
> >>
> >> Philip
> >>
> >>
> >> -
> >> http://www.philipotoole.com
> >>
> >>
> >> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com> wrote:
> >>
> >>
> >>
> >> Hi Philip,
> >>
> >> Yes, We have disabled auto commit but, we need to be able to read from
> >> particular offset if we manage the offset ourself in some storage(DB).
> >> High Level consumer does not allow per partition management
> plug-ability.
> >>
> >> I like to have the High Level consumers Failover and auto rebalancing.
> >> We just need plug ability of offset management.
> >>
> >> Thanks,
> >>
> >> Bhavesh
> >>
> >>
> >>
> >> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
> >> philip.oto...@yahoo.com.invalid> wrote:
> >>
> >>> No, you'll need to write your own failover.
> >>>
> >>> I'm not sure I follow your second question, but the high-level
> >>> Consumer should be able to do what you want if you disable
> >>> auto-commit. I'm not sure what else you're asking.
> >>>
> >>>
> >>> Philip
> >>>
> >>>
> >>> -
> >>> http://www.philipotoole.com
> >>>
> >>>
> >>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
> >>> mistry.p.bhav...@gmail.com> wrote:
> >>>
> >>>
> >>>
> >>> Hi Philip,
> >>>
> >>> Thanks for the update.  With Simple Consumer I will not get failover
> >>> and rebalance that is provided out of box.  what is other option not
> >>> to block reading and keep processing and commit only when batch is
> done.
> >>>
> >>> Thanks,
> >>>
> >>> Bhavesh
> >>>
> >>>
> >>>
> >>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
> >>> philip.oto...@yahoo.com.invalid> wrote:
> >>>
> >>> > Either use the SimpleConsumer which gives you much finer-grained
> >>> > control, or (this worked with 0.7) spin up a ConsumerConnection
> >>> > (this is a
> >>> HighLevel
> >>> > consumer concept) per partition, turn off auto-commit.
> >>> >
> >>> > Philip
> >>> >
> >>> >
> >>> > -
> >>> > http://www.philipotoole.com
> >>> >
> >>> >
> >>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
> >>> > mistry.p.bhav...@gmail.com> wrote:
> >>> >
> >>> >
> >>> >
> >>> > Hi Kafka Group,
> >>> >
> >>> > I have to pull the data from the Topic and index into Elastic
> >>> > Search with Bulk API and wanted to commit only batch that has be

RE: High Level Consumer and Commit

2014-09-03 Thread Seshadri, Balaji
Multiple consumers with single thread each will also work.

Only problem is no of connections to Kafka is increased.

-Original Message-
From: Gwen Shapira [mailto:gshap...@cloudera.com] 
Sent: Wednesday, September 03, 2014 4:20 PM
To: users@kafka.apache.org
Cc: Philip O'Toole
Subject: Re: High Level Consumer and Commit

Thanks, Balaji!

It looks like your approach depends on specific implementation details, such as 
the directory structure in ZK.
In this case it doesn't matter much since the APIs are not stable yet, but in 
general, wouldn't you prefer to use public APIs, even if it means multiple 
consumers without threads?

Gwen

On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji  
wrote:
> We can still do with single ConsumerConnector with multiple threads.
>
> Each thread updates its own data in zookeeper.The below one is our own 
> implementation of commitOffset.
>
> public void commitOffset(DESMetadata metaData) {
> log.debug("Update offsets only for ->"+ metaData.toString());
> String key = 
> metaData.getTopic()+"/"+metaData.getPartitionNumber();
> Long nextOffset = metaData.getOffSet()+1;
> if(nextOffset!=checkPointedOffset.get(key)){
> ZKGroupTopicDirs topicDirs = new 
> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
> ZkUtils.updatePersistentPath(zkClient, 
> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
> checkPointedOffset.put(key,nextOffset);
> }
> }
>
> -Original Message-
> From: Gwen Shapira [mailto:gshap...@cloudera.com]
> Sent: Tuesday, September 02, 2014 11:38 PM
> To: users@kafka.apache.org; Philip O'Toole
> Subject: Re: High Level Consumer and Commit
>
> I believe a simpler solution would be to create multiple ConsumerConnector, 
> each with 1 thread (single ConsumerStream) and use commitOffset API to commit 
> all partitions managed by each ConsumerConnector after the thread finished 
> processing the messages.
>
> Does that solve the problem, Bhavesh?
>
> Gwen
>
> On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole 
>  wrote:
>> Yeah, from reading that I suspect you need the SimpleConsumer. Try it out 
>> and see.
>>
>> Philip
>>
>>
>> -
>> http://www.philipotoole.com
>>
>>
>> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry 
>>  wrote:
>>
>>
>>
>> Hi Philip,
>>
>> Yes, We have disabled auto commit but, we need to be able to read 
>> from particular offset if we manage the offset ourself in some storage(DB).
>> High Level consumer does not allow per partition management plug-ability.
>>
>> I like to have the High Level consumers Failover and auto rebalancing.
>> We just need plug ability of offset management.
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>>
>> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole < 
>> philip.oto...@yahoo.com.invalid> wrote:
>>
>>> No, you'll need to write your own failover.
>>>
>>> I'm not sure I follow your second question, but the high-level 
>>> Consumer should be able to do what you want if you disable 
>>> auto-commit. I'm not sure what else you're asking.
>>>
>>>
>>> Philip
>>>
>>>
>>> -
>>> http://www.philipotoole.com
>>>
>>>
>>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry < 
>>> mistry.p.bhav...@gmail.com> wrote:
>>>
>>>
>>>
>>> Hi Philip,
>>>
>>> Thanks for the update.  With Simple Consumer I will not get failover 
>>> and rebalance that is provided out of box.  what is other option not 
>>> to block reading and keep processing and commit only when batch is done.
>>>
>>> Thanks,
>>>
>>> Bhavesh
>>>
>>>
>>>
>>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole < 
>>> philip.oto...@yahoo.com.invalid> wrote:
>>>
>>> > Either use the SimpleConsumer which gives you much finer-grained 
>>> > control, or (this worked with 0.7) spin up a ConsumerConnection 
>>> > (this is a
>>> HighLevel
>>> > consumer concept) per partition, turn off auto-commit.
>>> >
>>> > Philip
>>> >
>>> >
>>> > -
>>> > http://www.philipotoole.com
>>> >
>>> >
>>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry < 
>>> > mistry.p.bhav...@gmail.com> wrote:
>>> >
>>> >
>>> >
>>> > Hi Kafka Group,
>>> >
>>> > I have to pull the data from the Topic and index into Elastic 
>>> > Search with Bulk API and wanted to commit only batch that has been 
>>> > committed and
>>> still
>>> > continue to read from topic further on same topic.  I have auto 
>>> > commit to be off.
>>> >
>>> >
>>> > List  batch .
>>> >
>>> > while (iterator.hasNext()) {
>>> > batch.add(iterator.next().message());
>>> > if(batch size is 50 ){
>>> >   //===  Once the bulk API is successful it will commit 
>>> > the
>>> offset
>>> > to zookeeper...
>>> >   executor.submit(new Thread() process batch and commit batch,
>>> > cconsumerConnector)
>>> >   batch = new batch buffe

Re: message size limit

2014-09-03 Thread Bhavesh Mistry
I am referring to wiki http://kafka.apache.org/08/configuration.html and
following parameter control max batch message bytes as far as I know.
Kafka Community, please correct me if I am wrong.  I do not want to create
confusion for Kafka User Community here.   Also, if you increase this limit
than you have to set the corresponding limit increase on consumer side as
well (fetch.message.max.bytes).

Since we are using batch async mode, our messages are getting drop sometime
if the entire batch bytes  exceed this limit so I was asking Kafka
Developers if any optimal way to determine the batch size based on this
limit to minimize the data loss. Because, entire batch is rejected by
brokers.

message.max.bytes 100 The maximum size of a message that the server can
receive. It is important that this property be in sync with the maximum
fetch size your consumers use or else an unruly producer will be able to
publish messages too large for consumers to consume.

Thanks,

Bhavesh


On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
alexis.mi...@airbedandbreakfast.com> wrote:

> Hi Bhavesh
>
> can you explain what limit you're referring to?
> I'm asking because `message.max.bytes` is applied per message not per
> batch.
> is there another limit I should be aware of?
>
> thanks
>
>
> On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry  >
> wrote:
>
> > Hi Jun,
> >
> > We have similar problem.  We have variable length of messages.  So when
> we
> > have fixed size of Batch sometime the batch exceed the limit set on the
> > brokers (2MB).
> >
> > So can Producer have some extra logic to determine the optimal batch size
> > by looking at configured message.max.bytes  value.
> >
> > During the metadata update, Producer will get this value from the Broker
> > for each topic and Producer will check if current batch size reach this
> > limit than break batch into smaller chunk such way that It would not
> exceed
> > limit (unless single message exceed the limit). Basically try to avoid
> data
> > loss as much as possible.
> >
> > Please let me know what is your opinion on this...
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > alexis.mi...@airbedandbreakfast.com> wrote:
> >
> > > Thanks Jun.
> > >
> > > I'll create a jira and try to provide a patch. I think this is pretty
> > > serious.
> > >
> > > On Friday, August 29, 2014, Jun Rao  wrote:
> > >
> > > > The goal of batching is mostly to reduce the # RPC calls to the
> broker.
> > > If
> > > > compression is enabled, a larger batch typically implies better
> > > compression
> > > > ratio.
> > > >
> > > > The reason that we have to fail the whole batch is that the error
> code
> > in
> > > > the produce response is per partition, instead of per message.
> > > >
> > > > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com > wrote:
> > > >
> > > > > Could you explain the goals of batches? I was assuming this was
> > simply
> > > a
> > > > > performance optimization, but this behavior makes me think I'm
> > missing
> > > > > something.
> > > > > is a batch more than a list of *independent* messages?
> > > > >
> > > > > Why would you reject the whole batch? One invalid message causes
> the
> > > loss
> > > > > of batch.num.messages-1 messages :(
> > > > > It seems pretty critical to me.
> > > > >
> > > > > If ack=0, the producer will never know about it.
> > > > > If ack !=0, the producer will retry the whole batch. If the issue
> was
> > > > > related to data corruption (etc), retries might work. But in the
> case
> > > of
> > > > > "big message", the batch will always be rejected and the producer
> > will
> > > > give
> > > > > up.
> > > > >
> > > > > If the messages are indeed considered independent, I think this is
> a
> > > > pretty
> > > > > serious issue.
> > > > >
> > > > > I see 2 possible fix approaches:
> > > > > - the broker could reject only the invalid messages
> > > > > - the broker could reject the whole batch (like today) but the
> > producer
> > > > (if
> > > > > ack!=0) could retry messages one at a time on exception like
> > > > > "MessageSizeTooLarge".
> > > > >
> > > > > opinions?
> > > > >
> > > > > Alexis
> > > > >
> > > > > ```
> > > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id
> 46
> > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id
> 51
> > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id
> 56
> > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEvent

Re: High Level Consumer and Commit

2014-09-03 Thread Gwen Shapira
Thanks, Balaji!

It looks like your approach depends on specific implementation
details, such as the directory structure in ZK.
In this case it doesn't matter much since the APIs are not stable yet,
but in general, wouldn't you prefer to use public APIs, even if it
means multiple consumers without threads?

Gwen

On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji
 wrote:
> We can still do with single ConsumerConnector with multiple threads.
>
> Each thread updates its own data in zookeeper.The below one is our own 
> implementation of commitOffset.
>
> public void commitOffset(DESMetadata metaData) {
> log.debug("Update offsets only for ->"+ metaData.toString());
> String key = 
> metaData.getTopic()+"/"+metaData.getPartitionNumber();
> Long nextOffset = metaData.getOffSet()+1;
> if(nextOffset!=checkPointedOffset.get(key)){
> ZKGroupTopicDirs topicDirs = new 
> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
> ZkUtils.updatePersistentPath(zkClient, 
> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
> checkPointedOffset.put(key,nextOffset);
> }
> }
>
> -Original Message-
> From: Gwen Shapira [mailto:gshap...@cloudera.com]
> Sent: Tuesday, September 02, 2014 11:38 PM
> To: users@kafka.apache.org; Philip O'Toole
> Subject: Re: High Level Consumer and Commit
>
> I believe a simpler solution would be to create multiple ConsumerConnector, 
> each with 1 thread (single ConsumerStream) and use commitOffset API to commit 
> all partitions managed by each ConsumerConnector after the thread finished 
> processing the messages.
>
> Does that solve the problem, Bhavesh?
>
> Gwen
>
> On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole 
>  wrote:
>> Yeah, from reading that I suspect you need the SimpleConsumer. Try it out 
>> and see.
>>
>> Philip
>>
>>
>> -
>> http://www.philipotoole.com
>>
>>
>> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry 
>>  wrote:
>>
>>
>>
>> Hi Philip,
>>
>> Yes, We have disabled auto commit but, we need to be able to read from
>> particular offset if we manage the offset ourself in some storage(DB).
>> High Level consumer does not allow per partition management plug-ability.
>>
>> I like to have the High Level consumers Failover and auto rebalancing.
>> We just need plug ability of offset management.
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>>
>> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
>> philip.oto...@yahoo.com.invalid> wrote:
>>
>>> No, you'll need to write your own failover.
>>>
>>> I'm not sure I follow your second question, but the high-level
>>> Consumer should be able to do what you want if you disable
>>> auto-commit. I'm not sure what else you're asking.
>>>
>>>
>>> Philip
>>>
>>>
>>> -
>>> http://www.philipotoole.com
>>>
>>>
>>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
>>> mistry.p.bhav...@gmail.com> wrote:
>>>
>>>
>>>
>>> Hi Philip,
>>>
>>> Thanks for the update.  With Simple Consumer I will not get failover
>>> and rebalance that is provided out of box.  what is other option not
>>> to block reading and keep processing and commit only when batch is done.
>>>
>>> Thanks,
>>>
>>> Bhavesh
>>>
>>>
>>>
>>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
>>> philip.oto...@yahoo.com.invalid> wrote:
>>>
>>> > Either use the SimpleConsumer which gives you much finer-grained
>>> > control, or (this worked with 0.7) spin up a ConsumerConnection
>>> > (this is a
>>> HighLevel
>>> > consumer concept) per partition, turn off auto-commit.
>>> >
>>> > Philip
>>> >
>>> >
>>> > -
>>> > http://www.philipotoole.com
>>> >
>>> >
>>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
>>> > mistry.p.bhav...@gmail.com> wrote:
>>> >
>>> >
>>> >
>>> > Hi Kafka Group,
>>> >
>>> > I have to pull the data from the Topic and index into Elastic
>>> > Search with Bulk API and wanted to commit only batch that has been
>>> > committed and
>>> still
>>> > continue to read from topic further on same topic.  I have auto
>>> > commit to be off.
>>> >
>>> >
>>> > List  batch .
>>> >
>>> > while (iterator.hasNext()) {
>>> > batch.add(iterator.next().message());
>>> > if(batch size is 50 ){
>>> >   //===  Once the bulk API is successful it will commit the
>>> offset
>>> > to zookeeper...
>>> >   executor.submit(new Thread() process batch and commit batch,
>>> > cconsumerConnector)
>>> >   batch = new batch buffer
>>> >}
>>> > }
>>> >
>>> > This commitOffset API commits all messages that have been read so far.
>>> > What is best way to continue reading and only commit another thread
>>> finish
>>> > batch process is successful.  This will lead to fragmentation of
>>> > the Consumer offset so what is best way to implement continuous
>>> > reading
>>> stre

Re: message size limit

2014-09-03 Thread Alexis Midon
Hi Bhavesh

can you explain what limit you're referring to?
I'm asking because `message.max.bytes` is applied per message not per batch.
is there another limit I should be aware of?

thanks


On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry 
wrote:

> Hi Jun,
>
> We have similar problem.  We have variable length of messages.  So when we
> have fixed size of Batch sometime the batch exceed the limit set on the
> brokers (2MB).
>
> So can Producer have some extra logic to determine the optimal batch size
> by looking at configured message.max.bytes  value.
>
> During the metadata update, Producer will get this value from the Broker
> for each topic and Producer will check if current batch size reach this
> limit than break batch into smaller chunk such way that It would not exceed
> limit (unless single message exceed the limit). Basically try to avoid data
> loss as much as possible.
>
> Please let me know what is your opinion on this...
>
> Thanks,
>
> Bhavesh
>
>
> On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> alexis.mi...@airbedandbreakfast.com> wrote:
>
> > Thanks Jun.
> >
> > I'll create a jira and try to provide a patch. I think this is pretty
> > serious.
> >
> > On Friday, August 29, 2014, Jun Rao  wrote:
> >
> > > The goal of batching is mostly to reduce the # RPC calls to the broker.
> > If
> > > compression is enabled, a larger batch typically implies better
> > compression
> > > ratio.
> > >
> > > The reason that we have to fail the whole batch is that the error code
> in
> > > the produce response is per partition, instead of per message.
> > >
> > > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > alexis.mi...@airbedandbreakfast.com > wrote:
> > >
> > > > Could you explain the goals of batches? I was assuming this was
> simply
> > a
> > > > performance optimization, but this behavior makes me think I'm
> missing
> > > > something.
> > > > is a batch more than a list of *independent* messages?
> > > >
> > > > Why would you reject the whole batch? One invalid message causes the
> > loss
> > > > of batch.num.messages-1 messages :(
> > > > It seems pretty critical to me.
> > > >
> > > > If ack=0, the producer will never know about it.
> > > > If ack !=0, the producer will retry the whole batch. If the issue was
> > > > related to data corruption (etc), retries might work. But in the case
> > of
> > > > "big message", the batch will always be rejected and the producer
> will
> > > give
> > > > up.
> > > >
> > > > If the messages are indeed considered independent, I think this is a
> > > pretty
> > > > serious issue.
> > > >
> > > > I see 2 possible fix approaches:
> > > > - the broker could reject only the invalid messages
> > > > - the broker could reject the whole batch (like today) but the
> producer
> > > (if
> > > > ack!=0) could retry messages one at a time on exception like
> > > > "MessageSizeTooLarge".
> > > >
> > > > opinions?
> > > >
> > > > Alexis
> > > >
> > > > ```
> > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics
> test
> > > > with correlation ids in [43,62]
> > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > > > (kafka.producer.async.ProducerSendThread)
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 3
> > > > tries.
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > ```
> > > >
> > > >
> > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao  > > > wrote:
> > > >
> > > > > That's right. If one message in a batch exceeds the size limit, the
> > > whole
> > >

RE: High Level Consumer and Commit

2014-09-03 Thread Seshadri, Balaji
We can still do with single ConsumerConnector with multiple threads.

Each thread updates its own data in zookeeper.The below one is our own 
implementation of commitOffset.

public void commitOffset(DESMetadata metaData) {
log.debug("Update offsets only for ->"+ metaData.toString());
String key = 
metaData.getTopic()+"/"+metaData.getPartitionNumber();
Long nextOffset = metaData.getOffSet()+1;
if(nextOffset!=checkPointedOffset.get(key)){
ZKGroupTopicDirs topicDirs = new 
ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
ZkUtils.updatePersistentPath(zkClient, 
topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
checkPointedOffset.put(key,nextOffset);
}
}

-Original Message-
From: Gwen Shapira [mailto:gshap...@cloudera.com] 
Sent: Tuesday, September 02, 2014 11:38 PM
To: users@kafka.apache.org; Philip O'Toole
Subject: Re: High Level Consumer and Commit

I believe a simpler solution would be to create multiple ConsumerConnector, 
each with 1 thread (single ConsumerStream) and use commitOffset API to commit 
all partitions managed by each ConsumerConnector after the thread finished 
processing the messages.

Does that solve the problem, Bhavesh?

Gwen

On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole 
 wrote:
> Yeah, from reading that I suspect you need the SimpleConsumer. Try it out and 
> see.
>
> Philip
>
>
> -
> http://www.philipotoole.com
>
>
> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry 
>  wrote:
>
>
>
> Hi Philip,
>
> Yes, We have disabled auto commit but, we need to be able to read from 
> particular offset if we manage the offset ourself in some storage(DB).
> High Level consumer does not allow per partition management plug-ability.
>
> I like to have the High Level consumers Failover and auto rebalancing.  
> We just need plug ability of offset management.
>
> Thanks,
>
> Bhavesh
>
>
>
> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole < 
> philip.oto...@yahoo.com.invalid> wrote:
>
>> No, you'll need to write your own failover.
>>
>> I'm not sure I follow your second question, but the high-level 
>> Consumer should be able to do what you want if you disable 
>> auto-commit. I'm not sure what else you're asking.
>>
>>
>> Philip
>>
>>
>> -
>> http://www.philipotoole.com
>>
>>
>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry < 
>> mistry.p.bhav...@gmail.com> wrote:
>>
>>
>>
>> Hi Philip,
>>
>> Thanks for the update.  With Simple Consumer I will not get failover 
>> and rebalance that is provided out of box.  what is other option not 
>> to block reading and keep processing and commit only when batch is done.
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>>
>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole < 
>> philip.oto...@yahoo.com.invalid> wrote:
>>
>> > Either use the SimpleConsumer which gives you much finer-grained 
>> > control, or (this worked with 0.7) spin up a ConsumerConnection 
>> > (this is a
>> HighLevel
>> > consumer concept) per partition, turn off auto-commit.
>> >
>> > Philip
>> >
>> >
>> > -
>> > http://www.philipotoole.com
>> >
>> >
>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry < 
>> > mistry.p.bhav...@gmail.com> wrote:
>> >
>> >
>> >
>> > Hi Kafka Group,
>> >
>> > I have to pull the data from the Topic and index into Elastic 
>> > Search with Bulk API and wanted to commit only batch that has been 
>> > committed and
>> still
>> > continue to read from topic further on same topic.  I have auto 
>> > commit to be off.
>> >
>> >
>> > List  batch .
>> >
>> > while (iterator.hasNext()) {
>> > batch.add(iterator.next().message());
>> > if(batch size is 50 ){
>> >   //===  Once the bulk API is successful it will commit the
>> offset
>> > to zookeeper...
>> >   executor.submit(new Thread() process batch and commit batch,
>> > cconsumerConnector)
>> >   batch = new batch buffer
>> >}
>> > }
>> >
>> > This commitOffset API commits all messages that have been read so far.
>> > What is best way to continue reading and only commit another thread
>> finish
>> > batch process is successful.  This will lead to fragmentation of 
>> > the Consumer offset so what is best way to implement continuous 
>> > reading
>> stream
>> > and commit the rage offset.
>> >
>> > Is Simple Consumer a better approach for this.
>> >
>> >
>> > Thanks,
>> >
>> > Bhavesh
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Thanks,
>> > Bhavesh
>> >
>>


Re: message size limit

2014-09-03 Thread Bhavesh Mistry
Hi Jun,

We have similar problem.  We have variable length of messages.  So when we
have fixed size of Batch sometime the batch exceed the limit set on the
brokers (2MB).

So can Producer have some extra logic to determine the optimal batch size
by looking at configured message.max.bytes  value.

During the metadata update, Producer will get this value from the Broker
for each topic and Producer will check if current batch size reach this
limit than break batch into smaller chunk such way that It would not exceed
limit (unless single message exceed the limit). Basically try to avoid data
loss as much as possible.

Please let me know what is your opinion on this...

Thanks,

Bhavesh


On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
alexis.mi...@airbedandbreakfast.com> wrote:

> Thanks Jun.
>
> I'll create a jira and try to provide a patch. I think this is pretty
> serious.
>
> On Friday, August 29, 2014, Jun Rao  wrote:
>
> > The goal of batching is mostly to reduce the # RPC calls to the broker.
> If
> > compression is enabled, a larger batch typically implies better
> compression
> > ratio.
> >
> > The reason that we have to fail the whole batch is that the error code in
> > the produce response is per partition, instead of per message.
> >
> > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > alexis.mi...@airbedandbreakfast.com > wrote:
> >
> > > Could you explain the goals of batches? I was assuming this was simply
> a
> > > performance optimization, but this behavior makes me think I'm missing
> > > something.
> > > is a batch more than a list of *independent* messages?
> > >
> > > Why would you reject the whole batch? One invalid message causes the
> loss
> > > of batch.num.messages-1 messages :(
> > > It seems pretty critical to me.
> > >
> > > If ack=0, the producer will never know about it.
> > > If ack !=0, the producer will retry the whole batch. If the issue was
> > > related to data corruption (etc), retries might work. But in the case
> of
> > > "big message", the batch will always be rejected and the producer will
> > give
> > > up.
> > >
> > > If the messages are indeed considered independent, I think this is a
> > pretty
> > > serious issue.
> > >
> > > I see 2 possible fix approaches:
> > > - the broker could reject only the invalid messages
> > > - the broker could reject the whole batch (like today) but the producer
> > (if
> > > ack!=0) could retry messages one at a time on exception like
> > > "MessageSizeTooLarge".
> > >
> > > opinions?
> > >
> > > Alexis
> > >
> > > ```
> > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> > > with correlation ids in [43,62]
> > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > > (kafka.producer.async.ProducerSendThread)
> > > kafka.common.FailedToSendMessageException: Failed to send messages
> after
> > 3
> > > tries.
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > >  at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > >  at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > ```
> > >
> > >
> > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao  > > wrote:
> > >
> > > > That's right. If one message in a batch exceeds the size limit, the
> > whole
> > > > batch is rejected.
> > > >
> > > > When determining message.max.bytes, the most important thing to
> > consider
> > > is
> > > > probably memory since currently we need to allocate memory for a full
> > > > message in the broker and the producer and the consumer client.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com > wrote:
> > > >
> > > > > am I miss reading this loop:
> > > > >
> > > > >
> > > >
> > >
> >
>

Error in acceptor (kafka.network.Acceptor)

2014-09-03 Thread Shlomi Hazan
Hi,

I am trying to load a cluster with over than 10K connections, and bumped
into the error in the subject.
Is there any limitation on Kafka's side? if so it configurable? how?
on first look, it looks like the selector accepting the connection is
overflowing...

Thanks.
-- 
Shlomi


Manual Leader Assignment

2014-09-03 Thread Andrew Otto
Hiya,

During leader changes, we see short periods of message loss on some of our 
higher volume producers.  I suspect that this is because it takes a couple of 
seconds for Zookeeper to notice and notify the producers of the metadata 
change.  During this time, producer buffers can fill up and end up dropping 
some messages.

I’d like to do some troubleshooting.  Is it possible to manually change the 
leadership of a single partition?  I see here[1] that I can start a leadership 
election for a particular partition, but the JSON doesn’t show a way to choose 
the new leader of the partition.

Thanks!
-Andrew Otto

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-Howtousethetool?.1

Re: Subscription

2014-09-03 Thread Massimiliano Tomassi
Sorry I picked the wrong address


2014-09-03 18:45 GMT+01:00 François Langelier :

> You should follow those instructions
>
> http://kafka.apache.org/contact.html
>
>
> François Langelier
> Étudiant en génie Logiciel - École de Technologie Supérieure
> 
> Capitaine Club Capra 
> VP-Communication - CS Games  2014
> Jeux de Génie  2011 à 2014
> Magistrat Fraternité du Piranha 
> Comité Organisateur Olympiades ÉTS 2012
> Compétition Québécoise d'Ingénierie 2012 - Compétition Senior
>
>
> On Wed, Sep 3, 2014 at 11:05 AM, Massimiliano Tomassi <
> max.toma...@gmail.com> wrote:
>
>> Subscription
>>
>> --
>> 
>> Massimiliano Tomassi
>> 
>> web: http://about.me/maxtomassi
>> e-mail: max.toma...@gmail.com
>> mobile: +447751193667
>> 
>>
>
>


-- 

Massimiliano Tomassi

e-mail: max.toma...@gmail.com



Re: Subscription

2014-09-03 Thread François Langelier
You should follow those instructions

http://kafka.apache.org/contact.html


François Langelier
Étudiant en génie Logiciel - École de Technologie Supérieure

Capitaine Club Capra 
VP-Communication - CS Games  2014
Jeux de Génie  2011 à 2014
Magistrat Fraternité du Piranha 
Comité Organisateur Olympiades ÉTS 2012
Compétition Québécoise d'Ingénierie 2012 - Compétition Senior


On Wed, Sep 3, 2014 at 11:05 AM, Massimiliano Tomassi  wrote:

> Subscription
>
> --
> 
> Massimiliano Tomassi
> 
> web: http://about.me/maxtomassi
> e-mail: max.toma...@gmail.com
> mobile: +447751193667
> 
>


Subscription

2014-09-03 Thread Massimiliano Tomassi
Subscription

-- 

Massimiliano Tomassi

web: http://about.me/maxtomassi
e-mail: max.toma...@gmail.com
mobile: +447751193667



Re: High Level Consumer and Commit

2014-09-03 Thread Gwen Shapira
Sorry, I guess I missed that. The followup discussion was around the
simple consumer :)

I'm not sure why the OP didn't find this solution acceptable.

On Wed, Sep 3, 2014 at 8:29 AM, Philip O'Toole  wrote:
> That's what I said in my first reply. :-)
>
> -
> http://www.philipotoole.com
>
>
> On Tuesday, September 2, 2014 10:37 PM, Gwen Shapira 
> wrote:
>
>
> I believe a simpler solution would be to create multiple
> ConsumerConnector, each with 1 thread (single ConsumerStream) and use
> commitOffset API to commit all partitions managed by each
> ConsumerConnector after the thread finished processing the messages.
>
> Does that solve the problem, Bhavesh?
>
> Gwen
>
> On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole
>  wrote:
>> Yeah, from reading that I suspect you need the SimpleConsumer. Try it out
>> and see.
>>
>> Philip
>>
>>
>> -
>> http://www.philipotoole.com
>>
>>
>> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry
>>  wrote:
>>
>>
>>
>> Hi Philip,
>>
>> Yes, We have disabled auto commit but, we need to be able to read from
>> particular offset if we manage the offset ourself in some storage(DB).
>> High Level consumer does not allow per partition management plug-ability.
>>
>> I like to have the High Level consumers Failover and auto rebalancing.  We
>> just need plug ability of offset management.
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>>
>> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
>> philip.oto...@yahoo.com.invalid> wrote:
>>
>>> No, you'll need to write your own failover.
>>>
>>> I'm not sure I follow your second question, but the high-level Consumer
>>> should be able to do what you want if you disable auto-commit. I'm not
>>> sure
>>> what else you're asking.
>>>
>>>
>>> Philip
>>>
>>>
>>> -
>>> http://www.philipotoole.com
>>>
>>>
>>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
>>> mistry.p.bhav...@gmail.com> wrote:
>>>
>>>
>>>
>>> Hi Philip,
>>>
>>> Thanks for the update.  With Simple Consumer I will not get failover and
>>> rebalance that is provided out of box.  what is other option not to block
>>> reading and keep processing and commit only when batch is done.
>>>
>>> Thanks,
>>>
>>> Bhavesh
>>>
>>>
>>>
>>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
>>> philip.oto...@yahoo.com.invalid> wrote:
>>>
>>> > Either use the SimpleConsumer which gives you much finer-grained
>>> > control,
>>> > or (this worked with 0.7) spin up a ConsumerConnection (this is a
>>> HighLevel
>>> > consumer concept) per partition, turn off auto-commit.
>>> >
>>> > Philip
>>> >
>>> >
>>> > -
>>> > http://www.philipotoole.com
>>> >
>>> >
>>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
>>> > mistry.p.bhav...@gmail.com> wrote:
>>> >
>>> >
>>> >
>>> > Hi Kafka Group,
>>> >
>>> > I have to pull the data from the Topic and index into Elastic Search
>>> > with
>>> > Bulk API and wanted to commit only batch that has been committed and
>>> still
>>> > continue to read from topic further on same topic.  I have auto commit
>>> > to
>>> > be off.
>>> >
>>> >
>>> > List  batch .
>>> >
>>> > while (iterator.hasNext()) {
>>> > batch.add(iterator.next().message());
>>> > if(batch size is 50 ){
>>> >  //===  Once the bulk API is successful it will commit the
>>> offset
>>> > to zookeeper...
>>> >  executor.submit(new Thread() process batch and commit batch,
>>> > cconsumerConnector)
>>> >  batch = new batch buffer
>>> >}
>>> > }
>>> >
>>> > This commitOffset API commits all messages that have been read so far.
>>> > What is best way to continue reading and only commit another thread
>>> finish
>>> > batch process is successful.  This will lead to fragmentation of the
>>> > Consumer offset so what is best way to implement continuous reading
>>> stream
>>> > and commit the rage offset.
>>> >
>>> > Is Simple Consumer a better approach for this.
>>> >
>>> >
>>> > Thanks,
>>> >
>>> > Bhavesh
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > Thanks,
>>> > Bhavesh
>>> >
>>>
>
>


Re: High Level Consumer and Commit

2014-09-03 Thread Philip O'Toole
That's what I said in my first reply. :-)
 

-
http://www.philipotoole.com 


On Tuesday, September 2, 2014 10:37 PM, Gwen Shapira  
wrote:
 


I believe a simpler solution would be to create multiple
ConsumerConnector, each with 1 thread (single ConsumerStream) and use
commitOffset API to commit all partitions managed by each
ConsumerConnector after the thread finished processing the messages.

Does that solve the problem, Bhavesh?

Gwen


On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole
 wrote:
> Yeah, from reading that I suspect you need the SimpleConsumer. Try it out and 
> see.
>
> Philip
>
>
> -
> http://www.philipotoole.com
>
>
> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry 
>  wrote:
>
>
>
> Hi Philip,
>
> Yes, We have disabled auto commit but, we need to be able to read from
> particular offset if we manage the offset ourself in some storage(DB).
> High Level consumer does not allow per partition management plug-ability.
>
> I like to have the High Level consumers Failover and auto rebalancing.  We
> just need plug ability of offset management.
>
> Thanks,
>
> Bhavesh
>
>
>
> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
> philip.oto...@yahoo.com.invalid> wrote:
>
>> No, you'll need to write your own failover.
>>
>> I'm not sure I follow your second question, but the high-level Consumer
>> should be able to do what you want if you disable auto-commit. I'm not sure
>> what else you're asking.
>>
>>
>> Philip
>>
>>
>> -
>> http://www.philipotoole.com
>>
>>
>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
>> mistry.p.bhav...@gmail.com> wrote:
>>
>>
>>
>> Hi Philip,
>>
>> Thanks for the update.  With Simple Consumer I will not get failover and
>> rebalance that is provided out of box.  what is other option not to block
>> reading and keep processing and commit only when batch is done.
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>>
>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
>> philip.oto...@yahoo.com.invalid> wrote:
>>
>> > Either use the SimpleConsumer which gives you much finer-grained control,
>> > or (this worked with 0.7) spin up a ConsumerConnection (this is a
>> HighLevel
>> > consumer concept) per partition, turn off auto-commit.
>> >
>> > Philip
>> >
>> >
>> > -
>> > http://www.philipotoole.com
>> >
>> >
>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
>> > mistry.p.bhav...@gmail.com> wrote:
>> >
>> >
>> >
>> > Hi Kafka Group,
>> >
>> > I have to pull the data from the Topic and index into Elastic Search with
>> > Bulk API and wanted to commit only batch that has been committed and
>> still
>> > continue to read from topic further on same topic.  I have auto commit to
>> > be off.
>> >
>> >
>> > List  batch .
>> >
>> > while (iterator.hasNext()) {
>> > batch.add(iterator.next().message());
>> > if(batch size is 50 ){
>> >   //===  Once the bulk API is successful it will commit the
>> offset
>> > to zookeeper...
>> >   executor.submit(new Thread() process batch and commit batch,
>> > cconsumerConnector)
>> >   batch = new batch buffer
>> >}
>> > }
>> >
>> > This commitOffset API commits all messages that have been read so far.
>> > What is best way to continue reading and only commit another thread
>> finish
>> > batch process is successful.  This will lead to fragmentation of the
>> > Consumer offset so what is best way to implement continuous reading
>> stream
>> > and commit the rage offset.
>> >
>> > Is Simple Consumer a better approach for this.
>> >
>> >
>> > Thanks,
>> >
>> > Bhavesh
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Thanks,
>> > Bhavesh
>> >
>>

Re: message size limit

2014-09-03 Thread Alexis Midon
Thanks Jun.

I'll create a jira and try to provide a patch. I think this is pretty
serious.

On Friday, August 29, 2014, Jun Rao  wrote:

> The goal of batching is mostly to reduce the # RPC calls to the broker. If
> compression is enabled, a larger batch typically implies better compression
> ratio.
>
> The reason that we have to fail the whole batch is that the error code in
> the produce response is per partition, instead of per message.
>
> Retrying individual messages on MessageSizeTooLarge seems reasonable.
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> alexis.mi...@airbedandbreakfast.com > wrote:
>
> > Could you explain the goals of batches? I was assuming this was simply a
> > performance optimization, but this behavior makes me think I'm missing
> > something.
> > is a batch more than a list of *independent* messages?
> >
> > Why would you reject the whole batch? One invalid message causes the loss
> > of batch.num.messages-1 messages :(
> > It seems pretty critical to me.
> >
> > If ack=0, the producer will never know about it.
> > If ack !=0, the producer will retry the whole batch. If the issue was
> > related to data corruption (etc), retries might work. But in the case of
> > "big message", the batch will always be rejected and the producer will
> give
> > up.
> >
> > If the messages are indeed considered independent, I think this is a
> pretty
> > serious issue.
> >
> > I see 2 possible fix approaches:
> > - the broker could reject only the invalid messages
> > - the broker could reject the whole batch (like today) but the producer
> (if
> > ack!=0) could retry messages one at a time on exception like
> > "MessageSizeTooLarge".
> >
> > opinions?
> >
> > Alexis
> >
> > ```
> > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> > with correlation ids in [43,62]
> (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > (kafka.producer.async.ProducerSendThread)
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >  at
> >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >  at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > ```
> >
> >
> > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao  > wrote:
> >
> > > That's right. If one message in a batch exceeds the size limit, the
> whole
> > > batch is rejected.
> > >
> > > When determining message.max.bytes, the most important thing to
> consider
> > is
> > > probably memory since currently we need to allocate memory for a full
> > > message in the broker and the producer and the consumer client.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > alexis.mi...@airbedandbreakfast.com > wrote:
> > >
> > > > am I miss reading this loop:
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > >
> > > > it seems like all messages from `validMessages` (which is
> > > > ByteBufferMessageSet) are NOT appended if one of the message size
> > exceeds
> > > > the limit.
> > > >
> > > > I hope I'm missing something.
> > > >
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > thanks for you answer.
> > > > > Unfortunately the size won't help much, I'd like to see the actual
> > > > message
> > > > > data.
> > > > >
> > > > > By the way what are the things to consider when deciding on
> > > > > `message.max.bytes` value?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao  > wrote:
> > > > >
> > > > >> The message size check is currently only done on the broker. If
> you
> > > > enable
> > > > >> trace level logging in RequestChannel, you will see the produce

Re: Replication bandwidth double what is expected

2014-09-03 Thread Theo Hultberg
that makes sense. if the size of each fetch is small then compression won't
do much, and that could very well explain the increase in bandwidth.

we will try to change these settings and see what happens.

thanks a lot for your help.

T#


On Tue, Sep 2, 2014 at 10:44 PM, Guozhang Wang  wrote:

> Hi Theo,
>
> You can try to set replica.fetch.min.bytes to some large number (default to
> 1) and increase replica.fetch.wait.max.ms (default to 500) and see if that
> helps. In general, with 4 fetchers and min.bytes to 1 the replicas would
> effectively exchange many small packets over the wire.
>
> Guozhang
>
>
> On Mon, Sep 1, 2014 at 11:06 PM, Theo Hultberg  wrote:
>
> > Hi Guozhang,
> >
> > We're using the default on all of those, except num.replica.fetchers
> which
> > is set to 4.
> >
> > T#
> >
> >
> > On Mon, Sep 1, 2014 at 9:41 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Theo,
> > >
> > > What are the values for your "replica.fetch.max.bytes",
> > > "replica.fetch.min.bytes", "replica.fetch.wait.max.ms" and
> > > "num.replica.fetchers" configs?
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Sep 1, 2014 at 2:52 AM, Theo Hultberg 
> wrote:
> > >
> > > > Hi,
> > > >
> > > > We're evaluating Kafka, and have a problem with it using more
> bandwidth
> > > > than we can explain. From what we can tell the replication uses at
> > least
> > > > twice the bandwidth it should.
> > > >
> > > > We have four producer nodes and three broker nodes. We have enabled
> 3x
> > > > replication, so each node will get a copy of all data in this setup.
> > The
> > > > producers have Snappy compression enabled and send batches of 200
> > > messages.
> > > > The messages are around 1 KiB each. The cluster runs using mostly
> > default
> > > > configuration, and the Kafka version is 0.8.1.1.
> > > >
> > > > When we run iftop on the broker nodes we see that each Kafka node
> > > receives
> > > > around 6-7 Mbit from each producer node (or around 25-30 Mbit in
> > total),
> > > > but then sends around 50 Mbit to each other Kafka node (or 100 Mbit
> in
> > > > total). This is twice what we expected to see, and it seems to
> saturate
> > > the
> > > > bandwidth on our m1.xlarge machines. In other words, we expected the
> > > > incoming 25 Mbit to be amplified to 50 Mbit, not 100.
> > > >
> > > > One thing that could explain it, and that we don't really know how to
> > > > verify, is that the inter-node communication is not compressed. We
> > aren't
> > > > sure about what compression ratio we get on the incoming data, but
> 50%
> > > > sounds reasonable. Could this explain what we're seeing? Is there a
> > > > configuration property to enable compression on the replication
> traffic
> > > > that we've missed?
> > > >
> > > > yours
> > > > Theo
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Migrating data from old brokers to new borkers question

2014-09-03 Thread Alexis Midon
Hi Marcin,

A few weeks ago, I did an upgrade to 0.8.1.1 and then augmented the cluster
from 3 to 9 brokers. All went smoothly.
In a dev environment, we found out that the biggest pain point is to have
to deal with the json file and the error-prone command line interface.
So to make our life easier, my team mate Nelson [1] came up with kafkat:
https://github.com/airbnb/kafkat

We now install kafkat on every broker. Note that kafkat does NOT connect to
a broker, but to zookeeper. So you can actually use it from any machine.

For reassignment, please see:
`kafkat reassign [topic] [--brokers ] [--replicas ] `
It will transparently generate and kick off a balanced assignment.

feedback and contributions welcome! Enjoy!

Alexis

[1] https://github.com/nelgau



On Tue, Aug 26, 2014 at 10:27 AM, Marcin Michalski 
wrote:

> I am running on 0.8.1.1 and I thought that the partition reassignment tools
> can do this job. Just was not sure if this is the best way to do this.
> I will try this out in stage env first and will perform the same in prod.
>
> Thanks,
> marcin
>
>
> On Mon, Aug 25, 2014 at 7:23 PM, Joe Stein  wrote:
>
> > Marcin, that is a typical task now.  What version of Kafka are you
> running?
> >
> > Take a look at
> > https://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
> > and
> >
> >
> https://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> >
> > Basically you can do a --generate to get existing JSON topology and with
> > that take the results of "Current partition replica assignment" (the
> first
> > JSON that outputs) and make whatever changes (like sed old node for new
> > node and add more replica's which increase the replication factor,
> whatever
> > you want) and then --execute.
> >
> > With lots of data this takes time so you will want to run --verify to see
> > what is in progress... good thing do a node at a time (even topic at a
> > time) however you want to manage and wait for it as such.
> >
> > The "preferred" replica is simply the first one in the list of replicas.
> >  The kafka-preferred-replica-election.sh just makes that replica the
> leader
> > as this is not automatic yet.
> >
> > If you are running a version prior to 0.8.1.1 it might make sense to
> > upgrade the old nodes first then run reassign to the new servers.
> >
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> >
> > On Mon, Aug 25, 2014 at 8:59 PM, Marcin Michalski  >
> > wrote:
> >
> > > Hi, I would like to migrate my Kafka setup from old servers to new
> > servers.
> > > Let say I have 8 really old servers that have the kafka
> topics/partitions
> > > replicated 4 ways and want to migrate the data to 4 brand new servers
> and
> > > want the replication factor be 3. I wonder if anyone has ever performed
> > > this type of migration?
> > >
> > > Will auto rebalancing take care of this automatically if I do the
> > > following?
> > >
> > > Let say I bring down old broker id 1 down and startup new server broker
> > id
> > > 100 up, is there a way to migrate all of the data of the topic that had
> > the
> > > topic (where borker id 1 was the leader) over to the new broker 100?
> > >
> > > Or do I need to use *bin/kafka-preferred-replica-election.sh *to
> reassign
> > > the topics/partitions from old broker 1 to broker 100? And then just
> keep
> > > doing the same thing until all of the old brokers are decommissioned?
> > >
> > > Also, would kafka-preferred-replica-election.sh let me actually lower
> the
> > > number of replicas as well, if I just simply make sure that given
> > > topic/partition was only elected 3 times versus 4?
> > >
> > > Thanks for your insight,
> > > Marcin
> > >
> >
>