chia7712 commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589821796


##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -430,7 +438,10 @@ class FullFetchContext(private val time: Time,
       }
       cachedPartitions
     }
-    val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
+    // We select a shard randomly out of the available options
+    val shard = ThreadLocalRandom.current().nextInt(cache.size())

Review Comment:
   It seems both `size` and `apply` are used to randomly pick up a 
`FetchSessionCacheShard`. Maybe we can add a method `getRandomCacheShard` to 
`FetchSessionCache`.



##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -583,10 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
-class FetchSessionCache(private val maxEntries: Int,
-                        private val evictionMs: Long) extends Logging {
-  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
For a given instance, Math.max(1, shardNum * sessionIdRange) <= sessionId < 
(shardNum + 1) * sessionIdRange always holds.
+  * @param shardNum Identifier for this shard.
+ */
+class FetchSessionCacheShard(private val maxEntries: Int,
+                             private val evictionMs: Long,
+                             val sessionIdRange: Int = Int.MaxValue,
+                             private val shardNum: Int = 0) extends Logging {

Review Comment:
   Should `FetchSessionCacheShard` define the `logIdent` to helper users 
distinguish the source of log message? For example, `LogClean` uses thread id 
to be the `logIdent`
   
https://github.com/apache/kafka/blob/1fd39150aa39f0b3d7a38d6e2ce4e295cf8a3842/core/src/main/scala/kafka/log/LogCleaner.scala#L559



##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -430,7 +438,10 @@ class FullFetchContext(private val time: Time,
       }
       cachedPartitions
     }
-    val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
+    // We select a shard randomly out of the available options
+    val shard = ThreadLocalRandom.current().nextInt(cache.size())
+    val cacheShard = cache(shard);

Review Comment:
   please remove `;` 



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -389,9 +389,17 @@ class BrokerServer(
       authorizer = config.createNewAuthorizer()
       authorizer.foreach(_.configure(config.originals))
 
-      val fetchManager = new FetchManager(Time.SYSTEM,
-        new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-          KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+      // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+      // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 
1) * sessionIdRange
+      val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
+      val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
+        .map(shardNum => new FetchSessionCacheShard(
+          config.maxIncrementalFetchSessionCacheSlots / 
NumFetchSessionCacheShards,

Review Comment:
   The behavior gets changed a bit here. 
`max.incremental.fetch.session.cache.slots` is used to control the "total" 
number of cached session. With this change, however, the session room could be 
full even though the "total" size does not reach the threshold defined by 
`max.incremental.fetch.session.cache.slots`.
   
   This is not big issue, and we can update the docs of 
`max.incremental.fetch.session.cache.slots` for that behavior change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to