Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]

2024-05-11 Thread via GitHub


gaurav-narula closed pull request #15925: KAFKA-9401 Reduce contention for 
Fetch requests
URL: https://github.com/apache/kafka/pull/15925


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]

2024-05-11 Thread via GitHub


gaurav-narula commented on PR #15925:
URL: https://github.com/apache/kafka/pull/15925#issuecomment-2105946195

   Thanks for the feedback! My bad, I wasn't aware about the specifics of the 
backporting policy. Closing this and updating the JIRA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]

2024-05-11 Thread via GitHub


ijuma commented on PR #15925:
URL: https://github.com/apache/kafka/pull/15925#issuecomment-2105944275

   We don't usually cherry pick changes like this to older branches. It's not a 
regression and it hasn't been released yet (hence the risk is higher).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]

2024-05-11 Thread via GitHub


gaurav-narula commented on PR #15925:
URL: https://github.com/apache/kafka/pull/15925#issuecomment-2105943872

   CC: @chia7712 @soarez 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-11 Thread via GitHub


chia7712 commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2105931546

   @gaurav-narula Please file PR for branch 3.7 if you feel this one needs to 
be backport :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-11 Thread via GitHub


chia7712 merged PR #15836:
URL: https://github.com/apache/kafka/pull/15836


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-11 Thread via GitHub


gaurav-narula commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2105736540

   Thanks for the review! I'm fairly convinced these failures are unrelated. 
