This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new d7d7876989e KAFKA-19274; Group Coordinator Shards are not unloaded
when `__consumer_offsets` topic is deleted (#19713)
d7d7876989e is described below
commit d7d7876989e31da54797e96e1960010f27d2c97b
Author: David Jacot <[email protected]>
AuthorDate: Thu May 15 19:04:38 2025 +0200
KAFKA-19274; Group Coordinator Shards are not unloaded when
`__consumer_offsets` topic is deleted (#19713)
Group Coordinator Shards are not unloaded when `__consumer_offsets`
topic is deleted. The unloading is scheduled but it is ignored because
the epoch is equal to the current epoch:
```
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1]
Scheduling unloading of metadata for __consumer_offsets-0 with epoch
OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling
unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
```
This patch fixes the issue by not setting the leader epoch in this case.
The coordinator expects the leader epoch to be incremented when the
resignation code is called. When the topic is deleted, the epoch is not
incremented. Therefore, we must not use it. Note that this is aligned
with deleted partitions are handled too.
Reviewers: Dongnuo Lyu <[email protected]>, José Armando García Sancio
<[email protected]>
---
.../server/metadata/BrokerMetadataPublisher.scala | 7 ++-
.../api/GroupCoordinatorIntegrationTest.scala | 59 ++++++++++++++++++-
.../metadata/BrokerMetadataPublisherTest.scala | 68 +++++++++++++++++++++-
3 files changed, 129 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 1985f04348f..4a7ae84c2d5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -250,6 +250,11 @@ class BrokerMetadataPublisher(
/**
* Update the coordinator of local replica changes: election and resignation.
*
+ * When the topic is deleted or a partition of the topic is deleted, {@param
resignation}
+ * callback must be called with {@code None}. The coordinator expects the
leader epoch to be
+ * incremented when the {@param resignation} callback is called but the
leader epoch
+ * is not incremented when a topic is deleted.
+ *
* @param image latest metadata image
* @param delta metadata delta from the previous image and the latest image
* @param topicName name of the topic associated with the coordinator
@@ -270,7 +275,7 @@ class BrokerMetadataPublisher(
if (topicsDelta.topicWasDeleted(topicName)) {
topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach {
entry =>
if (entry.getValue.leader == brokerId) {
- resignation(entry.getKey, Some(entry.getValue.leaderEpoch))
+ resignation(entry.getKey, None)
}
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index cbd69baedc7..8d1399a1ebd 100644
---
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -17,8 +17,8 @@ import
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Typ
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol,
OffsetAndMetadata}
-import org.apache.kafka.common.errors.GroupIdNotFoundException
-import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicPartition}
+import org.apache.kafka.common.errors.{GroupIdNotFoundException,
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicCollection, TopicPartition}
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
@@ -27,11 +27,12 @@ import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ServerConfigs
+import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Timeout
import java.time.Duration
import java.util.Collections
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ExecutionException, TimeUnit}
@Timeout(120)
class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
@@ -278,6 +279,58 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def testRecreatingConsumerOffsetsTopic(): Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ withConsumer(groupId = "group", groupProtocol = GroupProtocol.CONSUMER)
{ consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ admin
+
.deleteTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+ .all()
+ .get()
+
+ TestUtils.waitUntilTrue(() => {
+ try {
+ admin
+
.describeTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+ .topicNameValues()
+ .get(Topic.GROUP_METADATA_TOPIC_NAME)
+ .get(JTestUtils.DEFAULT_MAX_WAIT_MS, TimeUnit.MILLISECONDS)
+ false
+ } catch {
+ case e: ExecutionException =>
+ e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
+ }
+ }, msg = s"${Topic.GROUP_METADATA_TOPIC_NAME} was not deleted")
+
+ withConsumer(groupId = "group", groupProtocol = GroupProtocol.CONSUMER)
{ consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
+ }
+
private def rollAndCompactConsumerOffsets(): Unit = {
val tp = new TopicPartition("__consumer_offsets", 0)
val broker = cluster.brokers.asScala.head._2
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index a166368a5aa..c5f4d3187e3 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -20,15 +20,18 @@ package kafka.server.metadata
import kafka.coordinator.transaction.TransactionCoordinator
import java.util.Collections.{singleton, singletonList, singletonMap}
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.LogManager
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry,
NewTopic}
+import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord,
TopicRecord}
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.coordinator.group.GroupCoordinator
@@ -179,6 +182,69 @@ class BrokerMetadataPublisherTest {
}
}
+ @Test
+ def testGroupCoordinatorTopicDeletion(): Unit = {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))
+ val metadataCache = new KRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_1)
+ val logManager = mock(classOf[LogManager])
+ val replicaManager = mock(classOf[ReplicaManager])
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val faultHandler = mock(classOf[FaultHandler])
+
+ val metadataPublisher = new BrokerMetadataPublisher(
+ config,
+ metadataCache,
+ logManager,
+ replicaManager,
+ groupCoordinator,
+ mock(classOf[TransactionCoordinator]),
+ Some(mock(classOf[ShareCoordinator])),
+ mock(classOf[DynamicConfigPublisher]),
+ mock(classOf[DynamicClientQuotaPublisher]),
+ mock(classOf[DynamicTopicClusterQuotaPublisher]),
+ mock(classOf[ScramPublisher]),
+ mock(classOf[DelegationTokenPublisher]),
+ mock(classOf[AclPublisher]),
+ faultHandler,
+ faultHandler
+ )
+
+ val topicId = Uuid.randomUuid()
+ var delta = new MetadataDelta(MetadataImage.EMPTY)
+ delta.replay(new TopicRecord()
+ .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+ .setTopicId(topicId)
+ )
+ delta.replay(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
+ .setLeader(config.brokerId)
+ )
+ delta.replay(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(1)
+ .setLeader(config.brokerId)
+ )
+ val image = delta.apply(MetadataProvenance.EMPTY)
+
+ delta = new MetadataDelta(image)
+ delta.replay(new RemoveTopicRecord()
+ .setTopicId(topicId)
+ )
+
+ metadataPublisher.onMetadataUpdate(delta,
delta.apply(MetadataProvenance.EMPTY),
+ LogDeltaManifest.newBuilder()
+ .provenance(MetadataProvenance.EMPTY)
+ .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
+ .numBatches(1)
+ .elapsedNs(100)
+ .numBytes(42)
+ .build())
+
+ verify(groupCoordinator).onResignation(0, OptionalInt.empty())
+ verify(groupCoordinator).onResignation(1, OptionalInt.empty())
+ }
+
@Test
def testNewImagePushedToGroupCoordinator(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))