Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]
gaurav-narula closed pull request #15925: KAFKA-9401 Reduce contention for Fetch requests URL: https://github.com/apache/kafka/pull/15925 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]
gaurav-narula commented on PR #15925: URL: https://github.com/apache/kafka/pull/15925#issuecomment-2105946195 Thanks for the feedback! My bad, I wasn't aware about the specifics of the backporting policy. Closing this and updating the JIRA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]
ijuma commented on PR #15925: URL: https://github.com/apache/kafka/pull/15925#issuecomment-2105944275 We don't usually cherry pick changes like this to older branches. It's not a regression and it hasn't been released yet (hence the risk is higher). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401 Reduce contention for Fetch requests [kafka]
gaurav-narula commented on PR #15925: URL: https://github.com/apache/kafka/pull/15925#issuecomment-2105943872 CC: @chia7712 @soarez -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2105931546 @gaurav-narula Please file PR for branch 3.7 if you feel this one needs to be backport :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 merged PR #15836: URL: https://github.com/apache/kafka/pull/15836 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2105736540 Thanks for the review! I'm fairly convinced these failures are unrelated. The report on Github enterprise suggests the failed tests are flakey. Please refer the following links for the flakiness report. ``` https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.trogdor.coordinator.CoordinatorTest=testTaskRequestWithOldStartMsGetsUpdated() https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.ConsumerBounceTest=testSeekAndCommitWithBrokerFailures() https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest=testProduceConsumeViaAssign(String)[1] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String, String)[1] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String, String)[2] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String, String)[3] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.SaslSslConsumerTest=testCoordinatorFailover(String, String)[4] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.server.DelegationTokenRequestsTest=testDelegationTokenRequests(String)[1] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.GetOffsetShellTest=testTopicPartitionsArgWithInternalIncluded()[1] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.GetOffsetShellTest=testTopicPartitionsArg()[1] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.TopicCommandIntegrationTest=testDescribeWithDescribeTopicPartitionsApi(String)[2] https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest=testSyncTopicConfigs() https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=kafka.api.PlaintextConsumerAssignorsTest=testRemoteAssignorRange(String%2C%20String)%5B1%5D https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.tools.TopicCommandIntegrationTest=testDescribeWithDescribeTopicPartitionsApi(String)[2] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2101862846 @gaurav-narula Could you take a look at those failed tests? I feel they are unrelated to this PR, so +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
soarez commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2098918354 `:core:test` timed out only on JDK 8 and Scala 2.12. Restarted the build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2097148532 @gaurav-narula Could you please rebase code to trigger QA again? It seems we have thread leaks in some tests :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2096392283 Resolved conflict with `trunk` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1590274476 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int, } } } +object FetchSessionCache { + private[server] val metricsGroup = new KafkaMetricsGroup(classOf[FetchSessionCache]) + private val counter = new AtomicLong(0) +} + +class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) { + // Set up metrics. + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => cacheShards.map(_.size).sum) + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => cacheShards.map(_.totalPartitions).sum) + + def getCacheShard(sessionId: Int): FetchSessionCacheShard = { +val shard = sessionId / cacheShards.head.sessionIdRange +cacheShards(shard) + } + + // Returns the shard in round-robin + def getNextCacheShard: FetchSessionCacheShard = { +val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt Review Comment: I used `AtomicLong` to practically rule out an overflow but found `Utils.toPositive` which is used by `RoundRobinPartitioner` :) Updated to use an `AtomicInteger` and also added some test to ensure round-robin allocations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1590078150 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -690,6 +702,10 @@ class FetchSessionCache(private val maxEntries: Int, * 2. B is considered "stale" because it has been inactive for a long time, or * 3. A contains more partitions than B, and B is not recently created. * +* Prior to KAFKA-9401, the session cache was not sharded and we looked at all Review Comment: This docs is great. Could you please update this also? https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/KafkaConfig.scala#L182 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int, } } } +object FetchSessionCache { + private[server] val metricsGroup = new KafkaMetricsGroup(classOf[FetchSessionCache]) + private val counter = new AtomicLong(0) +} + +class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) { + // Set up metrics. + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => cacheShards.map(_.size).sum) + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => cacheShards.map(_.totalPartitions).sum) + + def getCacheShard(sessionId: Int): FetchSessionCacheShard = { +val shard = sessionId / cacheShards.head.sessionIdRange Review Comment: It assumes the `cacheShards` is sorted by the `shardNum`, right? If so, could you please add comments for it? ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int, } } } +object FetchSessionCache { + private[server] val metricsGroup = new KafkaMetricsGroup(classOf[FetchSessionCache]) + private val counter = new AtomicLong(0) +} + +class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) { + // Set up metrics. + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => cacheShards.map(_.size).sum) + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => cacheShards.map(_.totalPartitions).sum) + + def getCacheShard(sessionId: Int): FetchSessionCacheShard = { +val shard = sessionId / cacheShards.head.sessionIdRange +cacheShards(shard) + } + + // Returns the shard in round-robin + def getNextCacheShard: FetchSessionCacheShard = { +val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt Review Comment: As `int` is enough to this case, maybe we can use `AtomicInteger`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589956639 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange + val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards + val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards) +.map(shardNum => new FetchSessionCacheShard( + config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards, Review Comment: That's a great point and it's quite subtle. I reckon this may happen because the cacheShards are picked randomly and it can be avoided by picking shards in round-robin. I'll make this change along with addressing the other comments Some subtle differences cannot be avoided, particularly around eviction. The [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability#KIP227:IntroduceIncrementalFetchRequeststoIncreasePartitionScalability-FetchSessionCaching) considers all existing sessions when considering a session for eviction while this change would consider only existing sessions **within** a shard for eviction. I'll update the documentation to call out the difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589821796 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -430,7 +438,10 @@ class FullFetchContext(private val time: Time, } cachedPartitions } -val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, +// We select a shard randomly out of the available options +val shard = ThreadLocalRandom.current().nextInt(cache.size()) Review Comment: It seems both `size` and `apply` are used to randomly pick up a `FetchSessionCacheShard`. Maybe we can add a method `getRandomCacheShard` to `FetchSessionCache`. ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -583,10 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara * * @param maxEntries The maximum number of entries that can be in the cache. * @param evictionMs The minimum time that an entry must be unused in order to be evictable. - */ -class FetchSessionCache(private val maxEntries: Int, -private val evictionMs: Long) extends Logging { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) + * @param sessionIdRange The number of sessionIds each cache shard handles. For a given instance, Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange always holds. + * @param shardNum Identifier for this shard. + */ +class FetchSessionCacheShard(private val maxEntries: Int, + private val evictionMs: Long, + val sessionIdRange: Int = Int.MaxValue, + private val shardNum: Int = 0) extends Logging { Review Comment: Should `FetchSessionCacheShard` define the `logIdent` to helper users distinguish the source of log message? For example, `LogClean` uses thread id to be the `logIdent` https://github.com/apache/kafka/blob/1fd39150aa39f0b3d7a38d6e2ce4e295cf8a3842/core/src/main/scala/kafka/log/LogCleaner.scala#L559 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -430,7 +438,10 @@ class FullFetchContext(private val time: Time, } cachedPartitions } -val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, +// We select a shard randomly out of the available options +val shard = ThreadLocalRandom.current().nextInt(cache.size()) +val cacheShard = cache(shard); Review Comment: please remove `;` ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange + val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards + val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards) +.map(shardNum => new FetchSessionCacheShard( + config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards, Review Comment: The behavior gets changed a bit here. `max.incremental.fetch.session.cache.slots` is used to control the "total" number of cached session. With this change, however, the session room could be full even though the "total" size does not reach the threshold defined by `max.incremental.fetch.session.cache.slots`. This is not big issue, and we can update the docs of `max.incremental.fetch.session.cache.slots` for that behavior change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589418655 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara * * @param maxEntries The maximum number of entries that can be in the cache. * @param evictionMs The minimum time that an entry must be unused in order to be evictable. - */ + * @param sessionIdRange The number of sessionIds each cache shard handles. The range for a given shard is [Math.max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange). + * @param shardNum Identifier for this shard. + */ class FetchSessionCache(private val maxEntries: Int, Review Comment: Thanks for the suggestion! I've renamed the existing type to `FetchSessionCacheShard` and `FetchSessionCache` is now essentially a wrapper around `Seq[FetchSessionCacheShard]`. This conveys the intention clearly indeed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
AndrewJSchofield commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589348685 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -395,19 +396,27 @@ object FullFetchContext { * The fetch context for a full fetch request. * * @param time The clock to use. - * @param cache The fetch session cache. + * @param caches The fetch session cache shards. * @param reqMetadataThe request metadata. * @param fetchData The partition data from the fetch request. * @param usesTopicIds True if this session should use topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, - private val cache: FetchSessionCache, + private val caches: Seq[FetchSessionCache], Review Comment: `cacheShards`? ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara * * @param maxEntries The maximum number of entries that can be in the cache. * @param evictionMs The minimum time that an entry must be unused in order to be evictable. - */ + * @param sessionIdRange The number of sessionIds each cache shard handles. The range for a given shard is [Math.max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange). Review Comment: I know the `[ , )` notation means what you intend for inclusive/exclusive bounds, but the presence of the parentheses makes it a bit hard to read I think. Maybe using >= and < would be clearer. ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara * * @param maxEntries The maximum number of entries that can be in the cache. * @param evictionMs The minimum time that an entry must be unused in order to be evictable. - */ + * @param sessionIdRange The number of sessionIds each cache shard handles. The range for a given shard is [Math.max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange). + * @param shardNum Identifier for this shard. + */ class FetchSessionCache(private val maxEntries: Int, Review Comment: Maybe the class name ought to be `FetchSessionCache` shard too. The cache really is the whole set of shards. ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: I would tend not to add a configuration for this. The value you're talking about sounds like it's doing the job on a busy workload, and it's small enough that there's negligible benefit of configuring it smaller for a tiny cluster. Having a configuration kind of crystallizes this aspect of the internal design of Kafka, and you might have an even better idea in the future that would make this configuration pointless. ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -789,7 +807,15 @@ class FetchSessionCache(private val maxEntries: Int, } class FetchManager(private val time: Time, - private val cache: FetchSessionCache) extends Logging { + private val caches: Seq[FetchSessionCache]) extends Logging { + + def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache)) + + def getShardedCache(sessionId: Int): FetchSessionCache = { Review Comment: `getCacheShard`? ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int, // A map containing sessions which can be evicted by privileged sessions. private val evictableByPrivileged = new util.TreeMap[EvictableKey, FetchSession] + private val metricTag = Map("shard" -> s"$shardNum").asJava + // Set up metrics. - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size) - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions) -
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589339685 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -541,9 +541,17 @@ class KafkaServer( }.toMap } -val fetchManager = new FetchManager(Time.SYSTEM, - new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, -KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) +// The FetchSessionCache is divided into config.numIoThreads shards, each responsible +// for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) +val sessionIdRange = Int.MaxValue / config.numIoThreads +val fetchSessionCaches = Range(0, config.numIoThreads) Review Comment: I've used `until` because I needed an exclusive Range -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589305094 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: > Out of curiosity are you planning later to make this constant a configuration? I think it's unlikely one would need to tweak this often. My experiment ran on a fairly busy cluster with ~50k Fetch rps (`kafka.network-RequestMetrics-RequestsPerSec(request=Fetch)`) on the broker which is reasonably high. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
OmniaGM commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586403158 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: + 1 on keeping number of threads out this. > How about we introduce a constant for now and set it to 8? Out of curiosity are you planning later to make this constant a configuration? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
soarez commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586395026 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: Since the number of shards need not depend on the thread pool size, I agree it may be less confusing to keep `config.numIoThreads` out of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586365674 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: Using `config.numIoThreads` is a heuristic and I reckon it's okay if shard count isn't modified dynamically along with numIoThreads. In fact, I realised my "After" benchmark was with 8 shards and not 64 as I had a misconfiguration while running it which goes to show even some sharding goes a long way. How about we introduce a constant for now and set it to 8? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586362012 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int, // A map containing sessions which can be evicted by privileged sessions. private val evictableByPrivileged = new util.TreeMap[EvictableKey, FetchSession] + private val metricTag = Map("shard" -> s"$shardNum").asJava + // Set up metrics. - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size) - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions) - metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC) + metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, metricTag) + metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size, metricTag) Review Comment: That's a good point. The reason I added the labels was because of the `removeMetric` calls. On further investigation, it seems that `metricsGroup.removeMetric` isn't really needed and we can have a combined metric for all the shards. The lambda that calculates the value for the gauges needs to synchronize on the cache but it's a small overhead overall as this is only when metrics are gathered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
OmniaGM commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586165970 ## core/src/test/scala/unit/kafka/server/FetchSessionTest.scala: ## @@ -1932,6 +1932,26 @@ class FetchSessionTest { } assertEquals(partitions, partitionsInContext.toSeq) } + + @Test + def testFetchManager_getShardedCache_retrievesCacheFromCorrectSegment(): Unit = { +// Given +val time = new MockTime() +val sessionIdRange = Int.MaxValue / 8 +val caches = Range(0, 8).map(shardNum => new FetchSessionCache(10, 1000, sessionIdRange, shardNum)) Review Comment: same as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
OmniaGM commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586165271 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -541,9 +541,17 @@ class KafkaServer( }.toMap } -val fetchManager = new FetchManager(Time.SYSTEM, - new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, -KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) +// The FetchSessionCache is divided into config.numIoThreads shards, each responsible +// for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) +val sessionIdRange = Int.MaxValue / config.numIoThreads +val fetchSessionCaches = Range(0, config.numIoThreads) Review Comment: same comment about `Range` vs `to` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
OmniaGM commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586164282 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -789,7 +807,15 @@ class FetchSessionCache(private val maxEntries: Int, } class FetchManager(private val time: Time, - private val cache: FetchSessionCache) extends Logging { + private val caches: Seq[FetchSessionCache]) extends Logging { + + def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache)) + + def getShardedCache(sessionId: Int): FetchSessionCache = { +val shard = sessionId / caches.head.sessionIdRange +caches.apply(shard) Review Comment: same here about `apply` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
OmniaGM commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586116968 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -430,6 +439,9 @@ class FullFetchContext(private val time: Time, } cachedPartitions } +// We select a shard randomly out of the available options +val shard = ThreadLocalRandom.current().nextInt(caches.size) +val cache = caches.apply(shard); Review Comment: you don't need `apply`, `caches(shard)` is enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
OmniaGM commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586109183 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads + val fetchSessionCaches = Range(0, config.numIoThreads) Review Comment: `0 to config.numIoThreads` is usually more preferred to scala than `Range` ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads + val fetchSessionCaches = Range(0, config.numIoThreads) Review Comment: And same as @chia7712 comment before `config.numIoThreads` is listed as broker dynamic config so what would happened when this change. ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -430,6 +439,9 @@ class FullFetchContext(private val time: Time, } cachedPartitions } +// We select a shard randomly out of the available options +val shard = ThreadLocalRandom.current().nextInt(caches.size) +val cache = caches.apply(shard); Review Comment: you don't need apply `caches(shard)` is enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
OmniaGM commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1586084377 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int, // A map containing sessions which can be evicted by privileged sessions. private val evictableByPrivileged = new util.TreeMap[EvictableKey, FetchSession] + private val metricTag = Map("shard" -> s"$shardNum").asJava + // Set up metrics. - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size) - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions) - metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC) + metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, metricTag) + metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size, metricTag) Review Comment: Am maybe wrong here but don't change tags of an existing metrics need a KIP! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1585489570 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int, // A map containing sessions which can be evicted by privileged sessions. private val evictableByPrivileged = new util.TreeMap[EvictableKey, FetchSession] + private val metricTag = Map("shard" -> s"$shardNum").asJava + // Set up metrics. - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size) - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions) - metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC) + metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, metricTag) + metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size, metricTag) Review Comment: not sure whether this is allowed. It seems to break the compatibility of metrics as it adds new tags. It means kafka users who monitoring this metrics need to update the query. ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: pardon me. what happens when users update `numIoThreads` dynamically? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2086478984 The following images show lock profiles collected using async-profiler before and after this change with numCacheShards = numIoThreads = 64 and demonstrates significant reduction in contention **Before** https://github.com/apache/kafka/assets/97168911/e2e1edad-7fe2-4260-908d-bc8d4395afca;> **After** https://github.com/apache/kafka/assets/97168911/8d926c4e-03e6-47e6-9367-cdd2ac89e3da;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org