gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589956639
########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, - new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange + val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards + val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards) + .map(shardNum => new FetchSessionCacheShard( + config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards, Review Comment: That's a great point and it's quite subtle. I reckon this may happen because the cacheShards are picked randomly and it can be avoided by picking shards in round-robin. I'll make this change along with addressing the other comments 👍 Some subtle differences cannot be avoided, particularly around eviction. The [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability#KIP227:IntroduceIncrementalFetchRequeststoIncreasePartitionScalability-FetchSessionCaching) considers all existing sessions when considering a session for eviction while this change would consider only existing sessions **within** a shard for eviction. I'll update the documentation to call out the difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org