We can still do with single ConsumerConnector with multiple threads.

Each thread updates its own data in zookeeper.The below one is our own 
implementation of commitOffset.

public void commitOffset(DESMetadata metaData) {
                log.debug("Update offsets only for ->"+ metaData.toString());
                String key = 
metaData.getTopic()+"/"+metaData.getPartitionNumber();
                Long nextOffset = metaData.getOffSet()+1;
                if(nextOffset!=checkPointedOffset.get(key)){
                        ZKGroupTopicDirs topicDirs = new 
ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
                        ZkUtils.updatePersistentPath(zkClient, 
topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
                        checkPointedOffset.put(key,nextOffset);
                }
}

-----Original Message-----
From: Gwen Shapira [mailto:gshap...@cloudera.com] 
Sent: Tuesday, September 02, 2014 11:38 PM
To: users@kafka.apache.org; Philip O'Toole
Subject: Re: High Level Consumer and Commit

I believe a simpler solution would be to create multiple ConsumerConnector, 
each with 1 thread (single ConsumerStream) and use commitOffset API to commit 
all partitions managed by each ConsumerConnector after the thread finished 
processing the messages.

Does that solve the problem, Bhavesh?

Gwen

On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole 
<philip.oto...@yahoo.com.invalid> wrote:
> Yeah, from reading that I suspect you need the SimpleConsumer. Try it out and 
> see.
>
> Philip
>
>
> -----------------------------------------
> http://www.philipotoole.com
>
>
> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry 
> <mistry.p.bhav...@gmail.com> wrote:
>
>
>
> Hi Philip,
>
> Yes, We have disabled auto commit but, we need to be able to read from 
> particular offset if we manage the offset ourself in some storage(DB).
> High Level consumer does not allow per partition management plug-ability.
>
> I like to have the High Level consumers Failover and auto rebalancing.  
> We just need plug ability of offset management.
>
> Thanks,
>
> Bhavesh
>
>
>
> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole < 
> philip.oto...@yahoo.com.invalid> wrote:
>
>> No, you'll need to write your own failover.
>>
>> I'm not sure I follow your second question, but the high-level 
>> Consumer should be able to do what you want if you disable 
>> auto-commit. I'm not sure what else you're asking.
>>
>>
>> Philip
>>
>>
>> -----------------------------------------
>> http://www.philipotoole.com
>>
>>
>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry < 
>> mistry.p.bhav...@gmail.com> wrote:
>>
>>
>>
>> Hi Philip,
>>
>> Thanks for the update.  With Simple Consumer I will not get failover 
>> and rebalance that is provided out of box.  what is other option not 
>> to block reading and keep processing and commit only when batch is done.
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>>
>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole < 
>> philip.oto...@yahoo.com.invalid> wrote:
>>
>> > Either use the SimpleConsumer which gives you much finer-grained 
>> > control, or (this worked with 0.7) spin up a ConsumerConnection 
>> > (this is a
>> HighLevel
>> > consumer concept) per partition, turn off auto-commit.
>> >
>> > Philip
>> >
>> >
>> > -----------------------------------------
>> > http://www.philipotoole.com
>> >
>> >
>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry < 
>> > mistry.p.bhav...@gmail.com> wrote:
>> >
>> >
>> >
>> > Hi Kafka Group,
>> >
>> > I have to pull the data from the Topic and index into Elastic 
>> > Search with Bulk API and wanted to commit only batch that has been 
>> > committed and
>> still
>> > continue to read from topic further on same topic.  I have auto 
>> > commit to be off.
>> >
>> >
>> > List<Message>  batch .....
>> >
>> > while (iterator.hasNext()) {
>> > batch.add(iterator.next().message());
>> > if(batch size is 50 ){
>> >       //===>>>>  Once the bulk API is successful it will commit the
>> offset
>> > to zookeeper...
>> >       executor.submit(new Thread() process batch and commit batch,
>> > cconsumerConnector)
>> >       batch = new batch buffer....
>> >    }
>> > }
>> >
>> > This commitOffset API commits all messages that have been read so far.
>> > What is best way to continue reading and only commit another thread
>> finish
>> > batch process is successful.  This will lead to fragmentation of 
>> > the Consumer offset so what is best way to implement continuous 
>> > reading
>> stream
>> > and commit the rage offset.
>> >
>> > Is Simple Consumer a better approach for this.
>> >
>> >
>> > Thanks,
>> >
>> > Bhavesh
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Thanks,
>> > Bhavesh
>> >
>>

Reply via email to