Re: Low-latency, high message size variance

2015-12-17 Thread Jason Gustafson
Hi Jens,

Gut feeling is that it's not a trivial patch, but you're more than welcome
to take a shot. We should probably do a KIP also since it's a change to a
public API (even if we only just released it). That's also a good way to
get feedback and make sure we're not missing a better approach. Here's a
link to the KIP guide:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.
Let me know if I can help out.

-Jason

On Wed, Dec 16, 2015 at 2:07 PM, Jens Rantil  wrote:

> Hi again Jason,
>
> Once again thanks for your answer.
>
> Yes, the more I think/read about this it sounds like the "max records"
> approach is more viable. Without knowing the code, I guess it would make
> more sense to create a "max.partition.fetch.messages" property. That way a
> consumer could optimize for quick fetch on startup instead of per-poll()
> call. And if a consumer really would like to change the number of messages
> realtime, they could simply close the consumer and restart it.
>
> I spent 45 minutes trying to set up a development environment to have a
> look at the Kafka code and maybe submit a pull request for this. Do you
> think this would be hard to implement? Would introducing this need a larger
> consensus/discussion in KAFKA-2986?
>
> Last, but not least, I'm happy to hear that this case is something that
> Kafka should handle. I've reviewed many queueing solutions really seem like
> the absolute best solution to our problem as long we can overcome this
> issue.
>
> Thanks,
> Jens
>
> On Tuesday, December 15, 2015, Jason Gustafson  > wrote:
>
> > I was talking with Jay this afternoon about this use case. The tricky
> thing
> > about adding a ping() or heartbeat() API is that you have to deal with
> the
> > potential for rebalancing. This means either allowing it to block while a
> > rebalance completes or having it raise an exception indicating that a
> > rebalance is needed. In code, the latter might look like this:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   try {
> > for (ConsumerRecord record : records) {
> >   process(record);
> >   consumer.heartbeat();
> > }
> >   } catch (RebalanceException e){
> > continue;
> >   }
> > }
> >
> > Unfortunately, this wouldn't work with auto-commit since it would tend to
> > break message processing early which would let the committed position get
> > ahead of the last offset processed. The alternative blocking approach
> > wouldn't be any better in this regard. Overall, it seems like this might
> > introduce a bigger problem than it solves.
> >
> > Perhaps the simpler solution is to provide a way to set the maximum
> number
> > of messages returned. This could either be a new configuration option or
> a
> > second argument in poll, but it would let you handle messages one-by-one
> if
> > you needed to. You'd then be able to set the session timeout according to
> > the expected time to handle a single message. It'd be a bit more work to
> > implement this, but if the use case is common enough, it might be
> > worthwhile.
> >
> > -Jason
> >
> > On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey Jens,
> > >
> > > The purpose of pause() is to stop fetches for a set of partitions. This
> > > lets you continue calling poll() to send heartbeats. Also note that
> > poll()
> > > generally only blocks for rebalances. In code, something like this is
> > what
> > > I was thinking:
> > >
> > > while (running) {
> > >   ConsumerRecords records = consumer.poll(1000);
> > >   if (queue.offer(records))
> > > continue;
> > >
> > >   TopicPartition[] assignment = toArray(consumer.assignment());
> > >   consumer.pause(assignment);
> > >   while (!queue.offer(records, heartbeatIntervalMs,
> > TimeUnit.MILLISECONDS))
> > > consumer.poll(0);
> > >   consumer.resume(assignment);
> > > }
> > >
> > > The tricky thing is handling rebalances since they might occur in
> either
> > > call to poll(). In a rebalance, you have to 1) drain the queue, 2)
> commit
> > > current offsets, and 3) maybe break from the inner poll loop. If the
> > > processing thread is busy when the rebalance is triggered, then you may
> > > have to discard the results when it's finished. It's also a little
> > > difficult communicating completion to the poll loop, which is where the
> > > offset commit needs to take place. I suppose another queue would work,
> > > sigh.
> > >
> > > Well, I think you can make that work, but I tend to agree that it's
> > pretty
> > > complicated. Perhaps instead of a queue, you should just submit the
> > > processor to an executor service for each record set returned and await
> > its
> > > completion directly. For example:
> > >
> > > while (running) {
> > >   ConsumerRecords records = consumer.poll(1000);
> > >   Future future = executor.submit(new 

Low-latency, high message size variance

2015-12-16 Thread Jens Rantil
Hi again Jason,

Once again thanks for your answer.

Yes, the more I think/read about this it sounds like the "max records"
approach is more viable. Without knowing the code, I guess it would make
more sense to create a "max.partition.fetch.messages" property. That way a
consumer could optimize for quick fetch on startup instead of per-poll()
call. And if a consumer really would like to change the number of messages
realtime, they could simply close the consumer and restart it.

I spent 45 minutes trying to set up a development environment to have a
look at the Kafka code and maybe submit a pull request for this. Do you
think this would be hard to implement? Would introducing this need a larger
consensus/discussion in KAFKA-2986?

Last, but not least, I'm happy to hear that this case is something that
Kafka should handle. I've reviewed many queueing solutions really seem like
the absolute best solution to our problem as long we can overcome this
issue.

Thanks,
Jens

On Tuesday, December 15, 2015, Jason Gustafson > wrote:

> I was talking with Jay this afternoon about this use case. The tricky thing
> about adding a ping() or heartbeat() API is that you have to deal with the
> potential for rebalancing. This means either allowing it to block while a
> rebalance completes or having it raise an exception indicating that a
> rebalance is needed. In code, the latter might look like this:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   try {
> for (ConsumerRecord record : records) {
>   process(record);
>   consumer.heartbeat();
> }
>   } catch (RebalanceException e){
> continue;
>   }
> }
>
> Unfortunately, this wouldn't work with auto-commit since it would tend to
> break message processing early which would let the committed position get
> ahead of the last offset processed. The alternative blocking approach
> wouldn't be any better in this regard. Overall, it seems like this might
> introduce a bigger problem than it solves.
>
> Perhaps the simpler solution is to provide a way to set the maximum number
> of messages returned. This could either be a new configuration option or a
> second argument in poll, but it would let you handle messages one-by-one if
> you needed to. You'd then be able to set the session timeout according to
> the expected time to handle a single message. It'd be a bit more work to
> implement this, but if the use case is common enough, it might be
> worthwhile.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson 
> wrote:
>
> > Hey Jens,
> >
> > The purpose of pause() is to stop fetches for a set of partitions. This
> > lets you continue calling poll() to send heartbeats. Also note that
> poll()
> > generally only blocks for rebalances. In code, something like this is
> what
> > I was thinking:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   if (queue.offer(records))
> > continue;
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!queue.offer(records, heartbeatIntervalMs,
> TimeUnit.MILLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> > }
> >
> > The tricky thing is handling rebalances since they might occur in either
> > call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> > current offsets, and 3) maybe break from the inner poll loop. If the
> > processing thread is busy when the rebalance is triggered, then you may
> > have to discard the results when it's finished. It's also a little
> > difficult communicating completion to the poll loop, which is where the
> > offset commit needs to take place. I suppose another queue would work,
> > sigh.
> >
> > Well, I think you can make that work, but I tend to agree that it's
> pretty
> > complicated. Perhaps instead of a queue, you should just submit the
> > processor to an executor service for each record set returned and await
> its
> > completion directly. For example:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> >   consumer.commitSync();
> > }
> >
> > This seems closer to the spirit of the poll loop, and it makes handling
> > commits a lot easier. You still have to deal with the rebalance problem,
> > but at least you don't have to deal with the queue. It's still a little
> > complex though. Maybe the consumer needs a ping() API which does the same
> > thing as poll() but doesn't send or return any fetches. That would
> simplify
> > things a little more:
> >
> > while 

