Hi! Sorry for not being clear enough. The problem is on follower side, not on leader side. It is the follower who is allocating replica.fetch.max.bytes-sized buffers for fetch responses somewhere in Java client code.
Andrey. > On 21 Jul 2016, at 16:28, Tom Crayford <tcrayf...@heroku.com> wrote: > > Hi there, > > From my understanding of the protocol (and from digging in the source code > a bunch) I can't see anywhere where Kafka overallocates memory based on the > fetch request's max bytes, but maybe I have missed something. If there is > such a place, then I'd recommend fixing that issue instead - it seems more > pressing and will alleviate your issue (unless I'm misunderstanding > something and we *have* to overallocate somewhere). > > I looked in the fetch request path up and down, and in the leader, tracing > from KafkaApis -> ReplicaManager -> Log -> LogSegment, then to > FetchResponse and FetchResponseSend (in case you want some pointers to some > code). > > I may be missing something here, but there seems to be a deeper issue here, > > Tom Crayford > Heroku Kafka > > On Thu, Jul 21, 2016 at 10:49 AM, Andrey L. Neporada < > anepor...@yandex-team.ru> wrote: > >> Hi all! >> >> We noticed that our Kafka cluster uses a lot of memory for replication. >> Our Kafka usage pattern is following: >> >> 1. Most messages are small (tens or hundreds kilobytes at most), but some >> (rare) messages can be several megabytes.So, we have to set >> replica.fetch.max.bytes = max.message.bytes = 8MB >> 2. Each Kafka broker handles several thousands of partitions from multiple >> topics. >> >> In this scenario total memory required for replication (i.e. >> replica.fetch.max.bytes * numOfPartitions) is unreasonably big. >> >> So we would like to propose following approach to fix this problem: >> >> 1. Introduce new config parameter replica.fetch.base.bytes - which is the >> initial size of replication data chunk. By default this parameter should be >> equal to replica.fetch.max.bytes so the replication process will work as >> before. >> >> 2. If the ReplicaFetcherThread fails when trying to replicate message >> bigger than current replication chunk, we increase it twofold (or up to >> replica.fetch.max.bytes, whichever is smaller) and retry. >> >> 3. If the chunk is replicated successfully we try to decrease the size of >> replication chunk back to replica.fetch.base.bytes. >> >> >> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), >> we we able to significatly decrease memory usage without any noticeable >> impact on replication efficiency. >> >> Here is JIRA ticket (with PR): >> https://issues.apache.org/jira/browse/KAFKA-3979 >> >> Your comments and feedback are highly appreciated! >> >> >> Thanks, >> Andrey.