This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new d7d7876989e KAFKA-19274; Group Coordinator Shards are not unloaded 
when `__consumer_offsets` topic is deleted (#19713)
d7d7876989e is described below

commit d7d7876989e31da54797e96e1960010f27d2c97b
Author: David Jacot <[email protected]>
AuthorDate: Thu May 15 19:04:38 2025 +0200

    KAFKA-19274; Group Coordinator Shards are not unloaded when 
`__consumer_offsets` topic is deleted (#19713)
    
    Group Coordinator Shards are not unloaded when `__consumer_offsets`
    topic is deleted. The unloading is scheduled but it is ignored because
    the epoch is equal to the current epoch:
    
    ```
    [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1]
    Scheduling  unloading of metadata for __consumer_offsets-0 with epoch
    OptionalInt[0]
    (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
    [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling
    unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
    (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
    [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
    metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current
    epoch is 0.
    (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
    [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
    metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current
    epoch is 0.
    (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
    ```
    
    This patch fixes the issue by not setting the leader epoch in this case.
    The coordinator expects the leader epoch to be incremented when the
    resignation code is called. When the topic is deleted, the epoch is not
    incremented. Therefore, we must not use it. Note that this is aligned
    with deleted partitions are handled too.
    
    Reviewers: Dongnuo Lyu <[email protected]>, José Armando García Sancio 
<[email protected]>
---
 .../server/metadata/BrokerMetadataPublisher.scala  |  7 ++-
 .../api/GroupCoordinatorIntegrationTest.scala      | 59 ++++++++++++++++++-
 .../metadata/BrokerMetadataPublisherTest.scala     | 68 +++++++++++++++++++++-
 3 files changed, 129 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 1985f04348f..4a7ae84c2d5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -250,6 +250,11 @@ class BrokerMetadataPublisher(
   /**
    * Update the coordinator of local replica changes: election and resignation.
    *
+   * When the topic is deleted or a partition of the topic is deleted, {@param 
resignation}
+   * callback must be called with {@code None}. The coordinator expects the 
leader epoch to be
+   * incremented when the {@param resignation} callback is called but the 
leader epoch
+   * is not incremented when a topic is deleted.
+   *
    * @param image latest metadata image
    * @param delta metadata delta from the previous image and the latest image
    * @param topicName name of the topic associated with the coordinator
@@ -270,7 +275,7 @@ class BrokerMetadataPublisher(
       if (topicsDelta.topicWasDeleted(topicName)) {
         topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { 
entry =>
           if (entry.getValue.leader == brokerId) {
-            resignation(entry.getKey, Some(entry.getValue.leaderEpoch))
+            resignation(entry.getKey, None)
           }
         }
       }
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index cbd69baedc7..8d1399a1ebd 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -17,8 +17,8 @@ import 
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Typ
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
 import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, 
OffsetAndMetadata}
-import org.apache.kafka.common.errors.GroupIdNotFoundException
-import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, 
TopicPartition}
+import org.apache.kafka.common.errors.{GroupIdNotFoundException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, 
TopicCollection, TopicPartition}
 import org.junit.jupiter.api.Assertions._
 
 import scala.jdk.CollectionConverters._
@@ -27,11 +27,12 @@ import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.server.config.ServerConfigs
+import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Timeout
 
 import java.time.Duration
 import java.util.Collections
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ExecutionException, TimeUnit}
 
 @Timeout(120)
 class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
@@ -278,6 +279,58 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
     }
   }
 
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def testRecreatingConsumerOffsetsTopic(): Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      withConsumer(groupId = "group", groupProtocol = GroupProtocol.CONSUMER) 
{ consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+
+      admin
+        
.deleteTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+        .all()
+        .get()
+
+      TestUtils.waitUntilTrue(() => {
+        try {
+          admin
+            
.describeTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+            .topicNameValues()
+            .get(Topic.GROUP_METADATA_TOPIC_NAME)
+            .get(JTestUtils.DEFAULT_MAX_WAIT_MS, TimeUnit.MILLISECONDS)
+          false
+        } catch {
+          case e: ExecutionException =>
+            e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
+        }
+      }, msg = s"${Topic.GROUP_METADATA_TOPIC_NAME} was not deleted")
+
+      withConsumer(groupId = "group", groupProtocol = GroupProtocol.CONSUMER) 
{ consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+    }
+  }
+
   private def rollAndCompactConsumerOffsets(): Unit = {
     val tp = new TopicPartition("__consumer_offsets", 0)
     val broker = cluster.brokers.asScala.head._2
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index a166368a5aa..c5f4d3187e3 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -20,15 +20,18 @@ package kafka.server.metadata
 import kafka.coordinator.transaction.TransactionCoordinator
 
 import java.util.Collections.{singleton, singletonList, singletonMap}
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import kafka.log.LogManager
 import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
 import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, 
NewTopic}
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, 
TopicRecord}
 import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.coordinator.group.GroupCoordinator
@@ -179,6 +182,69 @@ class BrokerMetadataPublisherTest {
     }
   }
 
+  @Test
+  def testGroupCoordinatorTopicDeletion(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))
+    val metadataCache = new KRaftMetadataCache(0, () => 
KRaftVersion.KRAFT_VERSION_1)
+    val logManager = mock(classOf[LogManager])
+    val replicaManager = mock(classOf[ReplicaManager])
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val faultHandler = mock(classOf[FaultHandler])
+
+    val metadataPublisher = new BrokerMetadataPublisher(
+      config,
+      metadataCache,
+      logManager,
+      replicaManager,
+      groupCoordinator,
+      mock(classOf[TransactionCoordinator]),
+      Some(mock(classOf[ShareCoordinator])),
+      mock(classOf[DynamicConfigPublisher]),
+      mock(classOf[DynamicClientQuotaPublisher]),
+      mock(classOf[DynamicTopicClusterQuotaPublisher]),
+      mock(classOf[ScramPublisher]),
+      mock(classOf[DelegationTokenPublisher]),
+      mock(classOf[AclPublisher]),
+      faultHandler,
+      faultHandler
+    )
+
+    val topicId = Uuid.randomUuid()
+    var delta = new MetadataDelta(MetadataImage.EMPTY)
+    delta.replay(new TopicRecord()
+      .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+      .setTopicId(topicId)
+    )
+    delta.replay(new PartitionRecord()
+      .setTopicId(topicId)
+      .setPartitionId(0)
+      .setLeader(config.brokerId)
+    )
+    delta.replay(new PartitionRecord()
+      .setTopicId(topicId)
+      .setPartitionId(1)
+      .setLeader(config.brokerId)
+    )
+    val image = delta.apply(MetadataProvenance.EMPTY)
+
+    delta = new MetadataDelta(image)
+    delta.replay(new RemoveTopicRecord()
+      .setTopicId(topicId)
+    )
+
+    metadataPublisher.onMetadataUpdate(delta, 
delta.apply(MetadataProvenance.EMPTY),
+      LogDeltaManifest.newBuilder()
+        .provenance(MetadataProvenance.EMPTY)
+        .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
+        .numBatches(1)
+        .elapsedNs(100)
+        .numBytes(42)
+        .build())
+
+    verify(groupCoordinator).onResignation(0, OptionalInt.empty())
+    verify(groupCoordinator).onResignation(1, OptionalInt.empty())
+  }
+
   @Test
   def testNewImagePushedToGroupCoordinator(): Unit = {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))

Reply via email to