Kamal,

Say if you have n threads in your executor thread poll, then you can let
consumer.poll() to return at most n records by setting "max.poll.records"
in the consumer config. Then you can maintain a circular bit buffer
indicating completed record offset (this is similar to your "ack" approach
I think), and then when calling commitSync you can set the parameter as the
maximum offset where all offsets smaller than it has already been "acked".
This should work well even if you scale your thread poll size to a very
large number.

Guozhang


On Mon, Mar 21, 2016 at 9:55 AM, Kamal C <kamaltar...@gmail.com> wrote:

> Hi All,
>
> I'm using Kafka 0.9.0.1.
>
> I have a requirement in which consumption of records are asynchronous.
>
>
> *for (ConsumerRecord record : records) {*
>
> *    executor.submit(new Runnable() {*
>
> *        public void run() {*
>
>
> *            // process record;        }*
>
> *    });*
>
> *}*
> *consumer.commitSync(); //Shouldn't commit here*
>
> The completion of *for()* loop doesn't mean that records are processed. A
> record is processed only when a job gets executed successfully.
>
> On application failure / restart, I've to submit the unprocessed records. I
> came up with a ack based approach but it's not scalable. How to track the
> offsets of processed records?
>
>
> --Kamal
>



-- 
-- Guozhang

Reply via email to