The report on Github enterprise suggests the failed tests are flakey. Please 
refer the following links for the flakiness report.
   
   ```
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.trogdor.coordinator.CoordinatorTest=testTaskRequestWithOldStartMsGetsUpdated()
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.ConsumerBounceTest=testSeekAndCommitWithBrokerFailures()
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest=testProduceConsumeViaAssign(String)[1]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String,
 String)[1]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String,
 String)[2]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String,
 String)[3]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String,
 String)[4]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.server.DelegationTokenRequestsTest=testDelegationTokenRequests(String)[1]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.GetOffsetShellTest=testTopicPartitionsArgWithInternalIncluded()[1]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.GetOffsetShellTest=testTopicPartitionsArg()[1]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.TopicCommandIntegrationTest=testDescribeWithDescribeTopicPartitionsApi(String)[2]
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest=testSyncTopicConfigs()
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.PlaintextConsumerAssignorsTest=testRemoteAssignorRange(String%2C%20String)%5B1%5D
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.TopicCommandIntegrationTest=testDescribeWithDescribeTopicPartitionsApi(String)[2]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-08 Thread via GitHub


chia7712 commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2101862846

   @gaurav-narula Could you take a look at those failed tests? I feel they are 
unrelated to this PR, so +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-07 Thread via GitHub


soarez commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2098918354

   `:core:test` timed out only on JDK 8 and Scala 2.12. Restarted the build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-06 Thread via GitHub


chia7712 commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2097148532

   @gaurav-narula Could you please rebase code to trigger QA again? It seems we 
have thread leaks in some tests :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-06 Thread via GitHub


gaurav-narula commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2096392283

   Resolved conflict with `trunk`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-05 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1590274476


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int,
 }
   }
 }
+object FetchSessionCache {
+  private[server] val metricsGroup = new 
KafkaMetricsGroup(classOf[FetchSessionCache])
+  private val counter = new AtomicLong(0)
+}
+
+class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) {
+  // Set up metrics.
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS,
 () => cacheShards.map(_.size).sum)
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED,
 () => cacheShards.map(_.totalPartitions).sum)
+
+  def getCacheShard(sessionId: Int): FetchSessionCacheShard = {
+val shard = sessionId / cacheShards.head.sessionIdRange
+cacheShards(shard)
+  }
+
+  // Returns the shard in round-robin
+  def getNextCacheShard: FetchSessionCacheShard = {
+val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt

Review Comment:
   I used `AtomicLong` to practically rule out an overflow but found 
`Utils.toPositive` which is used by `RoundRobinPartitioner` :) Updated to use 
an `AtomicInteger` and also added some test to ensure round-robin allocations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1590078150


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -690,6 +702,10 @@ class FetchSessionCache(private val maxEntries: Int,
 * 2. B is considered "stale" because it has been inactive for a long time, 
or
 * 3. A contains more partitions than B, and B is not recently created.
 *
+* Prior to KAFKA-9401, the session cache was not sharded and we looked at 
all

Review Comment:
   This docs is great. Could you please update this also?
   
   
https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/KafkaConfig.scala#L182



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int,
 }
   }
 }
+object FetchSessionCache {
+  private[server] val metricsGroup = new 
KafkaMetricsGroup(classOf[FetchSessionCache])
+  private val counter = new AtomicLong(0)
+}
+
+class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) {
+  // Set up metrics.
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS,
 () => cacheShards.map(_.size).sum)
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED,
 () => cacheShards.map(_.totalPartitions).sum)
+
+  def getCacheShard(sessionId: Int): FetchSessionCacheShard = {
+val shard = sessionId / cacheShards.head.sessionIdRange

Review Comment:
   It assumes the `cacheShards` is sorted by the `shardNum`, right? If so, 
could you please add comments for it?



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int,
 }
   }
 }
+object FetchSessionCache {
+  private[server] val metricsGroup = new 
KafkaMetricsGroup(classOf[FetchSessionCache])
+  private val counter = new AtomicLong(0)
+}
+
+class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) {
+  // Set up metrics.
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS,
 () => cacheShards.map(_.size).sum)
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED,
 () => cacheShards.map(_.totalPartitions).sum)
+
+  def getCacheShard(sessionId: Int): FetchSessionCacheShard = {
+val shard = sessionId / cacheShards.head.sessionIdRange
+cacheShards(shard)
+  }
+
+  // Returns the shard in round-robin
+  def getNextCacheShard: FetchSessionCacheShard = {
+val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt

Review Comment:
   As `int` is enough to this case, maybe we can use `AtomicInteger`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-04 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589956639


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 
1) * sessionIdRange
+  val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
+  val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
+.map(shardNum => new FetchSessionCacheShard(
+  config.maxIncrementalFetchSessionCacheSlots / 
NumFetchSessionCacheShards,

Review Comment:
   That's a great point and it's quite subtle. I reckon this may happen because 
the cacheShards are picked randomly and it can be avoided by picking shards in 
round-robin. I'll make this change along with addressing the other comments  
   
   Some subtle differences cannot be avoided, particularly around eviction. The 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability#KIP227:IntroduceIncrementalFetchRequeststoIncreasePartitionScalability-FetchSessionCaching)
 considers all existing sessions when considering a session for eviction while 
this change would consider only existing sessions **within** a shard for 
eviction. I'll update the documentation to call out the difference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589821796


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -430,7 +438,10 @@ class FullFetchContext(private val time: Time,
   }
   cachedPartitions
 }
-val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
+// We select a shard randomly out of the available options
+val shard = ThreadLocalRandom.current().nextInt(cache.size())

Review Comment:
   It seems both `size` and `apply` are used to randomly pick up a 
`FetchSessionCacheShard`. Maybe we can add a method `getRandomCacheShard` to 
`FetchSessionCache`.



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -583,10 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
-class FetchSessionCache(private val maxEntries: Int,
-private val evictionMs: Long) extends Logging {
-  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
For a given instance, Math.max(1, shardNum * sessionIdRange) <= sessionId < 
(shardNum + 1) * sessionIdRange always holds.
+  * @param shardNum Identifier for this shard.
+ */
+class FetchSessionCacheShard(private val maxEntries: Int,
+ private val evictionMs: Long,
+ val sessionIdRange: Int = Int.MaxValue,
+ private val shardNum: Int = 0) extends Logging {

Review Comment:
   Should `FetchSessionCacheShard` define the `logIdent` to helper users 
distinguish the source of log message? For example, `LogClean` uses thread id 
to be the `logIdent`
   
https://github.com/apache/kafka/blob/1fd39150aa39f0b3d7a38d6e2ce4e295cf8a3842/core/src/main/scala/kafka/log/LogCleaner.scala#L559



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -430,7 +438,10 @@ class FullFetchContext(private val time: Time,
   }
   cachedPartitions
 }
-val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
+// We select a shard randomly out of the available options
+val shard = ThreadLocalRandom.current().nextInt(cache.size())
+val cacheShard = cache(shard);

Review Comment:
   please remove `;` 



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 
1) * sessionIdRange
+  val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
+  val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
+.map(shardNum => new FetchSessionCacheShard(
+  config.maxIncrementalFetchSessionCacheSlots / 
NumFetchSessionCacheShards,

Review Comment:
   The behavior gets changed a bit here. 
`max.incremental.fetch.session.cache.slots` is used to control the "total" 
number of cached session. With this change, however, the session room could be 
full even though the "total" size does not reach the threshold defined by 
`max.incremental.fetch.session.cache.slots`.
   
   This is not big issue, and we can update the docs of 
`max.incremental.fetch.session.cache.slots` for that behavior change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589418655


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).
+  * @param shardNum Identifier for this shard.
+ */
 class FetchSessionCache(private val maxEntries: Int,

Review Comment:
   Thanks for the suggestion! I've renamed the existing type to 
`FetchSessionCacheShard` and `FetchSessionCache` is now essentially a wrapper 
around `Seq[FetchSessionCacheShard]`. This conveys the intention clearly indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


AndrewJSchofield commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589348685


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -395,19 +396,27 @@ object FullFetchContext {
   * The fetch context for a full fetch request.
   *
   * @param time   The clock to use.
-  * @param cache  The fetch session cache.
+  * @param caches The fetch session cache shards.
   * @param reqMetadataThe request metadata.
   * @param fetchData  The partition data from the fetch request.
   * @param usesTopicIds   True if this session should use topic IDs.
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
-   private val cache: FetchSessionCache,
+   private val caches: Seq[FetchSessionCache],

Review Comment:
   `cacheShards`?



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).

Review Comment:
   I know the `[ , )` notation means what you intend for inclusive/exclusive 
bounds, but the presence of the parentheses makes it a bit hard to read I 
think. Maybe using >= and < would be clearer.



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).
+  * @param shardNum Identifier for this shard.
+ */
 class FetchSessionCache(private val maxEntries: Int,

Review Comment:
   Maybe the class name ought to be `FetchSessionCache` shard too. The cache 
really is the whole set of shards.



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   I would tend not to add a configuration for this. The value you're talking 
about sounds like it's doing the job on a busy workload, and it's small enough 
that there's negligible benefit of configuring it smaller for a tiny cluster. 
Having a configuration kind of crystallizes this aspect of the internal design 
of Kafka, and you might have an even better idea in the future that would make 
this configuration pointless.



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -789,7 +807,15 @@ class FetchSessionCache(private val maxEntries: Int,
 }
 
 class FetchManager(private val time: Time,
-   private val cache: FetchSessionCache) extends Logging {
+   private val caches: Seq[FetchSessionCache]) extends Logging 
{
+
+  def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache))
+
+  def getShardedCache(sessionId: Int): FetchSessionCache = {

Review Comment:
   `getCacheShard`?



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int,
   // A map containing sessions which can be evicted by privileged sessions.
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
+  private val metricTag = Map("shard" -> s"$shardNum").asJava
+
   // Set up metrics.
-  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
-  

Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589339685


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -541,9 +541,17 @@ class KafkaServer(
 }.toMap
 }
 
-val fetchManager = new FetchManager(Time.SYSTEM,
-  new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+// The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+// for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+val sessionIdRange = Int.MaxValue / config.numIoThreads
+val fetchSessionCaches = Range(0, config.numIoThreads)

Review Comment:
   I've used `until` because I needed an exclusive Range



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-03 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589305094


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   > Out of curiosity are you planning later to make this constant a 
configuration?
   
   I think it's unlikely one would need to tweak this often. My experiment ran 
on a fairly busy cluster with ~50k Fetch rps 
(`kafka.network-RequestMetrics-RequestsPerSec(request=Fetch)`) on the broker 
which is reasonably high.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


OmniaGM commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586403158


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   + 1 on keeping number of threads out this. 
   
   > How about we introduce a constant for now and set it to 8?
   
   Out of curiosity are you planning later to make this constant a 
configuration? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


soarez commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586395026


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   Since the number of shards need not depend on the thread pool size, I agree 
it may be less confusing to keep `config.numIoThreads` out of this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586365674


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   Using `config.numIoThreads` is a heuristic and I reckon it's okay if shard 
count isn't modified dynamically along with numIoThreads.
   
   In fact, I realised my "After" benchmark was with 8 shards and not 64 as I 
had a misconfiguration while running it which goes to show even some sharding 
goes a long way. How about we introduce a constant for now and set it to 8?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586362012


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int,
   // A map containing sessions which can be evicted by privileged sessions.
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
+  private val metricTag = Map("shard" -> s"$shardNum").asJava
+
   // Set up metrics.
-  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
-  
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
+  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, 
metricTag)
+  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size, metricTag)

Review Comment:
   That's a good point. The reason I added the labels was because of the 
`removeMetric` calls. On further investigation, it seems that 
`metricsGroup.removeMetric` isn't really needed and we can have a combined 
metric for all the shards. The lambda that calculates the value for the gauges 
needs to synchronize on the cache but it's a small overhead overall as this is 
only when metrics are gathered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


OmniaGM commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586165970


##
core/src/test/scala/unit/kafka/server/FetchSessionTest.scala:
##
@@ -1932,6 +1932,26 @@ class FetchSessionTest {
 }
 assertEquals(partitions, partitionsInContext.toSeq)
   }
+
+  @Test
+  def testFetchManager_getShardedCache_retrievesCacheFromCorrectSegment(): 
Unit = {
+// Given
+val time = new MockTime()
+val sessionIdRange = Int.MaxValue / 8
+val caches = Range(0, 8).map(shardNum => new FetchSessionCache(10, 1000, 
sessionIdRange, shardNum))

Review Comment:
   same as above 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


OmniaGM commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586165271


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -541,9 +541,17 @@ class KafkaServer(
 }.toMap
 }
 
-val fetchManager = new FetchManager(Time.SYSTEM,
-  new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+// The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+// for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+val sessionIdRange = Int.MaxValue / config.numIoThreads
+val fetchSessionCaches = Range(0, config.numIoThreads)

Review Comment:
   same comment about `Range` vs `to`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


OmniaGM commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586164282


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -789,7 +807,15 @@ class FetchSessionCache(private val maxEntries: Int,
 }
 
 class FetchManager(private val time: Time,
-   private val cache: FetchSessionCache) extends Logging {
+   private val caches: Seq[FetchSessionCache]) extends Logging 
{
+
+  def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache))
+
+  def getShardedCache(sessionId: Int): FetchSessionCache = {
+val shard = sessionId / caches.head.sessionIdRange
+caches.apply(shard)

Review Comment:
   same here about `apply`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


OmniaGM commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586116968


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -430,6 +439,9 @@ class FullFetchContext(private val time: Time,
   }
   cachedPartitions
 }
+// We select a shard randomly out of the available options
+val shard = ThreadLocalRandom.current().nextInt(caches.size)
+val cache = caches.apply(shard);

Review Comment:
   you don't need `apply`, `caches(shard)` is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


OmniaGM commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586109183


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads
+  val fetchSessionCaches = Range(0, config.numIoThreads)

Review Comment:
   `0 to config.numIoThreads` is usually more preferred to scala than `Range`  



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads
+  val fetchSessionCaches = Range(0, config.numIoThreads)

Review Comment:
   And same as @chia7712 comment before `config.numIoThreads` is listed as 
broker dynamic config so what would happened when this change. 



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -430,6 +439,9 @@ class FullFetchContext(private val time: Time,
   }
   cachedPartitions
 }
+// We select a shard randomly out of the available options
+val shard = ThreadLocalRandom.current().nextInt(caches.size)
+val cache = caches.apply(shard);

Review Comment:
   you don't need apply `caches(shard)` is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-01 Thread via GitHub


OmniaGM commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1586084377


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int,
   // A map containing sessions which can be evicted by privileged sessions.
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
+  private val metricTag = Map("shard" -> s"$shardNum").asJava
+
   // Set up metrics.
-  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
-  
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
+  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, 
metricTag)
+  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size, metricTag)

Review Comment:
   Am maybe wrong here but don't change tags of an existing metrics need a KIP!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-04-30 Thread via GitHub


chia7712 commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1585489570


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int,
   // A map containing sessions which can be evicted by privileged sessions.
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
+  private val metricTag = Map("shard" -> s"$shardNum").asJava
+
   // Set up metrics.
-  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
-  
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
+  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, 
metricTag)
+  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size, metricTag)

Review Comment:
   not sure whether this is allowed. It seems to break the compatibility of 
metrics as it adds new tags. It means kafka users who monitoring this metrics 
need to update the query.



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+  val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   pardon me. what happens when users update `numIoThreads` dynamically?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-04-30 Thread via GitHub


gaurav-narula commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2086478984

   The following images show lock profiles collected using async-profiler 
before and after this change with numCacheShards = numIoThreads = 64 and 
demonstrates significant reduction in contention
   
   **Before**
   https://github.com/apache/kafka/assets/97168911/e2e1edad-7fe2-4260-908d-bc8d4395afca;>
   
   **After**
   https://github.com/apache/kafka/assets/97168911/8d926c4e-03e6-47e6-9367-cdd2ac89e3da;>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org