Re: Low-latency, high message size variance

2015-12-16 Thread Jens Rantil
Hi again Jason,

Sorry for a bit of a late response - I'm travelling and check my e-mail
spuriously.

I have a specific question regarding they pause solution quoted below:

On Tuesday, December 15, 2015, Jason Gustafson  wrote:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   if (queue.offer(records))
> continue;
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
> consumer.poll(0);
>   consumer.resume(assignment);
> }
>

As far as I've understood, the `KafkaConsumer` has a background thread that
fetches records, right? If so, isn't there a race condition between the
`consumer.poll(1000);` call and `consumer.pause(assignment);` where the
fetcher might fetch, and commit, messages that I then collect on my first
`consumer.poll(0);` call? Since `consumer.poll(0);` then would return a
non-empty list, I would essentially ignoring messages? Or is the pause()
call both 1) making sure consumer#poll never returns anything _and_ 2)
pauses the background fetcher?

Cheers,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


Re: Low-latency, high message size variance

2015-12-16 Thread Ismael Juma
Jason, maybe useful to KAFKA-2986 with this information if we ever decide
to do this?

Ismael
On 16 Dec 2015 04:42, "Jason Gustafson"  wrote:

> I was talking with Jay this afternoon about this use case. The tricky thing
> about adding a ping() or heartbeat() API is that you have to deal with the
> potential for rebalancing. This means either allowing it to block while a
> rebalance completes or having it raise an exception indicating that a
> rebalance is needed. In code, the latter might look like this:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   try {
> for (ConsumerRecord record : records) {
>   process(record);
>   consumer.heartbeat();
> }
>   } catch (RebalanceException e){
> continue;
>   }
> }
>
> Unfortunately, this wouldn't work with auto-commit since it would tend to
> break message processing early which would let the committed position get
> ahead of the last offset processed. The alternative blocking approach
> wouldn't be any better in this regard. Overall, it seems like this might
> introduce a bigger problem than it solves.
>
> Perhaps the simpler solution is to provide a way to set the maximum number
> of messages returned. This could either be a new configuration option or a
> second argument in poll, but it would let you handle messages one-by-one if
> you needed to. You'd then be able to set the session timeout according to
> the expected time to handle a single message. It'd be a bit more work to
> implement this, but if the use case is common enough, it might be
> worthwhile.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson 
> wrote:
>
> > Hey Jens,
> >
> > The purpose of pause() is to stop fetches for a set of partitions. This
> > lets you continue calling poll() to send heartbeats. Also note that
> poll()
> > generally only blocks for rebalances. In code, something like this is
> what
> > I was thinking:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   if (queue.offer(records))
> > continue;
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!queue.offer(records, heartbeatIntervalMs,
> TimeUnit.MILLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> > }
> >
> > The tricky thing is handling rebalances since they might occur in either
> > call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> > current offsets, and 3) maybe break from the inner poll loop. If the
> > processing thread is busy when the rebalance is triggered, then you may
> > have to discard the results when it's finished. It's also a little
> > difficult communicating completion to the poll loop, which is where the
> > offset commit needs to take place. I suppose another queue would work,
> > sigh.
> >
> > Well, I think you can make that work, but I tend to agree that it's
> pretty
> > complicated. Perhaps instead of a queue, you should just submit the
> > processor to an executor service for each record set returned and await
> its
> > completion directly. For example:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> > consumer.poll(0);
> >   consumer.resume(assignment);
> >   consumer.commitSync();
> > }
> >
> > This seems closer to the spirit of the poll loop, and it makes handling
> > commits a lot easier. You still have to deal with the rebalance problem,
> > but at least you don't have to deal with the queue. It's still a little
> > complex though. Maybe the consumer needs a ping() API which does the same
> > thing as poll() but doesn't send or return any fetches. That would
> simplify
> > things a little more:
> >
> > while (running) {
> >   ConsumerRecords records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> > consumer.ping();
> >   consumer.commitSync();
> > }
> >
> > Anyway, I'll think about it a little more and see if any other approaches
> > come to mind. I do agree that we should have a way to handle this case
> > without too much extra work.
> >
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil 
> wrote:
> >
> >> Hi Jason,
> >>
> >> Thanks for your response. See replies inline:
> >>
> >> On Tuesday, December 15, 2015, Jason Gustafson 
> >> wrote:
> >>
> >> > Hey Jens,
> >> >
> >> > I'm not sure I understand why increasing the session timeout is not an
> >> > option. Is the issue that there's too much uncertainly about
> processing
> >> > time to set an upper bound 

