This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 28de78bcbad MINOR: Refactor GroupCoordinator write path (#19290)
28de78bcbad is described below
commit 28de78bcbad605a3e906d085d2e59b441ae35212
Author: David Jacot <[email protected]>
AuthorDate: Thu Mar 27 16:58:47 2025 +0100
MINOR: Refactor GroupCoordinator write path (#19290)
This patch addresses a weirdness on the GroupCoordinator write path. The
`CoordinatorPartitionWriter` uses the `ReplicaManager#appendRecords`
method with `acks=1` and it expects it to completes
immediately/synchronously. It works because this is effectively what the
method does with `acks=1`. The issue is that fundamentally the method is
asynchronous so the contract is really fragile. This patch changes it by
introducing new method `ReplicaManager.appendRecordsToLeader`, which is
synchronous. It also refactors `ReplicaManager#appendRecords` to use
`ReplicaManager.appendRecordsToLeader` so we can benefits from all the
existing tests.
Reviewers: Fred Zheng <[email protected]>, Jeff Kim
<[email protected]>
---
.../group/CoordinatorPartitionWriter.scala | 10 +---
.../main/scala/kafka/server/ReplicaManager.scala | 60 ++++++++++++++++---
.../group/CoordinatorPartitionWriterTest.scala | 67 +++++++++------------
.../unit/kafka/server/ReplicaManagerTest.scala | 68 +++++++++++++++++++++-
4 files changed, 150 insertions(+), 55 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 08b3c9aa498..64f23c32f52 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -21,7 +21,6 @@ import kafka.server.{AddPartitionsToTxnManager,
ReplicaManager}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
@@ -139,17 +138,14 @@ class CoordinatorPartitionWriter(
verificationGuard: VerificationGuard,
records: MemoryRecords
): Long = {
- var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
- replicaManager.appendRecords(
- timeout = 0L,
+ // We write synchronously to the leader replica without waiting on
replication.
+ val appendResults = replicaManager.appendRecordsToLeader(
requiredAcks = 1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(tp -> records),
- responseCallback = results => appendResults = results,
requestLocal = RequestLocal.noCaching,
verificationGuards = Map(tp -> verificationGuard),
- delayedProduceLock = None,
// We can directly complete the purgatories here because we don't hold
// any conflicting locks.
actionQueue = directActionQueue
@@ -163,7 +159,7 @@ class CoordinatorPartitionWriter(
}
// Required offset.
- partitionResult.lastOffset + 1
+ partitionResult.info.lastOffset + 1
}
override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long):
CompletableFuture[Void] = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b5ee5629e76..958e4ce9dba 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -623,6 +623,50 @@ class ReplicaManager(val config: KafkaConfig,
def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
+ /**
+ * Append messages to leader replicas of the partition, without waiting on
replication.
+ *
+ * Noted that all pending delayed check operations are stored in a queue.
All callers to ReplicaManager.appendRecordsToLeader()
+ * are expected to call ActionQueue.tryCompleteActions for all affected
partitions, without holding any conflicting
+ * locks.
+ *
+ * @param requiredAcks the required acks -- it is only used
to ensure that the append meets the
+ * required acks.
+ * @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
+ * @param origin source of the append request (ie,
client, replication, coordinator)
+ * @param entriesPerPartition the records per partition to be
appended
+ * @param requestLocal container for the stateful instances
scoped to this request -- this must correspond to the
+ * thread calling this method
+ * @param actionQueue the action queue to use.
ReplicaManager#defaultActionQueue is used by default.
+ * @param verificationGuards the mapping from topic partition to
verification guards if transaction verification is used
+ */
+ def appendRecordsToLeader(
+ requiredAcks: Short,
+ internalTopicsAllowed: Boolean,
+ origin: AppendOrigin,
+ entriesPerPartition: Map[TopicPartition, MemoryRecords],
+ requestLocal: RequestLocal = RequestLocal.noCaching,
+ actionQueue: ActionQueue = this.defaultActionQueue,
+ verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty
+ ): Map[TopicPartition, LogAppendResult] = {
+ val startTimeMs = time.milliseconds
+ val localProduceResultsWithTopicId = appendToLocalLog(
+ internalTopicsAllowed = internalTopicsAllowed,
+ origin,
+ entriesPerPartition,
+ requiredAcks,
+ requestLocal,
+ verificationGuards.toMap
+ )
+ debug("Produce to local log in %d ms".format(time.milliseconds -
startTimeMs))
+
+ addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
+
+ localProduceResultsWithTopicId.map {
+ case (k, v) => (k.topicPartition, v)
+ }
+ }
+
/**
* Append messages to leader replicas of the partition, and wait for them to
be replicated to other replicas;
* the callback function will be triggered either when timeout or the
required acks are satisfied;
@@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig,
return
}
- val sTime = time.milliseconds
- val localProduceResultsWithTopicId =
appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
- origin, entriesPerPartition, requiredAcks, requestLocal,
verificationGuards.toMap)
- debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
- val localProduceResults : Map[TopicPartition, LogAppendResult] =
localProduceResultsWithTopicId.map {
- case(k, v) => (k.topicPartition, v)}
+ val localProduceResults = appendRecordsToLeader(
+ requiredAcks,
+ internalTopicsAllowed,
+ origin,
+ entriesPerPartition,
+ requestLocal,
+ actionQueue,
+ verificationGuards
+ )
val produceStatus = buildProducePartitionStatus(localProduceResults)
- addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
k -> v.info.recordValidationStats
})
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index eaa14832a8d..a56dab4fba7 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -16,16 +16,15 @@
*/
package kafka.coordinator.group
-import kafka.server.ReplicaManager
+import kafka.server.{LogAppendResult, ReplicaManager}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
SimpleRecord}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
RecordBatch, RecordValidationStats, SimpleRecord}
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo,
LogConfig, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull,
assertThrows, assertTrue}
import org.junit.jupiter.api.Test
@@ -35,7 +34,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
import java.nio.charset.Charset
-import java.util.Collections
+import java.util.Optional
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -94,34 +93,31 @@ class CoordinatorPartitionWriterTest {
val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
- val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse]
=> Unit] =
- ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse]
=> Unit])
- when(replicaManager.appendRecords(
- ArgumentMatchers.eq(0L),
+ when(replicaManager.appendRecordsToLeader(
ArgumentMatchers.eq(1.toShort),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
- callbackCapture.capture(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
- )).thenAnswer( _ => {
- callbackCapture.getValue.apply(Map(
- tp -> new PartitionResponse(
- Errors.NONE,
- 5,
- 10,
- RecordBatch.NO_TIMESTAMP,
- -1,
- Collections.emptyList(),
- ""
- )
- ))
- })
+ )).thenReturn(Map(tp -> LogAppendResult(
+ new LogAppendInfo(
+ 5L,
+ 10L,
+ Optional.empty,
+ RecordBatch.NO_TIMESTAMP,
+ 0L,
+ 0L,
+ RecordValidationStats.EMPTY,
+ CompressionType.NONE,
+ 100,
+ 10L
+ ),
+ Option.empty,
+ false
+ )))
val batch = MemoryRecords.withRecords(
Compression.NONE,
@@ -140,8 +136,7 @@ class CoordinatorPartitionWriterTest {
assertEquals(
batch,
- recordsCapture.getValue.getOrElse(tp,
- throw new AssertionError(s"No records for $tp"))
+ recordsCapture.getValue.getOrElse(tp, throw new AssertionError(s"No
records for $tp"))
)
}
@@ -203,26 +198,20 @@ class CoordinatorPartitionWriterTest {
val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
- val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse]
=> Unit] =
- ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse]
=> Unit])
- when(replicaManager.appendRecords(
- ArgumentMatchers.eq(0L),
+ when(replicaManager.appendRecordsToLeader(
ArgumentMatchers.eq(1.toShort),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
- callbackCapture.capture(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
- )).thenAnswer(_ => {
- callbackCapture.getValue.apply(Map(
- tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)
- ))
- })
+ )).thenReturn(Map(tp -> LogAppendResult(
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+ Some(Errors.NOT_LEADER_OR_FOLLOWER.exception),
+ false
+ )))
val batch = MemoryRecords.withRecords(
Compression.NONE,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5f1fb7b5f24..fda7eceab7f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -58,7 +58,7 @@ import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
-import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion,
MetadataVersion, OffsetAndEpoch, StopPartition}
+import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion,
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -72,7 +72,7 @@ import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager,
ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog,
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments,
ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo,
UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -6134,6 +6134,70 @@ class ReplicaManagerTest {
}
+ @Test
+ def testAppendRecordsToLeader(): Unit = {
+ val localId = 0
+ val foo = new TopicIdPartition(Uuid.randomUuid, 0, "foo")
+ val bar = new TopicIdPartition(Uuid.randomUuid, 0, "bar")
+
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(
+ timer = new MockTimer(time),
+ brokerId = localId
+ )
+
+ try {
+ val topicDelta = new TopicsDelta(TopicsImage.EMPTY)
+ topicDelta.replay(new TopicRecord()
+ .setName(foo.topic)
+ .setTopicId(foo.topicId)
+ )
+ topicDelta.replay(new PartitionRecord()
+ .setTopicId(foo.topicId)
+ .setPartitionId(foo.partition)
+ .setLeader(localId)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ .setReplicas(List[Integer](localId).asJava)
+ .setIsr(List[Integer](localId).asJava)
+ )
+
+ val metadataImage = imageFromTopics(topicDelta.apply())
+ replicaManager.applyDelta(topicDelta, metadataImage)
+
+ // Create test records.
+ val records = TestUtils.singletonRecords(
+ value = "test".getBytes,
+ timestamp = time.milliseconds
+ )
+
+ // Append records to both foo and bar.
+ val result = replicaManager.appendRecordsToLeader(
+ requiredAcks = 1,
+ internalTopicsAllowed = true,
+ origin = AppendOrigin.CLIENT,
+ entriesPerPartition = Map(
+ foo.topicPartition -> records,
+ bar.topicPartition -> records
+ ),
+ requestLocal = RequestLocal.noCaching
+ )
+
+ assertNotNull(result)
+ assertEquals(2, result.size)
+
+ val fooResult = result(foo.topicPartition)
+ assertEquals(Errors.NONE, fooResult.error)
+ assertEquals(0, fooResult.info.logStartOffset)
+ assertEquals(0, fooResult.info.firstOffset)
+ assertEquals(0, fooResult.info.lastOffset)
+
+ val barResult = result(bar.topicPartition)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, barResult.error)
+ assertEquals(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, barResult.info)
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
@Test
def testMonitorableReplicaSelector(): Unit = {