This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47bb46c10dc KAFKA-19582 the current assignments shown by
ReassignPartitionsCommand should include the log directories (#20319)
47bb46c10dc is described below
commit 47bb46c10dc82dc6ba584ca1659400d899f92893
Author: Ken Huang <[email protected]>
AuthorDate: Sat Aug 23 02:57:00 2025 +0800
KAFKA-19582 the current assignments shown by ReassignPartitionsCommand
should include the log directories (#20319)
The ReassignPartitionsCommand shows the topic replicas on each broker.
When using the --generate command, it returns the current partition
replica assignment. However, the log directory for each current replica
is always shown as any. This makes it impossible for users to determine
which specific log directory is being used by each replica. Therefore,
we should fix this behavior.
```
Current partition replica assignment
{
"version": 1,
"partitions": [
{
"topic": "test1",
"partition": 0,
"replicas": [
4,
2
],
"log_dirs": [
"any",
"any"
]
}
]
}
```
This PR
```
Current partition replica assignment
{
"version": 1,
"partitions": [
{
"topic": "test1",
"partition": 0,
"replicas": [
4,
2
],
"log_dirs": [
"/tmp/kraft-broker-logs234",
"/tmp/kraft-broker-logs"
]
}
]
}
```
Reviewers: PoAn Yang <[email protected]>, Jhen-Yung Hsu
<[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../tools/reassign/ReassignPartitionsCommand.java | 36 ++++++--
.../tools/reassign/ReassignPartitionsUnitTest.java | 100 ++++++++++++++++-----
2 files changed, 109 insertions(+), 27 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 4628c34fe18..8e1f6114c24 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -566,10 +566,11 @@ public class ReassignPartitionsCommand {
List<String> topicsToReassign = t0.getValue();
Map<TopicPartition, List<Integer>> currentAssignments =
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
+ Map<TopicPartitionReplica, String> currentReplicaLogDirs =
getReplicaToLogDir(adminClient, currentAssignments);
List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient,
brokersToReassign, enableRackAwareness);
Map<TopicPartition, List<Integer>> proposedAssignments =
calculateAssignment(currentAssignments, usableBrokers);
System.out.printf("Current partition replica assignment%n%s%n%n",
- formatAsReassignmentJson(currentAssignments, Map.of()));
+ formatAsReassignmentJson(currentAssignments,
currentReplicaLogDirs));
System.out.printf("Proposed partition reassignment
configuration%n%s%n",
formatAsReassignmentJson(proposedAssignments, Map.of()));
return Map.entry(proposedAssignments, currentAssignments);
@@ -775,7 +776,7 @@ public class ReassignPartitionsCommand {
verifyBrokerIds(adminClient, brokers);
Map<TopicPartition, List<Integer>> currentParts =
getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet());
-
System.out.println(currentPartitionReplicaAssignmentToString(proposedParts,
currentParts));
+
System.out.println(currentPartitionReplicaAssignmentToString(adminClient,
proposedParts, currentParts));
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);
@@ -916,20 +917,23 @@ public class ReassignPartitionsCommand {
/**
* Return the string which we want to print to describe the current
partition assignment.
*
+ * @param adminClient The admin client object to use.
* @param proposedParts The proposed partition assignment.
* @param currentParts The current partition assignment.
*
* @return The string to print. We will only
print information about
* partitions that appear in the
proposed partition assignment.
*/
- static String
currentPartitionReplicaAssignmentToString(Map<TopicPartition, List<Integer>>
proposedParts,
-
Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException
{
+ static String currentPartitionReplicaAssignmentToString(Admin adminClient,
+
Map<TopicPartition, List<Integer>> proposedParts,
+
Map<TopicPartition, List<Integer>> currentParts) throws
JsonProcessingException, ExecutionException, InterruptedException {
Map<TopicPartition, List<Integer>> partitionsToBeReassigned =
currentParts.entrySet().stream()
.filter(e -> proposedParts.containsKey(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ Map<TopicPartitionReplica, String> currentReplicaLogDirs =
getReplicaToLogDir(adminClient, partitionsToBeReassigned);
return String.format("Current partition replica
assignment%n%n%s%n%nSave this to use as the %s",
- formatAsReassignmentJson(partitionsToBeReassigned, Map.of()),
+ formatAsReassignmentJson(partitionsToBeReassigned,
currentReplicaLogDirs),
"--reassignment-json-file option during rollback");
}
@@ -1514,4 +1518,26 @@ public class ReassignPartitionsCommand {
}
return results;
}
+
+ static Map<TopicPartitionReplica, String> getReplicaToLogDir(
+ Admin adminClient,
+ Map<TopicPartition, List<Integer>> topicPartitionToReplicas
+ ) throws InterruptedException, ExecutionException {
+ var replicaLogDirs = topicPartitionToReplicas
+ .entrySet()
+ .stream()
+ .flatMap(entry -> entry.getValue()
+ .stream()
+ .map(id -> new
TopicPartitionReplica(entry.getKey().topic(), entry.getKey().partition(), id)))
+ .collect(Collectors.toUnmodifiableSet());
+
+ return adminClient.describeReplicaLogDirs(replicaLogDirs).all().get()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().getCurrentReplicaLogDir() !=
null)
+ .collect(Collectors.toMap(
+ Entry::getKey,
+ entry -> entry.getValue().getCurrentReplicaLogDir()
+ ));
+ }
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index 949b595a115..22e9011a2c2 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -71,6 +71,7 @@ import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generate
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
+import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaToLogDir;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyLogDirThrottle;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles;
@@ -436,29 +437,50 @@ public class ReassignPartitionsUnitTest {
@Test
public void testCurrentPartitionReplicaAssignmentToString() throws
Exception {
- Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
-
- proposedParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
- proposedParts.put(new TopicPartition("bar", 0), List.of(7, 8, 9));
-
- Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
-
- currentParts.put(new TopicPartition("foo", 0), List.of(1, 2, 3));
- currentParts.put(new TopicPartition("foo", 1), List.of(4, 5, 6));
- currentParts.put(new TopicPartition("bar", 0), List.of(7, 8));
- currentParts.put(new TopicPartition("baz", 0), List.of(10, 11, 12));
+ try (MockAdminClient adminClient = new MockAdminClient.Builder()
+ .numBrokers(6)
+ .brokerLogDirs(List.of(
+ List.of("/tmp/broker0/logs"),
+ List.of("/tmp/broker1/logs"),
+ List.of("/tmp/broker2/logs"),
+ List.of("/tmp/broker3/logs"),
+ List.of("/tmp/broker4/logs"),
+ List.of("/tmp/broker5/logs")
+ ))
+ .build()
+ ) {
+
+ List<Node> brokers = adminClient.brokers();
+ adminClient.addTopic(false, "foo", List.of(
+ new TopicPartitionInfo(1, brokers.get(1),
+ List.of(brokers.get(1), brokers.get(2), brokers.get(3)),
+ List.of(brokers.get(1), brokers.get(2), brokers.get(3)))
+ ), Map.of());
+
+ adminClient.addTopic(false, "bar", List.of(
+ new TopicPartitionInfo(0, brokers.get(4),
+ List.of(brokers.get(4), brokers.get(5)),
+ List.of(brokers.get(4), brokers.get(5)))
+ ), Map.of());
+
+ Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
+ proposedParts.put(new TopicPartition("foo", 1), List.of(0, 1, 2));
+ proposedParts.put(new TopicPartition("bar", 0), List.of(3, 4, 5));
+
+ Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
+ currentParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
+ currentParts.put(new TopicPartition("bar", 0), List.of(4, 5));
- assertEquals(String.join(System.lineSeparator(),
- "Current partition replica assignment",
- "",
- "{\"version\":1,\"partitions\":" +
-
"[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[7,8],\"log_dirs\":[\"any\",\"any\"]},"
+
-
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[4,5,6],\"log_dirs\":[\"any\",\"any\",\"any\"]}]"
+
- "}",
- "",
- "Save this to use as the --reassignment-json-file option during
rollback"),
- currentPartitionReplicaAssignmentToString(proposedParts,
currentParts)
- );
+ assertEquals(String.join(System.lineSeparator(),
+ "Current partition replica assignment",
+ "",
+
"{\"version\":1,\"partitions\":[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[4,5],\"log_dirs\":[\"/tmp/broker4/logs\",\"/tmp/broker4/logs\"]},"
+
+
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}",
+ "",
+ "Save this to use as the --reassignment-json-file option
during rollback"),
+ currentPartitionReplicaAssignmentToString(adminClient,
proposedParts, currentParts)
+ );
+ }
}
@Test
@@ -765,4 +787,38 @@ public class ReassignPartitionsUnitTest {
assertThrows(AdminOperationException.class, () ->
executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L,
Time.SYSTEM, false)).getMessage());
}
}
+
+ @Test
+ public void testGetReplicaToLogDir() throws Exception {
+ try (MockAdminClient adminClient = new MockAdminClient.Builder()
+ .numBrokers(4)
+ .brokerLogDirs(List.of(
+ List.of("/tmp/broker0/logs0"),
+ List.of("/tmp/broker1/logs0"),
+ List.of("/tmp/broker2/logs0"),
+ List.of("/tmp/broker3/logs0")
+ )).build()
+ ) {
+ addTopics(adminClient);
+
+ Map<TopicPartition, List<Integer>> topicPartitionToReplicas =
Map.of(
+ new TopicPartition("foo", 0), List.of(0, 1, 2),
+ new TopicPartition("foo", 1), List.of(1, 2, 3),
+ new TopicPartition("bar", 0), List.of(2, 3, 0)
+ );
+
+ Map<TopicPartitionReplica, String> result =
getReplicaToLogDir(adminClient, topicPartitionToReplicas);
+
+ assertFalse(result.isEmpty());
+ assertEquals("/tmp/broker0/logs0", result.get(new
TopicPartitionReplica("foo", 0, 0)));
+ assertEquals("/tmp/broker0/logs0", result.get(new
TopicPartitionReplica("foo", 0, 1)));
+ assertEquals("/tmp/broker0/logs0", result.get(new
TopicPartitionReplica("foo", 0, 2)));
+ assertEquals("/tmp/broker1/logs0", result.get(new
TopicPartitionReplica("foo", 1, 1)));
+ assertEquals("/tmp/broker1/logs0", result.get(new
TopicPartitionReplica("foo", 1, 2)));
+ assertEquals("/tmp/broker1/logs0", result.get(new
TopicPartitionReplica("foo", 1, 3)));
+ assertEquals("/tmp/broker2/logs0", result.get(new
TopicPartitionReplica("bar", 0, 0)));
+ assertEquals("/tmp/broker2/logs0", result.get(new
TopicPartitionReplica("bar", 0, 2)));
+ assertEquals("/tmp/broker2/logs0", result.get(new
TopicPartitionReplica("bar", 0, 3)));
+ }
+ }
}