Re: Low-latency, high message size variance

2015-12-15 Thread Jason Gustafson
Hey Jens,

The purpose of pause() is to stop fetches for a set of partitions. This
lets you continue calling poll() to send heartbeats. Also note that poll()
generally only blocks for rebalances. In code, something like this is what
I was thinking:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  if (queue.offer(records))
continue;

  TopicPartition[] assignment = toArray(consumer.assignment());
  consumer.pause(assignment);
  while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
consumer.poll(0);
  consumer.resume(assignment);
}

The tricky thing is handling rebalances since they might occur in either
call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
current offsets, and 3) maybe break from the inner poll loop. If the
processing thread is busy when the rebalance is triggered, then you may
have to discard the results when it's finished. It's also a little
difficult communicating completion to the poll loop, which is where the
offset commit needs to take place. I suppose another queue would work,
sigh.

Well, I think you can make that work, but I tend to agree that it's pretty
complicated. Perhaps instead of a queue, you should just submit the
processor to an executor service for each record set returned and await its
completion directly. For example:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  Future future = executor.submit(new Processor(records));

  TopicPartition[] assignment = toArray(consumer.assignment());
  consumer.pause(assignment);
  while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
consumer.poll(0);
  consumer.resume(assignment);
  consumer.commitSync();
}

