This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8f35f4ea3b KAFKA-19774 Cleanups for KIP-1066 (#21523)
c8f35f4ea3b is described below

commit c8f35f4ea3b50e67a6b4b4773f42539d0b566e28
Author: Mickael Maison <[email protected]>
AuthorDate: Thu Feb 26 08:08:48 2026 +0100

    KAFKA-19774 Cleanups for KIP-1066 (#21523)
    
    Addressing feedback from https://github.com/apache/kafka/pull/21273
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/admin/LogDirDescription.java     |   4 +
 .../kafka/server/DynamicBrokerConfigTest.scala     |  12 +-
 docs/getting-started/upgrade.md                    |   1 +
 docs/operations/basic-kafka-operations.md          |  56 ++++++++-
 .../kafka/server/config/DynamicBrokerConfig.java   |   3 +-
 .../server/CordonedLogDirsIntegrationTest.java     | 126 ++++++++++++++++++---
 6 files changed, 177 insertions(+), 25 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
index be1bb6e8f5b..a541fa7e4ac 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
@@ -39,6 +39,10 @@ public class LogDirDescription {
         this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES, 
false);
     }
 
+    public LogDirDescription(ApiException error, Map<TopicPartition, 
ReplicaInfo> replicaInfos, long totalBytes, long usableBytes) {
+        this(error, replicaInfos, totalBytes, usableBytes, false);
+    }
+
     public LogDirDescription(ApiException error, Map<TopicPartition, 
ReplicaInfo> replicaInfos, long totalBytes, long usableBytes, boolean 
isCordoned) {
         this.error = error;
         this.replicaInfos = replicaInfos;
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 15b9d7cb479..24ed3dedfd9 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1070,42 +1070,42 @@ class DynamicBrokerConfigTest {
     // Cordoning 1 new log dir, so 1 new handleCordoned invocation
     val props = new Properties()
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, logDirs.get(0))
-    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
     verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
 
     // When using *, no other entries must be specified, so no new invocations
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*,/invalid/log/dir")
-    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
     verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
 
     // Invalid log dir, so no new invocations
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/invalid/log/dir")
-    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
     verify(ctx.directoryEventHandler, times(0)).handleUncordoned(anySet)
 
     // * cordons the 2nd log dir, so 1 new handleCordoned invocation
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
-    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(logDirs, ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
     verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
 
     // clearing all cordoned log dirs, so 1 new handleUncordoned invocation
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "")
-    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertTrue(ctx.config.cordonedLogDirs.isEmpty)
     verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
     verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
 
     // * cordons all log dirs, so 1 new handleCordoned invocation
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, String.join(",", 
logDirs))
-    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(logDirs, ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
     verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index 558b954426b..22a4b0f6524 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -37,6 +37,7 @@ type: docs
   * The new config have been introduced: `remote.log.metadata.topic.min.isr` 
with 2 as default value. You can correct the min.insync.replicas for the 
existed __remote_log_metadata topic via kafka-configs.sh if needed. For further 
details, please refer to 
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
   * The new config prefix `remote.log.metadata.admin.` has been introduced. It 
allows independent configuration of the admin client used by 
`TopicBasedRemoteLogMetadataManager`. For further details, please refer to 
[KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
   * The `kafka-streams-scala` library is deprecated as of Kafka 4.3 and will 
be removed in Kafka 5.0. For further details, please refer to the [migration 
guide](/{version}/streams/developer-guide/scala-migration).
+  * Support for cordoning log directories: For further details, please refer 
to [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg).
 
 ## Upgrading to 4.2.0
 
diff --git a/docs/operations/basic-kafka-operations.md 
b/docs/operations/basic-kafka-operations.md
index 4c3d3961a27..dd042d0fcab 100644
--- a/docs/operations/basic-kafka-operations.md
+++ b/docs/operations/basic-kafka-operations.md
@@ -427,9 +427,61 @@ The --verify option can be used with the tool to check the 
status of the partiti
     Reassignment of partition [foo1,0] is completed
     Reassignment of partition [foo2,1] is completed
 
-## Decommissioning brokers
+## Decommissioning brokers and log directories
 
-The partition reassignment tool does not have the ability to automatically 
generate a reassignment plan for decommissioning brokers yet. As such, the 
admin has to come up with a reassignment plan to move the replica for all 
partitions hosted on the broker to be decommissioned, to the rest of the 
brokers. This can be relatively tedious as the reassignment needs to ensure 
that all the replicas are not moved from the decommissioned broker to only one 
other broker. To make this process effor [...]
+### Decommissioning brokers
+
+The first step to decommission brokers is to mark them as cordoned via the 
Admin API.
+
+For example to cordon broker 1:
+
+    $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--add-config cordoned.log.dirs="*" --entity-type brokers --entity-name 1
+    Completed updating config for broker 1.
+
+Then reassign all the partitions from that broker to other brokers in the 
cluster.
+The partition reassignment tool does not have the ability to automatically 
generate a reassignment plan for decommissioning brokers yet.
+As such, the admin has to come up with a reassignment plan to move the replica 
for all partitions hosted on the broker to be decommissioned, 
+to the rest of the brokers.
+
+Once all the reassignment is done, shutdown the broker and unregister it to 
remove it from the cluster.
+
+For example to unregister broker 1:
+
+    $ bin/kafka-cluster.sh unregister --bootstrap-server localhost:9092 --id 1
+
+### Decommissioning log directories
+
+The first step to decommission log directories is to mark them as cordoned via 
the Admin API.
+
+For example to cordon /data/dir1 from broker 1:
+
+    $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--add-config cordoned.log.dirs=/data/dir1 --entity-type brokers --entity-name 1
+    Completed updating config for broker 1.
+
+Then reassign all the partitions from the log directory to decommission to 
other log directories or brokers in the cluster.
+The partition reassignment tool does not have the ability to automatically 
generate a reassignment plan for decommissioning a log directory yet.
+As such, the admin has to come up with a reassignment plan to move the replica 
for all partitions hosted on the log directory to be decommissioned.
+
+Once all the reassignment is done, shutdown the broker.
+
+Then uncordon the log directory. Since the broker hosting that directory is 
offline, use --bootstrap-controller to do so.
+
+For example:
+
+    $ bin/kafka-configs.sh --bootstrap-controller localhost:9093 --alter 
--delete-config cordoned.log.dirs --entity-type brokers --entity-name 1
+    Completed updating config for broker 1.
+
+Update the configuration for the broker and remove the log directory to 
decommission from log.dir or log.dirs.
+
+For example if the broker configuration contained:
+
+    log.dirs=/data/dir1,/data/dir2
+
+Update it to:
+
+    log.dirs=/data/dir2
+
+Finally restart the broker.
 
 ## Increasing replication factor
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
index f7b557cf503..9c0d8f24175 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
@@ -61,7 +61,8 @@ public class DynamicBrokerConfig {
 
     private static final Set<String> PER_BROKER_CONFIGS = Stream.of(
             DYNAMIC_SECURITY_CONFIGS,
-            DynamicListenerConfig.RECONFIGURABLE_CONFIGS)
+            DynamicListenerConfig.RECONFIGURABLE_CONFIGS,
+            Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG))
         .flatMap(Collection::stream)
         .filter(c -> !CLUSTER_LEVEL_LISTENER_CONFIGS.contains(c))
         .collect(Collectors.toUnmodifiableSet());
diff --git 
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
 
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
index ce2eededede..def1cbbe191 100644
--- 
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
@@ -24,10 +24,12 @@ import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
 import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.clients.admin.FinalizedVersionRange;
 import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
 import org.apache.kafka.clients.admin.UpdateFeaturesResult;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
@@ -43,8 +45,11 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -100,7 +105,7 @@ public class CordonedLogDirsIntegrationTest {
             assertCordonedLogDirs(admin, List.of());
             // 3. we can't dynamically configure cordoned.log.dirs
             Throwable ee = assertThrows(ExecutionException.class, () ->
-                
admin.incrementalAlterConfigs(cordonedDirsConfig("")).all().get());
+                admin.incrementalAlterConfigs(cordonedDirsConfig("", 
BROKER_0)).all().get());
             assertInstanceOf(InvalidConfigurationException.class, 
ee.getCause());
 
             // Update the metadata version to support cordoning log dirs
@@ -118,7 +123,7 @@ public class CordonedLogDirsIntegrationTest {
             if (initialCordonedLogDirs.isEmpty()) {
                 // if no initial cordoned log dirs, this has not changed, and 
we can cordon log dirs
                 assertCordonedLogDirs(admin, List.of());
-                setCordonedLogDirs(admin, logDirsBroker0);
+                setCordonedLogDirs(admin, logDirsBroker0, BROKER_0);
                 initialCordonedLogDirs = logDirsBroker0;
             }
             // The statically or dynamically configured log dirs are now 
marked as cordoned
@@ -136,7 +141,7 @@ public class CordonedLogDirsIntegrationTest {
             assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
 
             // After uncordoning log dirs, we can create topics and partitions 
again
-            setCordonedLogDirs(admin, List.of());
+            setCordonedLogDirs(admin, List.of(), BROKER_0);
             admin.createTopics(newTopics).all().get();
             admin.createPartitions(newPartitions).all().get();
         }
@@ -152,7 +157,7 @@ public class CordonedLogDirsIntegrationTest {
             admin.createTopics(newTopic(TOPIC1)).all().get();
 
             // Cordon all log dirs
-            setCordonedLogDirs(admin, logDirsBroker0);
+            setCordonedLogDirs(admin, logDirsBroker0, BROKER_0);
             assertCordonedLogDirs(admin, logDirsBroker0);
 
             // We can't create new topics or partitions
@@ -168,7 +173,7 @@ public class CordonedLogDirsIntegrationTest {
             assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
 
             // Uncordon all log dirs
-            setCordonedLogDirs(admin, List.of(logDirsBroker0.get(0)));
+            setCordonedLogDirs(admin, List.of(logDirsBroker0.get(0)), 
BROKER_0);
             assertCordonedLogDirs(admin, List.of(logDirsBroker0.get(0)));
 
             // We can create topics and partitions again
@@ -192,7 +197,7 @@ public class CordonedLogDirsIntegrationTest {
             assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
 
             // Uncordon log dirs
-            setCordonedLogDirs(admin, List.of());
+            setCordonedLogDirs(admin, List.of(), BROKER_0);
 
             // We can't create topics again
             admin.createTopics(newTopics).all().get();
@@ -214,7 +219,7 @@ public class CordonedLogDirsIntegrationTest {
             }, 10_000, "Unable to find logdir for topic " + replica.topic());
             assertNotNull(logDir.get());
             String otherLogDir = logDirsBroker0.stream().filter(dir -> 
!dir.equals(logDir.get())).findFirst().get();
-            setCordonedLogDirs(admin, List.of(otherLogDir));
+            setCordonedLogDirs(admin, List.of(otherLogDir), BROKER_0);
 
             // We can't move the replica to the now cordoned log dir
             Throwable ee = assertThrows(ExecutionException.class, () ->
@@ -223,30 +228,119 @@ public class CordonedLogDirsIntegrationTest {
             assertInstanceOf(InvalidReplicaAssignmentException.class, 
ee.getCause());
 
             // After uncordoning the log dir, we can move the replica on it
-            setCordonedLogDirs(admin, List.of());
+            setCordonedLogDirs(admin, List.of(), BROKER_0);
             admin.alterReplicaLogDirs(Map.of(replica, 
otherLogDir)).all().get();
         }
     }
 
-    private Map<ConfigResource, Collection<AlterConfigOp>> 
cordonedDirsConfig(String value) {
+    @ClusterTest(
+            brokers = 2,
+            controllers = 1
+    )
+    public void testDecommissionBroker() throws ExecutionException, 
InterruptedException {
+        // Make sure we don't try to decommission the controller
+        int brokerId = clusterInstance.brokerIds().stream().filter(id -> 
!clusterInstance.controllerIds().contains(id)).findFirst().get();
+        try (Admin admin = clusterInstance.admin()) {
+            // Create 10 topics
+            for (int i = 0; i < 10; i++) {
+                admin.createTopics(newTopic("topic" + i, (short) 
1)).all().get();
+            }
+
+            // Check the 10 topics have been created and find the partitions 
on brokerId
+            Set<TopicPartition> partitionsToMove = new HashSet<>();
+            TestUtils.waitForCondition(() -> {
+                int found = 0;
+                Map<Integer, Map<String, LogDirDescription>> 
logDescriptionsPerBroker = 
admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get();
+                for (Map.Entry<Integer, Map<String, LogDirDescription>> entry 
: logDescriptionsPerBroker.entrySet()) {
+                    for (LogDirDescription logDirDescription : 
entry.getValue().values()) {
+                        
assertFalse(logDirDescription.replicaInfos().isEmpty());
+                        found += logDirDescription.replicaInfos().size();
+                        if (entry.getKey() == brokerId) {
+                            logDirDescription.replicaInfos().forEach((tp, 
replicaInfo) ->
+                                partitionsToMove.add(tp)
+                            );
+                        }
+                    }
+                }
+                return found == 10;
+            }, 10_000, "Unable to find 10 partitions");
+
+            // Cordon brokerId and move all its partitions to the other broker
+            List<String> logDirs = 
clusterInstance.brokers().get(brokerId).config().logDirs();
+            setCordonedLogDirs(admin, logDirs, new 
ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)));
+            int target = clusterInstance.brokerIds().stream().filter(id -> id 
!= brokerId).findFirst().get();
+            movePartitions(admin, partitionsToMove, brokerId, 
Optional.empty(), target);
+
+            // Create another 10 topics
+            for (int i = 10; i < 20; i++) {
+                admin.createTopics(newTopic("topic" + i, (short) 
1)).all().get();
+            }
+            TestUtils.waitForCondition(() -> 
admin.listTopics().names().get().size() == 20, 10_000, "Topics 10-19 were not 
created");
+
+            // Check only the other broker has replicas
+            Map<Integer, Map<String, LogDirDescription>> 
logDescriptionsPerBroker = 
admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get();
+            for (Map.Entry<Integer, Map<String, LogDirDescription>> entry : 
logDescriptionsPerBroker.entrySet()) {
+                entry.getValue().forEach((logDir, logDirDescription) -> {
+                    if (entry.getKey() == brokerId) {
+                        assertTrue(logDirDescription.replicaInfos().isEmpty());
+                    } else {
+                        
assertFalse(logDirDescription.replicaInfos().isEmpty());
+                    }
+                });
+            }
+
+            // Decommission brokerId
+            clusterInstance.brokers().get(brokerId).shutdown();
+            clusterInstance.brokers().get(brokerId).awaitShutdown();
+            admin.unregisterBroker(brokerId).all().get();
+            TestUtils.waitForCondition(() -> 
admin.describeCluster().nodes().get().size() == 1, 10_000, "Unable to 
unregister " + brokerId);
+        }
+    }
+
+    private void movePartitions(Admin admin, Set<TopicPartition> partitions, 
int source, Optional<String> logDir, int target) throws ExecutionException, 
InterruptedException {
+        Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments 
= new HashMap<>();
+        for (TopicPartition partition : partitions) {
+            reassignments.put(partition, Optional.of(new 
NewPartitionReassignment(List.of(target))));
+        }
+        admin.alterPartitionReassignments(reassignments).all().get();
+        TestUtils.waitForCondition(() ->
+                
admin.listPartitionReassignments().reassignments().get().isEmpty(), 10_000, 
"Unable to complete partition reassignments");
+
+        TestUtils.waitForCondition(() -> {
+            int replicas = 0;
+            Map<Integer, Map<String, LogDirDescription>> 
logDescriptionsPerBroker = 
admin.describeLogDirs(List.of(source)).allDescriptions().get();
+            for (Map.Entry<String, LogDirDescription> entry : 
logDescriptionsPerBroker.get(source).entrySet()) {
+                if (logDir.isEmpty() || entry.getKey().equals(logDir.get())) {
+                    replicas += entry.getValue().replicaInfos().size();
+                }
+            }
+            return replicas == 0;
+        }, 10_000, "Some replicas were not moved from " + source + " to " + 
target);
+    }
+
+    private Map<ConfigResource, Collection<AlterConfigOp>> 
cordonedDirsConfig(String value, ConfigResource cr) {
         return Map.of(
-                BROKER_0,
+                cr,
                 Set.of(new AlterConfigOp(new 
ConfigEntry(CORDONED_LOG_DIRS_CONFIG, value), AlterConfigOp.OpType.SET))
         );
     }
 
-    private void setCordonedLogDirs(Admin admin, List<String> logDirs) throws 
ExecutionException, InterruptedException {
+    private void setCordonedLogDirs(Admin admin, List<String> logDirs, 
ConfigResource cr) throws ExecutionException, InterruptedException {
         String logDirsStr = String.join(",", logDirs);
-        
admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr)).all().get();
+        admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr, 
cr)).all().get();
         TestUtils.waitForCondition(() -> {
-            Map<ConfigResource, Config> describeConfigs = 
admin.describeConfigs(Set.of(BROKER_0)).all().get();
-            Config config = describeConfigs.get(BROKER_0);
+            Map<ConfigResource, Config> describeConfigs = 
admin.describeConfigs(Set.of(cr)).all().get();
+            Config config = describeConfigs.get(cr);
             return 
logDirsStr.equals(config.get(CORDONED_LOG_DIRS_CONFIG).value());
-        }, 10_000, "Unable to set the " + CORDONED_LOG_DIRS_CONFIG + " 
configuration.");
+        }, 10_000, "Unable to set the " + CORDONED_LOG_DIRS_CONFIG + " 
configuration on " + cr + ".");
     }
 
     private Set<NewTopic> newTopic(String name) {
-        return Set.of(new NewTopic(name, 1, (short) 
clusterInstance.brokers().size()));
+        return newTopic(name, (short) clusterInstance.brokers().size());
+    }
+
+    private Set<NewTopic> newTopic(String name, short replicationFactor) {
+        return Set.of(new NewTopic(name, 1, replicationFactor));
     }
 
     private void assertCordonedLogDirs(Admin admin, List<String> 
expectedCordoned) throws ExecutionException, InterruptedException {

Reply via email to