This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f8dbad564f KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in
java (#14456)
8f8dbad564f is described below
commit 8f8dbad564ffd9be409bb85edadbc40659cd0a56
Author: Nikolay <[email protected]>
AuthorDate: Mon Oct 2 23:22:17 2023 +0300
KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java (#14456)
This PR is part of #13247
It contains ReassignPartitionsIntegrationTest rewritten in java.
Goal of PR is reduce changes size in main PR.
Reviewers: Taras Ledkov <[email protected]>, Justine Olshan
<[email protected]>
---
build.gradle | 1 +
checkstyle/import-control.xml | 4 +
core/src/test/java/kafka/test/ClusterConfig.java | 5 +
.../admin/ReassignPartitionsIntegrationTest.scala | 740 -----------------
.../tools/reassign/VerifyAssignmentResult.java | 2 +-
.../org/apache/kafka/tools/ToolsTestUtils.java | 4 +
.../ReassignPartitionsIntegrationTest.java | 924 +++++++++++++++++++++
7 files changed, 939 insertions(+), 741 deletions(-)
diff --git a/build.gradle b/build.gradle
index c73331bb02f..ffdbbf1ace4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1911,6 +1911,7 @@ project(':tools') {
testImplementation project(':connect:api')
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
+ testImplementation project(':storage:api').sourceSets.main.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f4b5decdd88..888e8a41ae8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -293,7 +293,11 @@
<subpackage name="reassign">
<allow pkg="org.apache.kafka.admin"/>
+ <allow pkg="org.apache.kafka.tools"/>
<allow pkg="kafka.admin" />
+ <allow pkg="kafka.cluster" />
+ <allow pkg="kafka.log" />
+ <allow pkg="kafka.server" />
<allow pkg="scala" />
</subpackage>
</subpackage>
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java
b/core/src/test/java/kafka/test/ClusterConfig.java
index 8e9f7de96ab..c4ea8669d53 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -146,6 +146,11 @@ public class ClusterConfig {
copy.consumerProperties.putAll(consumerProperties);
copy.saslServerProperties.putAll(saslServerProperties);
copy.saslClientProperties.putAll(saslClientProperties);
+ perBrokerOverrideProperties.forEach((brokerId, props) -> {
+ Properties propsCopy = new Properties();
+ propsCopy.putAll(props);
+ copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
+ });
return copy;
}
diff --git
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
deleted file mode 100644
index d3a04da4d13..00000000000
---
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ /dev/null
@@ -1,740 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import kafka.admin.ReassignPartitionsCommand._
-import kafka.server._
-import kafka.utils.Implicits._
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
-import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Timeout}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-import java.io.Closeable
-import java.util.{Collections, HashMap, List}
-import scala.collection.{Map, Seq, mutable}
-import scala.jdk.CollectionConverters._
-
-@Timeout(300)
-class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
-
- var cluster: ReassignPartitionsTestCluster = _
-
- @AfterEach
- override def tearDown(): Unit = {
- Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster")
- super.tearDown()
- }
-
- val unthrottledBrokerConfigs =
- 0.to(4).map { brokerId =>
- brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
- }.toMap
-
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testReassignment(quorum: String): Unit = {
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- executeAndVerifyReassignment()
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
- def testReassignmentWithAlterPartitionDisabled(quorum: String): Unit = {
- // Test reassignment when the IBP is on an older version which does not use
- // the `AlterPartition` API. In this case, the controller will register
individual
- // watches for each reassigning partition so that the reassignment can be
- // completed as soon as the ISR is expanded.
- val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp ->
IBP_2_7_IV1.version)
- cluster = new ReassignPartitionsTestCluster(configOverrides =
configOverrides)
- cluster.setup()
- executeAndVerifyReassignment()
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
- def testReassignmentCompletionDuringPartialUpgrade(quorum: String): Unit = {
- // Test reassignment during a partial upgrade when some brokers are
relying on
- // `AlterPartition` and some rely on the old notification logic through
Zookeeper.
- // In this test case, broker 0 starts up first on the latest IBP and is
typically
- // elected as controller. The three remaining brokers start up on the
older IBP.
- // We want to ensure that reassignment can still complete through the ISR
change
- // notification path even though the controller expects `AlterPartition`.
-
- // Override change notification settings so that test is not delayed by ISR
- // change notification delay
- ZkAlterPartitionManager.DefaultIsrPropagationConfig =
IsrChangePropagationConfig(
- checkIntervalMs = 500,
- lingerMs = 100,
- maxDelayMs = 500
- )
-
- val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp ->
IBP_2_7_IV1.version)
- val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 ->
oldIbpConfig)
-
- cluster = new ReassignPartitionsTestCluster(brokerConfigOverrides =
brokerConfigOverrides)
- cluster.setup()
-
- executeAndVerifyReassignment()
- }
-
- private def executeAndVerifyReassignment(): Unit = {
- val assignment = """{"version":1,"partitions":""" +
-
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},"""
+
-
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}"""
+
- """]}"""
-
- val foo0 = new TopicPartition("foo", 0)
- val bar0 = new TopicPartition("bar", 0)
-
- // Check that the assignment has not yet been started yet.
- val initialAssignment = Map(
- foo0 -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
- bar0 -> PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
- )
- waitForVerifyAssignment(cluster.adminClient, assignment, false,
- VerifyAssignmentResult(initialAssignment))
-
- // Execute the assignment
- runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
- assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
- val finalAssignment = Map(
- foo0 -> PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
- bar0 -> PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
- )
-
- val verifyAssignmentResult = runVerifyAssignment(cluster.adminClient,
assignment, false)
- assertFalse(verifyAssignmentResult.movesOngoing)
-
- // Wait for the assignment to complete
- waitForVerifyAssignment(cluster.adminClient, assignment, false,
- VerifyAssignmentResult(finalAssignment))
-
- assertEquals(unthrottledBrokerConfigs,
- describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
-
- // Verify that partitions are removed from brokers no longer assigned
- verifyReplicaDeleted(topicPartition = foo0, replicaId = 2)
- verifyReplicaDeleted(topicPartition = bar0, replicaId = 1)
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testHighWaterMarkAfterPartitionReassignment(quorum: String): Unit = {
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- val assignment = """{"version":1,"partitions":""" +
-
"""[{"topic":"foo","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]}"""
+
- """]}"""
-
- // Set the high water mark of foo-0 to 123 on its leader.
- val part = new TopicPartition("foo", 0)
- cluster.servers(0).replicaManager.logManager.truncateFullyAndStartAt(part,
123L, false)
-
- // Execute the assignment
- runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
- val finalAssignment = Map(part ->
- PartitionReassignmentState(Seq(3, 1, 2), Seq(3, 1, 2), true))
-
- // Wait for the assignment to complete
- waitForVerifyAssignment(cluster.adminClient, assignment, false,
- VerifyAssignmentResult(finalAssignment))
-
- TestUtils.waitUntilTrue(() => {
- cluster.servers(3).replicaManager.onlinePartition(part).
- flatMap(_.leaderLogIfLocal).isDefined
- }, "broker 3 should be the new leader", pause = 10L)
- assertEquals(123L,
cluster.servers(3).replicaManager.localLogOrException(part).highWatermark,
- s"Expected broker 3 to have the correct high water mark for the
partition.")
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testAlterReassignmentThrottle(quorum: String): Unit = {
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- cluster.produceMessages("foo", 0, 50)
- cluster.produceMessages("baz", 2, 60)
- val assignment = """{"version":1,"partitions":
-
[{"topic":"foo","partition":0,"replicas":[0,3,2],"log_dirs":["any","any","any"]},
-
{"topic":"baz","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]}
- ]}"""
-
- // Execute the assignment with a low throttle
- val initialThrottle = 1L
- runExecuteAssignment(cluster.adminClient, false, assignment,
initialThrottle, -1L)
- waitForInterBrokerThrottle(Set(0, 1, 2, 3), initialThrottle)
-
- // Now update the throttle and verify the reassignment completes
- val updatedThrottle = 300000L
- runExecuteAssignment(cluster.adminClient, additional = true, assignment,
updatedThrottle, -1L)
- waitForInterBrokerThrottle(Set(0, 1, 2, 3), updatedThrottle)
-
- val finalAssignment = Map(
- new TopicPartition("foo", 0) ->
- PartitionReassignmentState(Seq(0, 3, 2), Seq(0, 3, 2), true),
- new TopicPartition("baz", 2) ->
- PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 1), true))
-
- // Now remove the throttles.
- waitForVerifyAssignment(cluster.adminClient, assignment, false,
- VerifyAssignmentResult(finalAssignment))
- waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
- }
-
- /**
- * Test running a reassignment with the interBrokerThrottle set.
- */
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testThrottledReassignment(quorum: String): Unit = {
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- cluster.produceMessages("foo", 0, 50)
- cluster.produceMessages("baz", 2, 60)
- val assignment = """{"version":1,"partitions":""" +
-
"""[{"topic":"foo","partition":0,"replicas":[0,3,2],"log_dirs":["any","any","any"]},"""
+
-
"""{"topic":"baz","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]}"""
+
- """]}"""
-
- // Check that the assignment has not yet been started yet.
- val initialAssignment = Map(
- new TopicPartition("foo", 0) ->
- PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 3, 2), true),
- new TopicPartition("baz", 2) ->
- PartitionReassignmentState(Seq(0, 2, 1), Seq(3, 2, 1), true))
- assertEquals(VerifyAssignmentResult(initialAssignment),
runVerifyAssignment(cluster.adminClient, assignment, false))
- assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
-
- // Execute the assignment
- val interBrokerThrottle = 300000L
- runExecuteAssignment(cluster.adminClient, false, assignment,
interBrokerThrottle, -1L)
- waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
-
- val finalAssignment = Map(
- new TopicPartition("foo", 0) ->
- PartitionReassignmentState(Seq(0, 3, 2), Seq(0, 3, 2), true),
- new TopicPartition("baz", 2) ->
- PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 1), true))
-
- // Wait for the assignment to complete
- TestUtils.waitUntilTrue(
- () => {
- // Check the reassignment status.
- val result = runVerifyAssignment(cluster.adminClient, assignment, true)
- if (!result.partsOngoing) {
- true
- } else {
- assertFalse(result.partStates.forall(_._2.done), s"Expected at least
one partition reassignment to be ongoing when result = $result")
- assertEquals(Seq(0, 3, 2), result.partStates(new
TopicPartition("foo", 0)).targetReplicas)
- assertEquals(Seq(3, 2, 1), result.partStates(new
TopicPartition("baz", 2)).targetReplicas)
- logger.info(s"Current result: ${result}")
- waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
- false
- }
- }, "Expected reassignment to complete.")
- waitForVerifyAssignment(cluster.adminClient, assignment, true,
- VerifyAssignmentResult(finalAssignment))
- // The throttles should still have been preserved, since we ran with
--preserve-throttles
- waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
- // Now remove the throttles.
- waitForVerifyAssignment(cluster.adminClient, assignment, false,
- VerifyAssignmentResult(finalAssignment))
- waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testProduceAndConsumeWithReassignmentInProgress(quorum: String): Unit = {
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- cluster.produceMessages("baz", 2, 60)
- val assignment = """{"version":1,"partitions":""" +
-
"""[{"topic":"baz","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]}"""
+
- """]}"""
- runExecuteAssignment(cluster.adminClient, false, assignment, 300L, -1L)
- cluster.produceMessages("baz", 2, 100)
- val consumer = TestUtils.createConsumer(cluster.brokerList)
- val part = new TopicPartition("baz", 2)
- try {
- consumer.assign(Seq(part).asJava)
- TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 100)
- } finally {
- consumer.close()
- }
- TestUtils.removeReplicationThrottleForPartitions(cluster.adminClient,
Seq(0,1,2,3), Set(part))
- val finalAssignment = Map(part ->
- PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 1), true))
- waitForVerifyAssignment(cluster.adminClient, assignment, false,
- VerifyAssignmentResult(finalAssignment))
- }
-
- /**
- * Test running a reassignment and then cancelling it.
- */
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testCancellation(quorum: String): Unit = {
- val foo0 = new TopicPartition("foo", 0)
- val baz1 = new TopicPartition("baz", 1)
-
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- cluster.produceMessages(foo0.topic, foo0.partition, 200)
- cluster.produceMessages(baz1.topic, baz1.partition, 200)
- val assignment = """{"version":1,"partitions":""" +
-
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},"""
+
-
"""{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}"""
+
- """]}"""
- assertEquals(unthrottledBrokerConfigs,
- describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
- val interBrokerThrottle = 1L
- runExecuteAssignment(cluster.adminClient, false, assignment,
interBrokerThrottle, -1L)
- waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
-
- // Verify that the reassignment is running. The very low throttle should
keep it
- // from completing before this runs.
- waitForVerifyAssignment(cluster.adminClient, assignment, true,
- VerifyAssignmentResult(Map(
- foo0 -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3),
false),
- baz1 -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3),
false)),
- true, Map(), false))
- // Cancel the reassignment.
- assertEquals((Set(foo0, baz1), Set()),
runCancelAssignment(cluster.adminClient, assignment, true))
- // Broker throttles are still active because we passed --preserve-throttles
- waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
- // Cancelling the reassignment again should reveal nothing to cancel.
- assertEquals((Set(), Set()), runCancelAssignment(cluster.adminClient,
assignment, false))
- // This time, the broker throttles were removed.
- waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
- // Verify that there are no ongoing reassignments.
- assertFalse(runVerifyAssignment(cluster.adminClient, assignment,
false).partsOngoing)
- // Verify that the partition is removed from cancelled replicas
- verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
- verifyReplicaDeleted(topicPartition = baz1, replicaId = 3)
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testCancellationWithAddingReplicaInIsr(quorum: String): Unit = {
- val foo0 = new TopicPartition("foo", 0)
-
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- cluster.produceMessages(foo0.topic, foo0.partition, 200)
-
- // The reassignment will bring replicas 3 and 4 into the replica set and
remove 1 and 2.
- val assignment = """{"version":1,"partitions":""" +
-
"""[{"topic":"foo","partition":0,"replicas":[0,3,4],"log_dirs":["any","any","any"]}"""
+
- """]}"""
-
- // We will throttle replica 4 so that only replica 3 joins the ISR
- TestUtils.setReplicationThrottleForPartitions(
- cluster.adminClient,
- brokerIds = Seq(4),
- partitions = Set(foo0),
- throttleBytes = 1
- )
-
- // Execute the assignment and wait for replica 3 (only) to join the ISR
- runExecuteAssignment(
- cluster.adminClient,
- additional = false,
- reassignmentJson = assignment
- )
- TestUtils.waitUntilTrue(
- () => TestUtils.currentIsr(cluster.adminClient, foo0) == Set(0, 1, 2, 3),
- msg = "Timed out while waiting for replica 3 to join the ISR"
- )
-
- // Now cancel the assignment and verify that the partition is removed from
cancelled replicas
- assertEquals((Set(foo0), Set()), runCancelAssignment(cluster.adminClient,
assignment, preserveThrottles = true))
- verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
- verifyReplicaDeleted(topicPartition = foo0, replicaId = 4)
- }
-
- private def verifyReplicaDeleted(
- topicPartition: TopicPartition,
- replicaId: Int
- ): Unit = {
- def isReplicaStoppedAndDeleted(): Boolean = {
- val server = cluster.servers(replicaId)
- val partition = server.replicaManager.getPartition(topicPartition)
- val log = server.logManager.getLog(topicPartition)
- partition == HostedPartition.None && log.isEmpty
- }
- TestUtils.waitUntilTrue(isReplicaStoppedAndDeleted,
- msg = s"Timed out waiting for replica $replicaId of $topicPartition to
be deleted")
- }
-
- private def waitForLogDirThrottle(throttledBrokers: Set[Int],
logDirThrottle: Long): Unit = {
- val throttledConfigMap = Map[String, Long](
- brokerLevelLeaderThrottle -> -1,
- brokerLevelFollowerThrottle -> -1,
- brokerLevelLogDirThrottle -> logDirThrottle)
- waitForBrokerThrottles(throttledBrokers, throttledConfigMap)
- }
-
- private def waitForInterBrokerThrottle(throttledBrokers: Set[Int],
interBrokerThrottle: Long): Unit = {
- val throttledConfigMap = Map[String, Long](
- brokerLevelLeaderThrottle -> interBrokerThrottle,
- brokerLevelFollowerThrottle -> interBrokerThrottle,
- brokerLevelLogDirThrottle -> -1L)
- waitForBrokerThrottles(throttledBrokers, throttledConfigMap)
- }
-
- private def waitForBrokerThrottles(throttledBrokers: Set[Int],
throttleConfig: Map[String, Long]): Unit = {
- val throttledBrokerConfigs = unthrottledBrokerConfigs.map { case
(brokerId, unthrottledConfig) =>
- val expectedThrottleConfig = if (throttledBrokers.contains(brokerId)) {
- throttleConfig
- } else {
- unthrottledConfig
- }
- brokerId -> expectedThrottleConfig
- }
- waitForBrokerLevelThrottles(throttledBrokerConfigs)
- }
-
- private def waitForBrokerLevelThrottles(targetThrottles: Map[Int,
Map[String, Long]]): Unit = {
- var curThrottles: Map[Int, Map[String, Long]] = Map.empty
- TestUtils.waitUntilTrue(() => {
- curThrottles = describeBrokerLevelThrottles(targetThrottles.keySet.toSeq)
- targetThrottles.equals(curThrottles)
- }, s"timed out waiting for broker throttle to become ${targetThrottles}.
" +
- s"Latest throttles were ${curThrottles}", pause = 25)
- }
-
- /**
- * Describe the broker-level throttles in the cluster.
- *
- * @return A map whose keys are broker IDs and whose values
are throttle
- * information. The nested maps are keyed on
throttle name.
- */
- private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int,
Map[String, Long]] = {
- brokerIds.map { brokerId =>
- val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
brokerId.toString)
- val brokerConfigs =
cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
- .get(brokerResource)
- .get()
-
- val throttles = brokerLevelThrottles.map { throttleName =>
- val configValue = Option(brokerConfigs.get(throttleName))
- .map(_.value)
- .getOrElse("-1")
- (throttleName, configValue.toLong)
- }.toMap
- brokerId -> throttles
- }.toMap
- }
-
- /**
- * Test moving partitions between directories.
- */
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
- def testLogDirReassignment(quorum: String): Unit = {
- val topicPartition = new TopicPartition("foo", 0)
-
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- cluster.produceMessages(topicPartition.topic, topicPartition.partition,
700)
-
- val targetBrokerId = 0
- val replicas = Seq(0, 1, 2)
- val reassignment = buildLogDirReassignment(topicPartition, targetBrokerId,
replicas)
-
- // Start the replica move, but throttle it to be very slow so that it
can't complete
- // before our next checks happen.
- val logDirThrottle = 1L
- runExecuteAssignment(cluster.adminClient, additional = false,
reassignment.json,
- interBrokerThrottle = -1L, logDirThrottle)
-
- // Check the output of --verify
- waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
- VerifyAssignmentResult(Map(
- topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1,
2), true)
- ), false, Map(
- new TopicPartitionReplica(topicPartition.topic,
topicPartition.partition, 0) ->
- ActiveMoveState(reassignment.currentDir, reassignment.targetDir,
reassignment.targetDir)
- ), true))
- waitForLogDirThrottle(Set(0), logDirThrottle)
-
- // Remove the throttle
- cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(
- new ConfigResource(ConfigResource.Type.BROKER, "0"),
- Collections.singletonList(new AlterConfigOp(
- new ConfigEntry(brokerLevelLogDirThrottle, ""),
AlterConfigOp.OpType.DELETE))))
- .all().get()
- waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
-
- // Wait for the directory movement to complete.
- waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
- VerifyAssignmentResult(Map(
- topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1,
2), true)
- ), false, Map(
- new TopicPartitionReplica(topicPartition.topic,
topicPartition.partition, 0) ->
- CompletedMoveState(reassignment.targetDir)
- ), false))
-
- val info1 = new BrokerDirs(cluster.adminClient.describeLogDirs(0.to(4).
- map(_.asInstanceOf[Integer]).asJavaCollection), 0)
- assertEquals(reassignment.targetDir,
info1.curLogDirs.getOrElse(topicPartition, ""))
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
- def testAlterLogDirReassignmentThrottle(quorum: String): Unit = {
- val topicPartition = new TopicPartition("foo", 0)
-
- cluster = new ReassignPartitionsTestCluster()
- cluster.setup()
- cluster.produceMessages(topicPartition.topic, topicPartition.partition,
700)
-
- val targetBrokerId = 0
- val replicas = Seq(0, 1, 2)
- val reassignment = buildLogDirReassignment(topicPartition, targetBrokerId,
replicas)
-
- // Start the replica move with a low throttle so it does not complete
- val initialLogDirThrottle = 1L
- runExecuteAssignment(cluster.adminClient, false, reassignment.json,
- interBrokerThrottle = -1L, initialLogDirThrottle)
- waitForLogDirThrottle(Set(0), initialLogDirThrottle)
-
- // Now increase the throttle and verify that the log dir movement completes
- val updatedLogDirThrottle = 3000000L
- runExecuteAssignment(cluster.adminClient, additional = true,
reassignment.json,
- interBrokerThrottle = -1L, replicaAlterLogDirsThrottle =
updatedLogDirThrottle)
- waitForLogDirThrottle(Set(0), updatedLogDirThrottle)
-
- waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
- VerifyAssignmentResult(Map(
- topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1,
2), true)
- ), false, Map(
- new TopicPartitionReplica(topicPartition.topic,
topicPartition.partition, targetBrokerId) ->
- CompletedMoveState(reassignment.targetDir)
- ), false))
- }
-
- case class LogDirReassignment(json: String, currentDir: String, targetDir:
String)
-
- private def buildLogDirReassignment(topicPartition: TopicPartition,
- brokerId: Int,
- replicas: Seq[Int]): LogDirReassignment
= {
-
- val describeLogDirsResult = cluster.adminClient.describeLogDirs(
- 0.to(4).map(_.asInstanceOf[Integer]).asJavaCollection)
-
- val logDirInfo = new BrokerDirs(describeLogDirsResult, brokerId)
- assertTrue(logDirInfo.futureLogDirs.isEmpty)
-
- val currentDir = logDirInfo.curLogDirs(topicPartition)
- val newDir = logDirInfo.logDirs.find(!_.equals(currentDir)).get
-
- val logDirs = replicas.map { replicaId =>
- if (replicaId == brokerId)
- s""""$newDir""""
- else
- "\"any\""
- }
-
- val reassignmentJson =
- s"""
- | { "version": 1,
- | "partitions": [
- | {
- | "topic": "${topicPartition.topic}",
- | "partition": ${topicPartition.partition},
- | "replicas": [${replicas.mkString(",")}],
- | "log_dirs": [${logDirs.mkString(",")}]
- | }
- | ]
- | }
- |""".stripMargin
-
- LogDirReassignment(reassignmentJson, currentDir = currentDir, targetDir =
newDir)
- }
-
- private def runVerifyAssignment(adminClient: Admin, jsonString: String,
- preserveThrottles: Boolean) = {
- println(s"==> verifyAssignment(adminClient, jsonString=${jsonString})")
- verifyAssignment(adminClient, jsonString, preserveThrottles)
- }
-
- private def waitForVerifyAssignment(adminClient: Admin,
- jsonString: String,
- preserveThrottles: Boolean,
- expectedResult: VerifyAssignmentResult):
Unit = {
- var latestResult: VerifyAssignmentResult = null
- TestUtils.waitUntilTrue(
- () => {
- latestResult = runVerifyAssignment(adminClient, jsonString,
preserveThrottles)
- expectedResult.equals(latestResult)
- }, s"Timed out waiting for verifyAssignment result ${expectedResult}. "
+
- s"The latest result was ${latestResult}", pause = 10L)
- }
-
- private def runExecuteAssignment(adminClient: Admin,
- additional: Boolean,
- reassignmentJson: String,
- interBrokerThrottle: Long = -1,
- replicaAlterLogDirsThrottle: Long = -1) = {
- println(s"==> executeAssignment(adminClient, additional=${additional}, " +
- s"reassignmentJson=${reassignmentJson}, " +
- s"interBrokerThrottle=${interBrokerThrottle}, " +
- s"replicaAlterLogDirsThrottle=${replicaAlterLogDirsThrottle}))")
- executeAssignment(adminClient, additional, reassignmentJson,
- interBrokerThrottle, replicaAlterLogDirsThrottle)
- }
-
- private def runCancelAssignment(adminClient: Admin, jsonString: String,
- preserveThrottles: Boolean) = {
- println(s"==> cancelAssignment(adminClient, jsonString=${jsonString})")
- cancelAssignment(adminClient, jsonString, preserveThrottles)
- }
-
- class BrokerDirs(result: DescribeLogDirsResult, val brokerId: Int) {
- val logDirs = new mutable.HashSet[String]
- val curLogDirs = new mutable.HashMap[TopicPartition, String]
- val futureLogDirs = new mutable.HashMap[TopicPartition, String]
- result.descriptions.get(brokerId).get().forEach {
- case (logDirName, logDirInfo) => {
- logDirs.add(logDirName)
- logDirInfo.replicaInfos.forEach {
- case (part, info) =>
- if (info.isFuture) {
- futureLogDirs.put(part, logDirName)
- } else {
- curLogDirs.put(part, logDirName)
- }
- }
- }
- }
- }
-
- class ReassignPartitionsTestCluster(
- configOverrides: Map[String, String] = Map.empty,
- brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
- ) extends Closeable {
- val brokers = Map(
- 0 -> "rack0",
- 1 -> "rack0",
- 2 -> "rack1",
- 3 -> "rack1",
- 4 -> "rack1"
- )
-
- val topics = Map(
- "foo" -> Seq(Seq(0, 1, 2), Seq(1, 2, 3)),
- "bar" -> Seq(Seq(3, 2, 1)),
- "baz" -> Seq(Seq(1, 0, 2), Seq(2, 0, 1), Seq(0, 2, 1))
- )
-
- val brokerConfigs = brokers.map {
- case (brokerId, rack) =>
- val config = TestUtils.createBrokerConfig(
- nodeId = brokerId,
- zkConnect = zkConnectOrNull,
- rack = Some(rack),
- enableControlledShutdown = false, // shorten test time
- logDirCount = 3)
- // shorter backoff to reduce test durations when no active partitions
are eligible for fetching due to throttling
- config.setProperty(KafkaConfig.ReplicaFetchBackoffMsProp, "100")
- // Don't move partition leaders automatically.
- config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
- config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000")
- configOverrides.forKeyValue(config.setProperty)
-
- brokerConfigOverrides.get(brokerId).foreach { overrides =>
- overrides.forKeyValue(config.setProperty)
- }
-
- new KafkaConfig(config)
- }.toBuffer
-
- var servers = new mutable.ArrayBuffer[KafkaBroker]
-
- var brokerList: String = _
-
- var adminClient: Admin = _
-
- def setup(): Unit = {
- createServers()
- createTopics()
- }
-
- def createServers(): Unit = {
- brokers.keySet.foreach { brokerId =>
- servers += createBroker(brokerConfigs(brokerId))
- }
- }
-
- def createTopics(): Unit = {
- TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
- brokerList = TestUtils.plaintextBootstrapServers(servers)
- adminClient = Admin.create(Map[String, Object](
- AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
- ).asJava)
- adminClient.createTopics(topics.map {
- case (topicName, parts) =>
- val partMap = new HashMap[Integer, List[Integer]]()
- parts.zipWithIndex.foreach {
- case (part, index) => partMap.put(index,
part.map(Integer.valueOf).asJava)
- }
- new NewTopic(topicName, partMap)
- }.toList.asJava).all().get()
- topics.foreach {
- case (topicName, parts) =>
- TestUtils.waitForAllPartitionsMetadata(servers, topicName,
parts.size)
- }
-
- if (isKRaftTest()) {
- TestUtils.ensureConsistentKRaftMetadata(
- cluster.servers,
- controllerServer
- )
- }
- }
-
- def produceMessages(topic: String, partition: Int, numMessages: Int): Unit
= {
- val records = (0 until numMessages).map(_ =>
- new ProducerRecord[Array[Byte], Array[Byte]](topic, partition,
- null, new Array[Byte](10000)))
- TestUtils.produceMessages(servers, records, -1)
- }
-
- override def close(): Unit = {
- brokerList = null
- Utils.closeQuietly(adminClient, "adminClient")
- adminClient = null
- try {
- TestUtils.shutdownServers(servers)
- } finally {
- servers.clear()
- }
- }
- }
-}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
index c20cfd1029c..4812f1f19e9 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
@@ -46,7 +46,7 @@ public final class VerifyAssignmentResult {
public VerifyAssignmentResult(
Map<TopicPartition, PartitionReassignmentState> partStates,
boolean partsOngoing,
- Map<org.apache.kafka.common.TopicPartitionReplica, LogDirMoveState>
moveStates,
+ Map<TopicPartitionReplica, LogDirMoveState> moveStates,
boolean movesOngoing
) {
this.partStates = partStates;
diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
index 9e9ec94e968..36e4e12d816 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
@@ -16,12 +16,16 @@
*/
package org.apache.kafka.tools;
+import kafka.utils.TestInfoUtils;
import org.apache.kafka.common.utils.Exit;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
public class ToolsTestUtils {
+ /** @see TestInfoUtils#TestWithParameterizedQuorumName() */
+ public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME =
"{displayName}.quorum={0}";
+
public static String captureStandardOut(Runnable runnable) {
return captureStandardStream(false, runnable);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java
new file mode 100644
index 00000000000..896e4f74ffd
--- /dev/null
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.reassign;
+
+import kafka.admin.ReassignPartitionsCommand;
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.server.HostedPartition;
+import kafka.server.IsrChangePropagationConfig;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.server.ZkAlterPartitionManager;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.None$;
+import scala.Option;
+import scala.Some$;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
kafka.admin.ReassignPartitionsCommand.brokerLevelFollowerThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelLeaderThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelLogDirThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelThrottles;
+import static kafka.admin.ReassignPartitionsCommand.cancelAssignment;
+import static kafka.admin.ReassignPartitionsCommand.executeAssignment;
+import static kafka.admin.ReassignPartitionsCommand.verifyAssignment;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("ClassFanOutComplexity")
+@Timeout(300)
+public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
+ ReassignPartitionsTestCluster cluster;
+
+ @AfterEach
+ @Override
+ public void tearDown() {
+ Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster");
+ super.tearDown();
+ }
+
+ private final Map<Integer, Map<String, Long>> unthrottledBrokerConfigs =
new HashMap<>(); {
+ IntStream.range(0, 4).forEach(brokerId -> {
+ Map<String, Long> brokerConfig = new HashMap<>();
+
+ brokerLevelThrottles().foreach(throttle -> {
+ brokerConfig.put(throttle, -1L);
+ return null;
+ });
+
+ unthrottledBrokerConfigs.put(brokerId, brokerConfig);
+ });
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = {"zk", "kraft"})
+ public void testReassignment(String quorum) throws Exception {
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ executeAndVerifyReassignment();
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+ public void testReassignmentWithAlterPartitionDisabled(String quorum)
throws Exception {
+ // Test reassignment when the IBP is on an older version which does
not use
+ // the `AlterPartition` API. In this case, the controller will
register individual
+ // watches for each reassigning partition so that the reassignment can
be
+ // completed as soon as the ISR is expanded.
+ Map<String, String> configOverrides =
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(),
IBP_2_7_IV1.version());
+ cluster = new ReassignPartitionsTestCluster(configOverrides,
Collections.emptyMap());
+ cluster.setup();
+ executeAndVerifyReassignment();
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+ public void testReassignmentCompletionDuringPartialUpgrade(String quorum)
throws Exception {
+ // Test reassignment during a partial upgrade when some brokers are
relying on
+ // `AlterPartition` and some rely on the old notification logic
through Zookeeper.
+ // In this test case, broker 0 starts up first on the latest IBP and
is typically
+ // elected as controller. The three remaining brokers start up on the
older IBP.
+ // We want to ensure that reassignment can still complete through the
ISR change
+ // notification path even though the controller expects
`AlterPartition`.
+
+ // Override change notification settings so that test is not delayed
by ISR
+ // change notification delay
+ ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new
IsrChangePropagationConfig(500, 100, 500));
+
+ Map<String, String> oldIbpConfig =
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(),
IBP_2_7_IV1.version());
+ Map<Integer, Map<String, String>> brokerConfigOverrides = new
HashMap<>();
+ brokerConfigOverrides.put(1, oldIbpConfig);
+ brokerConfigOverrides.put(2, oldIbpConfig);
+ brokerConfigOverrides.put(3, oldIbpConfig);
+
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
brokerConfigOverrides);
+ cluster.setup();
+
+ executeAndVerifyReassignment();
+ }
+
+ private void executeAndVerifyReassignment() throws ExecutionException,
InterruptedException {
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
+
+
"{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+
+ TopicPartition foo0 = new TopicPartition("foo", 0);
+ TopicPartition bar0 = new TopicPartition("bar", 0);
+
+ // Check that the assignment has not yet been started yet.
+ Map<TopicPartition, PartitionReassignmentState> initialAssignment =
new HashMap<>();
+
+ initialAssignment.put(foo0, new
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3),
true));
+ initialAssignment.put(bar0, new
PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0),
true));
+
+ waitForVerifyAssignment(cluster.adminClient, assignment, false,
+ new VerifyAssignmentResult(initialAssignment));
+
+ // Execute the assignment
+ runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+ assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+ Map<TopicPartition, PartitionReassignmentState> finalAssignment = new
HashMap<>();
+ finalAssignment.put(foo0, new
PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3),
true));
+ finalAssignment.put(bar0, new
PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0),
true));
+
+ kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult
verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment,
false);
+ assertFalse(verifyAssignmentResult.movesOngoing());
+
+ // Wait for the assignment to complete
+ waitForVerifyAssignment(cluster.adminClient, assignment, false,
+ new VerifyAssignmentResult(finalAssignment));
+
+ assertEquals(unthrottledBrokerConfigs,
+ describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+ // Verify that partitions are removed from brokers no longer assigned
+ verifyReplicaDeleted(foo0, 2);
+ verifyReplicaDeleted(bar0, 1);
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = {"zk", "kraft"})
+ public void testHighWaterMarkAfterPartitionReassignment(String quorum)
throws Exception {
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+
+ // Set the high water mark of foo-0 to 123 on its leader.
+ TopicPartition part = new TopicPartition("foo", 0);
+
cluster.servers.get(0).replicaManager().logManager().truncateFullyAndStartAt(part,
123L, false, None$.empty());
+
+ // Execute the assignment
+ runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+ Map<TopicPartition, PartitionReassignmentState> finalAssignment =
Collections.singletonMap(part,
+ new PartitionReassignmentState(Arrays.asList(3, 1, 2),
Arrays.asList(3, 1, 2), true));
+
+ // Wait for the assignment to complete
+ waitForVerifyAssignment(cluster.adminClient, assignment, false,
+ new VerifyAssignmentResult(finalAssignment));
+
+ TestUtils.waitUntilTrue(() ->
+ cluster.servers.get(3).replicaManager().onlinePartition(part).
+ map(Partition::leaderLogIfLocal).isDefined(),
+ () -> "broker 3 should be the new leader", DEFAULT_MAX_WAIT_MS,
10L);
+ assertEquals(123L,
cluster.servers.get(3).replicaManager().localLogOrException(part).highWatermark(),
+ "Expected broker 3 to have the correct high water mark for the
partition.");
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = {"zk", "kraft"})
+ public void testAlterReassignmentThrottle(String quorum) throws Exception {
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ cluster.produceMessages("foo", 0, 50);
+ cluster.produceMessages("baz", 2, 60);
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
+
+
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+
+ // Execute the assignment with a low throttle
+ long initialThrottle = 1L;
+ runExecuteAssignment(cluster.adminClient, false, assignment,
initialThrottle, -1L);
+ waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), initialThrottle);
+
+ // Now update the throttle and verify the reassignment completes
+ long updatedThrottle = 300000L;
+ runExecuteAssignment(cluster.adminClient, true, assignment,
updatedThrottle, -1L);
+ waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), updatedThrottle);
+
+ Map<TopicPartition, PartitionReassignmentState> finalAssignment = new
HashMap<>();
+ finalAssignment.put(new TopicPartition("foo", 0),
+ new PartitionReassignmentState(Arrays.asList(0, 3, 2),
Arrays.asList(0, 3, 2), true));
+ finalAssignment.put(new TopicPartition("baz", 2),
+ new PartitionReassignmentState(Arrays.asList(3, 2, 1),
Arrays.asList(3, 2, 1), true));
+
+ // Now remove the throttles.
+ waitForVerifyAssignment(cluster.adminClient, assignment, false,
+ new VerifyAssignmentResult(finalAssignment));
+ waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+ }
+
+ /**
+ * Test running a reassignment with the interBrokerThrottle set.
+ */
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = {"zk", "kraft"})
+ public void testThrottledReassignment(String quorum) throws Exception {
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ cluster.produceMessages("foo", 0, 50);
+ cluster.produceMessages("baz", 2, 60);
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
+
+
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+
+ // Check that the assignment has not yet been started yet.
+ Map<TopicPartition, PartitionReassignmentState> initialAssignment =
new HashMap<>();
+ initialAssignment.put(new TopicPartition("foo", 0),
+ new PartitionReassignmentState(Arrays.asList(0, 1, 2),
Arrays.asList(0, 3, 2), true));
+ initialAssignment.put(new TopicPartition("baz", 2),
+ new PartitionReassignmentState(Arrays.asList(0, 2, 1),
Arrays.asList(3, 2, 1), true));
+ assertEquals(asScala(new VerifyAssignmentResult(initialAssignment)),
runVerifyAssignment(cluster.adminClient, assignment, false));
+ assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+ // Execute the assignment
+ long interBrokerThrottle = 300000L;
+ runExecuteAssignment(cluster.adminClient, false, assignment,
interBrokerThrottle, -1L);
+ waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3),
interBrokerThrottle);
+
+ Map<TopicPartition, PartitionReassignmentState> finalAssignment = new
HashMap<>();
+ finalAssignment.put(new TopicPartition("foo", 0),
+ new PartitionReassignmentState(Arrays.asList(0, 3, 2),
Arrays.asList(0, 3, 2), true));
+ finalAssignment.put(new TopicPartition("baz", 2),
+ new PartitionReassignmentState(Arrays.asList(3, 2, 1),
Arrays.asList(3, 2, 1), true));
+
+ // Wait for the assignment to complete
+ TestUtils.waitUntilTrue(
+ () -> {
+ // Check the reassignment status.
+ kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult
result = runVerifyAssignment(cluster.adminClient, assignment, true);
+
+ if (!result.partsOngoing()) {
+ return true;
+ } else {
+ assertFalse(
+
result.partStates().values().forall(ReassignPartitionsCommand.PartitionReassignmentState::done),
+ "Expected at least one partition reassignment to be
ongoing when result = " + result
+ );
+ assertEquals(seq(0, 3, 2), result.partStates().get(new
TopicPartition("foo", 0)).get().targetReplicas());
+ assertEquals(seq(3, 2, 1), result.partStates().get(new
TopicPartition("baz", 2)).get().targetReplicas());
+ System.out.println("Current result: " + result);
+ waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3),
interBrokerThrottle);
+ return false;
+ }
+ }, () -> "Expected reassignment to complete.",
DEFAULT_MAX_WAIT_MS, 100L);
+ waitForVerifyAssignment(cluster.adminClient, assignment, true,
+ new VerifyAssignmentResult(finalAssignment));
+ // The throttles should still have been preserved, since we ran with
--preserve-throttles
+ waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3),
interBrokerThrottle);
+ // Now remove the throttles.
+ waitForVerifyAssignment(cluster.adminClient, assignment, false,
+ new VerifyAssignmentResult(finalAssignment));
+ waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = {"zk", "kraft"})
+ public void testProduceAndConsumeWithReassignmentInProgress(String quorum)
throws Exception {
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ cluster.produceMessages("baz", 2, 60);
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+ runExecuteAssignment(cluster.adminClient, false, assignment, 300L,
-1L);
+ cluster.produceMessages("baz", 2, 100);
+ Consumer<byte[], byte[]> consumer =
TestUtils.createConsumer(cluster.brokerList,
+ "group",
+ "earliest",
+ true,
+ false,
+ 500,
+ SecurityProtocol.PLAINTEXT,
+ None$.empty(),
+ None$.empty(),
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer()
+ );
+
+ TopicPartition part = new TopicPartition("baz", 2);
+ try {
+ consumer.assign(Collections.singleton(part));
+ TestUtils.pollUntilAtLeastNumRecords(consumer, 100,
DEFAULT_MAX_WAIT_MS);
+ } finally {
+ consumer.close();
+ }
+ TestUtils.removeReplicationThrottleForPartitions(cluster.adminClient,
seq(0, 1, 2, 3), set(part));
+ Map<TopicPartition, PartitionReassignmentState> finalAssignment =
Collections.singletonMap(part,
+ new PartitionReassignmentState(Arrays.asList(3, 2, 1),
Arrays.asList(3, 2, 1), true));
+ waitForVerifyAssignment(cluster.adminClient, assignment, false,
+ new VerifyAssignmentResult(finalAssignment));
+ }
+
+ /**
+ * Test running a reassignment and then cancelling it.
+ */
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = {"zk", "kraft"})
+ public void testCancellation(String quorum) throws Exception {
+ TopicPartition foo0 = new TopicPartition("foo", 0);
+ TopicPartition baz1 = new TopicPartition("baz", 1);
+
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ cluster.produceMessages(foo0.topic(), foo0.partition(), 200);
+ cluster.produceMessages(baz1.topic(), baz1.partition(), 200);
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
+
+
"{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+ assertEquals(unthrottledBrokerConfigs,
+ describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+ long interBrokerThrottle = 1L;
+ runExecuteAssignment(cluster.adminClient, false, assignment,
interBrokerThrottle, -1L);
+ waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3),
interBrokerThrottle);
+
+ Map<TopicPartition, PartitionReassignmentState> partStates = new
HashMap<>();
+
+ partStates.put(foo0, new PartitionReassignmentState(Arrays.asList(0,
1, 3, 2), Arrays.asList(0, 1, 3), false));
+ partStates.put(baz1, new PartitionReassignmentState(Arrays.asList(0,
2, 3, 1), Arrays.asList(0, 2, 3), false));
+
+ // Verify that the reassignment is running. The very low throttle
should keep it
+ // from completing before this runs.
+ waitForVerifyAssignment(cluster.adminClient, assignment, true,
+ new VerifyAssignmentResult(partStates, true,
Collections.emptyMap(), false));
+ // Cancel the reassignment.
+ assertEquals(new Tuple2<>(set(foo0, baz1), set()),
runCancelAssignment(cluster.adminClient, assignment, true));
+ // Broker throttles are still active because we passed
--preserve-throttles
+ waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3),
interBrokerThrottle);
+ // Cancelling the reassignment again should reveal nothing to cancel.
+ assertEquals(new Tuple2<>(set(), set()),
runCancelAssignment(cluster.adminClient, assignment, false));
+ // This time, the broker throttles were removed.
+ waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+ // Verify that there are no ongoing reassignments.
+ assertFalse(runVerifyAssignment(cluster.adminClient, assignment,
false).partsOngoing());
+ // Verify that the partition is removed from cancelled replicas
+ verifyReplicaDeleted(foo0, 3);
+ verifyReplicaDeleted(baz1, 3);
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = {"zk", "kraft"})
+ public void testCancellationWithAddingReplicaInIsr(String quorum) throws
Exception {
+ TopicPartition foo0 = new TopicPartition("foo", 0);
+
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ cluster.produceMessages(foo0.topic(), foo0.partition(), 200);
+
+ // The reassignment will bring replicas 3 and 4 into the replica set
and remove 1 and 2.
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+
+ // We will throttle replica 4 so that only replica 3 joins the ISR
+ TestUtils.setReplicationThrottleForPartitions(
+ cluster.adminClient,
+ seq(4),
+ set(foo0),
+ 1
+ );
+
+ // Execute the assignment and wait for replica 3 (only) to join the ISR
+ runExecuteAssignment(
+ cluster.adminClient,
+ false,
+ assignment,
+ -1L,
+ -1L
+ );
+ TestUtils.waitUntilTrue(
+ () -> Objects.equals(TestUtils.currentIsr(cluster.adminClient,
foo0), set(0, 1, 2, 3)),
+ () -> "Timed out while waiting for replica 3 to join the ISR",
+ DEFAULT_MAX_WAIT_MS, 100L
+ );
+
+ // Now cancel the assignment and verify that the partition is removed
from cancelled replicas
+ assertEquals(new Tuple2<>(set(foo0), set()),
runCancelAssignment(cluster.adminClient, assignment, true));
+ verifyReplicaDeleted(foo0, 3);
+ verifyReplicaDeleted(foo0, 4);
+ }
+
+ private void verifyReplicaDeleted(
+ TopicPartition topicPartition,
+ Integer replicaId
+ ) {
+ TestUtils.waitUntilTrue(
+ () -> {
+ KafkaBroker server = cluster.servers.get(replicaId);
+ HostedPartition partition =
server.replicaManager().getPartition(topicPartition);
+ Option<UnifiedLog> log =
server.logManager().getLog(topicPartition, false);
+ return partition == HostedPartition.None$.MODULE$ &&
log.isEmpty();
+ },
+ () -> "Timed out waiting for replica " + replicaId + " of " +
topicPartition + " to be deleted",
+ DEFAULT_MAX_WAIT_MS,
+ 100L
+ );
+ }
+
+ private void waitForLogDirThrottle(Set<Integer> throttledBrokers, Long
logDirThrottle) {
+ Map<String, Long> throttledConfigMap = new HashMap<>();
+ throttledConfigMap.put(brokerLevelLeaderThrottle(), -1L);
+ throttledConfigMap.put(brokerLevelFollowerThrottle(), -1L);
+ throttledConfigMap.put(brokerLevelLogDirThrottle(), logDirThrottle);
+ waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
+ }
+
+ private void waitForInterBrokerThrottle(List<Integer> throttledBrokers,
Long interBrokerThrottle) {
+ Map<String, Long> throttledConfigMap = new HashMap<>();
+ throttledConfigMap.put(brokerLevelLeaderThrottle(),
interBrokerThrottle);
+ throttledConfigMap.put(brokerLevelFollowerThrottle(),
interBrokerThrottle);
+ throttledConfigMap.put(brokerLevelLogDirThrottle(), -1L);
+ waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
+ }
+
+ private void waitForBrokerThrottles(Collection<Integer> throttledBrokers,
Map<String, Long> throttleConfig) {
+ Map<Integer, Map<String, Long>> throttledBrokerConfigs = new
HashMap<>();
+ unthrottledBrokerConfigs.forEach((brokerId, unthrottledConfig) -> {
+ Map<String, Long> expectedThrottleConfig =
throttledBrokers.contains(brokerId)
+ ? throttleConfig
+ : unthrottledConfig;
+ throttledBrokerConfigs.put(brokerId, expectedThrottleConfig);
+ });
+ waitForBrokerLevelThrottles(throttledBrokerConfigs);
+ }
+
+ private void waitForBrokerLevelThrottles(Map<Integer, Map<String, Long>>
targetThrottles) {
+ AtomicReference<Map<Integer, Map<String, Long>>> curThrottles = new
AtomicReference<>(new HashMap<>());
+ TestUtils.waitUntilTrue(() -> {
+ try {
+
curThrottles.set(describeBrokerLevelThrottles(targetThrottles.keySet()));
+ return targetThrottles.equals(curThrottles.get());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }, () -> "timed out waiting for broker throttle to become " +
targetThrottles + ". " +
+ "Latest throttles were " + curThrottles.get(),
DEFAULT_MAX_WAIT_MS, 25);
+ }
+
+ /**
+ * Describe the broker-level throttles in the cluster.
+ *
+ * @return A map whose keys are broker IDs and whose values
are throttle
+ * information. The nested maps are keyed on
throttle name.
+ */
+ private Map<Integer, Map<String, Long>>
describeBrokerLevelThrottles(Collection<Integer> brokerIds) throws
ExecutionException, InterruptedException {
+ Map<Integer, Map<String, Long>> results = new HashMap<>();
+ for (Integer brokerId : brokerIds) {
+ ConfigResource brokerResource = new
ConfigResource(ConfigResource.Type.BROKER, brokerId.toString());
+ Config brokerConfigs =
cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
+ .get(brokerResource)
+ .get();
+
+ Map<String, Long> throttles = new HashMap<>();
+ brokerLevelThrottles().foreach(throttleName -> {
+ String configValue =
Optional.ofNullable(brokerConfigs.get(throttleName)).map(ConfigEntry::value).orElse("-1");
+ throttles.put(throttleName, Long.parseLong(configValue));
+ return null;
+ });
+ results.put(brokerId, throttles);
+ }
+ return results;
+ }
+
+ /**
+ * Test moving partitions between directories.
+ */
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
+ public void testLogDirReassignment(String quorum) throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 0);
+
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ cluster.produceMessages(topicPartition.topic(),
topicPartition.partition(), 700);
+
+ int targetBrokerId = 0;
+ List<Integer> replicas = Arrays.asList(0, 1, 2);
+ LogDirReassignment reassignment =
buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
+
+ // Start the replica move, but throttle it to be very slow so that it
can't complete
+ // before our next checks happen.
+ long logDirThrottle = 1L;
+ runExecuteAssignment(cluster.adminClient, false, reassignment.json,
+ -1L, logDirThrottle);
+
+ // Check the output of --verify
+ waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+ new VerifyAssignmentResult(Collections.singletonMap(
+ topicPartition, new
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+ ), false, Collections.singletonMap(
+ new TopicPartitionReplica(topicPartition.topic(),
topicPartition.partition(), 0),
+ new ActiveMoveState(reassignment.currentDir,
reassignment.targetDir, reassignment.targetDir)
+ ), true));
+ waitForLogDirThrottle(Collections.singleton(0), logDirThrottle);
+
+ // Remove the throttle
+ cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.BROKER, "0"),
+ Collections.singletonList(new AlterConfigOp(
+ new ConfigEntry(brokerLevelLogDirThrottle(), ""),
AlterConfigOp.OpType.DELETE))))
+ .all().get();
+ waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+
+ // Wait for the directory movement to complete.
+ waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+ new VerifyAssignmentResult(Collections.singletonMap(
+ topicPartition, new
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+ ), false, Collections.singletonMap(
+ new TopicPartitionReplica(topicPartition.topic(),
topicPartition.partition(), 0),
+ new CompletedMoveState(reassignment.targetDir)
+ ), false));
+
+ BrokerDirs info1 = new
BrokerDirs(cluster.adminClient.describeLogDirs(IntStream.range(0,
4).boxed().collect(Collectors.toList())), 0);
+ assertEquals(reassignment.targetDir,
info1.curLogDirs.getOrDefault(topicPartition, ""));
+ }
+
+ @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+ @ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
+ public void testAlterLogDirReassignmentThrottle(String quorum) throws
Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 0);
+
+ cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(),
Collections.emptyMap());
+ cluster.setup();
+ cluster.produceMessages(topicPartition.topic(),
topicPartition.partition(), 700);
+
+ int targetBrokerId = 0;
+ List<Integer> replicas = Arrays.asList(0, 1, 2);
+ LogDirReassignment reassignment =
buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
+
+ // Start the replica move with a low throttle so it does not complete
+ long initialLogDirThrottle = 1L;
+ runExecuteAssignment(cluster.adminClient, false, reassignment.json,
+ -1L, initialLogDirThrottle);
+ waitForLogDirThrottle(new HashSet<>(Collections.singletonList(0)),
initialLogDirThrottle);
+
+ // Now increase the throttle and verify that the log dir movement
completes
+ long updatedLogDirThrottle = 3000000L;
+ runExecuteAssignment(cluster.adminClient, true, reassignment.json,
+ -1L, updatedLogDirThrottle);
+ waitForLogDirThrottle(Collections.singleton(0), updatedLogDirThrottle);
+
+ waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+ new VerifyAssignmentResult(Collections.singletonMap(
+ topicPartition, new
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+ ), false, Collections.singletonMap(
+ new TopicPartitionReplica(topicPartition.topic(),
topicPartition.partition(), targetBrokerId),
+ new CompletedMoveState(reassignment.targetDir)
+ ), false));
+ }
+
+ static class LogDirReassignment {
+ final String json;
+ final String currentDir;
+ final String targetDir;
+
+ public LogDirReassignment(String json, String currentDir, String
targetDir) {
+ this.json = json;
+ this.currentDir = currentDir;
+ this.targetDir = targetDir;
+ }
+ }
+
+ private LogDirReassignment buildLogDirReassignment(TopicPartition
topicPartition,
+ int brokerId,
+ List<Integer> replicas)
throws ExecutionException, InterruptedException {
+
+ DescribeLogDirsResult describeLogDirsResult =
cluster.adminClient.describeLogDirs(
+ IntStream.range(0, 4).boxed().collect(Collectors.toList()));
+
+ BrokerDirs logDirInfo = new BrokerDirs(describeLogDirsResult,
brokerId);
+ assertTrue(logDirInfo.futureLogDirs.isEmpty());
+
+ String currentDir = logDirInfo.curLogDirs.get(topicPartition);
+ String newDir = logDirInfo.logDirs.stream().filter(dir ->
!dir.equals(currentDir)).findFirst().get();
+
+ List<String> logDirs = replicas.stream().map(replicaId -> {
+ if (replicaId == brokerId)
+ return "\"" + newDir + "\"";
+ else
+ return "\"any\"";
+ }).collect(Collectors.toList());
+
+ String reassignmentJson =
+ " { \"version\": 1," +
+ " \"partitions\": [" +
+ " {" +
+ " \"topic\": \"" + topicPartition.topic() + "\"," +
+ " \"partition\": " + topicPartition.partition() + "," +
+ " \"replicas\": [" +
replicas.stream().map(Object::toString).collect(Collectors.joining(",")) + "],"
+
+ " \"log_dirs\": [" + String.join(",", logDirs) + "]" +
+ " }" +
+ " ]" +
+ " }";
+
+ return new LogDirReassignment(reassignmentJson, currentDir, newDir);
+ }
+
+
+
+ private kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult
runVerifyAssignment(Admin adminClient, String jsonString,
+ Boolean
preserveThrottles) {
+ System.out.println("==> verifyAssignment(adminClient, jsonString=" +
jsonString);
+ return verifyAssignment(adminClient, jsonString, preserveThrottles);
+ }
+
+ private void waitForVerifyAssignment(Admin adminClient,
+ String jsonString,
+ Boolean preserveThrottles,
+ VerifyAssignmentResult
expectedResult) {
+ final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult
expectedResult0 = asScala(expectedResult);
+ final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult[]
latestResult = {null};
+ TestUtils.waitUntilTrue(
+ () -> {
+ latestResult[0] = runVerifyAssignment(adminClient, jsonString,
preserveThrottles);
+ return expectedResult0.equals(latestResult[0]);
+ }, () -> "Timed out waiting for verifyAssignment result " +
expectedResult + ". " +
+ "The latest result was " + latestResult[0],
DEFAULT_MAX_WAIT_MS, 10L);
+ }
+
+ private void runExecuteAssignment(Admin adminClient,
+ Boolean additional,
+ String reassignmentJson,
+ Long interBrokerThrottle,
+ Long replicaAlterLogDirsThrottle) {
+ System.out.println("==> executeAssignment(adminClient, additional=" +
additional + ", " +
+ "reassignmentJson=" + reassignmentJson + ", " +
+ "interBrokerThrottle=" + interBrokerThrottle + ", " +
+ "replicaAlterLogDirsThrottle=" + replicaAlterLogDirsThrottle +
"))");
+ executeAssignment(adminClient, additional, reassignmentJson,
+ interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L,
Time.SYSTEM);
+ }
+
+ private Tuple2<scala.collection.immutable.Set<TopicPartition>,
scala.collection.immutable.Set<TopicPartitionReplica>> runCancelAssignment(
+ Admin adminClient,
+ String jsonString,
+ Boolean preserveThrottles
+ ) {
+ System.out.println("==> cancelAssignment(adminClient, jsonString=" +
jsonString);
+ return cancelAssignment(adminClient, jsonString, preserveThrottles,
10000L, Time.SYSTEM);
+ }
+
+ static class BrokerDirs {
+ final DescribeLogDirsResult result;
+ final int brokerId;
+
+ final Set<String> logDirs = new HashSet<>();
+ final Map<TopicPartition, String> curLogDirs = new HashMap<>();
+ final Map<TopicPartition, String> futureLogDirs = new HashMap<>();
+
+ public BrokerDirs(DescribeLogDirsResult result, int brokerId) throws
ExecutionException, InterruptedException {
+ this.result = result;
+ this.brokerId = brokerId;
+
+ result.descriptions().get(brokerId).get().forEach((logDirName,
logDirInfo) -> {
+ logDirs.add(logDirName);
+ logDirInfo.replicaInfos().forEach((part, info) -> {
+ if (info.isFuture()) {
+ futureLogDirs.put(part, logDirName);
+ } else {
+ curLogDirs.put(part, logDirName);
+ }
+ });
+ });
+ }
+ }
+
+ class ReassignPartitionsTestCluster implements Closeable {
+ private final List<KafkaConfig> brokerConfigs = new ArrayList<>();
+
+ private final Map<Integer, String> brokers = new HashMap<>(); {
+ brokers.put(0, "rack0");
+ brokers.put(1, "rack0");
+ brokers.put(2, "rack1");
+ brokers.put(3, "rack1");
+ brokers.put(4, "rack1");
+ }
+
+ private final Map<String, List<List<Integer>>> topics = new
HashMap<>(); {
+ topics.put("foo", Arrays.asList(Arrays.asList(0, 1, 2),
Arrays.asList(1, 2, 3)));
+ topics.put("bar", Arrays.asList(Arrays.asList(3, 2, 1)));
+ topics.put("baz", Arrays.asList(Arrays.asList(1, 0, 2),
Arrays.asList(2, 0, 1), Arrays.asList(0, 2, 1)));
+ }
+
+ private final List<KafkaBroker> servers = new ArrayList<>();
+
+ private String brokerList;
+
+ private Admin adminClient;
+
+ public ReassignPartitionsTestCluster(Map<String, String>
configOverrides, Map<Integer, Map<String, String>> brokerConfigOverrides) {
+ brokers.forEach((brokerId, rack) -> {
+ Properties config = TestUtils.createBrokerConfig(
+ brokerId,
+ zkConnectOrNull(),
+ false, // shorten test time
+ true,
+ TestUtils.RandomPort(),
+ scala.None$.empty(),
+ scala.None$.empty(),
+ scala.None$.empty(),
+ true,
+ false,
+ TestUtils.RandomPort(),
+ false,
+ TestUtils.RandomPort(),
+ false,
+ TestUtils.RandomPort(),
+ Some$.MODULE$.apply(rack),
+ 3,
+ false,
+ 1,
+ (short) 1,
+ false);
+ // shorter backoff to reduce test durations when no active
partitions are eligible for fetching due to throttling
+ config.setProperty(KafkaConfig.ReplicaFetchBackoffMsProp(),
"100");
+ // Don't move partition leaders automatically.
+
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+ config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp(),
"1000");
+ configOverrides.forEach(config::setProperty);
+ brokerConfigOverrides.getOrDefault(brokerId,
Collections.emptyMap()).forEach(config::setProperty);
+
+ brokerConfigs.add(new KafkaConfig(config));
+ });
+ }
+
+ public void setup() throws ExecutionException, InterruptedException {
+ createServers();
+ createTopics();
+ }
+
+ public void createServers() {
+ brokers.keySet().forEach(brokerId ->
+ servers.add(createBroker(brokerConfigs.get(brokerId),
Time.SYSTEM, true, scala.None$.empty()))
+ );
+ }
+
+ public void createTopics() throws ExecutionException,
InterruptedException {
+ TestUtils.waitUntilBrokerMetadataIsPropagated(seq(servers),
DEFAULT_MAX_WAIT_MS);
+ brokerList = TestUtils.plaintextBootstrapServers(seq(servers));
+
+ adminClient =
Admin.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList));
+
+ adminClient.createTopics(topics.entrySet().stream().map(e -> {
+ Map<Integer, List<Integer>> partMap = new HashMap<>();
+
+ Iterator<List<Integer>> partsIter = e.getValue().iterator();
+ int index = 0;
+ while (partsIter.hasNext()) {
+ partMap.put(index, partsIter.next());
+ index++;
+ }
+ return new NewTopic(e.getKey(), partMap);
+ }).collect(Collectors.toList())).all().get();
+ topics.forEach((topicName, parts) ->
TestUtils.waitForAllPartitionsMetadata(seq(servers), topicName, parts.size()));
+
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(
+ seq(cluster.servers),
+ controllerServer(),
+ "Timeout waiting for controller metadata propagating to
brokers"
+ );
+ }
+ }
+
+ public void produceMessages(String topic, int partition, int
numMessages) {
+ List<ProducerRecord<byte[], byte[]>> records = IntStream.range(0,
numMessages).mapToObj(i ->
+ new ProducerRecord<byte[], byte[]>(topic, partition,
+ null, new byte[10000])).collect(Collectors.toList());
+ TestUtils.produceMessages(seq(servers), seq(records), -1);
+ }
+
+ @Override
+ public void close() {
+ brokerList = null;
+ Utils.closeQuietly(adminClient, "adminClient");
+ adminClient = null;
+ try {
+ TestUtils.shutdownServers(seq(servers), true);
+ } finally {
+ servers.clear();
+ }
+ }
+ }
+
+ private ReassignPartitionsCommand.VerifyAssignmentResult
asScala(VerifyAssignmentResult res) {
+ Map<TopicPartition,
ReassignPartitionsCommand.PartitionReassignmentState> partStates = new
HashMap<>();
+ res.partStates.forEach((tp, state) -> partStates.put(tp,
asScala(state)));
+
+ Map<TopicPartitionReplica, ReassignPartitionsCommand.LogDirMoveState>
moveStates = new HashMap<>();
+ res.moveStates.forEach((tpr, state) -> moveStates.put(tpr,
asScala(state)));
+
+ return new
ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates),
res.partsOngoing, asScala(moveStates), res.movesOngoing);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private ReassignPartitionsCommand.PartitionReassignmentState
asScala(PartitionReassignmentState state) {
+ return new ReassignPartitionsCommand.PartitionReassignmentState(
+ seq((List) state.currentReplicas),
+ seq((List) state.targetReplicas),
+ state.done
+ );
+ }
+
+ private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState
state) {
+ if (state instanceof ActiveMoveState) {
+ ActiveMoveState s = (ActiveMoveState) state;
+ return new
ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir,
s.futureLogDir);
+ } else if (state instanceof CancelledMoveState) {
+ CancelledMoveState s = (CancelledMoveState) state;
+ return new
ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir);
+ } else if (state instanceof CompletedMoveState) {
+ CompletedMoveState s = (CompletedMoveState) state;
+ return new
ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir);
+ } else if (state instanceof MissingLogDirMoveState) {
+ MissingLogDirMoveState s = (MissingLogDirMoveState) state;
+ return new
ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir);
+ } else if (state instanceof MissingReplicaMoveState) {
+ MissingReplicaMoveState s = (MissingReplicaMoveState) state;
+ return new
ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir);
+ }
+
+ throw new IllegalArgumentException("Unknown state " + state);
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T> scala.collection.immutable.Set<T> set(final T... set) {
+ return mutableSet(set).toSet();
+ }
+
+ @SuppressWarnings({"deprecation", "unchecked"})
+ private static <T> scala.collection.mutable.Set<T> mutableSet(final
T...set) {
+ return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private static <T> Seq<T> seq(T... seq) {
+ return seq(Arrays.asList(seq));
+ }
+
+ @SuppressWarnings({"deprecation"})
+ private static <T> Seq<T> seq(Collection<T> seq) {
+ return
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
+ }
+
+ @SuppressWarnings("deprecation")
+ private static <K, V> scala.collection.Map<K, V> asScala(Map<K, V> jmap) {
+ return JavaConverters.mapAsScalaMap(jmap);
+ }
+}