This seems closer to the spirit of the poll loop, and it makes handling
commits a lot easier. You still have to deal with the rebalance problem,
but at least you don't have to deal with the queue. It's still a little
complex though. Maybe the consumer needs a ping() API which does the same
thing as poll() but doesn't send or return any fetches. That would simplify
things a little more:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  Future future = executor.submit(new Processor(records));
  while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
consumer.ping();
  consumer.commitSync();
}

Anyway, I'll think about it a little more and see if any other approaches
come to mind. I do agree that we should have a way to handle this case
without too much extra work.


-Jason


On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil  wrote:

> Hi Jason,
>
> Thanks for your response. See replies inline:
>
> On Tuesday, December 15, 2015, Jason Gustafson  wrote:
>
> > Hey Jens,
> >
> > I'm not sure I understand why increasing the session timeout is not an
> > option. Is the issue that there's too much uncertainly about processing
> > time to set an upper bound for each round of the poll loop?
> >
>
> Yes, that's the issue.
>
>
> > One general workaround would be to move the processing into another
> thread.
> > For example, you could add messages to a blocking queue and have the
> > processor poll them. We have a pause() API which can be used to prevent
> > additional fetches when the queue is full. This would allow you to
> continue
> > the poll loop instead of blocking in offer(). The tricky thing would be
> > handling rebalances since you'd need to clear the queue and get the last
> > offset from the processors. Enough people probably will end up doing
> > something like this that we should probably work through an example on
> how
> > to do it properly.
> >
>
> Hm, as far as I've understood the consumer will only send heartbeats to the
> broker when poll() is being called. If I would call pause() on a consumer
> (from a separate thread) I understand poll() will block undefinitely. Will
> the polling consumer still send heartbeats when blocked? Or would a pause
> for too long (while my records are being processed) eventually lead to
> session timeout? If the latter, that would sort of defeat the purpose since
> I am trying to avoid unnecessary rebalancing of consumers when there is
> high pressure on the consumers.
>
> Regarding handling of rebalancing for a queue solution you describe; It
> really sounds very complicated. It's probably doable, but doesn't this sort
> of defeat the purpose of the high level consumer API? I mean, it sounds
> like it should gracefully handle slow consumption of varying size. I might
> be wrong.
>
> Thanks,
> Jens
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook  Linkedin
> <
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >
>  Twitter 
>


Re: Low-latency, high message size variance

2015-12-15 Thread Jason Gustafson
I was talking with Jay this afternoon about this use case. The tricky thing
about adding a ping() or heartbeat() API is that you have to deal with the
potential for rebalancing. This means either allowing it to block while a
rebalance completes or having it raise an exception indicating that a
rebalance is needed. In code, the latter might look like this:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  try {
for (ConsumerRecord record : records) {
  process(record);
  consumer.heartbeat();
}
  } catch (RebalanceException e){
continue;
  }
}

Unfortunately, this wouldn't work with auto-commit since it would tend to
break message processing early which would let the committed position get
ahead of the last offset processed. The alternative blocking approach
wouldn't be any better in this regard. Overall, it seems like this might
introduce a bigger problem than it solves.

Perhaps the simpler solution is to provide a way to set the maximum number
of messages returned. This could either be a new configuration option or a
second argument in poll, but it would let you handle messages one-by-one if
you needed to. You'd then be able to set the session timeout according to
the expected time to handle a single message. It'd be a bit more work to
implement this, but if the use case is common enough, it might be
worthwhile.

-Jason

On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson 
wrote:

