This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0c5e5c5d2d3 KAFKA-18329; [2/3] Delete old group coordinator (KIP-848)
(#19251)
0c5e5c5d2d3 is described below
commit 0c5e5c5d2d312c15bc4aa54e9933df209cf5cb4b
Author: David Jacot <[email protected]>
AuthorDate: Fri Mar 21 13:54:41 2025 +0100
KAFKA-18329; [2/3] Delete old group coordinator (KIP-848) (#19251)
This patch is the second of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
The patch removes `group.coordinator.new.enable` (internal config) and
all its usages (integration tests, unit tests, etc.). It also cleans up
`KafkaApis` to remove logic only used by the old group coordinator.
Reviewers: Jeff Kim <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../clients/consumer/ConsumerIntegrationTest.java | 7 +-
.../group/GroupCoordinatorAdapter.scala | 2 -
.../src/main/scala/kafka/server/BrokerServer.scala | 63 +++---
core/src/main/scala/kafka/server/KafkaApis.scala | 44 +----
core/src/main/scala/kafka/server/KafkaConfig.scala | 12 +-
.../kafka/server/DeleteGroupsRequestTest.scala | 23 +--
.../kafka/server/DescribeGroupsRequestTest.scala | 16 +-
.../server/GroupCoordinatorBaseRequestTest.scala | 4 -
.../unit/kafka/server/HeartbeatRequestTest.scala | 16 +-
.../unit/kafka/server/JoinGroupRequestTest.scala | 16 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 218 +--------------------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 4 -
.../unit/kafka/server/ListGroupsRequestTest.scala | 21 +-
.../kafka/server/OffsetCommitRequestTest.scala | 23 +--
.../kafka/server/OffsetDeleteRequestTest.scala | 19 +-
.../unit/kafka/server/OffsetFetchRequestTest.scala | 56 +-----
.../server/ShareGroupHeartbeatRequestTest.scala | 6 -
.../unit/kafka/server/SyncGroupRequestTest.scala | 17 +-
.../kafka/server/TxnOffsetCommitRequestTest.scala | 20 +-
.../kafka/coordinator/group/GroupCoordinator.java | 5 -
.../coordinator/group/GroupCoordinatorConfig.java | 7 -
.../coordinator/group/GroupCoordinatorService.java | 8 -
.../group/modern/share/ShareGroupConfigTest.java | 1 -
.../test/junit/ClusterTestExtensionsTest.java | 4 -
24 files changed, 61 insertions(+), 551 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index e0572340e5f..a56de229318 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -44,18 +44,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsumerIntegrationTest {
@ClusterTests({
- @ClusterTest(serverProperties = {
- @ClusterConfigProperty(key = "offsets.topic.num.partitions", value
= "1"),
- @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
- @ClusterConfigProperty(key = "group.coordinator.new.enable", value
= "false")
- }),
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value
= "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
@ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic")
})
})
- public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance
clusterInstance) throws Exception {
+ public void testAsyncConsumerWithConsumerProtocolDisabled(ClusterInstance
clusterInstance) throws Exception {
String topic = "test-topic";
clusterInstance.createTopic(topic, 1, (short) 1);
try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(Map.of(
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 22dbc3a3563..fd301d14fa2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -67,8 +67,6 @@ private[group] class GroupCoordinatorAdapter(
private val time: Time
) extends org.apache.kafka.coordinator.group.GroupCoordinator {
- override def isNewGroupCoordinator: Boolean = false
-
override def consumerGroupHeartbeat(
context: RequestContext,
request: ConsumerGroupHeartbeatRequestData
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e1794bb41f1..f1707f1b234 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,7 +17,7 @@
package kafka.server
-import kafka.coordinator.group.{CoordinatorLoaderImpl,
CoordinatorPartitionWriter, GroupCoordinatorAdapter}
+import kafka.coordinator.group.{CoordinatorLoaderImpl,
CoordinatorPartitionWriter}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
@@ -622,41 +622,32 @@ class BrokerServer(
// Create group coordinator, but don't start it until we've started
replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it
would be good
// to fix the underlying issue.
- if (config.isNewGroupCoordinatorEnabled) {
- val time = Time.SYSTEM
- val serde = new GroupCoordinatorRecordSerde
- val timer = new SystemTimerReaper(
- "group-coordinator-reaper",
- new SystemTimer("group-coordinator")
- )
- val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
- time,
- replicaManager,
- serde,
- config.groupCoordinatorConfig.offsetsLoadBufferSize
- )
- val writer = new CoordinatorPartitionWriter(
- replicaManager
- )
- new GroupCoordinatorService.Builder(config.brokerId,
config.groupCoordinatorConfig)
- .withTime(time)
- .withTimer(timer)
- .withLoader(loader)
- .withWriter(writer)
- .withCoordinatorRuntimeMetrics(new
GroupCoordinatorRuntimeMetrics(metrics))
- .withGroupCoordinatorMetrics(new
GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
- .withGroupConfigManager(groupConfigManager)
- .withPersister(persister)
- .withAuthorizer(authorizer.toJava)
- .build()
- } else {
- GroupCoordinatorAdapter(
- config,
- replicaManager,
- Time.SYSTEM,
- metrics
- )
- }
+ val time = Time.SYSTEM
+ val serde = new GroupCoordinatorRecordSerde
+ val timer = new SystemTimerReaper(
+ "group-coordinator-reaper",
+ new SystemTimer("group-coordinator")
+ )
+ val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
+ time,
+ replicaManager,
+ serde,
+ config.groupCoordinatorConfig.offsetsLoadBufferSize
+ )
+ val writer = new CoordinatorPartitionWriter(
+ replicaManager
+ )
+ new GroupCoordinatorService.Builder(config.brokerId,
config.groupCoordinatorConfig)
+ .withTime(time)
+ .withTimer(timer)
+ .withLoader(loader)
+ .withWriter(writer)
+ .withCoordinatorRuntimeMetrics(new
GroupCoordinatorRuntimeMetrics(metrics))
+ .withGroupCoordinatorMetrics(new
GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
+ .withGroupConfigManager(groupConfigManager)
+ .withPersister(persister)
+ .withAuthorizer(authorizer.toJava)
+ .build()
}
private def createShareCoordinator(): Option[ShareCoordinator] = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e2d8e17f950..b6fbce294aa 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1646,40 +1646,8 @@ class KafkaApis(val requestChannel: RequestChannel,
trace(s"End transaction marker append for producer id $producerId
completed with status: $currentErrors")
updateErrors(producerId, currentErrors)
- def maybeSendResponse(): Unit = {
- if (numAppends.decrementAndGet() == 0) {
- requestHelper.sendResponseExemptThrottle(request, new
WriteTxnMarkersResponse(errors))
- }
- }
-
- // The new group coordinator uses GroupCoordinator#completeTransaction
so we do
- // not need to call GroupCoordinator#onTransactionCompleted here.
- if (config.isNewGroupCoordinatorEnabled) {
- maybeSendResponse()
- return
- }
-
- val successfulOffsetsPartitions = currentErrors.asScala.filter { case
(topicPartition, error) =>
- topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error ==
Errors.NONE
- }.keys
-
- // If no end transaction marker has been written to a __consumer_offsets
partition, we do not
- // need to call GroupCoordinator#onTransactionCompleted.
- if (successfulOffsetsPartitions.isEmpty) {
- maybeSendResponse()
- return
- }
-
- // Otherwise, we call GroupCoordinator#onTransactionCompleted to
materialize the offsets
- // into the cache and we wait until the meterialization is completed.
- groupCoordinator.onTransactionCompleted(producerId,
successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
- if (exception != null) {
- error(s"Received an exception while trying to update the offsets
cache on transaction marker append", exception)
- val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
- successfulOffsetsPartitions.foreach(updatedErrors.put(_,
Errors.UNKNOWN_SERVER_ERROR))
- updateErrors(producerId, updatedErrors)
- }
- maybeSendResponse()
+ if (numAppends.decrementAndGet() == 0) {
+ requestHelper.sendResponseExemptThrottle(request, new
WriteTxnMarkersResponse(errors))
}
}
@@ -1727,9 +1695,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
partitionsWithCompatibleMessageFormat.foreach { partition =>
- if (groupCoordinator.isNewGroupCoordinator && partition.topic ==
GROUP_METADATA_TOPIC_NAME) {
- // When the new group coordinator is used, writing the end marker
is fully delegated
- // to the group coordinator.
+ if (partition.topic == GROUP_METADATA_TOPIC_NAME) {
groupCoordinator.completeTransaction(
partition,
marker.producerId,
@@ -2525,7 +2491,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def isConsumerGroupProtocolEnabled(): Boolean = {
- groupCoordinator.isNewGroupCoordinator &&
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) &&
groupVersion().isConsumerRebalanceProtocolSupported
}
@@ -2658,7 +2623,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def isStreamsGroupProtocolEnabled(): Boolean = {
- groupCoordinator.isNewGroupCoordinator &&
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
}
@@ -3843,7 +3807,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def isShareGroupProtocolEnabled: Boolean = {
- groupCoordinator.isNewGroupCoordinator &&
config.shareGroupConfig.isShareGroupEnabled
+ config.shareGroupConfig.isShareGroupEnabled
}
private def updateRecordConversionStats(request: RequestChannel.Request,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 608e27f25e4..2b8be8518b5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -388,28 +388,18 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
/** ********* Controlled shutdown configuration ***********/
val controlledShutdownEnable =
getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
- /** New group coordinator configs */
- val isNewGroupCoordinatorEnabled =
getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG)
+ /** Group coordinator configs */
val groupCoordinatorRebalanceProtocols = {
val protocols =
getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
.asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet
if (!protocols.contains(GroupType.CLASSIC)) {
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}'
protocol is not supported.")
}
- if (protocols.contains(GroupType.CONSUMER) &&
!isNewGroupCoordinatorEnabled) {
- warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only
supported with the new group coordinator.")
- }
if (protocols.contains(GroupType.SHARE)) {
- if (!isNewGroupCoordinatorEnabled) {
- warn(s"The new '${GroupType.SHARE}' rebalance protocol is only
supported with the new group coordinator.")
- }
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol
are enabled. " +
"This is part of the early access of KIP-932 and MUST NOT be used in
production.")
}
if (protocols.contains(GroupType.STREAMS)) {
- if (!isNewGroupCoordinatorEnabled) {
- warn(s"The new '${GroupType.STREAMS}' rebalance protocol is only
supported with the new group coordinator.")
- }
warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance
protocol are enabled. " +
"This is part of the early access of KIP-1071 and MUST NOT be used in
production.")
}
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index f9b9e9c946a..4a6cc43aae3 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
-import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Assertions.assertEquals
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@@ -32,7 +32,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testDeleteGroupsWithNewConsumerGroupProtocol(): Unit = {
testDeleteGroups(true)
}
@@ -42,28 +42,11 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
- testDeleteGroups(false)
- }
-
- @ClusterTest(
- types = Array(Type.KRAFT, Type.CO_KRAFT),
- serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- )
- )
- def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ def testDeleteGroupsWithOldConsumerGroupProtocol(): Unit = {
testDeleteGroups(false)
}
private def testDeleteGroups(useNewProtocol: Boolean): Unit = {
- if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 4f1ba4b9b2c..239a382a1dd 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -32,21 +32,7 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinat
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
- def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
- testDescribeGroups()
- }
-
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
- def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
- testDescribeGroups()
- }
-
- private def testDescribeGroups(): Unit = {
+ def testDescribeGroupsWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index eaec283ced7..98124c4377f 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -117,10 +117,6 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
cluster.brokers.values.stream.allMatch(b =>
b.config.unstableApiVersionsEnabled)
}
- protected def isNewGroupCoordinatorEnabled: Boolean = {
- cluster.brokers.values.stream.allMatch(b =>
b.config.isNewGroupCoordinatorEnabled)
- }
-
protected def getTopicIds: Map[String, Uuid] = {
cluster.controllers().get(cluster.controllerIds().iterator().next()).controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
}
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 332c01aeeb5..35be14db515 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -36,21 +36,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBas
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
- def testHeartbeatWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit
= {
- testHeartbeat()
- }
-
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
- def testHeartbeatWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit
= {
- testHeartbeat()
- }
-
- private def testHeartbeat(): Unit = {
+ def testHeartbeatWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index f77c2fc1bfa..dbe7a7046a4 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -40,21 +40,7 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBas
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
))
- def testJoinGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit
= {
- testJoinGroup()
- }
-
- @ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
- ))
- def testJoinGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit
= {
- testJoinGroup()
- }
-
- private def testJoinGroup(): Unit = {
+ def testJoinGroupWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9c382d709c4..4750017ecd2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -184,7 +184,6 @@ class KafkaApisTest extends Logging {
true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(),
Collections.emptyMap[String, java.lang.Short], 0))
-
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
setupFeatures(featureVersions)
new KafkaApis(
@@ -2394,8 +2393,6 @@ class KafkaApisTest extends Logging {
when(replicaManager.onlinePartition(any()))
.thenReturn(Some(mock(classOf[Partition])))
- when(groupCoordinator.isNewGroupCoordinator)
- .thenReturn(true)
when(groupCoordinator.completeTransaction(
ArgumentMatchers.eq(topicPartition),
any(),
@@ -2504,132 +2501,7 @@ class KafkaApisTest extends Logging {
}
@Test
- def testHandleWriteTxnMarkersRequestWithOldGroupCoordinator(): Unit = {
- val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
- val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
- val foo0 = new TopicPartition("foo", 0)
- val foo1 = new TopicPartition("foo", 1)
-
- val allPartitions = List(
- offset0,
- offset1,
- foo0,
- foo1
- )
-
- val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
- List(
- new TxnMarkerEntry(
- 1L,
- 1.toShort,
- 0,
- TransactionResult.COMMIT,
- List(offset0, foo0).asJava
- ),
- new TxnMarkerEntry(
- 2L,
- 1.toShort,
- 0,
- TransactionResult.ABORT,
- List(offset1, foo1).asJava
- )
- ).asJava
- ).build()
-
- val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
-
- allPartitions.foreach { tp =>
- when(replicaManager.onlinePartition(tp))
- .thenReturn(Some(mock(classOf[Partition])))
- }
-
- when(groupCoordinator.onTransactionCompleted(
- ArgumentMatchers.eq(1L),
- ArgumentMatchers.any(),
- ArgumentMatchers.eq(TransactionResult.COMMIT)
- )).thenReturn(CompletableFuture.completedFuture[Void](null))
-
- when(groupCoordinator.onTransactionCompleted(
- ArgumentMatchers.eq(2L),
- ArgumentMatchers.any(),
- ArgumentMatchers.eq(TransactionResult.ABORT)
-
)).thenReturn(FutureUtils.failedFuture[Void](Errors.NOT_CONTROLLER.exception))
-
- val entriesPerPartition: ArgumentCaptor[Map[TopicPartition,
MemoryRecords]] =
- ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
- val responseCallback: ArgumentCaptor[Map[TopicPartition,
PartitionResponse] => Unit] =
- ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse]
=> Unit])
-
- when(replicaManager.appendRecords(
- ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
- ArgumentMatchers.eq(-1),
- ArgumentMatchers.eq(true),
- ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
- entriesPerPartition.capture(),
- responseCallback.capture(),
- any(),
- any(),
- ArgumentMatchers.eq(RequestLocal.noCaching()),
- any(),
- any()
- )).thenAnswer { _ =>
- responseCallback.getValue.apply(
- entriesPerPartition.getValue.keySet.map { tp =>
- tp -> new PartitionResponse(Errors.NONE)
- }.toMap
- )
- }
- kafkaApis = createKafkaApis(overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false"
- ))
- kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest,
RequestLocal.noCaching())
-
- val expectedResponse = new WriteTxnMarkersResponseData()
- .setMarkers(List(
- new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
- .setProducerId(1L)
- .setTopics(List(
- new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
- .setName(Topic.GROUP_METADATA_TOPIC_NAME)
- .setPartitions(List(
- new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code)
- ).asJava),
- new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
- .setName("foo")
- .setPartitions(List(
- new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code)
- ).asJava)
- ).asJava),
- new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
- .setProducerId(2L)
- .setTopics(List(
- new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
- .setName(Topic.GROUP_METADATA_TOPIC_NAME)
- .setPartitions(List(
- new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
- .setPartitionIndex(1)
- .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
- ).asJava),
- new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
- .setName("foo")
- .setPartitions(List(
- new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
- .setPartitionIndex(1)
- .setErrorCode(Errors.NONE.code)
- ).asJava)
- ).asJava)
- ).asJava)
-
- val response =
verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
- assertEquals(normalize(expectedResponse), normalize(response.data))
- }
-
- @Test
- def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = {
+ def testHandleWriteTxnMarkersRequest(): Unit = {
val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
val foo0 = new TopicPartition("foo", 0)
@@ -2764,7 +2636,7 @@ class KafkaApisTest extends Logging {
"NOT_COORDINATOR",
"REQUEST_TIMED_OUT"
))
- def
testHandleWriteTxnMarkersRequestWithNewGroupCoordinatorErrorTranslation(error:
Errors): Unit = {
+ def testHandleWriteTxnMarkersRequestErrorTranslation(error: Errors): Unit = {
val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
@@ -6013,49 +5885,6 @@ class KafkaApisTest extends Logging {
assertArrayEquals(expectedAcquiredRecords(10, 19, 1).toArray(),
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
}
- @Test
- def testHandleShareFetchNewGroupCoordinatorDisabled(): Unit = {
- val topicId = Uuid.randomUuid()
- val memberId: Uuid = Uuid.randomUuid()
- val groupId = "group"
-
- metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
-
- val shareFetchRequestData = new ShareFetchRequestData().
- setGroupId(groupId).
- setMemberId(memberId.toString).
- setShareSessionEpoch(1).
- setTopics(util.List.of(new ShareFetchRequestData.FetchTopic().
- setTopicId(topicId).
- setPartitions(util.List.of(
- new ShareFetchRequestData.FetchPartition()
- .setPartitionIndex(0)
- .setAcknowledgementBatches(util.List.of(
- new AcknowledgementBatch()
- .setFirstOffset(0)
- .setLastOffset(9)
- .setAcknowledgeTypes(util.List.of(1.toByte))
- ))
- ))
- ))
-
- val shareFetchRequest = new
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
- val request = buildRequest(shareFetchRequest)
-
- kafkaApis = createKafkaApis(
- overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
- ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
- ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
- )
- kafkaApis.handleShareFetchRequest(request)
-
- val response = verifyNoThrottling[ShareFetchResponse](request)
- val responseData = response.data()
-
- assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode)
- }
-
@Test
def testHandleShareFetchShareGroupDisabled(): Unit = {
val topicId = Uuid.randomUuid()
@@ -6285,49 +6114,6 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.NONE.code,
topicResponses.get(0).partitions.get(0).errorCode)
}
- @Test
- def testHandleShareAcknowledgeNewGroupCoordinatorDisabled(): Unit = {
- val topicId = Uuid.randomUuid()
- val memberId: Uuid = Uuid.randomUuid()
- val groupId = "group"
-
- metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
-
- val shareAcknowledgeRequestData = new ShareAcknowledgeRequestData()
- .setGroupId(groupId)
- .setMemberId(memberId.toString)
- .setShareSessionEpoch(1)
- .setTopics(List(new ShareAcknowledgeRequestData.AcknowledgeTopic()
- .setTopicId(topicId)
- .setPartitions(List(
- new ShareAcknowledgeRequestData.AcknowledgePartition()
- .setPartitionIndex(0)
- .setAcknowledgementBatches(List(
- new ShareAcknowledgeRequestData.AcknowledgementBatch()
- .setFirstOffset(0)
- .setLastOffset(9)
- .setAcknowledgeTypes(Collections.singletonList(1.toByte))
- ).asJava)
- ).asJava)
- ).asJava)
-
- val shareAcknowledgeRequest = new
ShareAcknowledgeRequest.Builder(shareAcknowledgeRequestData).build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion)
- val request = buildRequest(shareAcknowledgeRequest)
-
- kafkaApis = createKafkaApis(
- overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
- ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
- ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
- )
- kafkaApis.handleShareAcknowledgeRequest(request)
-
- val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
- val responseData = response.data()
-
- assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode)
- }
-
@Test
def testHandleShareAcknowledgeShareGroupDisabled(): Unit = {
val topicId = Uuid.randomUuid()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1a57724d543..88cbe22769f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1003,7 +1003,6 @@ class KafkaConfigTest {
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
/** New group coordinator configs */
- case GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG => //
ignore
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
/** Consumer groups configs */
@@ -1754,20 +1753,17 @@ class KafkaConfigTest {
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer")
var config = KafkaConfig.fromProps(props)
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER),
config.groupCoordinatorRebalanceProtocols)
- assertTrue(config.isNewGroupCoordinatorEnabled)
assertFalse(config.shareGroupConfig.isShareGroupEnabled)
// This is OK.
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,share")
config = KafkaConfig.fromProps(props)
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE),
config.groupCoordinatorRebalanceProtocols)
- assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,streams")
val config2 = KafkaConfig.fromProps(props)
assertEquals(Set(GroupType.CLASSIC, GroupType.STREAMS),
config2.groupCoordinatorRebalanceProtocols)
- assertTrue(config2.isNewGroupCoordinatorEnabled)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index 3961c725ed4..37687f67b7b 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
-import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Assertions.assertEquals
@ClusterTestDefaults(types = Array(Type.KRAFT))
class ListGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@@ -34,7 +34,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBa
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
)
)
- def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit
= {
+ def testListGroupsWithNewConsumerGroupProtocol(): Unit = {
testListGroups(true)
}
@@ -43,26 +43,11 @@ class ListGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorBa
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
))
- def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit
= {
- testListGroups(false)
- }
-
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
- ))
- def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit
= {
+ def testListGroupsWithOldConsumerGroupProtocol(): Unit = {
testListGroups(false)
}
private def testListGroups(useNewProtocol: Boolean): Unit = {
- if (!isNewGroupCoordinatorEnabled && useNewProtocol) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 57700712379..0c6355e434e 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -20,7 +20,6 @@ import
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Clu
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.Assertions.fail
@ClusterTestDefaults(types = Array(Type.KRAFT))
class OffsetCommitRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@@ -31,7 +30,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testOffsetCommitWithNewConsumerGroupProtocol(): Unit = {
testOffsetCommit(true)
}
@@ -41,25 +40,11 @@ class OffsetCommitRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
- testOffsetCommit(false)
- }
-
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
- def testOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ def testOffsetCommitWithOldConsumerGroupProtocol(): Unit = {
testOffsetCommit(false)
}
private def testOffsetCommit(useNewProtocol: Boolean): Unit = {
- if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
@@ -97,7 +82,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
partition = 0,
offset = 100L,
expectedError =
- if (isNewGroupCoordinatorEnabled && version >= 9)
Errors.GROUP_ID_NOT_FOUND
+ if (version >= 9) Errors.GROUP_ID_NOT_FOUND
else Errors.ILLEGAL_GENERATION,
version = version.toShort
)
@@ -111,7 +96,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
partition = 0,
offset = 100L,
expectedError =
- if (isNewGroupCoordinatorEnabled && version >= 9)
Errors.GROUP_ID_NOT_FOUND
+ if (version >= 9) Errors.GROUP_ID_NOT_FOUND
else Errors.ILLEGAL_GENERATION,
version = version.toShort
)
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 0a808f6c868..edb40c99e6a 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -20,7 +20,6 @@ import
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Clu
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.Assertions.fail
@ClusterTestDefaults(types = Array(Type.KRAFT))
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@@ -30,7 +29,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testOffsetDeleteWithNewConsumerGroupProtocol(): Unit = {
testOffsetDelete(true)
}
@@ -38,25 +37,11 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
- def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
- testOffsetDelete(false)
- }
-
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
- def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ def testOffsetDeleteWithOldConsumerGroupProtocol(): Unit = {
testOffsetDelete(false)
}
private def testOffsetDelete(useNewProtocol: Boolean): Unit = {
- if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index a504ecdeea0..5286c173625 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.message.OffsetFetchResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Assertions.assertEquals
import scala.jdk.CollectionConverters._
@@ -35,7 +35,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testSingleGroupOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
}
@@ -45,27 +45,17 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testSingleGroupOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
}
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
- def
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
- testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true)
- }
-
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def
testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
testSingleGroupAllOffsetFetch(useNewProtocol = true, requireStable = true)
}
@@ -73,27 +63,17 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
- def
testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable =
false)
}
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
- def
testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
- testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = true)
- }
-
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
- def
testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testMultiGroupsOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true)
}
@@ -101,25 +81,11 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
- def
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testMultiGroupsOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable =
false)
}
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
- ))
- def
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
- testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true)
- }
-
private def testSingleGroupOffsetFetch(useNewProtocol: Boolean,
requireStable: Boolean): Unit = {
- if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
@@ -288,10 +254,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
}
private def testSingleGroupAllOffsetFetch(useNewProtocol: Boolean,
requireStable: Boolean): Unit = {
- if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
@@ -401,10 +363,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
}
private def testMultipleGroupsOffsetFetch(useNewProtocol: Boolean,
requireStable: Boolean): Unit = {
- if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 448b6897ede..7aaa163a2de 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -54,7 +54,6 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
@@ -150,7 +149,6 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
@@ -301,7 +299,6 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
@@ -417,7 +414,6 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
@@ -606,7 +602,6 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
@@ -783,7 +778,6 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index 3a53fbf144a..f949dec3ddc 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -38,22 +38,7 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBas
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
))
- def testSyncGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit
= {
- testSyncGroup()
- }
-
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
- ))
- def testSyncGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit
= {
- testSyncGroup()
- }
-
- private def testSyncGroup(): Unit = {
+ def testSyncGroupWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
diff --git
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index b2cd44bbd92..122e91e95bb 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
import scala.jdk.CollectionConverters.IterableHasAsScala
@@ -40,30 +40,16 @@ import scala.jdk.CollectionConverters.IterableHasAsScala
class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
- def testTxnOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ def testTxnOffsetCommitWithNewConsumerGroupProtocol(): Unit = {
testTxnOffsetCommit(true)
}
@ClusterTest
- def testTxnOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
- testTxnOffsetCommit(false)
- }
-
- @ClusterTest(
- serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
- )
- )
- def testTxnOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ def testTxnOffsetCommitWithOldConsumerGroupProtocol(): Unit = {
testTxnOffsetCommit(false)
}
private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = {
- if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
- fail("Cannot use the new protocol with the old group coordinator.")
- }
-
val topic = "topic"
val partition = 0
val transactionalId = "txn"
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 47269bad3a3..9be99e6e1a9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -68,11 +68,6 @@ import java.util.function.IntSupplier;
*/
public interface GroupCoordinator {
- /**
- * @return True if the new coordinator; False otherwise.
- */
- boolean isNewGroupCoordinator();
-
/**
* Heartbeat to a Consumer Group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 81010c65f14..89131d61b6e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -40,7 +40,6 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -59,10 +58,6 @@ public class GroupCoordinatorConfig {
///
/// Group coordinator configs
///
- public static final String NEW_GROUP_COORDINATOR_ENABLE_CONFIG =
"group.coordinator.new.enable";
- public static final String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the
new group coordinator.";
- public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = true;
-
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG =
"group.coordinator.rebalance.protocols";
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC =
"The list of enabled rebalance protocols." +
"The " + Group.GroupType.SHARE + " rebalance protocol is in early
access and therefore must not be used in production.";
@@ -288,8 +283,6 @@ public class GroupCoordinatorConfig {
.define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT,
OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_PARTITIONS_DOC)
.define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
.define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
- // Internal configuration used by integration and system tests.
- .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN,
NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM,
NEW_GROUP_COORDINATOR_ENABLE_DOC)
// Offset configs
.define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT,
OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index cdf43f793d3..602499c6e49 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -337,14 +337,6 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
partitionFor(groupId));
}
- /**
- * See {@link GroupCoordinator#isNewGroupCoordinator()}
- */
- @Override
- public boolean isNewGroupCoordinator() {
- return true;
- }
-
/**
* See {@link GroupCoordinator#partitionFor(String)}
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 56d2a58bd63..36b48d307a8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -136,7 +136,6 @@ public class ShareGroupConfigTest {
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupMinRecordLockDurationMs);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupMaxRecordLockDurationMs);
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,share");
-
configs.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, true);
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1);
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG,
10);
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index c013d155977..79593a317e2 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -83,7 +83,6 @@ import static
org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -216,9 +215,6 @@ public class ClusterTestExtensionsTest {
@ClusterTests({
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
- }),
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "false"),
})
})
public void testNotSupportedNewGroupProtocols(ClusterInstance
clusterInstance) {