This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f8dbad564f KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in 
java (#14456)
8f8dbad564f is described below

commit 8f8dbad564ffd9be409bb85edadbc40659cd0a56
Author: Nikolay <[email protected]>
AuthorDate: Mon Oct 2 23:22:17 2023 +0300

    KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java (#14456)
    
    This PR is part of #13247
    It contains ReassignPartitionsIntegrationTest rewritten in java.
    Goal of PR is reduce changes size in main PR.
    
    Reviewers: Taras Ledkov  <[email protected]>, Justine Olshan 
<[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   4 +
 core/src/test/java/kafka/test/ClusterConfig.java   |   5 +
 .../admin/ReassignPartitionsIntegrationTest.scala  | 740 -----------------
 .../tools/reassign/VerifyAssignmentResult.java     |   2 +-
 .../org/apache/kafka/tools/ToolsTestUtils.java     |   4 +
 .../ReassignPartitionsIntegrationTest.java         | 924 +++++++++++++++++++++
 7 files changed, 939 insertions(+), 741 deletions(-)

diff --git a/build.gradle b/build.gradle
index c73331bb02f..ffdbbf1ace4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1911,6 +1911,7 @@ project(':tools') {
     testImplementation project(':connect:api')
     testImplementation project(':connect:runtime')
     testImplementation project(':connect:runtime').sourceSets.test.output
+    testImplementation project(':storage:api').sourceSets.main.output
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
     testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f4b5decdd88..888e8a41ae8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -293,7 +293,11 @@
 
     <subpackage name="reassign">
       <allow pkg="org.apache.kafka.admin"/>
+      <allow pkg="org.apache.kafka.tools"/>
       <allow pkg="kafka.admin" />
+      <allow pkg="kafka.cluster" />
+      <allow pkg="kafka.log" />
+      <allow pkg="kafka.server" />
       <allow pkg="scala" />
     </subpackage>
   </subpackage>
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java 
b/core/src/test/java/kafka/test/ClusterConfig.java
index 8e9f7de96ab..c4ea8669d53 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -146,6 +146,11 @@ public class ClusterConfig {
         copy.consumerProperties.putAll(consumerProperties);
         copy.saslServerProperties.putAll(saslServerProperties);
         copy.saslClientProperties.putAll(saslClientProperties);
+        perBrokerOverrideProperties.forEach((brokerId, props) -> {
+            Properties propsCopy = new Properties();
+            propsCopy.putAll(props);
+            copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
+        });
         return copy;
     }
 
diff --git 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
deleted file mode 100644
index d3a04da4d13..00000000000
--- 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ /dev/null
@@ -1,740 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import kafka.admin.ReassignPartitionsCommand._
-import kafka.server._
-import kafka.utils.Implicits._
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
-import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Timeout}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-import java.io.Closeable
-import java.util.{Collections, HashMap, List}
-import scala.collection.{Map, Seq, mutable}
-import scala.jdk.CollectionConverters._
-
-@Timeout(300)
-class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
-
-  var cluster: ReassignPartitionsTestCluster = _
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster")
-    super.tearDown()
-  }
-
-  val unthrottledBrokerConfigs =
-    0.to(4).map { brokerId =>
-      brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
-    }.toMap
-
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testReassignment(quorum: String): Unit = {
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    executeAndVerifyReassignment()
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
-  def testReassignmentWithAlterPartitionDisabled(quorum: String): Unit = {
-    // Test reassignment when the IBP is on an older version which does not use
-    // the `AlterPartition` API. In this case, the controller will register 
individual
-    // watches for each reassigning partition so that the reassignment can be
-    // completed as soon as the ISR is expanded.
-    val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> 
IBP_2_7_IV1.version)
-    cluster = new ReassignPartitionsTestCluster(configOverrides = 
configOverrides)
-    cluster.setup()
-    executeAndVerifyReassignment()
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
-  def testReassignmentCompletionDuringPartialUpgrade(quorum: String): Unit = {
-    // Test reassignment during a partial upgrade when some brokers are 
relying on
-    // `AlterPartition` and some rely on the old notification logic through 
Zookeeper.
-    // In this test case, broker 0 starts up first on the latest IBP and is 
typically
-    // elected as controller. The three remaining brokers start up on the 
older IBP.
-    // We want to ensure that reassignment can still complete through the ISR 
change
-    // notification path even though the controller expects `AlterPartition`.
-
-    // Override change notification settings so that test is not delayed by ISR
-    // change notification delay
-    ZkAlterPartitionManager.DefaultIsrPropagationConfig = 
IsrChangePropagationConfig(
-      checkIntervalMs = 500,
-      lingerMs = 100,
-      maxDelayMs = 500
-    )
-
-    val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> 
IBP_2_7_IV1.version)
-    val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> 
oldIbpConfig)
-
-    cluster = new ReassignPartitionsTestCluster(brokerConfigOverrides = 
brokerConfigOverrides)
-    cluster.setup()
-
-    executeAndVerifyReassignment()
-  }
-
-  private def executeAndVerifyReassignment(): Unit = {
-    val assignment = """{"version":1,"partitions":""" +
-      
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},"""
 +
-      
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}"""
 +
-      """]}"""
-
-    val foo0 = new TopicPartition("foo", 0)
-    val bar0 = new TopicPartition("bar", 0)
-
-    // Check that the assignment has not yet been started yet.
-    val initialAssignment = Map(
-      foo0 -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
-      bar0 -> PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
-    )
-    waitForVerifyAssignment(cluster.adminClient, assignment, false,
-      VerifyAssignmentResult(initialAssignment))
-
-    // Execute the assignment
-    runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
-    assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
-    val finalAssignment = Map(
-      foo0 -> PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
-      bar0 -> PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
-    )
-
-    val verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, 
assignment, false)
-    assertFalse(verifyAssignmentResult.movesOngoing)
-
-    // Wait for the assignment to complete
-    waitForVerifyAssignment(cluster.adminClient, assignment, false,
-      VerifyAssignmentResult(finalAssignment))
-
-    assertEquals(unthrottledBrokerConfigs,
-      describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
-
-    // Verify that partitions are removed from brokers no longer assigned
-    verifyReplicaDeleted(topicPartition = foo0, replicaId = 2)
-    verifyReplicaDeleted(topicPartition = bar0, replicaId = 1)
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testHighWaterMarkAfterPartitionReassignment(quorum: String): Unit = {
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    val assignment = """{"version":1,"partitions":""" +
-      
"""[{"topic":"foo","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]}"""
 +
-      """]}"""
-
-    // Set the high water mark of foo-0 to 123 on its leader.
-    val part = new TopicPartition("foo", 0)
-    cluster.servers(0).replicaManager.logManager.truncateFullyAndStartAt(part, 
123L, false)
-
-    // Execute the assignment
-    runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
-    val finalAssignment = Map(part ->
-      PartitionReassignmentState(Seq(3, 1, 2), Seq(3, 1, 2), true))
-
-    // Wait for the assignment to complete
-    waitForVerifyAssignment(cluster.adminClient, assignment, false,
-      VerifyAssignmentResult(finalAssignment))
-
-    TestUtils.waitUntilTrue(() => {
-      cluster.servers(3).replicaManager.onlinePartition(part).
-        flatMap(_.leaderLogIfLocal).isDefined
-    }, "broker 3 should be the new leader", pause = 10L)
-    assertEquals(123L, 
cluster.servers(3).replicaManager.localLogOrException(part).highWatermark,
-      s"Expected broker 3 to have the correct high water mark for the 
partition.")
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testAlterReassignmentThrottle(quorum: String): Unit = {
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    cluster.produceMessages("foo", 0, 50)
-    cluster.produceMessages("baz", 2, 60)
-    val assignment = """{"version":1,"partitions":
-      
[{"topic":"foo","partition":0,"replicas":[0,3,2],"log_dirs":["any","any","any"]},
-      
{"topic":"baz","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]}
-      ]}"""
-
-    // Execute the assignment with a low throttle
-    val initialThrottle = 1L
-    runExecuteAssignment(cluster.adminClient, false, assignment, 
initialThrottle, -1L)
-    waitForInterBrokerThrottle(Set(0, 1, 2, 3), initialThrottle)
-
-    // Now update the throttle and verify the reassignment completes
-    val updatedThrottle = 300000L
-    runExecuteAssignment(cluster.adminClient, additional = true, assignment, 
updatedThrottle, -1L)
-    waitForInterBrokerThrottle(Set(0, 1, 2, 3), updatedThrottle)
-
-    val finalAssignment = Map(
-      new TopicPartition("foo", 0) ->
-        PartitionReassignmentState(Seq(0, 3, 2), Seq(0, 3, 2), true),
-      new TopicPartition("baz", 2) ->
-        PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 1), true))
-
-    // Now remove the throttles.
-    waitForVerifyAssignment(cluster.adminClient, assignment, false,
-      VerifyAssignmentResult(finalAssignment))
-    waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
-  }
-
-  /**
-   * Test running a reassignment with the interBrokerThrottle set.
-   */
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testThrottledReassignment(quorum: String): Unit = {
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    cluster.produceMessages("foo", 0, 50)
-    cluster.produceMessages("baz", 2, 60)
-    val assignment = """{"version":1,"partitions":""" +
-      
"""[{"topic":"foo","partition":0,"replicas":[0,3,2],"log_dirs":["any","any","any"]},"""
 +
-      
"""{"topic":"baz","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]}"""
 +
-      """]}"""
-
-    // Check that the assignment has not yet been started yet.
-    val initialAssignment = Map(
-      new TopicPartition("foo", 0) ->
-        PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 3, 2), true),
-      new TopicPartition("baz", 2) ->
-        PartitionReassignmentState(Seq(0, 2, 1), Seq(3, 2, 1), true))
-    assertEquals(VerifyAssignmentResult(initialAssignment), 
runVerifyAssignment(cluster.adminClient, assignment, false))
-    assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
-
-    // Execute the assignment
-    val interBrokerThrottle = 300000L
-    runExecuteAssignment(cluster.adminClient, false, assignment, 
interBrokerThrottle, -1L)
-    waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
-
-    val finalAssignment = Map(
-      new TopicPartition("foo", 0) ->
-        PartitionReassignmentState(Seq(0, 3, 2), Seq(0, 3, 2), true),
-      new TopicPartition("baz", 2) ->
-        PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 1), true))
-
-    // Wait for the assignment to complete
-    TestUtils.waitUntilTrue(
-      () => {
-        // Check the reassignment status.
-        val result = runVerifyAssignment(cluster.adminClient, assignment, true)
-        if (!result.partsOngoing) {
-          true
-        } else {
-          assertFalse(result.partStates.forall(_._2.done), s"Expected at least 
one partition reassignment to be ongoing when result = $result")
-          assertEquals(Seq(0, 3, 2), result.partStates(new 
TopicPartition("foo", 0)).targetReplicas)
-          assertEquals(Seq(3, 2, 1), result.partStates(new 
TopicPartition("baz", 2)).targetReplicas)
-          logger.info(s"Current result: ${result}")
-          waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
-          false
-        }
-      }, "Expected reassignment to complete.")
-    waitForVerifyAssignment(cluster.adminClient, assignment, true,
-      VerifyAssignmentResult(finalAssignment))
-    // The throttles should still have been preserved, since we ran with 
--preserve-throttles
-    waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
-    // Now remove the throttles.
-    waitForVerifyAssignment(cluster.adminClient, assignment, false,
-      VerifyAssignmentResult(finalAssignment))
-    waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testProduceAndConsumeWithReassignmentInProgress(quorum: String): Unit = {
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    cluster.produceMessages("baz", 2, 60)
-    val assignment = """{"version":1,"partitions":""" +
-      
"""[{"topic":"baz","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]}"""
 +
-      """]}"""
-    runExecuteAssignment(cluster.adminClient, false, assignment, 300L, -1L)
-    cluster.produceMessages("baz", 2, 100)
-    val consumer = TestUtils.createConsumer(cluster.brokerList)
-    val part = new TopicPartition("baz", 2)
-    try {
-      consumer.assign(Seq(part).asJava)
-      TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 100)
-    } finally {
-      consumer.close()
-    }
-    TestUtils.removeReplicationThrottleForPartitions(cluster.adminClient, 
Seq(0,1,2,3), Set(part))
-    val finalAssignment = Map(part ->
-      PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 1), true))
-    waitForVerifyAssignment(cluster.adminClient, assignment, false,
-      VerifyAssignmentResult(finalAssignment))
-  }
-
-  /**
-   * Test running a reassignment and then cancelling it.
-   */
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testCancellation(quorum: String): Unit = {
-    val foo0 = new TopicPartition("foo", 0)
-    val baz1 = new TopicPartition("baz", 1)
-
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    cluster.produceMessages(foo0.topic, foo0.partition, 200)
-    cluster.produceMessages(baz1.topic, baz1.partition, 200)
-    val assignment = """{"version":1,"partitions":""" +
-      
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},"""
 +
-      
"""{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}"""
 +
-      """]}"""
-    assertEquals(unthrottledBrokerConfigs,
-      describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
-    val interBrokerThrottle = 1L
-    runExecuteAssignment(cluster.adminClient, false, assignment, 
interBrokerThrottle, -1L)
-    waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
-
-    // Verify that the reassignment is running.  The very low throttle should 
keep it
-    // from completing before this runs.
-    waitForVerifyAssignment(cluster.adminClient, assignment, true,
-      VerifyAssignmentResult(Map(
-        foo0 -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), 
false),
-        baz1 -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), 
false)),
-        true, Map(), false))
-    // Cancel the reassignment.
-    assertEquals((Set(foo0, baz1), Set()), 
runCancelAssignment(cluster.adminClient, assignment, true))
-    // Broker throttles are still active because we passed --preserve-throttles
-    waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
-    // Cancelling the reassignment again should reveal nothing to cancel.
-    assertEquals((Set(), Set()), runCancelAssignment(cluster.adminClient, 
assignment, false))
-    // This time, the broker throttles were removed.
-    waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
-    // Verify that there are no ongoing reassignments.
-    assertFalse(runVerifyAssignment(cluster.adminClient, assignment, 
false).partsOngoing)
-    // Verify that the partition is removed from cancelled replicas
-    verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
-    verifyReplicaDeleted(topicPartition = baz1, replicaId = 3)
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testCancellationWithAddingReplicaInIsr(quorum: String): Unit = {
-    val foo0 = new TopicPartition("foo", 0)
-
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    cluster.produceMessages(foo0.topic, foo0.partition, 200)
-
-    // The reassignment will bring replicas 3 and 4 into the replica set and 
remove 1 and 2.
-    val assignment = """{"version":1,"partitions":""" +
-      
"""[{"topic":"foo","partition":0,"replicas":[0,3,4],"log_dirs":["any","any","any"]}"""
 +
-      """]}"""
-
-    // We will throttle replica 4 so that only replica 3 joins the ISR
-    TestUtils.setReplicationThrottleForPartitions(
-      cluster.adminClient,
-      brokerIds = Seq(4),
-      partitions = Set(foo0),
-      throttleBytes = 1
-    )
-
-    // Execute the assignment and wait for replica 3 (only) to join the ISR
-    runExecuteAssignment(
-      cluster.adminClient,
-      additional = false,
-      reassignmentJson = assignment
-    )
-    TestUtils.waitUntilTrue(
-      () => TestUtils.currentIsr(cluster.adminClient, foo0) == Set(0, 1, 2, 3),
-      msg = "Timed out while waiting for replica 3 to join the ISR"
-    )
-
-    // Now cancel the assignment and verify that the partition is removed from 
cancelled replicas
-    assertEquals((Set(foo0), Set()), runCancelAssignment(cluster.adminClient, 
assignment, preserveThrottles = true))
-    verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
-    verifyReplicaDeleted(topicPartition = foo0, replicaId = 4)
-  }
-
-  private def verifyReplicaDeleted(
-    topicPartition: TopicPartition,
-    replicaId: Int
-  ): Unit = {
-    def isReplicaStoppedAndDeleted(): Boolean = {
-      val server = cluster.servers(replicaId)
-      val partition = server.replicaManager.getPartition(topicPartition)
-      val log = server.logManager.getLog(topicPartition)
-      partition == HostedPartition.None && log.isEmpty
-    }
-    TestUtils.waitUntilTrue(isReplicaStoppedAndDeleted,
-      msg = s"Timed out waiting for replica $replicaId of $topicPartition to 
be deleted")
-  }
-
-  private def waitForLogDirThrottle(throttledBrokers: Set[Int], 
logDirThrottle: Long): Unit = {
-    val throttledConfigMap = Map[String, Long](
-      brokerLevelLeaderThrottle -> -1,
-      brokerLevelFollowerThrottle -> -1,
-      brokerLevelLogDirThrottle -> logDirThrottle)
-    waitForBrokerThrottles(throttledBrokers, throttledConfigMap)
-  }
-
-  private def waitForInterBrokerThrottle(throttledBrokers: Set[Int], 
interBrokerThrottle: Long): Unit = {
-    val throttledConfigMap = Map[String, Long](
-      brokerLevelLeaderThrottle -> interBrokerThrottle,
-      brokerLevelFollowerThrottle -> interBrokerThrottle,
-      brokerLevelLogDirThrottle -> -1L)
-    waitForBrokerThrottles(throttledBrokers, throttledConfigMap)
-  }
-
-  private def waitForBrokerThrottles(throttledBrokers: Set[Int], 
throttleConfig: Map[String, Long]): Unit = {
-    val throttledBrokerConfigs = unthrottledBrokerConfigs.map { case 
(brokerId, unthrottledConfig) =>
-      val expectedThrottleConfig = if (throttledBrokers.contains(brokerId)) {
-        throttleConfig
-      } else {
-        unthrottledConfig
-      }
-      brokerId -> expectedThrottleConfig
-    }
-    waitForBrokerLevelThrottles(throttledBrokerConfigs)
-  }
-
-  private def waitForBrokerLevelThrottles(targetThrottles: Map[Int, 
Map[String, Long]]): Unit = {
-    var curThrottles: Map[Int, Map[String, Long]] = Map.empty
-    TestUtils.waitUntilTrue(() => {
-      curThrottles = describeBrokerLevelThrottles(targetThrottles.keySet.toSeq)
-      targetThrottles.equals(curThrottles)
-    }, s"timed out waiting for broker throttle to become ${targetThrottles}.  
" +
-      s"Latest throttles were ${curThrottles}", pause = 25)
-  }
-
-  /**
-   * Describe the broker-level throttles in the cluster.
-   *
-   * @return                A map whose keys are broker IDs and whose values 
are throttle
-   *                        information.  The nested maps are keyed on 
throttle name.
-   */
-  private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, 
Map[String, Long]] = {
-    brokerIds.map { brokerId =>
-      val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
brokerId.toString)
-      val brokerConfigs = 
cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
-        .get(brokerResource)
-        .get()
-
-      val throttles = brokerLevelThrottles.map { throttleName =>
-        val configValue = Option(brokerConfigs.get(throttleName))
-          .map(_.value)
-          .getOrElse("-1")
-        (throttleName, configValue.toLong)
-      }.toMap
-      brokerId -> throttles
-    }.toMap
-  }
-
-  /**
-   * Test moving partitions between directories.
-   */
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
-  def testLogDirReassignment(quorum: String): Unit = {
-    val topicPartition = new TopicPartition("foo", 0)
-
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    cluster.produceMessages(topicPartition.topic, topicPartition.partition, 
700)
-
-    val targetBrokerId = 0
-    val replicas = Seq(0, 1, 2)
-    val reassignment = buildLogDirReassignment(topicPartition, targetBrokerId, 
replicas)
-
-    // Start the replica move, but throttle it to be very slow so that it 
can't complete
-    // before our next checks happen.
-    val logDirThrottle = 1L
-    runExecuteAssignment(cluster.adminClient, additional = false, 
reassignment.json,
-      interBrokerThrottle = -1L, logDirThrottle)
-
-    // Check the output of --verify
-    waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
-      VerifyAssignmentResult(Map(
-        topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 
2), true)
-      ), false, Map(
-        new TopicPartitionReplica(topicPartition.topic, 
topicPartition.partition, 0) ->
-          ActiveMoveState(reassignment.currentDir, reassignment.targetDir, 
reassignment.targetDir)
-      ), true))
-    waitForLogDirThrottle(Set(0), logDirThrottle)
-
-    // Remove the throttle
-    cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(
-      new ConfigResource(ConfigResource.Type.BROKER, "0"),
-      Collections.singletonList(new AlterConfigOp(
-        new ConfigEntry(brokerLevelLogDirThrottle, ""), 
AlterConfigOp.OpType.DELETE))))
-      .all().get()
-    waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
-
-    // Wait for the directory movement to complete.
-    waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
-      VerifyAssignmentResult(Map(
-        topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 
2), true)
-      ), false, Map(
-        new TopicPartitionReplica(topicPartition.topic, 
topicPartition.partition, 0) ->
-          CompletedMoveState(reassignment.targetDir)
-      ), false))
-
-    val info1 = new BrokerDirs(cluster.adminClient.describeLogDirs(0.to(4).
-      map(_.asInstanceOf[Integer]).asJavaCollection), 0)
-    assertEquals(reassignment.targetDir, 
info1.curLogDirs.getOrElse(topicPartition, ""))
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
-  def testAlterLogDirReassignmentThrottle(quorum: String): Unit = {
-    val topicPartition = new TopicPartition("foo", 0)
-
-    cluster = new ReassignPartitionsTestCluster()
-    cluster.setup()
-    cluster.produceMessages(topicPartition.topic, topicPartition.partition, 
700)
-
-    val targetBrokerId = 0
-    val replicas = Seq(0, 1, 2)
-    val reassignment = buildLogDirReassignment(topicPartition, targetBrokerId, 
replicas)
-
-    // Start the replica move with a low throttle so it does not complete
-    val initialLogDirThrottle = 1L
-    runExecuteAssignment(cluster.adminClient, false, reassignment.json,
-      interBrokerThrottle = -1L, initialLogDirThrottle)
-    waitForLogDirThrottle(Set(0), initialLogDirThrottle)
-
-    // Now increase the throttle and verify that the log dir movement completes
-    val updatedLogDirThrottle = 3000000L
-    runExecuteAssignment(cluster.adminClient, additional = true, 
reassignment.json,
-      interBrokerThrottle = -1L, replicaAlterLogDirsThrottle = 
updatedLogDirThrottle)
-    waitForLogDirThrottle(Set(0), updatedLogDirThrottle)
-
-    waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
-      VerifyAssignmentResult(Map(
-        topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 
2), true)
-      ), false, Map(
-        new TopicPartitionReplica(topicPartition.topic, 
topicPartition.partition, targetBrokerId) ->
-          CompletedMoveState(reassignment.targetDir)
-      ), false))
-  }
-
-  case class LogDirReassignment(json: String, currentDir: String, targetDir: 
String)
-
-  private def buildLogDirReassignment(topicPartition: TopicPartition,
-                                      brokerId: Int,
-                                      replicas: Seq[Int]): LogDirReassignment 
= {
-
-    val describeLogDirsResult = cluster.adminClient.describeLogDirs(
-      0.to(4).map(_.asInstanceOf[Integer]).asJavaCollection)
-
-    val logDirInfo = new BrokerDirs(describeLogDirsResult, brokerId)
-    assertTrue(logDirInfo.futureLogDirs.isEmpty)
-
-    val currentDir = logDirInfo.curLogDirs(topicPartition)
-    val newDir = logDirInfo.logDirs.find(!_.equals(currentDir)).get
-
-    val logDirs = replicas.map { replicaId =>
-      if (replicaId == brokerId)
-        s""""$newDir""""
-      else
-        "\"any\""
-    }
-
-    val reassignmentJson =
-      s"""
-         | { "version": 1,
-         |  "partitions": [
-         |    {
-         |     "topic": "${topicPartition.topic}",
-         |     "partition": ${topicPartition.partition},
-         |     "replicas": [${replicas.mkString(",")}],
-         |     "log_dirs": [${logDirs.mkString(",")}]
-         |    }
-         |   ]
-         |  }
-         |""".stripMargin
-
-    LogDirReassignment(reassignmentJson, currentDir = currentDir, targetDir = 
newDir)
-  }
-
-  private def runVerifyAssignment(adminClient: Admin, jsonString: String,
-                                  preserveThrottles: Boolean) = {
-    println(s"==> verifyAssignment(adminClient, jsonString=${jsonString})")
-    verifyAssignment(adminClient, jsonString, preserveThrottles)
-  }
-
-  private def waitForVerifyAssignment(adminClient: Admin,
-                                      jsonString: String,
-                                      preserveThrottles: Boolean,
-                                      expectedResult: VerifyAssignmentResult): 
Unit = {
-    var latestResult: VerifyAssignmentResult = null
-    TestUtils.waitUntilTrue(
-      () => {
-        latestResult = runVerifyAssignment(adminClient, jsonString, 
preserveThrottles)
-        expectedResult.equals(latestResult)
-      }, s"Timed out waiting for verifyAssignment result ${expectedResult}.  " 
+
-        s"The latest result was ${latestResult}", pause = 10L)
-  }
-
-  private def runExecuteAssignment(adminClient: Admin,
-                                   additional: Boolean,
-                                   reassignmentJson: String,
-                                   interBrokerThrottle: Long = -1,
-                                   replicaAlterLogDirsThrottle: Long = -1) = {
-    println(s"==> executeAssignment(adminClient, additional=${additional}, " +
-      s"reassignmentJson=${reassignmentJson}, " +
-      s"interBrokerThrottle=${interBrokerThrottle}, " +
-      s"replicaAlterLogDirsThrottle=${replicaAlterLogDirsThrottle}))")
-    executeAssignment(adminClient, additional, reassignmentJson,
-      interBrokerThrottle, replicaAlterLogDirsThrottle)
-  }
-
-  private def runCancelAssignment(adminClient: Admin, jsonString: String,
-                                  preserveThrottles: Boolean) = {
-    println(s"==> cancelAssignment(adminClient, jsonString=${jsonString})")
-    cancelAssignment(adminClient, jsonString, preserveThrottles)
-  }
-
-  class BrokerDirs(result: DescribeLogDirsResult, val brokerId: Int) {
-    val logDirs = new mutable.HashSet[String]
-    val curLogDirs = new mutable.HashMap[TopicPartition, String]
-    val futureLogDirs = new mutable.HashMap[TopicPartition, String]
-    result.descriptions.get(brokerId).get().forEach {
-      case (logDirName, logDirInfo) => {
-        logDirs.add(logDirName)
-        logDirInfo.replicaInfos.forEach {
-          case (part, info) =>
-            if (info.isFuture) {
-              futureLogDirs.put(part, logDirName)
-            } else {
-              curLogDirs.put(part, logDirName)
-            }
-        }
-      }
-    }
-  }
-
-  class ReassignPartitionsTestCluster(
-    configOverrides: Map[String, String] = Map.empty,
-    brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
-  ) extends Closeable {
-    val brokers = Map(
-      0 -> "rack0",
-      1 -> "rack0",
-      2 -> "rack1",
-      3 -> "rack1",
-      4 -> "rack1"
-    )
-
-    val topics = Map(
-      "foo" -> Seq(Seq(0, 1, 2), Seq(1, 2, 3)),
-      "bar" -> Seq(Seq(3, 2, 1)),
-      "baz" -> Seq(Seq(1, 0, 2), Seq(2, 0, 1), Seq(0, 2, 1))
-    )
-
-    val brokerConfigs = brokers.map {
-      case (brokerId, rack) =>
-        val config = TestUtils.createBrokerConfig(
-          nodeId = brokerId,
-          zkConnect = zkConnectOrNull,
-          rack = Some(rack),
-          enableControlledShutdown = false, // shorten test time
-          logDirCount = 3)
-        // shorter backoff to reduce test durations when no active partitions 
are eligible for fetching due to throttling
-        config.setProperty(KafkaConfig.ReplicaFetchBackoffMsProp, "100")
-        // Don't move partition leaders automatically.
-        config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
-        config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000")
-        configOverrides.forKeyValue(config.setProperty)
-
-        brokerConfigOverrides.get(brokerId).foreach { overrides =>
-          overrides.forKeyValue(config.setProperty)
-        }
-
-        new KafkaConfig(config)
-    }.toBuffer
-
-    var servers = new mutable.ArrayBuffer[KafkaBroker]
-
-    var brokerList: String = _
-
-    var adminClient: Admin = _
-
-    def setup(): Unit = {
-      createServers()
-      createTopics()
-    }
-
-    def createServers(): Unit = {
-      brokers.keySet.foreach { brokerId =>
-        servers += createBroker(brokerConfigs(brokerId))
-      }
-    }
-
-    def createTopics(): Unit = {
-      TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
-      brokerList = TestUtils.plaintextBootstrapServers(servers)
-      adminClient = Admin.create(Map[String, Object](
-        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
-      ).asJava)
-      adminClient.createTopics(topics.map {
-        case (topicName, parts) =>
-          val partMap = new HashMap[Integer, List[Integer]]()
-          parts.zipWithIndex.foreach {
-            case (part, index) => partMap.put(index, 
part.map(Integer.valueOf).asJava)
-          }
-          new NewTopic(topicName, partMap)
-      }.toList.asJava).all().get()
-      topics.foreach {
-        case (topicName, parts) =>
-          TestUtils.waitForAllPartitionsMetadata(servers, topicName, 
parts.size)
-      }
-
-      if (isKRaftTest()) {
-        TestUtils.ensureConsistentKRaftMetadata(
-          cluster.servers,
-          controllerServer
-        )
-      }
-    }
-
-    def produceMessages(topic: String, partition: Int, numMessages: Int): Unit 
= {
-      val records = (0 until numMessages).map(_ =>
-        new ProducerRecord[Array[Byte], Array[Byte]](topic, partition,
-          null, new Array[Byte](10000)))
-      TestUtils.produceMessages(servers, records, -1)
-    }
-
-    override def close(): Unit = {
-      brokerList = null
-      Utils.closeQuietly(adminClient, "adminClient")
-      adminClient = null
-      try {
-        TestUtils.shutdownServers(servers)
-      } finally {
-        servers.clear()
-      }
-    }
-  }
-}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
index c20cfd1029c..4812f1f19e9 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
@@ -46,7 +46,7 @@ public final class VerifyAssignmentResult {
     public VerifyAssignmentResult(
         Map<TopicPartition, PartitionReassignmentState> partStates,
         boolean partsOngoing,
-        Map<org.apache.kafka.common.TopicPartitionReplica, LogDirMoveState> 
moveStates,
+        Map<TopicPartitionReplica, LogDirMoveState> moveStates,
         boolean movesOngoing
     ) {
         this.partStates = partStates;
diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java 
b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
index 9e9ec94e968..36e4e12d816 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.tools;
 
+import kafka.utils.TestInfoUtils;
 import org.apache.kafka.common.utils.Exit;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 
 public class ToolsTestUtils {
+    /** @see TestInfoUtils#TestWithParameterizedQuorumName()  */
+    public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = 
"{displayName}.quorum={0}";
+
     public static String captureStandardOut(Runnable runnable) {
         return captureStandardStream(false, runnable);
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java
new file mode 100644
index 00000000000..896e4f74ffd
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.reassign;
+
+import kafka.admin.ReassignPartitionsCommand;
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.server.HostedPartition;
+import kafka.server.IsrChangePropagationConfig;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.server.ZkAlterPartitionManager;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.None$;
+import scala.Option;
+import scala.Some$;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
kafka.admin.ReassignPartitionsCommand.brokerLevelFollowerThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelLeaderThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelLogDirThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelThrottles;
+import static kafka.admin.ReassignPartitionsCommand.cancelAssignment;
+import static kafka.admin.ReassignPartitionsCommand.executeAssignment;
+import static kafka.admin.ReassignPartitionsCommand.verifyAssignment;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("ClassFanOutComplexity")
+@Timeout(300)
+public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
+    ReassignPartitionsTestCluster cluster;
+
+    @AfterEach
+    @Override
+    public void tearDown() {
+        Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster");
+        super.tearDown();
+    }
+
+    private final Map<Integer, Map<String, Long>> unthrottledBrokerConfigs = 
new HashMap<>(); {
+        IntStream.range(0, 4).forEach(brokerId -> {
+            Map<String, Long> brokerConfig = new HashMap<>();
+
+            brokerLevelThrottles().foreach(throttle -> {
+                brokerConfig.put(throttle, -1L);
+                return null;
+            });
+
+            unthrottledBrokerConfigs.put(brokerId, brokerConfig);
+        });
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testReassignment(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        executeAndVerifyReassignment();
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+    public void testReassignmentWithAlterPartitionDisabled(String quorum) 
throws Exception {
+        // Test reassignment when the IBP is on an older version which does 
not use
+        // the `AlterPartition` API. In this case, the controller will 
register individual
+        // watches for each reassigning partition so that the reassignment can 
be
+        // completed as soon as the ISR is expanded.
+        Map<String, String> configOverrides = 
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), 
IBP_2_7_IV1.version());
+        cluster = new ReassignPartitionsTestCluster(configOverrides, 
Collections.emptyMap());
+        cluster.setup();
+        executeAndVerifyReassignment();
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+    public void testReassignmentCompletionDuringPartialUpgrade(String quorum) 
throws Exception {
+        // Test reassignment during a partial upgrade when some brokers are 
relying on
+        // `AlterPartition` and some rely on the old notification logic 
through Zookeeper.
+        // In this test case, broker 0 starts up first on the latest IBP and 
is typically
+        // elected as controller. The three remaining brokers start up on the 
older IBP.
+        // We want to ensure that reassignment can still complete through the 
ISR change
+        // notification path even though the controller expects 
`AlterPartition`.
+
+        // Override change notification settings so that test is not delayed 
by ISR
+        // change notification delay
+        ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new 
IsrChangePropagationConfig(500, 100, 500));
+
+        Map<String, String> oldIbpConfig = 
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), 
IBP_2_7_IV1.version());
+        Map<Integer, Map<String, String>> brokerConfigOverrides = new 
HashMap<>();
+        brokerConfigOverrides.put(1, oldIbpConfig);
+        brokerConfigOverrides.put(2, oldIbpConfig);
+        brokerConfigOverrides.put(3, oldIbpConfig);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
brokerConfigOverrides);
+        cluster.setup();
+
+        executeAndVerifyReassignment();
+    }
+
+    private void executeAndVerifyReassignment() throws ExecutionException, 
InterruptedException {
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        TopicPartition bar0 = new TopicPartition("bar", 0);
+
+        // Check that the assignment has not yet been started yet.
+        Map<TopicPartition, PartitionReassignmentState> initialAssignment = 
new HashMap<>();
+
+        initialAssignment.put(foo0, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3), 
true));
+        initialAssignment.put(bar0, new 
PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0), 
true));
+
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(initialAssignment));
+
+        // Execute the assignment
+        runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+        assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(foo0, new 
PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3), 
true));
+        finalAssignment.put(bar0, new 
PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0), 
true));
+
+        kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, 
false);
+        assertFalse(verifyAssignmentResult.movesOngoing());
+
+        // Wait for the assignment to complete
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+
+        assertEquals(unthrottledBrokerConfigs,
+            describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+        // Verify that partitions are removed from brokers no longer assigned
+        verifyReplicaDeleted(foo0, 2);
+        verifyReplicaDeleted(bar0, 1);
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testHighWaterMarkAfterPartitionReassignment(String quorum) 
throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Set the high water mark of foo-0 to 123 on its leader.
+        TopicPartition part = new TopicPartition("foo", 0);
+        
cluster.servers.get(0).replicaManager().logManager().truncateFullyAndStartAt(part,
 123L, false, None$.empty());
+
+        // Execute the assignment
+        runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
Collections.singletonMap(part,
+            new PartitionReassignmentState(Arrays.asList(3, 1, 2), 
Arrays.asList(3, 1, 2), true));
+
+        // Wait for the assignment to complete
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+
+        TestUtils.waitUntilTrue(() ->
+                cluster.servers.get(3).replicaManager().onlinePartition(part).
+                    map(Partition::leaderLogIfLocal).isDefined(),
+            () -> "broker 3 should be the new leader", DEFAULT_MAX_WAIT_MS, 
10L);
+        assertEquals(123L, 
cluster.servers.get(3).replicaManager().localLogOrException(part).highWatermark(),
+            "Expected broker 3 to have the correct high water mark for the 
partition.");
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testAlterReassignmentThrottle(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("foo", 0, 50);
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Execute the assignment with a low throttle
+        long initialThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
initialThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), initialThrottle);
+
+        // Now update the throttle and verify the reassignment completes
+        long updatedThrottle = 300000L;
+        runExecuteAssignment(cluster.adminClient, true, assignment, 
updatedThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), updatedThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 3, 2), 
Arrays.asList(0, 3, 2), true));
+        finalAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+
+        // Now remove the throttles.
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+    }
+
+    /**
+     * Test running a reassignment with the interBrokerThrottle set.
+     */
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testThrottledReassignment(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("foo", 0, 50);
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Check that the assignment has not yet been started yet.
+        Map<TopicPartition, PartitionReassignmentState> initialAssignment = 
new HashMap<>();
+        initialAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 1, 2), 
Arrays.asList(0, 3, 2), true));
+        initialAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(0, 2, 1), 
Arrays.asList(3, 2, 1), true));
+        assertEquals(asScala(new VerifyAssignmentResult(initialAssignment)), 
runVerifyAssignment(cluster.adminClient, assignment, false));
+        assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+        // Execute the assignment
+        long interBrokerThrottle = 300000L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
interBrokerThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 3, 2), 
Arrays.asList(0, 3, 2), true));
+        finalAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+
+        // Wait for the assignment to complete
+        TestUtils.waitUntilTrue(
+            () -> {
+                // Check the reassignment status.
+                kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
result = runVerifyAssignment(cluster.adminClient, assignment, true);
+
+                if (!result.partsOngoing()) {
+                    return true;
+                } else {
+                    assertFalse(
+                        
result.partStates().values().forall(ReassignPartitionsCommand.PartitionReassignmentState::done),
+                        "Expected at least one partition reassignment to be 
ongoing when result = " + result
+                    );
+                    assertEquals(seq(0, 3, 2), result.partStates().get(new 
TopicPartition("foo", 0)).get().targetReplicas());
+                    assertEquals(seq(3, 2, 1), result.partStates().get(new 
TopicPartition("baz", 2)).get().targetReplicas());
+                    System.out.println("Current result: " + result);
+                    waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+                    return false;
+                }
+            }, () -> "Expected reassignment to complete.", 
DEFAULT_MAX_WAIT_MS, 100L);
+        waitForVerifyAssignment(cluster.adminClient, assignment, true,
+            new VerifyAssignmentResult(finalAssignment));
+        // The throttles should still have been preserved, since we ran with 
--preserve-throttles
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+        // Now remove the throttles.
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testProduceAndConsumeWithReassignmentInProgress(String quorum) 
throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+        runExecuteAssignment(cluster.adminClient, false, assignment, 300L, 
-1L);
+        cluster.produceMessages("baz", 2, 100);
+        Consumer<byte[], byte[]> consumer = 
TestUtils.createConsumer(cluster.brokerList,
+            "group",
+            "earliest",
+            true,
+            false,
+            500,
+            SecurityProtocol.PLAINTEXT,
+            None$.empty(),
+            None$.empty(),
+            new ByteArrayDeserializer(),
+            new ByteArrayDeserializer()
+        );
+
+        TopicPartition part = new TopicPartition("baz", 2);
+        try {
+            consumer.assign(Collections.singleton(part));
+            TestUtils.pollUntilAtLeastNumRecords(consumer, 100, 
DEFAULT_MAX_WAIT_MS);
+        } finally {
+            consumer.close();
+        }
+        TestUtils.removeReplicationThrottleForPartitions(cluster.adminClient, 
seq(0, 1, 2, 3), set(part));
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
Collections.singletonMap(part,
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+    }
+
+    /**
+     * Test running a reassignment and then cancelling it.
+     */
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testCancellation(String quorum) throws Exception {
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        TopicPartition baz1 = new TopicPartition("baz", 1);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(foo0.topic(), foo0.partition(), 200);
+        cluster.produceMessages(baz1.topic(), baz1.partition(), 200);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+        assertEquals(unthrottledBrokerConfigs,
+            describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+        long interBrokerThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
interBrokerThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> partStates = new 
HashMap<>();
+
+        partStates.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 
1, 3, 2), Arrays.asList(0, 1, 3), false));
+        partStates.put(baz1, new PartitionReassignmentState(Arrays.asList(0, 
2, 3, 1), Arrays.asList(0, 2, 3), false));
+
+        // Verify that the reassignment is running.  The very low throttle 
should keep it
+        // from completing before this runs.
+        waitForVerifyAssignment(cluster.adminClient, assignment, true,
+            new VerifyAssignmentResult(partStates, true, 
Collections.emptyMap(), false));
+        // Cancel the reassignment.
+        assertEquals(new Tuple2<>(set(foo0, baz1), set()), 
runCancelAssignment(cluster.adminClient, assignment, true));
+        // Broker throttles are still active because we passed 
--preserve-throttles
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+        // Cancelling the reassignment again should reveal nothing to cancel.
+        assertEquals(new Tuple2<>(set(), set()), 
runCancelAssignment(cluster.adminClient, assignment, false));
+        // This time, the broker throttles were removed.
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+        // Verify that there are no ongoing reassignments.
+        assertFalse(runVerifyAssignment(cluster.adminClient, assignment, 
false).partsOngoing());
+        // Verify that the partition is removed from cancelled replicas
+        verifyReplicaDeleted(foo0, 3);
+        verifyReplicaDeleted(baz1, 3);
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testCancellationWithAddingReplicaInIsr(String quorum) throws 
Exception {
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(foo0.topic(), foo0.partition(), 200);
+
+        // The reassignment will bring replicas 3 and 4 into the replica set 
and remove 1 and 2.
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // We will throttle replica 4 so that only replica 3 joins the ISR
+        TestUtils.setReplicationThrottleForPartitions(
+            cluster.adminClient,
+            seq(4),
+            set(foo0),
+            1
+        );
+
+        // Execute the assignment and wait for replica 3 (only) to join the ISR
+        runExecuteAssignment(
+            cluster.adminClient,
+            false,
+            assignment,
+            -1L,
+            -1L
+        );
+        TestUtils.waitUntilTrue(
+            () -> Objects.equals(TestUtils.currentIsr(cluster.adminClient, 
foo0), set(0, 1, 2, 3)),
+            () -> "Timed out while waiting for replica 3 to join the ISR",
+            DEFAULT_MAX_WAIT_MS, 100L
+        );
+
+        // Now cancel the assignment and verify that the partition is removed 
from cancelled replicas
+        assertEquals(new Tuple2<>(set(foo0), set()), 
runCancelAssignment(cluster.adminClient, assignment, true));
+        verifyReplicaDeleted(foo0, 3);
+        verifyReplicaDeleted(foo0, 4);
+    }
+
+    private void verifyReplicaDeleted(
+        TopicPartition topicPartition,
+        Integer replicaId
+    ) {
+        TestUtils.waitUntilTrue(
+            () -> {
+                KafkaBroker server = cluster.servers.get(replicaId);
+                HostedPartition partition = 
server.replicaManager().getPartition(topicPartition);
+                Option<UnifiedLog> log = 
server.logManager().getLog(topicPartition, false);
+                return partition == HostedPartition.None$.MODULE$ && 
log.isEmpty();
+            },
+            () -> "Timed out waiting for replica " + replicaId + " of " + 
topicPartition + " to be deleted",
+            DEFAULT_MAX_WAIT_MS,
+            100L
+        );
+    }
+
+    private void waitForLogDirThrottle(Set<Integer> throttledBrokers, Long 
logDirThrottle) {
+        Map<String, Long> throttledConfigMap = new HashMap<>();
+        throttledConfigMap.put(brokerLevelLeaderThrottle(), -1L);
+        throttledConfigMap.put(brokerLevelFollowerThrottle(), -1L);
+        throttledConfigMap.put(brokerLevelLogDirThrottle(), logDirThrottle);
+        waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
+    }
+
+    private void waitForInterBrokerThrottle(List<Integer> throttledBrokers, 
Long interBrokerThrottle) {
+        Map<String, Long> throttledConfigMap = new HashMap<>();
+        throttledConfigMap.put(brokerLevelLeaderThrottle(), 
interBrokerThrottle);
+        throttledConfigMap.put(brokerLevelFollowerThrottle(), 
interBrokerThrottle);
+        throttledConfigMap.put(brokerLevelLogDirThrottle(), -1L);
+        waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
+    }
+
+    private void waitForBrokerThrottles(Collection<Integer> throttledBrokers, 
Map<String, Long> throttleConfig) {
+        Map<Integer, Map<String, Long>> throttledBrokerConfigs = new 
HashMap<>();
+        unthrottledBrokerConfigs.forEach((brokerId, unthrottledConfig) -> {
+            Map<String, Long> expectedThrottleConfig = 
throttledBrokers.contains(brokerId)
+                ? throttleConfig
+                : unthrottledConfig;
+            throttledBrokerConfigs.put(brokerId, expectedThrottleConfig);
+        });
+        waitForBrokerLevelThrottles(throttledBrokerConfigs);
+    }
+
+    private void waitForBrokerLevelThrottles(Map<Integer, Map<String, Long>> 
targetThrottles) {
+        AtomicReference<Map<Integer, Map<String, Long>>> curThrottles = new 
AtomicReference<>(new HashMap<>());
+        TestUtils.waitUntilTrue(() -> {
+            try {
+                
curThrottles.set(describeBrokerLevelThrottles(targetThrottles.keySet()));
+                return targetThrottles.equals(curThrottles.get());
+            } catch (ExecutionException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }, () -> "timed out waiting for broker throttle to become " + 
targetThrottles + ".  " +
+            "Latest throttles were " + curThrottles.get(), 
DEFAULT_MAX_WAIT_MS, 25);
+    }
+
+    /**
+     * Describe the broker-level throttles in the cluster.
+     *
+     * @return                A map whose keys are broker IDs and whose values 
are throttle
+     *                        information.  The nested maps are keyed on 
throttle name.
+     */
+    private Map<Integer, Map<String, Long>> 
describeBrokerLevelThrottles(Collection<Integer> brokerIds) throws 
ExecutionException, InterruptedException {
+        Map<Integer, Map<String, Long>> results = new HashMap<>();
+        for (Integer brokerId : brokerIds) {
+            ConfigResource brokerResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.toString());
+            Config brokerConfigs = 
cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
+                .get(brokerResource)
+                .get();
+
+            Map<String, Long> throttles = new HashMap<>();
+            brokerLevelThrottles().foreach(throttleName -> {
+                String configValue = 
Optional.ofNullable(brokerConfigs.get(throttleName)).map(ConfigEntry::value).orElse("-1");
+                throttles.put(throttleName, Long.parseLong(configValue));
+                return null;
+            });
+            results.put(brokerId, throttles);
+        }
+        return results;
+    }
+
+    /**
+     * Test moving partitions between directories.
+     */
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
+    public void testLogDirReassignment(String quorum) throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 0);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(topicPartition.topic(), 
topicPartition.partition(), 700);
+
+        int targetBrokerId = 0;
+        List<Integer> replicas = Arrays.asList(0, 1, 2);
+        LogDirReassignment reassignment = 
buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
+
+        // Start the replica move, but throttle it to be very slow so that it 
can't complete
+        // before our next checks happen.
+        long logDirThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, reassignment.json,
+            -1L, logDirThrottle);
+
+        // Check the output of --verify
+        waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+            new VerifyAssignmentResult(Collections.singletonMap(
+                topicPartition, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+            ), false, Collections.singletonMap(
+                new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), 0),
+                new ActiveMoveState(reassignment.currentDir, 
reassignment.targetDir, reassignment.targetDir)
+            ), true));
+        waitForLogDirThrottle(Collections.singleton(0), logDirThrottle);
+
+        // Remove the throttle
+        cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(
+                new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                Collections.singletonList(new AlterConfigOp(
+                    new ConfigEntry(brokerLevelLogDirThrottle(), ""), 
AlterConfigOp.OpType.DELETE))))
+            .all().get();
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+
+        // Wait for the directory movement to complete.
+        waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+            new VerifyAssignmentResult(Collections.singletonMap(
+                topicPartition, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+            ), false, Collections.singletonMap(
+                new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), 0),
+                new CompletedMoveState(reassignment.targetDir)
+            ), false));
+
+        BrokerDirs info1 = new 
BrokerDirs(cluster.adminClient.describeLogDirs(IntStream.range(0, 
4).boxed().collect(Collectors.toList())), 0);
+        assertEquals(reassignment.targetDir, 
info1.curLogDirs.getOrDefault(topicPartition, ""));
+    }
+
+    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
+    public void testAlterLogDirReassignmentThrottle(String quorum) throws 
Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 0);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(topicPartition.topic(), 
topicPartition.partition(), 700);
+
+        int targetBrokerId = 0;
+        List<Integer> replicas = Arrays.asList(0, 1, 2);
+        LogDirReassignment reassignment = 
buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
+
+        // Start the replica move with a low throttle so it does not complete
+        long initialLogDirThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, reassignment.json,
+            -1L, initialLogDirThrottle);
+        waitForLogDirThrottle(new HashSet<>(Collections.singletonList(0)), 
initialLogDirThrottle);
+
+        // Now increase the throttle and verify that the log dir movement 
completes
+        long updatedLogDirThrottle = 3000000L;
+        runExecuteAssignment(cluster.adminClient, true, reassignment.json,
+            -1L, updatedLogDirThrottle);
+        waitForLogDirThrottle(Collections.singleton(0), updatedLogDirThrottle);
+
+        waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+            new VerifyAssignmentResult(Collections.singletonMap(
+                topicPartition, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+            ), false, Collections.singletonMap(
+                new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), targetBrokerId),
+                new CompletedMoveState(reassignment.targetDir)
+            ), false));
+    }
+
+    static class LogDirReassignment {
+        final String json;
+        final String currentDir;
+        final String targetDir;
+
+        public LogDirReassignment(String json, String currentDir, String 
targetDir) {
+            this.json = json;
+            this.currentDir = currentDir;
+            this.targetDir = targetDir;
+        }
+    }
+
+    private LogDirReassignment buildLogDirReassignment(TopicPartition 
topicPartition,
+                                                       int brokerId,
+                                                       List<Integer> replicas) 
throws ExecutionException, InterruptedException {
+
+        DescribeLogDirsResult describeLogDirsResult = 
cluster.adminClient.describeLogDirs(
+            IntStream.range(0, 4).boxed().collect(Collectors.toList()));
+
+        BrokerDirs logDirInfo = new BrokerDirs(describeLogDirsResult, 
brokerId);
+        assertTrue(logDirInfo.futureLogDirs.isEmpty());
+
+        String currentDir = logDirInfo.curLogDirs.get(topicPartition);
+        String newDir = logDirInfo.logDirs.stream().filter(dir -> 
!dir.equals(currentDir)).findFirst().get();
+
+        List<String> logDirs = replicas.stream().map(replicaId -> {
+            if (replicaId == brokerId)
+                return "\"" + newDir + "\"";
+            else
+                return "\"any\"";
+        }).collect(Collectors.toList());
+
+        String reassignmentJson =
+            " { \"version\": 1," +
+                "  \"partitions\": [" +
+                "    {" +
+                "     \"topic\": \"" + topicPartition.topic() + "\"," +
+                "     \"partition\": " + topicPartition.partition() + "," +
+                "     \"replicas\": [" + 
replicas.stream().map(Object::toString).collect(Collectors.joining(",")) + "]," 
+
+                "     \"log_dirs\": [" + String.join(",", logDirs) + "]" +
+                "    }" +
+                "   ]" +
+                "  }";
+
+        return new LogDirReassignment(reassignmentJson, currentDir, newDir);
+    }
+
+
+
+    private kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
runVerifyAssignment(Admin adminClient, String jsonString,
+                                                       Boolean 
preserveThrottles) {
+        System.out.println("==> verifyAssignment(adminClient, jsonString=" + 
jsonString);
+        return verifyAssignment(adminClient, jsonString, preserveThrottles);
+    }
+
+    private void waitForVerifyAssignment(Admin adminClient,
+                                         String jsonString,
+                                         Boolean preserveThrottles,
+                                         VerifyAssignmentResult 
expectedResult) {
+        final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
expectedResult0 = asScala(expectedResult);
+        final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult[] 
latestResult = {null};
+        TestUtils.waitUntilTrue(
+            () -> {
+                latestResult[0] = runVerifyAssignment(adminClient, jsonString, 
preserveThrottles);
+                return expectedResult0.equals(latestResult[0]);
+            }, () -> "Timed out waiting for verifyAssignment result " + 
expectedResult + ".  " +
+                "The latest result was " + latestResult[0], 
DEFAULT_MAX_WAIT_MS, 10L);
+    }
+
+    private void runExecuteAssignment(Admin adminClient,
+                                      Boolean additional,
+                                      String reassignmentJson,
+                                      Long interBrokerThrottle,
+                                      Long replicaAlterLogDirsThrottle) {
+        System.out.println("==> executeAssignment(adminClient, additional=" + 
additional + ", " +
+            "reassignmentJson=" + reassignmentJson + ", " +
+            "interBrokerThrottle=" + interBrokerThrottle + ", " +
+            "replicaAlterLogDirsThrottle=" + replicaAlterLogDirsThrottle + 
"))");
+        executeAssignment(adminClient, additional, reassignmentJson,
+            interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, 
Time.SYSTEM);
+    }
+
+    private Tuple2<scala.collection.immutable.Set<TopicPartition>, 
scala.collection.immutable.Set<TopicPartitionReplica>> runCancelAssignment(
+        Admin adminClient,
+        String jsonString,
+        Boolean preserveThrottles
+    ) {
+        System.out.println("==> cancelAssignment(adminClient, jsonString=" + 
jsonString);
+        return cancelAssignment(adminClient, jsonString, preserveThrottles, 
10000L, Time.SYSTEM);
+    }
+
+    static class BrokerDirs {
+        final DescribeLogDirsResult result;
+        final int brokerId;
+
+        final Set<String> logDirs = new HashSet<>();
+        final Map<TopicPartition, String> curLogDirs = new HashMap<>();
+        final Map<TopicPartition, String> futureLogDirs = new HashMap<>();
+
+        public BrokerDirs(DescribeLogDirsResult result, int brokerId) throws 
ExecutionException, InterruptedException {
+            this.result = result;
+            this.brokerId = brokerId;
+
+            result.descriptions().get(brokerId).get().forEach((logDirName, 
logDirInfo) -> {
+                logDirs.add(logDirName);
+                logDirInfo.replicaInfos().forEach((part, info) -> {
+                    if (info.isFuture()) {
+                        futureLogDirs.put(part, logDirName);
+                    } else {
+                        curLogDirs.put(part, logDirName);
+                    }
+                });
+            });
+        }
+    }
+
+    class ReassignPartitionsTestCluster implements Closeable {
+        private final List<KafkaConfig> brokerConfigs = new ArrayList<>();
+
+        private final Map<Integer, String> brokers = new HashMap<>(); {
+            brokers.put(0, "rack0");
+            brokers.put(1, "rack0");
+            brokers.put(2, "rack1");
+            brokers.put(3, "rack1");
+            brokers.put(4, "rack1");
+        }
+
+        private final Map<String, List<List<Integer>>> topics = new 
HashMap<>(); {
+            topics.put("foo", Arrays.asList(Arrays.asList(0, 1, 2), 
Arrays.asList(1, 2, 3)));
+            topics.put("bar", Arrays.asList(Arrays.asList(3, 2, 1)));
+            topics.put("baz", Arrays.asList(Arrays.asList(1, 0, 2), 
Arrays.asList(2, 0, 1), Arrays.asList(0, 2, 1)));
+        }
+
+        private final List<KafkaBroker> servers = new ArrayList<>();
+
+        private String brokerList;
+
+        private Admin adminClient;
+
+        public ReassignPartitionsTestCluster(Map<String, String> 
configOverrides, Map<Integer, Map<String, String>> brokerConfigOverrides) {
+            brokers.forEach((brokerId, rack) -> {
+                Properties config = TestUtils.createBrokerConfig(
+                    brokerId,
+                    zkConnectOrNull(),
+                    false, // shorten test time
+                    true,
+                    TestUtils.RandomPort(),
+                    scala.None$.empty(),
+                    scala.None$.empty(),
+                    scala.None$.empty(),
+                    true,
+                    false,
+                    TestUtils.RandomPort(),
+                    false,
+                    TestUtils.RandomPort(),
+                    false,
+                    TestUtils.RandomPort(),
+                    Some$.MODULE$.apply(rack),
+                    3,
+                    false,
+                    1,
+                    (short) 1,
+                    false);
+                // shorter backoff to reduce test durations when no active 
partitions are eligible for fetching due to throttling
+                config.setProperty(KafkaConfig.ReplicaFetchBackoffMsProp(), 
"100");
+                // Don't move partition leaders automatically.
+                
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+                config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp(), 
"1000");
+                configOverrides.forEach(config::setProperty);
+                brokerConfigOverrides.getOrDefault(brokerId, 
Collections.emptyMap()).forEach(config::setProperty);
+
+                brokerConfigs.add(new KafkaConfig(config));
+            });
+        }
+
+        public void setup() throws ExecutionException, InterruptedException {
+            createServers();
+            createTopics();
+        }
+
+        public void createServers() {
+            brokers.keySet().forEach(brokerId ->
+                servers.add(createBroker(brokerConfigs.get(brokerId), 
Time.SYSTEM, true, scala.None$.empty()))
+            );
+        }
+
+        public void createTopics() throws ExecutionException, 
InterruptedException {
+            TestUtils.waitUntilBrokerMetadataIsPropagated(seq(servers), 
DEFAULT_MAX_WAIT_MS);
+            brokerList = TestUtils.plaintextBootstrapServers(seq(servers));
+
+            adminClient = 
Admin.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
 brokerList));
+
+            adminClient.createTopics(topics.entrySet().stream().map(e -> {
+                Map<Integer, List<Integer>> partMap = new HashMap<>();
+
+                Iterator<List<Integer>> partsIter = e.getValue().iterator();
+                int index = 0;
+                while (partsIter.hasNext()) {
+                    partMap.put(index, partsIter.next());
+                    index++;
+                }
+                return new NewTopic(e.getKey(), partMap);
+            }).collect(Collectors.toList())).all().get();
+            topics.forEach((topicName, parts) -> 
TestUtils.waitForAllPartitionsMetadata(seq(servers), topicName, parts.size()));
+
+            if (isKRaftTest()) {
+                TestUtils.ensureConsistentKRaftMetadata(
+                    seq(cluster.servers),
+                    controllerServer(),
+                    "Timeout waiting for controller metadata propagating to 
brokers"
+                );
+            }
+        }
+
+        public void produceMessages(String topic, int partition, int 
numMessages) {
+            List<ProducerRecord<byte[], byte[]>> records = IntStream.range(0, 
numMessages).mapToObj(i ->
+                new ProducerRecord<byte[], byte[]>(topic, partition,
+                    null, new byte[10000])).collect(Collectors.toList());
+            TestUtils.produceMessages(seq(servers), seq(records), -1);
+        }
+
+        @Override
+        public void close() {
+            brokerList = null;
+            Utils.closeQuietly(adminClient, "adminClient");
+            adminClient = null;
+            try {
+                TestUtils.shutdownServers(seq(servers), true);
+            } finally {
+                servers.clear();
+            }
+        }
+    }
+
+    private ReassignPartitionsCommand.VerifyAssignmentResult 
asScala(VerifyAssignmentResult res) {
+        Map<TopicPartition, 
ReassignPartitionsCommand.PartitionReassignmentState> partStates = new 
HashMap<>();
+        res.partStates.forEach((tp, state) -> partStates.put(tp, 
asScala(state)));
+
+        Map<TopicPartitionReplica, ReassignPartitionsCommand.LogDirMoveState> 
moveStates = new HashMap<>();
+        res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, 
asScala(state)));
+
+        return new 
ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), 
res.partsOngoing, asScala(moveStates), res.movesOngoing);
+    }
+
+    @SuppressWarnings({"unchecked"})
+    private ReassignPartitionsCommand.PartitionReassignmentState 
asScala(PartitionReassignmentState state) {
+        return new ReassignPartitionsCommand.PartitionReassignmentState(
+            seq((List) state.currentReplicas),
+            seq((List) state.targetReplicas),
+            state.done
+        );
+    }
+
+    private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState 
state) {
+        if (state instanceof ActiveMoveState) {
+            ActiveMoveState s = (ActiveMoveState) state;
+            return new 
ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, 
s.futureLogDir);
+        } else if (state instanceof CancelledMoveState) {
+            CancelledMoveState s = (CancelledMoveState) state;
+            return new 
ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir);
+        } else if (state instanceof CompletedMoveState) {
+            CompletedMoveState s = (CompletedMoveState) state;
+            return new 
ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir);
+        } else if (state instanceof MissingLogDirMoveState) {
+            MissingLogDirMoveState s = (MissingLogDirMoveState) state;
+            return new 
ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir);
+        } else if (state instanceof MissingReplicaMoveState) {
+            MissingReplicaMoveState s = (MissingReplicaMoveState) state;
+            return new 
ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir);
+        }
+
+        throw new IllegalArgumentException("Unknown state " + state);
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> scala.collection.immutable.Set<T> set(final T... set) {
+        return mutableSet(set).toSet();
+    }
+
+    @SuppressWarnings({"deprecation", "unchecked"})
+    private static <T> scala.collection.mutable.Set<T> mutableSet(final 
T...set) {
+        return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
+    }
+
+    @SuppressWarnings({"unchecked"})
+    private static <T> Seq<T> seq(T... seq) {
+        return seq(Arrays.asList(seq));
+    }
+
+    @SuppressWarnings({"deprecation"})
+    private static <T> Seq<T> seq(Collection<T> seq) {
+        return 
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
+    }
+
+    @SuppressWarnings("deprecation")
+    private static <K, V> scala.collection.Map<K, V> asScala(Map<K, V> jmap) {
+        return JavaConverters.mapAsScalaMap(jmap);
+    }
+}

Reply via email to