> Hey Jens,
>
> The purpose of pause() is to stop fetches for a set of partitions. This
> lets you continue calling poll() to send heartbeats. Also note that poll()
> generally only blocks for rebalances. In code, something like this is what
> I was thinking:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   if (queue.offer(records))
> continue;
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
> consumer.poll(0);
>   consumer.resume(assignment);
> }
>
> The tricky thing is handling rebalances since they might occur in either
> call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> current offsets, and 3) maybe break from the inner poll loop. If the
> processing thread is busy when the rebalance is triggered, then you may
> have to discard the results when it's finished. It's also a little
> difficult communicating completion to the poll loop, which is where the
> offset commit needs to take place. I suppose another queue would work,
> sigh.
>
> Well, I think you can make that work, but I tend to agree that it's pretty
> complicated. Perhaps instead of a queue, you should just submit the
> processor to an executor service for each record set returned and await its
> completion directly. For example:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   Future future = executor.submit(new Processor(records));
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> consumer.poll(0);
>   consumer.resume(assignment);
>   consumer.commitSync();
> }
>
> This seems closer to the spirit of the poll loop, and it makes handling
> commits a lot easier. You still have to deal with the rebalance problem,
> but at least you don't have to deal with the queue. It's still a little
> complex though. Maybe the consumer needs a ping() API which does the same
> thing as poll() but doesn't send or return any fetches. That would simplify
> things a little more:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   Future future = executor.submit(new Processor(records));
>   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> consumer.ping();
>   consumer.commitSync();
> }
>
> Anyway, I'll think about it a little more and see if any other approaches
> come to mind. I do agree that we should have a way to handle this case
> without too much extra work.
>
>
> -Jason
>
>
> On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil  wrote:
>
>> Hi Jason,
>>
>> Thanks for your response. See replies inline:
>>
>> On Tuesday, December 15, 2015, Jason Gustafson 
>> wrote:
>>
>> > Hey Jens,
>> >
>> > I'm not sure I understand why increasing the session timeout is not an
>> > option. Is the issue that there's too much uncertainly about processing
>> > time to set an upper bound for each round of the poll loop?
>> >
>>
>> Yes, that's the issue.
>>
>>
>> > One general workaround would be to move the processing into another
>> thread.
>> > For example, you could add messages to a blocking queue and have the
>> > processor poll them. We have a pause() API which can be used to prevent
>> > additional fetches when the queue is full. This would allow you to
>> continue
>> > the poll loop 

Re: Low-latency, high message size variance

2015-12-15 Thread Jens Rantil
Hi Jason,

Thanks for your response. See replies inline:

On Tuesday, December 15, 2015, Jason Gustafson  wrote:

> Hey Jens,
>
> I'm not sure I understand why increasing the session timeout is not an
> option. Is the issue that there's too much uncertainly about processing
> time to set an upper bound for each round of the poll loop?
>

Yes, that's the issue.


> One general workaround would be to move the processing into another thread.
> For example, you could add messages to a blocking queue and have the
> processor poll them. We have a pause() API which can be used to prevent
> additional fetches when the queue is full. This would allow you to continue
> the poll loop instead of blocking in offer(). The tricky thing would be
> handling rebalances since you'd need to clear the queue and get the last
> offset from the processors. Enough people probably will end up doing
> something like this that we should probably work through an example on how
> to do it properly.
>

Hm, as far as I've understood the consumer will only send heartbeats to the
broker when poll() is being called. If I would call pause() on a consumer
(from a separate thread) I understand poll() will block undefinitely. Will
the polling consumer still send heartbeats when blocked? Or would a pause
for too long (while my records are being processed) eventually lead to
session timeout? If the latter, that would sort of defeat the purpose since
I am trying to avoid unnecessary rebalancing of consumers when there is
high pressure on the consumers.

Regarding handling of rebalancing for a queue solution you describe; It
really sounds very complicated. It's probably doable, but doesn't this sort
of defeat the purpose of the high level consumer API? I mean, it sounds
like it should gracefully handle slow consumption of varying size. I might
be wrong.

Thanks,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


Re: Low-latency, high message size variance

2015-12-13 Thread Jens Rantil
Hi again,


For the record I filed an issue about this here: 
https://issues.apache.org/jira/browse/KAFKA-2986




Cheers,

Jens





–
Skickat från Mailbox

On Fri, Dec 11, 2015 at 7:56 PM, Jens Rantil  wrote:

> Hi,
> We've been experimenting a little with running Kafka internally for better
> handling temporary throughput peaks of asynchronous tasks. However, we've
> had a big issues making Kafka work for us and I am starting to question
> whether its a good fit.
> Our usecase:
>- High latency. At peaks, each consumer requires ~20 seconds to handle a
>single message/task.
>- Extreme variation in message size: Serialized tasks are in the range
>of ~300 bytes up to ~3 MB.
>- Generally, it is processed in 20 seconds independent of message size.
>Most messages are small.
> Our setup:
>- Kafka 0.9.0.
>- Using the new Java consumer API (consumer groups etc.).
>- To occasionally handle large (3 MB) messages we've had to set the
>following configuration parameters:
>   - max.partition.fetch.bytes=10485760=10MB on consumer to handle
>   larger messages.
>   - session.timeout.ms=30s to handle our high latency processing.
>   - replica.fetch.max.bytes=10485760=10MB on broker.
>   - message.max.bytes=10485760=10MB on broker.
> Sample code:
> while (isRunning()) {
>   ConsumerRecords records = consumer.poll(100);
>   for (final ConsumerRecord record : records) {
> // Handle record...
>   }
> }
> AFAIK this seem like a very basic consumer code.
> Initial problem: When doing load testing to simulate peaks our consumer
> started spinning infinitely in similar fashion to [1]. We also noticed that
> we consistently were seeing [2] in our broker log.
> [1] http://bit.ly/1Q7zxgh
> [2] https://gist.github.com/JensRantil/2d1e7db3bd919eb35f9b
> Root cause analysis: AFAIK, health checks are only submitted to Kafka when
> calling `consumer.poll(...)`. To handle larger messages, we needed to
> increase max.partition.fetch.bytes. However, due to our high latency
> consumer a large amounts of small messages could be prefetched which made
> our inner for loop run long enough for the broker to consider our consumer
> dead.
> Two questions:
>- Is there any workaround to avoid the broker thinking our consumer is
>dead? Increasing session timeout to handle the polling interval for small
>messages is not an option since we simply prefetch too many messages for
>that to be an option. Can we set a limit on how many messages Kafka
>prefetches? Or is there a way to send health checks to broker out of bands
>without invoking the `KafkaConsumer#poll` method?
>- Is Kafka a bad tool for our usecase?
> Thanks and have a nice weekend,
> Jens
> -- 
> Jens Rantil
> Backend engineer
> Tink AB
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
> Facebook  Linkedin
> 
>  Twitter 

Low-latency, high message size variance

2015-12-11 Thread Jens Rantil
Hi,

We've been experimenting a little with running Kafka internally for better
handling temporary throughput peaks of asynchronous tasks. However, we've
had a big issues making Kafka work for us and I am starting to question
whether its a good fit.

Our usecase:

   - High latency. At peaks, each consumer requires ~20 seconds to handle a
   single message/task.
   - Extreme variation in message size: Serialized tasks are in the range
   of ~300 bytes up to ~3 MB.
   - Generally, it is processed in 20 seconds independent of message size.
   Most messages are small.

Our setup:

   - Kafka 0.9.0.
   - Using the new Java consumer API (consumer groups etc.).
   - To occasionally handle large (3 MB) messages we've had to set the
   following configuration parameters:
  - max.partition.fetch.bytes=10485760=10MB on consumer to handle
  larger messages.
  - session.timeout.ms=30s to handle our high latency processing.
  - replica.fetch.max.bytes=10485760=10MB on broker.
  - message.max.bytes=10485760=10MB on broker.

Sample code:

while (isRunning()) {
  ConsumerRecords records = consumer.poll(100);
  for (final ConsumerRecord record : records) {
// Handle record...
  }
}

AFAIK this seem like a very basic consumer code.

Initial problem: When doing load testing to simulate peaks our consumer
started spinning infinitely in similar fashion to [1]. We also noticed that
we consistently were seeing [2] in our broker log.

[1] http://bit.ly/1Q7zxgh
[2] https://gist.github.com/JensRantil/2d1e7db3bd919eb35f9b

Root cause analysis: AFAIK, health checks are only submitted to Kafka when
calling `consumer.poll(...)`. To handle larger messages, we needed to
increase max.partition.fetch.bytes. However, due to our high latency
consumer a large amounts of small messages could be prefetched which made
our inner for loop run long enough for the broker to consider our consumer
dead.

Two questions:

   - Is there any workaround to avoid the broker thinking our consumer is
   dead? Increasing session timeout to handle the polling interval for small
   messages is not an option since we simply prefetch too many messages for
   that to be an option. Can we set a limit on how many messages Kafka
   prefetches? Or is there a way to send health checks to broker out of bands
   without invoking the `KafkaConsumer#poll` method?
   - Is Kafka a bad tool for our usecase?

Thanks and have a nice weekend,
Jens

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter