This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0c5e5c5d2d3 KAFKA-18329; [2/3] Delete old group coordinator (KIP-848) 
(#19251)
0c5e5c5d2d3 is described below

commit 0c5e5c5d2d312c15bc4aa54e9933df209cf5cb4b
Author: David Jacot <[email protected]>
AuthorDate: Fri Mar 21 13:54:41 2025 +0100

    KAFKA-18329; [2/3] Delete old group coordinator (KIP-848) (#19251)
    
    This patch is the second of a series of patches to remove the old group
    coordinator. With the release of Apache Kafka 4.0, the so-called new
    group coordinator is the default and only option available now.
    
    The patch removes `group.coordinator.new.enable` (internal config) and
    all its usages (integration tests, unit tests, etc.). It also cleans up
    `KafkaApis` to remove logic only used by the old group coordinator.
    
    Reviewers: Jeff Kim <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../clients/consumer/ConsumerIntegrationTest.java  |   7 +-
 .../group/GroupCoordinatorAdapter.scala            |   2 -
 .../src/main/scala/kafka/server/BrokerServer.scala |  63 +++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  44 +----
 core/src/main/scala/kafka/server/KafkaConfig.scala |  12 +-
 .../kafka/server/DeleteGroupsRequestTest.scala     |  23 +--
 .../kafka/server/DescribeGroupsRequestTest.scala   |  16 +-
 .../server/GroupCoordinatorBaseRequestTest.scala   |   4 -
 .../unit/kafka/server/HeartbeatRequestTest.scala   |  16 +-
 .../unit/kafka/server/JoinGroupRequestTest.scala   |  16 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 218 +--------------------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   4 -
 .../unit/kafka/server/ListGroupsRequestTest.scala  |  21 +-
 .../kafka/server/OffsetCommitRequestTest.scala     |  23 +--
 .../kafka/server/OffsetDeleteRequestTest.scala     |  19 +-
 .../unit/kafka/server/OffsetFetchRequestTest.scala |  56 +-----
 .../server/ShareGroupHeartbeatRequestTest.scala    |   6 -
 .../unit/kafka/server/SyncGroupRequestTest.scala   |  17 +-
 .../kafka/server/TxnOffsetCommitRequestTest.scala  |  20 +-
 .../kafka/coordinator/group/GroupCoordinator.java  |   5 -
 .../coordinator/group/GroupCoordinatorConfig.java  |   7 -
 .../coordinator/group/GroupCoordinatorService.java |   8 -
 .../group/modern/share/ShareGroupConfigTest.java   |   1 -
 .../test/junit/ClusterTestExtensionsTest.java      |   4 -
 24 files changed, 61 insertions(+), 551 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index e0572340e5f..a56de229318 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -44,18 +44,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class ConsumerIntegrationTest {
 
     @ClusterTests({
-        @ClusterTest(serverProperties = {
-            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "1"),
-            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
-            @ClusterConfigProperty(key = "group.coordinator.new.enable", value 
= "false")
-        }),
         @ClusterTest(serverProperties = {
             @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "1"),
             @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
             @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic")
         })
     })
-    public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance 
clusterInstance) throws Exception {
+    public void testAsyncConsumerWithConsumerProtocolDisabled(ClusterInstance 
clusterInstance) throws Exception {
         String topic = "test-topic";
         clusterInstance.createTopic(topic, 1, (short) 1);
         try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(Map.of(
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 22dbc3a3563..fd301d14fa2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -67,8 +67,6 @@ private[group] class GroupCoordinatorAdapter(
   private val time: Time
 ) extends org.apache.kafka.coordinator.group.GroupCoordinator {
 
-  override def isNewGroupCoordinator: Boolean = false
-
   override def consumerGroupHeartbeat(
     context: RequestContext,
     request: ConsumerGroupHeartbeatRequestData
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e1794bb41f1..f1707f1b234 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.coordinator.group.{CoordinatorLoaderImpl, 
CoordinatorPartitionWriter, GroupCoordinatorAdapter}
+import kafka.coordinator.group.{CoordinatorLoaderImpl, 
CoordinatorPartitionWriter}
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.LogManager
 import kafka.log.remote.RemoteLogManager
@@ -622,41 +622,32 @@ class BrokerServer(
     // Create group coordinator, but don't start it until we've started 
replica manager.
     // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it 
would be good
     // to fix the underlying issue.
-    if (config.isNewGroupCoordinatorEnabled) {
-      val time = Time.SYSTEM
-      val serde = new GroupCoordinatorRecordSerde
-      val timer = new SystemTimerReaper(
-        "group-coordinator-reaper",
-        new SystemTimer("group-coordinator")
-      )
-      val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
-        time,
-        replicaManager,
-        serde,
-        config.groupCoordinatorConfig.offsetsLoadBufferSize
-      )
-      val writer = new CoordinatorPartitionWriter(
-        replicaManager
-      )
-      new GroupCoordinatorService.Builder(config.brokerId, 
config.groupCoordinatorConfig)
-        .withTime(time)
-        .withTimer(timer)
-        .withLoader(loader)
-        .withWriter(writer)
-        .withCoordinatorRuntimeMetrics(new 
GroupCoordinatorRuntimeMetrics(metrics))
-        .withGroupCoordinatorMetrics(new 
GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
-        .withGroupConfigManager(groupConfigManager)
-        .withPersister(persister)
-        .withAuthorizer(authorizer.toJava)
-        .build()
-    } else {
-      GroupCoordinatorAdapter(
-        config,
-        replicaManager,
-        Time.SYSTEM,
-        metrics
-      )
-    }
+    val time = Time.SYSTEM
+    val serde = new GroupCoordinatorRecordSerde
+    val timer = new SystemTimerReaper(
+      "group-coordinator-reaper",
+      new SystemTimer("group-coordinator")
+    )
+    val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
+      time,
+      replicaManager,
+      serde,
+      config.groupCoordinatorConfig.offsetsLoadBufferSize
+    )
+    val writer = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+    new GroupCoordinatorService.Builder(config.brokerId, 
config.groupCoordinatorConfig)
+      .withTime(time)
+      .withTimer(timer)
+      .withLoader(loader)
+      .withWriter(writer)
+      .withCoordinatorRuntimeMetrics(new 
GroupCoordinatorRuntimeMetrics(metrics))
+      .withGroupCoordinatorMetrics(new 
GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
+      .withGroupConfigManager(groupConfigManager)
+      .withPersister(persister)
+      .withAuthorizer(authorizer.toJava)
+      .build()
   }
 
   private def createShareCoordinator(): Option[ShareCoordinator] = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e2d8e17f950..b6fbce294aa 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1646,40 +1646,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace(s"End transaction marker append for producer id $producerId 
completed with status: $currentErrors")
       updateErrors(producerId, currentErrors)
 
-      def maybeSendResponse(): Unit = {
-        if (numAppends.decrementAndGet() == 0) {
-          requestHelper.sendResponseExemptThrottle(request, new 
WriteTxnMarkersResponse(errors))
-        }
-      }
-
-      // The new group coordinator uses GroupCoordinator#completeTransaction 
so we do
-      // not need to call GroupCoordinator#onTransactionCompleted here.
-      if (config.isNewGroupCoordinatorEnabled) {
-        maybeSendResponse()
-        return
-      }
-
-      val successfulOffsetsPartitions = currentErrors.asScala.filter { case 
(topicPartition, error) =>
-        topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == 
Errors.NONE
-      }.keys
-
-      // If no end transaction marker has been written to a __consumer_offsets 
partition, we do not
-      // need to call GroupCoordinator#onTransactionCompleted.
-      if (successfulOffsetsPartitions.isEmpty) {
-        maybeSendResponse()
-        return
-      }
-
-      // Otherwise, we call GroupCoordinator#onTransactionCompleted to 
materialize the offsets
-      // into the cache and we wait until the meterialization is completed.
-      groupCoordinator.onTransactionCompleted(producerId, 
successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
-        if (exception != null) {
-          error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", exception)
-          val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
-          successfulOffsetsPartitions.foreach(updatedErrors.put(_, 
Errors.UNKNOWN_SERVER_ERROR))
-          updateErrors(producerId, updatedErrors)
-        }
-        maybeSendResponse()
+      if (numAppends.decrementAndGet() == 0) {
+        requestHelper.sendResponseExemptThrottle(request, new 
WriteTxnMarkersResponse(errors))
       }
     }
 
@@ -1727,9 +1695,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
         partitionsWithCompatibleMessageFormat.foreach { partition =>
-          if (groupCoordinator.isNewGroupCoordinator && partition.topic == 
GROUP_METADATA_TOPIC_NAME) {
-            // When the new group coordinator is used, writing the end marker 
is fully delegated
-            // to the group coordinator.
+          if (partition.topic == GROUP_METADATA_TOPIC_NAME) {
             groupCoordinator.completeTransaction(
               partition,
               marker.producerId,
@@ -2525,7 +2491,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def isConsumerGroupProtocolEnabled(): Boolean = {
-    groupCoordinator.isNewGroupCoordinator &&
       
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) &&
       groupVersion().isConsumerRebalanceProtocolSupported
   }
@@ -2658,7 +2623,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def isStreamsGroupProtocolEnabled(): Boolean = {
-    groupCoordinator.isNewGroupCoordinator &&
       
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
   }
 
@@ -3843,7 +3807,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def isShareGroupProtocolEnabled: Boolean = {
-    groupCoordinator.isNewGroupCoordinator && 
config.shareGroupConfig.isShareGroupEnabled
+    config.shareGroupConfig.isShareGroupEnabled
   }
 
   private def updateRecordConversionStats(request: RequestChannel.Request,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 608e27f25e4..2b8be8518b5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -388,28 +388,18 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   /** ********* Controlled shutdown configuration ***********/
   val controlledShutdownEnable = 
getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
 
-  /** New group coordinator configs */
-  val isNewGroupCoordinatorEnabled = 
getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG)
+  /** Group coordinator configs */
   val groupCoordinatorRebalanceProtocols = {
     val protocols = 
getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
       .asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet
     if (!protocols.contains(GroupType.CLASSIC)) {
       throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
     }
-    if (protocols.contains(GroupType.CONSUMER) && 
!isNewGroupCoordinatorEnabled) {
-      warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only 
supported with the new group coordinator.")
-    }
     if (protocols.contains(GroupType.SHARE)) {
-      if (!isNewGroupCoordinatorEnabled) {
-        warn(s"The new '${GroupType.SHARE}' rebalance protocol is only 
supported with the new group coordinator.")
-      }
       warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol 
are enabled. " +
         "This is part of the early access of KIP-932 and MUST NOT be used in 
production.")
     }
     if (protocols.contains(GroupType.STREAMS)) {
-      if (!isNewGroupCoordinatorEnabled) {
-        warn(s"The new '${GroupType.STREAMS}' rebalance protocol is only 
supported with the new group coordinator.")
-      }
       warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance 
protocol are enabled. " +
         "This is part of the early access of KIP-1071 and MUST NOT be used in 
production.")
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index f9b9e9c946a..4a6cc43aae3 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
-import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Assertions.assertEquals
 
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class DeleteGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
@@ -32,7 +32,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testDeleteGroupsWithNewConsumerGroupProtocol(): Unit = {
     testDeleteGroups(true)
   }
 
@@ -42,28 +42,11 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
-    testDeleteGroups(false)
-  }
-
-  @ClusterTest(
-    types = Array(Type.KRAFT, Type.CO_KRAFT),
-    serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-    )
-  )
-  def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+  def testDeleteGroupsWithOldConsumerGroupProtocol(): Unit = {
     testDeleteGroups(false)
   }
 
   private def testDeleteGroups(useNewProtocol: Boolean): Unit = {
-    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 4f1ba4b9b2c..239a382a1dd 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -32,21 +32,7 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinat
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
-  def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
-    testDescribeGroups()
-  }
-
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
-  def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
-    testDescribeGroups()
-  }
-
-  private def testDescribeGroups(): Unit = {
+  def testDescribeGroupsWithOldConsumerGroupProtocol(): Unit = {
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index eaec283ced7..98124c4377f 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -117,10 +117,6 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     cluster.brokers.values.stream.allMatch(b => 
b.config.unstableApiVersionsEnabled)
   }
 
-  protected def isNewGroupCoordinatorEnabled: Boolean = {
-    cluster.brokers.values.stream.allMatch(b => 
b.config.isNewGroupCoordinatorEnabled)
-  }
-
   protected def getTopicIds: Map[String, Uuid] = {
     
cluster.controllers().get(cluster.controllerIds().iterator().next()).controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
   }
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 332c01aeeb5..35be14db515 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -36,21 +36,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBas
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
-  def testHeartbeatWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
-    testHeartbeat()
-  }
-
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
-  def testHeartbeatWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
-    testHeartbeat()
-  }
-
-  private def testHeartbeat(): Unit = {
+  def testHeartbeatWithOldConsumerGroupProtocol(): Unit = {
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index f77c2fc1bfa..dbe7a7046a4 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -40,21 +40,7 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBas
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
   ))
-  def testJoinGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
-    testJoinGroup()
-  }
-
-  @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
-  ))
-  def testJoinGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
-    testJoinGroup()
-  }
-
-  private def testJoinGroup(): Unit = {
+  def testJoinGroupWithOldConsumerGroupProtocol(): Unit = {
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9c382d709c4..4750017ecd2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -184,7 +184,6 @@ class KafkaApisTest extends Logging {
       true,
       () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
 
-    
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
     setupFeatures(featureVersions)
 
     new KafkaApis(
@@ -2394,8 +2393,6 @@ class KafkaApisTest extends Logging {
 
     when(replicaManager.onlinePartition(any()))
       .thenReturn(Some(mock(classOf[Partition])))
-    when(groupCoordinator.isNewGroupCoordinator)
-      .thenReturn(true)
     when(groupCoordinator.completeTransaction(
       ArgumentMatchers.eq(topicPartition),
       any(),
@@ -2504,132 +2501,7 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testHandleWriteTxnMarkersRequestWithOldGroupCoordinator(): Unit = {
-    val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
-    val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
-    val foo0 = new TopicPartition("foo", 0)
-    val foo1 = new TopicPartition("foo", 1)
-
-    val allPartitions = List(
-      offset0,
-      offset1,
-      foo0,
-      foo1
-    )
-
-    val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
-      List(
-        new TxnMarkerEntry(
-          1L,
-          1.toShort,
-          0,
-          TransactionResult.COMMIT,
-          List(offset0, foo0).asJava
-        ),
-        new TxnMarkerEntry(
-          2L,
-          1.toShort,
-          0,
-          TransactionResult.ABORT,
-          List(offset1, foo1).asJava
-        )
-      ).asJava
-    ).build()
-
-    val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
-
-    allPartitions.foreach { tp =>
-      when(replicaManager.onlinePartition(tp))
-        .thenReturn(Some(mock(classOf[Partition])))
-    }
-
-    when(groupCoordinator.onTransactionCompleted(
-      ArgumentMatchers.eq(1L),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.eq(TransactionResult.COMMIT)
-    )).thenReturn(CompletableFuture.completedFuture[Void](null))
-
-    when(groupCoordinator.onTransactionCompleted(
-      ArgumentMatchers.eq(2L),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.eq(TransactionResult.ABORT)
-    
)).thenReturn(FutureUtils.failedFuture[Void](Errors.NOT_CONTROLLER.exception))
-
-    val entriesPerPartition: ArgumentCaptor[Map[TopicPartition, 
MemoryRecords]] =
-      ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
-    val responseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
-      ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
-
-    when(replicaManager.appendRecords(
-      ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
-      ArgumentMatchers.eq(-1),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
-      entriesPerPartition.capture(),
-      responseCallback.capture(),
-      any(),
-      any(),
-      ArgumentMatchers.eq(RequestLocal.noCaching()),
-      any(),
-      any()
-    )).thenAnswer { _ =>
-      responseCallback.getValue.apply(
-        entriesPerPartition.getValue.keySet.map { tp =>
-          tp -> new PartitionResponse(Errors.NONE)
-        }.toMap
-      )
-    }
-    kafkaApis = createKafkaApis(overrideProperties = Map(
-      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false"
-    ))
-    kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, 
RequestLocal.noCaching())
-
-    val expectedResponse = new WriteTxnMarkersResponseData()
-      .setMarkers(List(
-        new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
-          .setProducerId(1L)
-          .setTopics(List(
-            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
-              .setName(Topic.GROUP_METADATA_TOPIC_NAME)
-              .setPartitions(List(
-                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
-                  .setPartitionIndex(0)
-                  .setErrorCode(Errors.NONE.code)
-              ).asJava),
-            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
-              .setName("foo")
-              .setPartitions(List(
-                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
-                  .setPartitionIndex(0)
-                  .setErrorCode(Errors.NONE.code)
-              ).asJava)
-          ).asJava),
-        new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
-          .setProducerId(2L)
-          .setTopics(List(
-            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
-              .setName(Topic.GROUP_METADATA_TOPIC_NAME)
-              .setPartitions(List(
-                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
-                  .setPartitionIndex(1)
-                  .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
-              ).asJava),
-            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
-              .setName("foo")
-              .setPartitions(List(
-                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
-                  .setPartitionIndex(1)
-                  .setErrorCode(Errors.NONE.code)
-              ).asJava)
-          ).asJava)
-      ).asJava)
-
-    val response = 
verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
-    assertEquals(normalize(expectedResponse), normalize(response.data))
-  }
-
-  @Test
-  def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = {
+  def testHandleWriteTxnMarkersRequest(): Unit = {
     val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
     val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
     val foo0 = new TopicPartition("foo", 0)
@@ -2764,7 +2636,7 @@ class KafkaApisTest extends Logging {
     "NOT_COORDINATOR",
     "REQUEST_TIMED_OUT"
   ))
-  def 
testHandleWriteTxnMarkersRequestWithNewGroupCoordinatorErrorTranslation(error: 
Errors): Unit = {
+  def testHandleWriteTxnMarkersRequestErrorTranslation(error: Errors): Unit = {
     val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
 
     val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
@@ -6013,49 +5885,6 @@ class KafkaApisTest extends Logging {
     assertArrayEquals(expectedAcquiredRecords(10, 19, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
   }
 
-  @Test
-  def testHandleShareFetchNewGroupCoordinatorDisabled(): Unit = {
-    val topicId = Uuid.randomUuid()
-    val memberId: Uuid = Uuid.randomUuid()
-    val groupId = "group"
-
-    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-
-    val shareFetchRequestData = new ShareFetchRequestData().
-      setGroupId(groupId).
-      setMemberId(memberId.toString).
-      setShareSessionEpoch(1).
-      setTopics(util.List.of(new ShareFetchRequestData.FetchTopic().
-        setTopicId(topicId).
-        setPartitions(util.List.of(
-          new ShareFetchRequestData.FetchPartition()
-            .setPartitionIndex(0)
-            .setAcknowledgementBatches(util.List.of(
-              new AcknowledgementBatch()
-                .setFirstOffset(0)
-                .setLastOffset(9)
-                .setAcknowledgeTypes(util.List.of(1.toByte))
-            ))
-        ))
-      ))
-
-    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
-    val request = buildRequest(shareFetchRequest)
-
-    kafkaApis = createKafkaApis(
-      overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
-        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
-        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      )
-    kafkaApis.handleShareFetchRequest(request)
-
-    val response = verifyNoThrottling[ShareFetchResponse](request)
-    val responseData = response.data()
-
-    assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode)
-  }
-
   @Test
   def testHandleShareFetchShareGroupDisabled(): Unit = {
     val topicId = Uuid.randomUuid()
@@ -6285,49 +6114,6 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
   }
 
-  @Test
-  def testHandleShareAcknowledgeNewGroupCoordinatorDisabled(): Unit = {
-    val topicId = Uuid.randomUuid()
-    val memberId: Uuid = Uuid.randomUuid()
-    val groupId = "group"
-
-    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-
-    val shareAcknowledgeRequestData = new ShareAcknowledgeRequestData()
-      .setGroupId(groupId)
-      .setMemberId(memberId.toString)
-      .setShareSessionEpoch(1)
-      .setTopics(List(new ShareAcknowledgeRequestData.AcknowledgeTopic()
-        .setTopicId(topicId)
-        .setPartitions(List(
-          new ShareAcknowledgeRequestData.AcknowledgePartition()
-            .setPartitionIndex(0)
-            .setAcknowledgementBatches(List(
-              new ShareAcknowledgeRequestData.AcknowledgementBatch()
-                .setFirstOffset(0)
-                .setLastOffset(9)
-                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
-            ).asJava)
-        ).asJava)
-      ).asJava)
-
-    val shareAcknowledgeRequest = new 
ShareAcknowledgeRequest.Builder(shareAcknowledgeRequestData).build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion)
-    val request = buildRequest(shareAcknowledgeRequest)
-
-    kafkaApis = createKafkaApis(
-      overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
-        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
-        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
-      )
-    kafkaApis.handleShareAcknowledgeRequest(request)
-
-    val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
-    val responseData = response.data()
-
-    assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode)
-  }
-
   @Test
   def testHandleShareAcknowledgeShareGroupDisabled(): Unit = {
     val topicId = Uuid.randomUuid()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1a57724d543..88cbe22769f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1003,7 +1003,6 @@ class KafkaConfigTest {
         case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
 
         /** New group coordinator configs */
-        case GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG => // 
ignore
         case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
 
         /** Consumer groups configs */
@@ -1754,20 +1753,17 @@ class KafkaConfigTest {
     
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,consumer")
     var config = KafkaConfig.fromProps(props)
     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER), 
config.groupCoordinatorRebalanceProtocols)
-    assertTrue(config.isNewGroupCoordinatorEnabled)
     assertFalse(config.shareGroupConfig.isShareGroupEnabled)
 
     // This is OK.
     
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,consumer,share")
     config = KafkaConfig.fromProps(props)
     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), 
config.groupCoordinatorRebalanceProtocols)
-    assertTrue(config.isNewGroupCoordinatorEnabled)
     assertTrue(config.shareGroupConfig.isShareGroupEnabled)
 
     
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,streams")
     val config2 = KafkaConfig.fromProps(props)
     assertEquals(Set(GroupType.CLASSIC, GroupType.STREAMS), 
config2.groupCoordinatorRebalanceProtocols)
-    assertTrue(config2.isNewGroupCoordinatorEnabled)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index 3961c725ed4..37687f67b7b 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
 import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
-import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Assertions.assertEquals
 
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
@@ -34,7 +34,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBa
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
     )
   )
-  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+  def testListGroupsWithNewConsumerGroupProtocol(): Unit = {
     testListGroups(true)
   }
 
@@ -43,26 +43,11 @@ class ListGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorBa
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
   ))
-  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
-    testListGroups(false)
-  }
-
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
-  ))
-  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+  def testListGroupsWithOldConsumerGroupProtocol(): Unit = {
     testListGroups(false)
   }
 
   private def testListGroups(useNewProtocol: Boolean): Unit = {
-    if (!isNewGroupCoordinatorEnabled && useNewProtocol) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 57700712379..0c6355e434e 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -20,7 +20,6 @@ import 
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Clu
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.Assertions.fail
 
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class OffsetCommitRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
@@ -31,7 +30,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testOffsetCommitWithNewConsumerGroupProtocol(): Unit = {
     testOffsetCommit(true)
   }
 
@@ -41,25 +40,11 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
-    testOffsetCommit(false)
-  }
-
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
-  def testOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+  def testOffsetCommitWithOldConsumerGroupProtocol(): Unit = {
     testOffsetCommit(false)
   }
 
   private def testOffsetCommit(useNewProtocol: Boolean): Unit = {
-    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
@@ -97,7 +82,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         partition = 0,
         offset = 100L,
         expectedError =
-          if (isNewGroupCoordinatorEnabled && version >= 9) 
Errors.GROUP_ID_NOT_FOUND
+          if (version >= 9) Errors.GROUP_ID_NOT_FOUND
           else Errors.ILLEGAL_GENERATION,
         version = version.toShort
       )
@@ -111,7 +96,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         partition = 0,
         offset = 100L,
         expectedError =
-          if (isNewGroupCoordinatorEnabled && version >= 9) 
Errors.GROUP_ID_NOT_FOUND
+          if (version >= 9) Errors.GROUP_ID_NOT_FOUND
           else Errors.ILLEGAL_GENERATION,
         version = version.toShort
       )
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 0a808f6c868..edb40c99e6a 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -20,7 +20,6 @@ import 
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Clu
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.Assertions.fail
 
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class OffsetDeleteRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
@@ -30,7 +29,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testOffsetDeleteWithNewConsumerGroupProtocol(): Unit = {
     testOffsetDelete(true)
   }
 
@@ -38,25 +37,11 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
-  def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
-    testOffsetDelete(false)
-  }
-
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
-  def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+  def testOffsetDeleteWithOldConsumerGroupProtocol(): Unit = {
     testOffsetDelete(false)
   }
 
   private def testOffsetDelete(useNewProtocol: Boolean): Unit = {
-    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index a504ecdeea0..5286c173625 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Assertions.assertEquals
 
 import scala.jdk.CollectionConverters._
 
@@ -35,7 +35,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testSingleGroupOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
     testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
@@ -45,27 +45,17 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def 
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testSingleGroupOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
     testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
   }
 
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
-  def 
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
-    testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true)
-  }
-
   @ClusterTest(
     serverProperties = Array(
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def 
testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator():
 Unit = {
+  def testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
     testSingleGroupAllOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
@@ -73,27 +63,17 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
-  def 
testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator():
 Unit = {
+  def testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
     testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = 
false)
   }
 
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
-  def 
testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator():
 Unit = {
-    testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = true)
-  }
-
   @ClusterTest(
     serverProperties = Array(
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
   )
-  def 
testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testMultiGroupsOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
     testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
@@ -101,25 +81,11 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
-  def 
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testMultiGroupsOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
     testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = 
false)
   }
 
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
-  def 
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
-    testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true)
-  }
-
   private def testSingleGroupOffsetFetch(useNewProtocol: Boolean, 
requireStable: Boolean): Unit = {
-    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
@@ -288,10 +254,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
   }
 
   private def testSingleGroupAllOffsetFetch(useNewProtocol: Boolean, 
requireStable: Boolean): Unit = {
-    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
@@ -401,10 +363,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
   }
 
   private def testMultipleGroupsOffsetFetch(useNewProtocol: Boolean, 
requireStable: Boolean): Unit = {
-    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 448b6897ede..7aaa163a2de 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -54,7 +54,6 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
       new ClusterConfigProperty(key = "group.share.enable", value = "true"),
       new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
       new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
@@ -150,7 +149,6 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
       new ClusterConfigProperty(key = "group.share.enable", value = "true"),
       new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
       new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
@@ -301,7 +299,6 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
       new ClusterConfigProperty(key = "group.share.enable", value = "true"),
       new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
       new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
@@ -417,7 +414,6 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
       new ClusterConfigProperty(key = "group.share.enable", value = "true"),
       new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
       new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
@@ -606,7 +602,6 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
       new ClusterConfigProperty(key = "group.share.enable", value = "true"),
       new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
       new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
@@ -783,7 +778,6 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
       new ClusterConfigProperty(key = "group.share.enable", value = "true"),
       new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
       new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index 3a53fbf144a..f949dec3ddc 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -38,22 +38,7 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBas
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
   ))
