[ https://issues.apache.org/jira/browse/KAFKA-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gaurav Narula reassigned KAFKA-9401: ------------------------------------ Assignee: Gaurav Narula > High lock contention for kafka.server.FetchManager.newContext > ------------------------------------------------------------- > > Key: KAFKA-9401 > URL: https://issues.apache.org/jira/browse/KAFKA-9401 > Project: Kafka > Issue Type: Improvement > Components: core > Reporter: Lucas Bradstreet > Assignee: Gaurav Narula > Priority: Major > > kafka.server.FetchManager.newContext takes out what is essentially a global > fetch lock on kafka.server.FetchSessionCache, for updates to not only the > FetchSessionCache but the also update the fetch sessions stored with in it. > This causes a high amount of lock contention for fetches, as every fetch > request must go through this lock. > I have taken an async-profiler lock profile on a high throughput cluster, and > I see around 25s of waiting on this lock for a sixty second profile. > {noformat} > *— 25818577497 ns (20.84%), 5805 samples > [ 0] kafka.server.FetchSessionCache > [ 1] kafka.server.FetchManager.newContext > [ 2] kafka.server.KafkaApis.handleFetchRequest > [ 3] kafka.server.KafkaApis.handle > [ 4] kafka.server.KafkaRequestHandler.run > [ 5] java.lang.Thread.run > {noformat} > FetchSession.scala: > {code:java} > cache.synchronized { > cache.get(reqMetadata.sessionId) match { > case None => { > debug(s"Session error for ${reqMetadata.sessionId}: no such session > ID found.") > new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, > reqMetadata) > } > case Some(session) => session.synchronized { > if (session.epoch != reqMetadata.epoch) { > debug(s"Session error for ${reqMetadata.sessionId}: expected epoch > " + > s"${session.epoch}, but got ${reqMetadata.epoch} instead."); > new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, > reqMetadata) > } else { > val (added, updated, removed) = session.update(fetchData, toForget, > reqMetadata) > if (session.isEmpty) { > debug(s"Created a new sessionless FetchContext and closing > session id ${session.id}, " + > s"epoch ${session.epoch}: after removing > ${partitionsToLogString(removed)}, " + > s"there are no more partitions left.") > cache.remove(session) > new SessionlessFetchContext(fetchData) > } else { > cache.touch(session, time.milliseconds()) > session.epoch = JFetchMetadata.nextEpoch(session.epoch) > debug(s"Created a new incremental FetchContext for session id > ${session.id}, " + > s"epoch ${session.epoch}: added > ${partitionsToLogString(added)}, " + > s"updated ${partitionsToLogString(updated)}, " + > s"removed ${partitionsToLogString(removed)}") > new IncrementalFetchContext(time, reqMetadata, session) > } > } > } > } > } > {code} > Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect > FetchSessionCache eviction logic" > ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly > touched now, whereas previously the touch was being skipped. > -- This message was sent by Atlassian Jira (v8.20.10#820010)