This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8fce97b2c35 KAFKA-19144 Move DelayedProduce to server module (#19793)
8fce97b2c35 is described below

commit 8fce97b2c3584435840ec5d8c2ad6377124d3c6f
Author: S.Y. Wang <[email protected]>
AuthorDate: Tue Mar 10 04:48:39 2026 +0900

    KAFKA-19144 Move DelayedProduce to server module (#19793)
    
    This PR moves `DelayedProduce` to the server module. One notable change
    is that the type of the `responseCallback` parameter in
    `ReplicaManager#appendRecords()` has been changed to a Java `Map`. Other
    related type changes have been made accordingly.
    
    Reviewers: Ken Huang <[email protected]>, PoAn Yang
     <[email protected]>, TengYao Chi <[email protected]>, DL1231
     <[email protected]>, Kuan-Po Tseng
     <[email protected]>, Christo Lolov <[email protected]>, Chia-Ping
     Tsai <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../transaction/TransactionStateManager.scala      |  11 +-
 .../main/scala/kafka/server/DelayedProduce.scala   | 161 ----------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  59 ++++--
 .../kafka/server/LocalLeaderEndPointTest.scala     |   6 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   2 +-
 .../AbstractCoordinatorConcurrencyTest.scala       |  19 +-
 .../transaction/TransactionStateManagerTest.scala  |  10 +-
 .../unit/kafka/server/DelayedProduceTest.scala     |   7 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  16 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |   8 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  16 +-
 .../kafka/server/purgatory/DelayedProduce.java     | 214 +++++++++++++++++++++
 14 files changed, 308 insertions(+), 225 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 82a3f233fe2..e7a5b297674 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -44,7 +44,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.storage.internals.log.{AppendOrigin, 
AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, 
LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, 
VerificationGuard}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.partition.{AlterPartitionListener, 
AssignmentState, CommittedPartitionState, OngoingReassignmentState, 
PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, 
PendingShrinkIsr, SimpleAssignmentState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
 import org.apache.kafka.server.replica.Replica
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
UnexpectedAppendOffsetException}
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e67e81e1c88..fae1c857654 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -47,6 +47,7 @@ import org.apache.kafka.storage.internals.log.AppendOrigin
 import com.google.re2j.{Pattern, PatternSyntaxException}
 import org.apache.kafka.common.errors.InvalidRegularExpression
 
+import java.util
 import java.util.Optional
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
@@ -258,8 +259,8 @@ class TransactionStateManager(brokerId: Int,
     expiredForPartition: Iterable[TransactionalIdCoordinatorEpochAndMetadata],
     tombstoneRecords: MemoryRecords
   ): Unit = {
-    def removeFromCacheCallback(responses: collection.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
-      responses.foreachEntry { (topicPartition, response) =>
+    def removeFromCacheCallback(responses: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+      responses.forEach { (topicPartition, response) =>
         inReadLock[Exception](stateLock, () => {
           transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
             expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
@@ -670,13 +671,13 @@ class TransactionStateManager(brokerId: Int,
     val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
 
     // set the callback function to update transaction status in cache after 
log append completed
-    def updateCacheCallback(responseStatus: collection.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+    def updateCacheCallback(responseStatus: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
       // the append response should only contain the topics partition
-      if (responseStatus.size != 1 || 
!responseStatus.contains(transactionStateTopicIdPartition))
+      if (responseStatus.size != 1 || 
!responseStatus.containsKey(transactionStateTopicIdPartition))
         throw new IllegalStateException("Append status %s should only have one 
partition %s"
           .format(responseStatus, transactionStateTopicPartition))
 
-      val status = responseStatus(transactionStateTopicIdPartition)
+      val status = responseStatus.get(transactionStateTopicIdPartition)
 
       var responseError = if (status.error == Errors.NONE) {
         Errors.NONE
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
deleted file mode 100644
index e2c9c72b4bb..00000000000
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import com.typesafe.scalalogging.Logger
-import com.yammer.metrics.core.Meter
-import kafka.utils.Logging
-import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.purgatory.DelayedOperation
-
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-case class ProducePartitionStatus(requiredOffset: Long, responseStatus: 
PartitionResponse) {
-  @volatile var acksPending = false
-
-  override def toString: String = s"[acksPending: $acksPending, error: 
${responseStatus.error.code}, " +
-    s"startOffset: ${responseStatus.baseOffset}, requiredOffset: 
$requiredOffset]"
-}
-
-/**
- * The produce metadata maintained by the delayed produce operation
- */
-case class ProduceMetadata(produceRequiredAcks: Short,
-                           produceStatus: Map[TopicIdPartition, 
ProducePartitionStatus]) {
-
-  override def toString = s"[requiredAcks: $produceRequiredAcks, 
partitionStatus: $produceStatus]"
-}
-
-object DelayedProduce {
-  private final val logger = Logger(classOf[DelayedProduce])
-}
-
-/**
- * A delayed produce operation that can be created by the replica manager and 
watched
- * in the produce operation purgatory
- */
-class DelayedProduce(delayMs: Long,
-                     produceMetadata: ProduceMetadata,
-                     replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicIdPartition, 
PartitionResponse] => Unit)
-  extends DelayedOperation(delayMs) with Logging {
-
-  override lazy val logger: Logger = DelayedProduce.logger
-
-  // first update the acks pending variable according to the error code
-  produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
-    if (status.responseStatus.error == Errors.NONE) {
-      // Timeout error state will be cleared when required acks are received
-      status.acksPending = true
-      status.responseStatus.error = Errors.REQUEST_TIMED_OUT
-    } else {
-      status.acksPending = false
-    }
-
-    trace(s"Initial partition status for $topicPartition is $status")
-  }
-
-  /**
-   * The delayed produce operation can be completed if every partition
-   * it produces to is satisfied by one of the following:
-   *
-   * Case A: Replica not assigned to partition
-   * Case B: Replica is no longer the leader of this partition
-   * Case C: This broker is the leader:
-   *   C.1 - If there was a local error thrown while checking if at least 
requiredAcks
-   *         replicas have caught up to this operation: set an error in 
response
-   *   C.2 - Otherwise, set the response with no error.
-   */
-  override def tryComplete(): Boolean = {
-    // check for each partition if it still has pending acks
-    produceMetadata.produceStatus.foreachEntry { (topicIdPartition, status) =>
-      trace(s"Checking produce satisfaction for $topicIdPartition, current 
status $status")
-      // skip those partitions that have already been satisfied
-      if (status.acksPending) {
-        val (hasEnough, error) = 
replicaManager.getPartitionOrError(topicIdPartition.topicPartition()) match {
-          case Left(err) =>
-            // Case A
-            (false, err)
-
-          case Right(partition) =>
-            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
-        }
-
-        // Case B || C.1 || C.2
-        if (error != Errors.NONE || hasEnough) {
-          status.acksPending = false
-          status.responseStatus.error = error
-        }
-      }
-    }
-
-    // check if every partition has satisfied at least one of case A, B or C
-    if (!produceMetadata.produceStatus.values.exists(_.acksPending))
-      forceComplete()
-    else
-      false
-  }
-
-  override def onExpiration(): Unit = {
-    produceMetadata.produceStatus.foreachEntry { (topicIdPartition, status) =>
-      if (status.acksPending) {
-        debug(s"Expiring produce request for partition $topicIdPartition with 
status $status")
-        
DelayedProduceMetrics.recordExpiration(topicIdPartition.topicPartition())
-      }
-    }
-  }
-
-  /**
-   * Upon completion, return the current response status along with the error 
code per partition
-   */
-  override def onComplete(): Unit = {
-    val responseStatus = produceMetadata.produceStatus.map { case (k, status) 
=> k -> status.responseStatus }
-    responseCallback(responseStatus)
-  }
-}
-
-object DelayedProduceMetrics {
-  // Changing the package or class name may cause incompatibility with 
existing code and metrics configuration
-  private val metricsPackage = "kafka.server"
-  private val metricsClassName = "DelayedProduceMetrics"
-  private val metricsGroup = new KafkaMetricsGroup(metricsPackage, 
metricsClassName)
-
-  private val aggregateExpirationMeter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
-
-  private val partitionExpirationMeters = new 
ConcurrentHashMap[TopicPartition, Meter]
-
-  def recordExpiration(partition: TopicPartition): Unit = {
-    aggregateExpirationMeter.mark()
-    partitionExpirationMeters.computeIfAbsent(partition, key => 
metricsGroup.newMeter("ExpiresPerSec",
-      "requests",
-      TimeUnit.SECONDS,
-      Map("topic" -> key.topic, "partition" -> 
key.partition.toString).asJava)).mark()
-  }
-
-  def removePartitionMetrics(partition: TopicPartition): Unit = {
-    if (partitionExpirationMeters.remove(partition) != null) {
-      metricsGroup.removeMetric("ExpiresPerSec",
-        Map("topic" -> partition.topic, "partition" -> 
partition.partition.toString).asJava)
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 00b3b11495a..76a45415acc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1830,7 +1830,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             entriesPerPartition = controlRecords,
             requestLocal = requestLocal,
             responseCallback = errors => {
-              errors.foreachEntry { (topicIdPartition, partitionResponse) =>
+              errors.forEach { (topicIdPartition, partitionResponse) =>
                 addResultAndMaybeComplete(topicIdPartition.topicPartition(), 
partitionResponse.error)
               }
             },
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 84c0cd593c1..8bccb4dc8e4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -49,6 +49,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, 
MetadataImage, TopicsDelta}
 import org.apache.kafka.logger.StateChangeLogger
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.MetadataCache
+import org.apache.kafka.server.purgatory.DelayedProduce.ProducePartitionStatus
 import org.apache.kafka.server.LogAppendResult.LogAppendSummary
 import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, 
StopPartition, TransactionVersion}
 import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -57,7 +58,8 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.partition.PartitionListener
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, 
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, 
TopicPartitionOperationKey}
+import 
org.apache.kafka.server.purgatory.DelayedProduce.PartitionStatusValidator.Result
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch, 
DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, 
ListOffsetsPartitionStatus, TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, 
DelayedShareFetchPartitionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
@@ -79,6 +81,7 @@ import java.util.{Collections, Optional, OptionalInt, 
OptionalLong}
 import java.util.function.Consumer
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.jdk.CollectionConverters._
+import scala.jdk.FunctionConverters.enrichAsJavaConsumer
 import scala.jdk.OptionConverters.RichOptional
 
 object ReplicaManager {
@@ -409,7 +412,7 @@ class ReplicaManager(val config: KafkaConfig,
       completeDelayedOperationsWhenNotPartitionLeader(topicPartition, topicId)
       // Clean up per-partition expiration metrics regardless of whether the 
local log
       // is deleted. This covers both partition deletion and reassignment 
(leader -> follower).
-      DelayedProduceMetrics.removePartitionMetrics(topicPartition)
+      DelayedProduce.removePartitionMetrics(topicPartition)
       DelayedRemoteListOffsets.removePartitionMetrics(topicPartition)
     }
 
@@ -636,7 +639,7 @@ class ReplicaManager(val config: KafkaConfig,
                     internalTopicsAllowed: Boolean,
                     origin: AppendOrigin,
                     entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
-                    responseCallback: Map[TopicIdPartition, PartitionResponse] 
=> Unit,
+                    responseCallback: util.Map[TopicIdPartition, 
PartitionResponse] => Unit,
                     recordValidationStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                     requestLocal: RequestLocal = RequestLocal.noCaching,
                     verificationGuards: Map[TopicPartition, VerificationGuard] 
= Map.empty,
@@ -756,8 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
 
       val preAppendPartitionResponses = 
buildProducePartitionStatus(errorResults).map { case (k, status) => k -> 
status.responseStatus }
 
-      def newResponseCallback(responses: Map[TopicIdPartition, 
PartitionResponse]): Unit = {
-        responseCallback(preAppendPartitionResponses ++ responses)
+      def newResponseCallback(responses: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+        responseCallback(preAppendPartitionResponses ++ responses.asScala)
       }
 
       appendRecords(
@@ -832,7 +835,7 @@ class ReplicaManager(val config: KafkaConfig,
     results: Map[TopicIdPartition, LogAppendResult]
   ): Map[TopicIdPartition, ProducePartitionStatus] = {
     results.map { case (topicIdPartition, result) =>
-      topicIdPartition -> ProducePartitionStatus(
+      topicIdPartition -> new ProducePartitionStatus(
         result.logAppendSummary.lastOffset + 1, // required offset
         new PartitionResponse(
           result.error,
@@ -877,12 +880,26 @@ class ReplicaManager(val config: KafkaConfig,
     entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
     initialAppendResults: Map[TopicIdPartition, LogAppendResult],
     initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus],
-    responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
+    responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
   ): Unit = {
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
initialAppendResults)) {
-      // create delayed produce operation
-      val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
-      val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, 
this, responseCallback)
+      // Create delayed produce operation
+      //
+      // This delegate is invoked by DelayedProduce to verify if the produce 
operation can be completed.
+      // Defined here to provide access to ReplicaManager#getPartitionOrError, 
which is otherwise inaccessible to the caller.
+      def delegate(tp: TopicPartition, requiredOffset: Long) : Result = {
+        val (hasEnough, error) = getPartitionOrError(tp).fold(
+            // Please refer to the documentation in 
`DelayedProduce#tryComplete` for a comprehensive description of these cases.
+            // Case A or Case B
+            err => (false, err),
+
+            // Case B or Case C
+            partition => 
partition.checkEnoughReplicasReachOffset(requiredOffset))
+
+        new Result(hasEnough, error)
+      }
+
+      val delayedProduce = new DelayedProduce(timeoutMs, 
initialProduceStatus.asJava, delegate, responseCallback.asJava)
 
       // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
       val producerRequestKeys = entriesPerPartition.keys.map(new 
TopicPartitionOperationKey(_)).toList
@@ -893,23 +910,25 @@ class ReplicaManager(val config: KafkaConfig,
       delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys.asJava)
     } else {
       // we can respond immediately
-      val produceResponseStatus = initialProduceStatus.map { case (k, status) 
=> k -> status.responseStatus }
+      val produceResponseStatus = new util.HashMap[TopicIdPartition, 
PartitionResponse]
+      initialProduceStatus.foreach { case (k, status) => 
produceResponseStatus.put(k, status.responseStatus) }
       responseCallback(produceResponseStatus)
     }
   }
 
   private def sendInvalidRequiredAcksResponse(
     entries: Map[TopicIdPartition, MemoryRecords],
-    responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit): Unit 
= {
+    responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit): 
Unit = {
     // If required.acks is outside accepted range, something is wrong with the 
client
     // Just return an error and don't handle the request at all
-    val responseStatus = entries.map { case (topicIdPartition, _) =>
-      topicIdPartition -> new PartitionResponse(
-        Errors.INVALID_REQUIRED_ACKS,
-        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
-        RecordBatch.NO_TIMESTAMP,
-        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
-      )
+    val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]
+    entries.foreach { case(topicIdPartition, _) =>
+        responseStatus.put(topicIdPartition, new PartitionResponse(
+          Errors.INVALID_REQUIRED_ACKS,
+          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+          RecordBatch.NO_TIMESTAMP,
+          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset)
+        )
     }
     responseCallback(responseStatus)
   }
@@ -2519,7 +2538,7 @@ class ReplicaManager(val config: KafkaConfig,
       partitionsToStartFetching.foreach{ case (topicPartition, partition) =>
         completeDelayedOperationsWhenNotPartitionLeader(topicPartition, 
partition.topicId)
         // Clean up per-partition expiration metrics when transitioning from 
leader to follower.
-        DelayedProduceMetrics.removePartitionMetrics(topicPartition)
+        DelayedProduce.removePartitionMetrics(topicPartition)
         DelayedRemoteListOffsets.removePartitionMetrics(topicPartition)
       }
 
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 70e04c94b20..b25bca7357a 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -427,10 +427,10 @@ class LocalLeaderEndPointTest extends Logging {
                             origin: AppendOrigin = AppendOrigin.CLIENT,
                             requiredAcks: Short = -1): 
CallbackResult[PartitionResponse] = {
     val result = new CallbackResult[PartitionResponse]()
-    def appendCallback(responses: scala.collection.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+    def appendCallback(responses: JMap[TopicIdPartition, PartitionResponse]): 
Unit = {
       val response = responses.get(partition)
-      assertTrue(response.isDefined)
-      result.fire(response.get)
+      assertNotNull(response)
+      result.fire(response)
     }
 
     replicaManager.appendRecords(
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 3aeb3e89b1a..2fdd251992a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -56,7 +56,7 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager, RequestLocal}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.partition.{AlterPartitionListener, 
OngoingReassignmentState, PartitionListener, PendingShrinkIsr, 
SimpleAssignmentState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
UnexpectedAppendOffsetException}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index a2bf4ad3537..28343beeb0d 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -32,8 +32,9 @@ import 
org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.MetadataCache
+import org.apache.kafka.server.purgatory.DelayedProduce.ProducePartitionStatus
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, 
TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch, 
DelayedRemoteListOffsets, TopicPartitionOperationKey}
 import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{MockTimer, Timer}
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
@@ -43,6 +44,7 @@ import org.mockito.Mockito.{mock, when, withSettings}
 
 import scala.collection._
 import scala.jdk.CollectionConverters._
+import scala.jdk.FunctionConverters.enrichAsJavaConsumer
 
 abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] 
extends Logging {
   val nThreads = 5
@@ -214,7 +216,7 @@ object AbstractCoordinatorConcurrencyTest {
                                internalTopicsAllowed: Boolean,
                                origin: AppendOrigin,
                                entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],
-                               responseCallback: Map[TopicIdPartition, 
PartitionResponse] => Unit,
+                               responseCallback: 
java.util.Map[TopicIdPartition, PartitionResponse] => Unit,
                                processingStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                                requestLocal: RequestLocal = 
RequestLocal.noCaching,
                                verificationGuards: Map[TopicPartition, 
VerificationGuard] = Map.empty,
@@ -222,11 +224,14 @@ object AbstractCoordinatorConcurrencyTest {
 
       if (entriesPerPartition.isEmpty)
         return
-      val produceMetadata = ProduceMetadata(1, entriesPerPartition.map {
+      val produceStatus = entriesPerPartition.map {
         case (tp, _) =>
-          (tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 
0L, RecordBatch.NO_TIMESTAMP, 0L)))
-      })
-      val delayedProduce = new DelayedProduce(5, produceMetadata, this, 
responseCallback) {
+          (tp, new ProducePartitionStatus(0L, new 
PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+      }.asJava
+
+      // It is safe to set the third parameter to null because it is only used 
in tryComplete().
+      // In this test, we override the original implementation and do not use 
that parameter at all.
+      val delayedProduce = new DelayedProduce(5, produceStatus, null, 
responseCallback.asJava) {
         // Complete produce requests after a few attempts to trigger delayed 
produce from different threads
         val completeAttempts = new AtomicInteger
         override def tryComplete(): Boolean = {
@@ -239,7 +244,7 @@ object AbstractCoordinatorConcurrencyTest {
           responseCallback(entriesPerPartition.map {
             case (tp, _) =>
               (tp, new PartitionResponse(Errors.NONE, 0L, 
RecordBatch.NO_TIMESTAMP, 0L))
-          })
+          }.asJava)
         }
       }
       val producerRequestKeys = entriesPerPartition.keys.map(new 
TopicPartitionOperationKey(_))
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 3d42de28110..84c58f40c87 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -1106,7 +1106,7 @@ class TransactionStateManagerTest {
     capturedAppends: mutable.Map[TopicIdPartition, 
mutable.Buffer[MemoryRecords]]
   ): Unit = {
     val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
-    val callbackCapture: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
+    val callbackCapture: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] 
=> Unit])
 
     when(replicaManager.appendRecords(
       anyLong(),
@@ -1130,7 +1130,7 @@ class TransactionStateManagerTest {
         batches += records
 
         topicPartition -> new PartitionResponse(appendError, 0L, 
RecordBatch.NO_TIMESTAMP, 0L)
-      }.toMap
+      }.toMap.asJava
     ))
   }
 
@@ -1261,7 +1261,7 @@ class TransactionStateManagerTest {
   private def prepareForTxnMessageAppend(error: Errors): Unit = {
     reset(replicaManager)
 
-    val capturedArgument: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
+    val capturedArgument: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] 
=> Unit])
     when(replicaManager.appendRecords(anyLong(),
       anyShort(),
       internalTopicsAllowed = ArgumentMatchers.eq(true),
@@ -1273,8 +1273,8 @@ class TransactionStateManagerTest {
       any(),
       any()
     )).thenAnswer(_ => capturedArgument.getValue.apply(
-      Map(new TopicIdPartition(transactionTopicId, partitionId, 
TRANSACTION_STATE_TOPIC_NAME) ->
-        new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+      util.Map.of(new TopicIdPartition(transactionTopicId, partitionId, 
TRANSACTION_STATE_TOPIC_NAME),
+              new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
     )
     when(replicaManager.topicIdPartition(new 
TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 0))).thenReturn(new 
TopicIdPartition(transactionTopicId, 0, TRANSACTION_STATE_TOPIC_NAME))
     when(replicaManager.topicIdPartition(new 
TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 1))).thenReturn(new 
TopicIdPartition(transactionTopicId, 1, TRANSACTION_STATE_TOPIC_NAME))
diff --git a/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala 
b/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
index 1afe9b3fecd..25a044c6d08 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.purgatory.DelayedProduce
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.api.Assertions._
 
@@ -37,7 +38,7 @@ class DelayedProduceTest {
     val partition = new TopicPartition("test-topic", 0)
 
     // Record an expiration so the partition metric is created
-    DelayedProduceMetrics.recordExpiration(partition)
+    DelayedProduce.recordExpiration(partition)
 
     // Verify the partition metric exists in the registry
     val metricsBefore = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@@ -53,7 +54,7 @@ class DelayedProduceTest {
         !name.getMBeanName.contains("topic="))
 
     // Remove the partition metric
-    DelayedProduceMetrics.removePartitionMetrics(partition)
+    DelayedProduce.removePartitionMetrics(partition)
 
     // Verify the partition metric is removed from the registry
     val metricsAfter = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@@ -77,6 +78,6 @@ class DelayedProduceTest {
     val partition = new TopicPartition("nonexistent-topic", 0)
 
     // Should not throw when removing a partition that was never recorded
-    DelayedProduceMetrics.removePartitionMetrics(partition)
+    DelayedProduce.removePartitionMetrics(partition)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 48268685da2..f6cc60bd46f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3205,7 +3205,7 @@ class KafkaApisTest extends Logging {
     val expectedErrors = util.Map.of(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
tp2, Errors.NONE)
 
     val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = 
ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
-    val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
+    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] 
=> Unit])
 
     when(replicaManager.onlinePartition(tp1))
       .thenReturn(None)
@@ -3223,7 +3223,7 @@ class KafkaApisTest extends Logging {
       ArgumentMatchers.eq(requestLocal),
       any(),
       any()
-    )).thenAnswer(_ => responseCallback.getValue.apply(Map(new 
TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE))))
+    )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new 
TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE))))
     kafkaApis = createKafkaApis()
     kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal)
     verify(requestChannel).sendResponse(
@@ -3346,8 +3346,8 @@ class KafkaApisTest extends Logging {
 
     val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition, 
MemoryRecords]] =
       ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
-    val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] =
-      ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] 
=> Unit])
+    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionResponse] => Unit])
 
     when(replicaManager.appendRecords(
       ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
@@ -3364,7 +3364,7 @@ class KafkaApisTest extends Logging {
       responseCallback.getValue.apply(
         entriesPerPartition.getValue.keySet.map { tp =>
           tp -> new PartitionResponse(Errors.NONE)
-        }.toMap
+        }.toMap.asJava
       )
     }
     kafkaApis = createKafkaApis()
@@ -3517,8 +3517,8 @@ class KafkaApisTest extends Logging {
     // Set up appendRecords to simulate epoch validation failure
     val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition, 
MemoryRecords]] =
       ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
-    val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] =
-      ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] 
=> Unit])
+    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionResponse] => Unit])
     
     when(replicaManager.appendRecords(
       ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
@@ -3535,7 +3535,7 @@ class KafkaApisTest extends Logging {
       // Simulate epoch validation failure by calling callback with 
INVALID_PRODUCER_EPOCH error
       val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
       responseCallback.getValue.apply(
-        Map(topicIdPartition -> new 
PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))
+        util.Map.of(topicIdPartition, new 
PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))
       )
     }
     
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 0b699f9ae08..cc532d05e00 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -296,10 +296,14 @@ class ReplicaManagerConcurrencyTest extends Logging {
       val future = new CompletableFuture[ProduceResponse.PartitionResponse]()
       val topicIdPartition: common.TopicIdPartition = 
replicaManager.topicIdPartition(topicPartition)
 
-      def produceCallback(results: collection.Map[common.TopicIdPartition, 
ProduceResponse.PartitionResponse]): Unit = {
+      def produceCallback(results: util.Map[common.TopicIdPartition, 
ProduceResponse.PartitionResponse]): Unit = {
         try {
           assertEquals(1, results.size)
-          val (topicPartition, result) = results.head
+
+          val entry = results.entrySet().iterator().next()
+          val topicPartition = entry.getKey
+          val result = entry.getValue
+
           assertEquals(topicIdPartition, topicPartition)
           assertEquals(Errors.NONE, result.error)
           future.complete(result)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 515cb65ee02..b83495ea3db 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -66,8 +66,8 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog
 import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.network.BrokerEndPoint
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch, 
DelayedRemoteListOffsets}
 import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets}
 import org.apache.kafka.server.share.SharePartitionKey
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
 import org.apache.kafka.server.share.metrics.ShareGroupMetrics
@@ -261,8 +261,8 @@ class ReplicaManagerTest {
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager)
     try {
-      def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
-        assert(responseStatus.values.head.error == 
Errors.INVALID_REQUIRED_ACKS)
+      def callback(responseStatus: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+        assert(responseStatus.values().iterator().next().error == 
Errors.INVALID_REQUIRED_ACKS)
       }
       rm.appendRecords(
         timeout = 0,
@@ -2459,8 +2459,8 @@ class ReplicaManagerTest {
     numOfRecords: Int
   ): AtomicReference[PartitionResponse] = {
     val produceResult = new AtomicReference[PartitionResponse]()
-    def callback(response: Map[TopicIdPartition, PartitionResponse]): Unit = {
-      produceResult.set(response(topicPartition))
+    def callback(response: util.Map[TopicIdPartition, PartitionResponse]): 
Unit = {
+      produceResult.set(response.get(topicPartition))
     }
 
     val records = MemoryRecords.withRecords(
@@ -2706,10 +2706,10 @@ class ReplicaManagerTest {
                             transactionVersion: Short = 
TransactionVersion.TV_UNKNOWN): CallbackResult[PartitionResponse] = {
     val result = new CallbackResult[PartitionResponse]()
     val topicIdPartition = new TopicIdPartition(topicId, partition)
-    def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
+    def appendCallback(responses: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
       val response = responses.get(topicIdPartition)
-      assertTrue(response.isDefined)
-      result.fire(response.get)
+      assertNotNull(response)
+      result.fire(response)
     }
 
     replicaManager.appendRecords(
diff --git 
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
new file mode 100644
index 00000000000..711c45cff73
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * A delayed produce operation that can be created by the replica manager and 
watched
+ * in the produce operation purgatory
+ */
+public class DelayedProduce extends DelayedOperation {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DelayedProduce.class);
+
+    // Changing the package or class name may cause incompatibility with 
existing code and metrics configuration
+    private static final String METRICS_PACKAGE = "kafka.server";
+    private static final String METRICS_CLASS_NAME = "DelayedProduceMetrics";
+    private static final KafkaMetricsGroup METRICS_GROUP = new 
KafkaMetricsGroup(METRICS_PACKAGE, METRICS_CLASS_NAME);
+    private static final Meter AGGREGATE_EXPIRATION_METER = 
METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
+    private static final ConcurrentHashMap<TopicPartition, Meter> 
PARTITION_EXPIRATION_METERS = new ConcurrentHashMap<>();
+
+    public static final class ProducePartitionStatus {
+        private final long requiredOffset;
+        private final PartitionResponse responseStatus;
+
+        private volatile boolean acksPending;
+
+        public ProducePartitionStatus(long requiredOffset, PartitionResponse 
responseStatus) {
+            this.requiredOffset = requiredOffset;
+            this.responseStatus = responseStatus;
+        }
+
+        public PartitionResponse responseStatus() {
+            return responseStatus;
+        }
+
+        private void setAcksPending(boolean acksPending) {
+            this.acksPending = acksPending;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "[acksPending: %s, error: %s, startOffset: %s, 
requiredOffset: %d]",
+                    acksPending,
+                    responseStatus.error.code(),
+                    responseStatus.baseOffset,
+                    requiredOffset
+            );
+        }
+    }
+
+    @FunctionalInterface
+    public interface PartitionStatusValidator {
+        record Result(boolean hasEnough, Errors error) { }
+        /**
+         * Validates the status of a partition and its replicas to determine
+         * if a delayed produce operation can be completed.
+         *
+         * @param topicPartition The partition to check.
+         * @param requiredOffset The offset that replicas must reach.
+         * @return A result with a Boolean (hasEnoughReplicas) and the Error 
code.
+         */
+        Result validate(TopicPartition topicPartition, long requiredOffset);
+    }
+
+    private final Map<TopicIdPartition, ProducePartitionStatus> produceStatus;
+    private final PartitionStatusValidator statusValidator;
+    private final Consumer<Map<TopicIdPartition, PartitionResponse>> 
responseCallback;
+
+    public DelayedProduce(long delayMs,
+                          Map<TopicIdPartition, ProducePartitionStatus> 
produceStatus,
+                          PartitionStatusValidator statusValidator,
+                          Consumer<Map<TopicIdPartition, PartitionResponse>> 
responseCallback) {
+        super(delayMs);
+
+        this.produceStatus = produceStatus;
+        this.statusValidator = statusValidator;
+        this.responseCallback = responseCallback;
+
+        // first update the acks pending variable according to the error code
+        produceStatus.forEach((topicPartition, status) -> {
+            if (status.responseStatus.error == Errors.NONE) {
+                // Timeout error state will be cleared when required acks are 
received
+                status.acksPending = true;
+                status.responseStatus.error = Errors.REQUEST_TIMED_OUT;
+            } else {
+                status.acksPending = false;
+            }
+
+            LOGGER.trace("Initial partition status for {} is {}", 
topicPartition, status);
+        });
+    }
+
+    /**
+     * The delayed produce operation can be completed if every partition
+     * it produces to is satisfied by one of the following:
+     *
+     * Case A: Replica not assigned to partition
+     * Case B: Replica is no longer the leader of this partition
+     * Case C: This broker is the leader:
+     *   C.1 - If there was a local error thrown while checking if at least 
requiredAcks
+     *         replicas have caught up to this operation: set an error in 
response
+     *   C.2 - Otherwise, set the response with no error.
+     *
+     * These cases were originally validated by some methods in the 
ReplicaManager.
+     * However, since DelayedProduce has been moved to the server module, it 
cannot directly access the ReplicaManager.
+     * Therefore, these validations have been delegated to the method within 
`ReplicaManager#maybeAddDelayedProduce()`.
+     */
+    @Override
+    public boolean tryComplete() {
+        // check for each partition if it still has pending acks
+        produceStatus.forEach((topicIdPartition, status) -> {
+            LOGGER.trace("Checking produce satisfaction for {}, current status 
{}", topicIdPartition, status);
+            // skip those partitions that have already been satisfied
+            if (status.acksPending) {
+                // Delegate to `ReplicaManager#maybeAddDelayedProduce`
+                // Validate Cases A, B, or C
+                PartitionStatusValidator.Result result = 
statusValidator.validate(topicIdPartition.topicPartition(), 
status.requiredOffset);
+
+                // Update the partition status to reflect Case A, B, or C:
+                Errors errors = result.error;
+                if (errors != Errors.NONE || result.hasEnough()) {
+                    status.setAcksPending(false);
+                    status.responseStatus.error = errors;
+                }
+            }
+        });
+
+        // check if every partition has satisfied at least one of case A, B or 
C
+        boolean anyPending = false;
+        for (ProducePartitionStatus status : produceStatus.values()) {
+            if (status.acksPending) {
+                anyPending = true;
+                break;
+            }
+        }
+        if (!anyPending) {
+            return forceComplete();
+        }
+
+        return false;
+    }
+
+    @Override
+    public void onExpiration() {
+        produceStatus.forEach((topicIdPartition, status) -> {
+            if (status.acksPending) {
+                LOGGER.debug("Expiring produce request for partition {} with 
status {}", topicIdPartition, status);
+                recordExpiration(topicIdPartition.topicPartition());
+            }
+        });
+    }
+
+    /**
+     * Upon completion, return the current response status along with the 
error code per partition
+     */
+    @Override
+    public void onComplete() {
+        Map<TopicIdPartition, PartitionResponse> responseStatus = new 
HashMap<>();
+
+        for (Map.Entry<TopicIdPartition, ProducePartitionStatus> entry : 
produceStatus.entrySet()) {
+            responseStatus.put(entry.getKey(), 
entry.getValue().responseStatus());
+        }
+
+        responseCallback.accept(responseStatus);
+    }
+
+    public static void recordExpiration(TopicPartition partition) {
+        AGGREGATE_EXPIRATION_METER.mark();
+        PARTITION_EXPIRATION_METERS.computeIfAbsent(partition,
+                key -> METRICS_GROUP.newMeter("ExpiresPerSec",
+                        "requests",
+                        TimeUnit.SECONDS,
+                        Map.of("topic", key.topic(), "partition", 
String.valueOf(key.partition())))
+        ).mark();
+    }
+
+    public static void removePartitionMetrics(TopicPartition partition) {
+        if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
+            METRICS_GROUP.removeMetric("ExpiresPerSec",
+                    Map.of("topic", partition.topic(),
+                            "partition", 
String.valueOf(partition.partition())));
+        }
+    }
+}

Reply via email to