-  def testSyncGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
-    testSyncGroup()
-  }
-
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
-  ))
-  def testSyncGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
-    testSyncGroup()
-  }
-
-  private def testSyncGroup(): Unit = {
+  def testSyncGroupWithOldConsumerGroupProtocol(): Unit = {
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
diff --git 
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index b2cd44bbd92..122e91e95bb 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.ProducerIdAndEpoch
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
 
 import scala.jdk.CollectionConverters.IterableHasAsScala
 
@@ -40,30 +40,16 @@ import scala.jdk.CollectionConverters.IterableHasAsScala
 class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
 
   @ClusterTest
-  def testTxnOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+  def testTxnOffsetCommitWithNewConsumerGroupProtocol(): Unit = {
     testTxnOffsetCommit(true)
   }
 
   @ClusterTest
-  def testTxnOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
-    testTxnOffsetCommit(false)
-  }
-
-  @ClusterTest(
-    serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    )
-  )
-  def testTxnOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+  def testTxnOffsetCommitWithOldConsumerGroupProtocol(): Unit = {
     testTxnOffsetCommit(false)
   }
 
   private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = {
-    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
-      fail("Cannot use the new protocol with the old group coordinator.")
-    }
-
     val topic = "topic"
     val partition = 0
     val transactionalId = "txn"
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 47269bad3a3..9be99e6e1a9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -68,11 +68,6 @@ import java.util.function.IntSupplier;
  */
 public interface GroupCoordinator {
 
-    /**
-     * @return True if the new coordinator; False otherwise.
-     */
-    boolean isNewGroupCoordinator();
-
     /**
      * Heartbeat to a Consumer Group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 81010c65f14..89131d61b6e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -40,7 +40,6 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
 import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -59,10 +58,6 @@ public class GroupCoordinatorConfig {
     ///
     /// Group coordinator configs
     ///
-    public static final String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = 
"group.coordinator.new.enable";
-    public static final String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the 
new group coordinator.";
-    public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = true;
-
     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols." +
             "The " + Group.GroupType.SHARE + " rebalance protocol is in early 
access and therefore must not be used in production.";
@@ -288,8 +283,6 @@ public class GroupCoordinatorConfig {
         .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, 
OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
OFFSETS_TOPIC_PARTITIONS_DOC)
         .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, 
OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
         .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) 
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, 
OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
-        // Internal configuration used by integration and system tests.
-        .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, 
NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, 
NEW_GROUP_COORDINATOR_ENABLE_DOC)
 
         // Offset configs
         .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, 
OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index cdf43f793d3..602499c6e49 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -337,14 +337,6 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         return new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
partitionFor(groupId));
     }
 
-    /**
-     * See {@link GroupCoordinator#isNewGroupCoordinator()}
-     */
-    @Override
-    public boolean isNewGroupCoordinator() {
-        return true;
-    }
-
     /**
      * See {@link GroupCoordinator#partitionFor(String)}
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 56d2a58bd63..36b48d307a8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -136,7 +136,6 @@ public class ShareGroupConfigTest {
         
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupMinRecordLockDurationMs);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupMaxRecordLockDurationMs);
         
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,share");
-        
configs.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, true);
         
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1);
         
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 
10);
 
diff --git 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index c013d155977..79593a317e2 100644
--- 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++ 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -83,7 +83,6 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -216,9 +215,6 @@ public class ClusterTestExtensionsTest {
     @ClusterTests({
         @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
-        }),
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "false"),
         })
     })
     public void testNotSupportedNewGroupProtocols(ClusterInstance 
clusterInstance) {

Reply via email to