Hi!

Sorry for not being clear enough.
The problem is on follower side, not on leader side.
It is the follower who is allocating replica.fetch.max.bytes-sized buffers for 
fetch responses somewhere in Java client code.

Andrey.



> On 21 Jul 2016, at 16:28, Tom Crayford <tcrayf...@heroku.com> wrote:
> 
> Hi there,
> 
> From my understanding of the protocol (and from digging in the source code
> a bunch) I can't see anywhere where Kafka overallocates memory based on the
> fetch request's max bytes, but maybe I have missed something. If there is
> such a place, then I'd recommend fixing that issue instead - it seems more
> pressing and will alleviate your issue (unless I'm misunderstanding
> something and we *have* to overallocate somewhere).
> 
> I looked in the fetch request path up and down, and in the leader, tracing
> from KafkaApis -> ReplicaManager -> Log -> LogSegment, then to
> FetchResponse and FetchResponseSend (in case you want some pointers to some
> code).
> 
> I may be missing something here, but there seems to be a deeper issue here,
> 
> Tom Crayford
> Heroku Kafka
> 
> On Thu, Jul 21, 2016 at 10:49 AM, Andrey L. Neporada <
> anepor...@yandex-team.ru> wrote:
> 
>> Hi all!
>> 
>> We noticed that our Kafka cluster uses a lot of memory for replication.
>> Our Kafka usage pattern is following:
>> 
>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>> (rare) messages can be several megabytes.So, we have to set
>> replica.fetch.max.bytes = max.message.bytes = 8MB
>> 2. Each Kafka broker handles several thousands of partitions from multiple
>> topics.
>> 
>> In this scenario total memory required for replication (i.e.
>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>> 
>> So we would like to propose following approach to fix this problem:
>> 
>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>> initial size of replication data chunk. By default this parameter should be
>> equal to replica.fetch.max.bytes so the replication process will work as
>> before.
>> 
>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>> bigger than current replication chunk, we increase it twofold (or up to
>> replica.fetch.max.bytes, whichever is smaller) and retry.
>> 
>> 3. If the chunk is replicated successfully we try to decrease the size of
>> replication chunk back to replica.fetch.base.bytes.
>> 
>> 
>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>> we we able to significatly decrease memory usage without any noticeable
>> impact on replication efficiency.
>> 
>> Here is JIRA ticket (with PR):
>> https://issues.apache.org/jira/browse/KAFKA-3979
>> 
>> Your comments and feedback are highly appreciated!
>> 
>> 
>> Thanks,
>> Andrey.

Reply via email to