This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a45d36ca5d5 KAFKA-19774: Mechanism to cordon log dirs (KIP-1066) 
(#21273)
a45d36ca5d5 is described below

commit a45d36ca5d5b6f48d4313d50d1f7b6c5e6c905a4
Author: Mickael Maison <[email protected]>
AuthorDate: Thu Feb 19 15:18:28 2026 +0100

    KAFKA-19774: Mechanism to cordon log dirs (KIP-1066) (#21273)
    
    Implements [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg)
    
    Reviewers: Christo Lolov <[email protected]>, PoAn Yang
     <[email protected]>
---
 .../clients/CreateTopicsRequestWithPolicyTest.java |   3 +-
 .../kafka/clients/admin/KafkaAdminClient.java      |   3 +-
 .../kafka/clients/admin/LogDirDescription.java     |  14 +-
 .../common/message/BrokerHeartbeatRequest.json     |   8 +-
 .../common/message/BrokerHeartbeatResponse.json    |   4 +-
 .../common/message/BrokerRegistrationRequest.json  |  11 +-
 .../common/message/BrokerRegistrationResponse.json |   6 +-
 .../common/message/DescribeLogDirsRequest.json     |   3 +-
 .../common/message/DescribeLogDirsResponse.json    |   6 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  44 +++-
 .../kafka/clients/admin/MockAdminClient.java       |   3 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  33 ++-
 .../src/main/scala/kafka/server/BrokerServer.scala |  21 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  28 ++-
 core/src/main/scala/kafka/server/KafkaBroker.scala |   2 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |  11 +
 .../main/scala/kafka/server/ReplicaManager.scala   |  10 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  84 +++++++
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  73 +++++-
 .../kafka/server/DynamicBrokerConfigTest.scala     |  72 +++++-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  43 +++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  28 ++-
 .../kafka/controller/BrokerHeartbeatManager.java   |  13 +-
 .../kafka/controller/ClusterControlManager.java    |  36 ++-
 .../controller/ConfigurationControlManager.java    |  18 ++
 .../controller/ReplicationControlManager.java      |  33 +++
 .../java/org/apache/kafka/image/ClusterDelta.java  |   6 +-
 .../apache/kafka/metadata/BrokerRegistration.java  |  54 ++++-
 .../metadata/placement/StripedReplicaPlacer.java   |   4 +-
 .../metadata/BrokerRegistrationChangeRecord.json   |   7 +-
 .../common/metadata/RegisterBrokerRecord.json      |   7 +-
 .../controller/BrokerHeartbeatManagerTest.java     |   5 +-
 .../ConfigurationControlManagerTest.java           |  25 ++
 .../kafka/controller/QuorumControllerTest.java     |   2 +-
 .../controller/ReplicationControlManagerTest.java  |  25 +-
 .../image/node/ClusterImageBrokersNodeTest.java    |   3 +-
 .../kafka/metadata/BrokerRegistrationTest.java     |  44 +++-
 .../placement/StripedReplicaPlacerTest.java        |   8 +-
 .../kafka/server/common/DirectoryEventHandler.java |  16 ++
 .../kafka/server/common/MetadataVersion.java       |  18 +-
 .../kafka/server/config/ServerLogConfigs.java      |   7 +
 .../kafka/server/common/MetadataVersionTest.java   |  11 +-
 .../kafka/server/BrokerLifecycleManager.java       |  87 ++++++-
 .../kafka/server/config/AbstractKafkaConfig.java   |   8 +
 .../kafka/server/config/DynamicBrokerConfig.java   |   7 +-
 .../server/CordonedLogDirsIntegrationTest.java     | 264 +++++++++++++++++++++
 .../kafka/storage/internals/log/LogConfig.java     |   1 +
 .../apache/kafka/common/test/api/ClusterTest.java  |   2 +-
 .../org/apache/kafka/tools/FeatureCommandTest.java |  12 +-
 49 files changed, 1120 insertions(+), 113 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
index bfbc4794da5..da1d7f60535 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
@@ -216,7 +216,8 @@ public class CreateTopicsRequestWithPolicyTest {
                 admin,
                 true,
                 InvalidReplicationFactorException.class,
-                "Unable to replicate the partition 4 time(s): The target 
replication factor of 4 cannot be reached because only 3 broker(s) are 
registered."
+                "Unable to replicate the partition 4 time(s): The target 
replication factor of 4 cannot be "
+                    + "reached because only 3 broker(s) are registered or some 
brokers have all their log directories cordoned."
             );
 
             validateErrorCreateTopicsRequests(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a80c860de5e..85f1e87459f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3054,7 +3054,8 @@ public class KafkaAdminClient extends AdminClient {
                 Errors.forCode(logDirResult.errorCode()).exception(),
                 replicaInfoMap,
                 logDirResult.totalBytes(),
-                logDirResult.usableBytes()));
+                logDirResult.usableBytes(),
+                logDirResult.isCordoned()));
         }
         return result;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
index 340e88db160..be1bb6e8f5b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
@@ -33,16 +33,18 @@ public class LogDirDescription {
     private final ApiException error;
     private final OptionalLong totalBytes;
     private final OptionalLong usableBytes;
+    private final boolean isCordoned;
 
     public LogDirDescription(ApiException error, Map<TopicPartition, 
ReplicaInfo> replicaInfos) {
-        this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES);
+        this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES, 
false);
     }
 
-    public LogDirDescription(ApiException error, Map<TopicPartition, 
ReplicaInfo> replicaInfos, long totalBytes, long usableBytes) {
+    public LogDirDescription(ApiException error, Map<TopicPartition, 
ReplicaInfo> replicaInfos, long totalBytes, long usableBytes, boolean 
isCordoned) {
         this.error = error;
         this.replicaInfos = replicaInfos;
         this.totalBytes = (totalBytes == UNKNOWN_VOLUME_BYTES) ? 
OptionalLong.empty() : OptionalLong.of(totalBytes);
         this.usableBytes = (usableBytes == UNKNOWN_VOLUME_BYTES) ? 
OptionalLong.empty() : OptionalLong.of(usableBytes);
+        this.isCordoned = isCordoned;
     }
 
     /**
@@ -82,6 +84,13 @@ public class LogDirDescription {
         return usableBytes;
     }
 
+    /**
+     * Whether this log directory is cordoned or not.
+     */
+    public boolean isCordoned() {
+        return isCordoned;
+    }
+
     @Override
     public String toString() {
         return "LogDirDescription(" +
@@ -89,6 +98,7 @@ public class LogDirDescription {
                 ", error=" + error +
                 ", totalBytes=" + totalBytes +
                 ", usableBytes=" + usableBytes +
+                ", isCordoned=" + isCordoned +
                 ')';
     }
 }
diff --git 
a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json 
b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
index 8f574b41fc4..41d5fa0a150 100644
--- a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
@@ -13,12 +13,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 1 adds the OfflineLogDirs flexible field
+// Version 2 adds the CordonedLogDirs flexible field
 {
   "apiKey": 63,
   "type": "request",
   "listeners": ["controller"],
   "name": "BrokerHeartbeatRequest",
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
@@ -32,6 +34,8 @@
     { "name": "WantShutDown", "type": "bool", "versions": "0+",
       "about": "True if the broker wants to be shut down, false otherwise." },
     { "name": "OfflineLogDirs", "type":  "[]uuid", "versions": "1+", 
"taggedVersions": "1+", "tag": 0,
-      "about": "Log directories that failed and went offline." }
+      "about": "Log directories that failed and went offline." },
+    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions": "2+", 
"taggedVersions": "2+",
+      "tag": "1", "about": "Log directories that are cordoned." }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json 
b/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json
index 9956daf1ff0..157fbfac68c 100644
--- a/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json
@@ -13,11 +13,13 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Version 1 is the same as version 0 (new field in request).
+// Version 2 is the same as version 0 (new field in request).
 {
   "apiKey": 63,
   "type": "response",
   "name": "BrokerHeartbeatResponse",
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json 
b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 8a9348596ed..53e37f21d5a 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -14,19 +14,16 @@
 // limitations under the License.
 
 // Version 1 adds Zk broker epoch to the request if the broker is migrating 
from Zk mode to KRaft mode.
-
 // Version 2 adds LogDirs for KIP-858
-
 // Version 3 adds the PreviousBrokerEpoch for the KIP-966
-
 // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in 
the response from being 0.
-
+// Version 5 adds the CordonedLogDirs flexible field
 {
   "apiKey":62,
   "type": "request",
   "listeners": ["controller"],
   "name": "BrokerRegistrationRequest",
-  "validVersions": "0-4",
+  "validVersions": "0-5",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
@@ -64,6 +61,8 @@
     { "name": "LogDirs", "type":  "[]uuid", "versions":  "2+",
       "about": "Log directories configured in this broker which are 
available.", "ignorable": true },
     { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "3+", 
"default": "-1", "ignorable": true,
-      "about": "The epoch before a clean shutdown." }
+      "about": "The epoch before a clean shutdown." },
+    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "5+", 
"taggedVersions": "5+",
+      "tag": "0", "about": "Log directories that are cordoned." }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json 
b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
index dbc15cc4003..956e945df40 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
@@ -14,13 +14,15 @@
 // limitations under the License.
 
 // Version 1 adds Zk broker epoch to the request if the broker is migrating 
from Zk mode to KRaft mode.
-//
 // Version 2 adds the PreviousBrokerEpoch to the request for the KIP-966
+// Version 3 is the same as version 2 (new field in request).
+// Version 4 is the same as version 2 (new field in request).
+// Version 5 is the same as version 2 (new field in request).
 {
   "apiKey": 62,
   "type": "response",
   "name": "BrokerRegistrationResponse",
-  "validVersions": "0-4",
+  "validVersions": "0-5",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json 
b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
index 4f3bfa2c58c..c21a9cf906e 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
@@ -23,7 +23,8 @@
   // Version 2 is the first flexible version.
   // Version 3 is the same as version 2 (new field in response).
   // Version 4 is the same as version 2 (new fields in response).
-  "validVersions": "1-4",
+  // Version 5 is the same as version 2 (new fields in response).
+  "validVersions": "1-5",
   "flexibleVersions": "2+",
   "fields": [
     { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", 
"nullableVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json 
b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
index 725d1ad337b..71d7fbde9ae 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
@@ -22,7 +22,8 @@
   // Version 2 is the first flexible version.
   // Version 3 adds the top-level ErrorCode field
   // Version 4 adds the TotalBytes and UsableBytes fields
-  "validVersions": "1-4",
+  // Version 5 adds IsCordoned field
+  "validVersions": "1-5",
   "flexibleVersions": "2+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -55,6 +56,9 @@
       },
       { "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable": 
true, "default": "-1",
         "about": "The usable size in bytes of the volume the log directory is 
in. This value does not include the size of data stored in remote storage."
+      },
+      { "name": "IsCordoned", "type": "bool", "versions": "5+", "ignorable": 
true, "default": false,
+        "about": "True if this log directory is cordoned."
       }
     ]}
   ]
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1b274c95add..89d1890ab68 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2252,7 +2252,12 @@ public class KafkaAdminClientTest {
 
     private static DescribeLogDirsResponse 
prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, 
long partitionSize, long offsetLag, long totalBytes, long usableBytes) {
         return prepareDescribeLogDirsResponse(error, logDir,
-                prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false), totalBytes, usableBytes);
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false), totalBytes, usableBytes, false);
+    }
+
+    private static DescribeLogDirsResponse 
prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, 
long partitionSize, long offsetLag, long totalBytes, long usableBytes, boolean 
isCordoned) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false), totalBytes, usableBytes, isCordoned);
     }
 
     private static List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
@@ -2278,7 +2283,8 @@ public class KafkaAdminClientTest {
 
     private static DescribeLogDirsResponse 
prepareDescribeLogDirsResponse(Errors error, String logDir,
                                                                           
List<DescribeLogDirsTopic> topics,
-                                                                          long 
totalBytes, long usableBytes) {
+                                                                          long 
totalBytes, long usableBytes,
+                                                                          
boolean isCordoned) {
         return new DescribeLogDirsResponse(
                 new DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
                         .setErrorCode(error.code())
@@ -2286,6 +2292,7 @@ public class KafkaAdminClientTest {
                         .setTopics(topics)
                         .setTotalBytes(totalBytes)
                         .setUsableBytes(usableBytes)
+                        .setIsCordoned(isCordoned)
                 )));
     }
 
@@ -2355,6 +2362,7 @@ public class KafkaAdminClientTest {
         assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
         assertEquals(totalBytes, descriptionsMap.get(logDir).totalBytes());
         assertEquals(usableBytes, descriptionsMap.get(logDir).usableBytes());
+        assertFalse(descriptionsMap.get(logDir).isCordoned());
     }
 
     @Test
@@ -2434,6 +2442,38 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeLogDirsWithCordonedDir() throws 
ExecutionException, InterruptedException {
+        Set<Integer> brokers = singleton(0);
+        String logDir = "/var/data/kafka";
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(
+                    prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, 
123, -1, -1, -1, true),
+                    env.cluster().nodeById(0));
+
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> 
descriptions = result.descriptions();
+            assertEquals(brokers, descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = 
descriptions.get(0).get();
+            assertEquals(singleton(logDir), descriptionsMap.keySet());
+            assertTrue(descriptionsMap.get(logDir).isCordoned());
+            assertEquals(Set.of(tp), 
descriptionsMap.get(logDir).replicaInfos().keySet());
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = 
result.allDescriptions().get();
+            assertEquals(brokers, allDescriptions.keySet());
+            Map<String, LogDirDescription> allMap = allDescriptions.get(0);
+            assertNotNull(allMap);
+            assertEquals(singleton(logDir), allMap.keySet());
+            assertTrue(allMap.get(logDir).isCordoned());
+            assertEquals(Set.of(tp), 
allMap.get(logDir).replicaInfos().keySet());
+        }
+    }
+
     @Test
     public void testDescribeReplicaLogDirs() throws ExecutionException, 
InterruptedException {
         TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 12, 1);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 48874f1a1b2..bb460855df0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1087,7 +1087,8 @@ public class MockAdminClient extends AdminClient {
                         logDirDescription.error(),
                         topicPartitionReplicaInfoMap,
                         
logDirDescription.totalBytes().orElse(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES),
-                        
logDirDescription.usableBytes().orElse(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES)));
+                        
logDirDescription.usableBytes().orElse(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES),
+                        logDirDescription.isCordoned()));
                 }
             }
         }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index efdab2a0f31..85c56672aaa 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -100,6 +100,7 @@ class LogManager(logDirs: Seq[File],
   private val strayLogs = new ConcurrentHashMap[TopicPartition, UnifiedLog]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
+  @volatile private var _cordonedLogDirs: Set[String] = Set()
   @volatile private var _currentDefaultConfig = initialDefaultConfig
   @volatile private var numRecoveryThreadsPerDataDir = 
recoveryThreadsPerDataDir
 
@@ -128,6 +129,14 @@ class LogManager(logDirs: Seq[File],
   private val directoryIds: mutable.Map[String, Uuid] = 
loadDirectoryIds(liveLogDirs)
   def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
 
+  def updateCordonedLogDirs(newCordonedLogDirs: Set[String]): Unit = {
+    _cordonedLogDirs = newCordonedLogDirs
+  }
+
+  def cordonedLogDirs(): Set[String] = {
+    _cordonedLogDirs
+  }
+
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
     (dir, new OffsetCheckpointFile(new File(dir, 
JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), logDirFailureChannel))).toMap
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
@@ -154,11 +163,15 @@ class LogManager(logDirs: Seq[File],
   private[kafka] def cleaner: LogCleaner = _cleaner
 
   metricsGroup.newGauge("OfflineLogDirectoryCount", () => offlineLogDirs.size)
+  metricsGroup.newGauge("CordonedLogDirectoryCount", () => 
cordonedLogDirs().size)
 
   for (dir <- logDirs) {
     metricsGroup.newGauge("LogDirectoryOffline",
       () => if (_liveLogDirs.contains(dir)) 0 else 1,
       Map("logDirectory" -> dir.getAbsolutePath).asJava)
+    metricsGroup.newGauge("LogDirectoryCordoned",
+      () => if (cordonedLogDirs().contains(dir.getAbsolutePath)) 1 else 0,
+      Map("logDirectory" -> dir.getAbsolutePath).asJava)
   }
 
   /**
@@ -1365,20 +1378,28 @@ class LogManager(logDirs: Seq[File],
    * Provides the full ordered list of suggested directories for the next 
partition.
    * Currently this is done by calculating the number of partitions in each 
directory and then sorting the
    * data directories by fewest partitions.
+   *
+   * It's possible replicas are assigned to this broker right before all its 
log directories are cordoned.
+   * In that case, pick the first available directory
    */
-  private def nextLogDirs(): List[File] = {
+  def nextLogDirs(): List[File] = {
     if (_liveLogDirs.size == 1) {
       List(_liveLogDirs.peek())
     } else {
       // count the number of logs in each parent directory (including 0 for 
empty directories
       val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs) 
=> parent -> logs.size }
       val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
-      val dirCounts = (zeros ++ logCounts).toBuffer
+      val dirCounts = (zeros ++ logCounts).filter(d => 
!cordonedLogDirs().contains(d._1)).toBuffer
 
-      // choose the directory with the least logs in it
-      dirCounts.sortBy(_._2).map {
-        case (path: String, _: Int) => new File(path)
-      }.toList
+      if (dirCounts.isEmpty) {
+        // all log directories are cordoned, choose the first live directory
+        List(_liveLogDirs.peek())
+      } else {
+        // choose the directory with the least logs in it
+        dirCounts.sortBy(_._2).map {
+          case (path: String, _: Int) => new File(path)
+        }.toList
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 977eca37a80..698f8857881 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -65,6 +65,7 @@ import java.util
 import java.util.Optional
 import java.util.concurrent.locks.{Condition, ReentrantLock}
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, 
TimeoutException}
+import java.util.stream.Collectors
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOption
@@ -218,11 +219,13 @@ class BrokerServer(
         brokerTopicStats,
         logDirFailureChannel)
 
-      lifecycleManager = new BrokerLifecycleManager(config,
+      lifecycleManager = new BrokerLifecycleManager(
+        config,
         time,
         s"broker-${config.nodeId}-",
         logManager.directoryIdsSet.asJava,
-        () => new Thread(() => shutdown(), "kafka-shutdown-thread").start())
+        () => new Thread(() => shutdown(), "kafka-shutdown-thread").start(),
+        () => metadataCache.metadataVersion().isCordonedLogDirsSupported)
 
       // Enable delegation token cache for all SCRAM mechanisms to simplify 
dynamic update.
       // This keeps the cache up-to-date if new SCRAM mechanisms are enabled 
dynamically.
@@ -332,7 +335,13 @@ class BrokerServer(
 
         override def handleFailure(directoryId: Uuid): Unit =
           lifecycleManager.propagateDirectoryFailure(directoryId, 
config.logDirFailureTimeoutMs)
-      }
+
+        override def handleCordoned(directoryIds: util.Set[Uuid]): Unit =
+          lifecycleManager.propagateDirectoryCordoned(directoryIds)
+
+        override def handleUncordoned(directoryIds: util.Set[Uuid]): Unit =
+          lifecycleManager.propagateDirectoryUncordoned(directoryIds)
+}
 
       /**
        * TODO: move this action queue to handle thread so we can simplify 
concurrency handling
@@ -411,13 +420,17 @@ class BrokerServer(
         s"broker-${config.nodeId}-",
         config.brokerHeartbeatIntervalMs
       )
+      val initialCordonedLogDirs: util.Set[Uuid] = 
config.cordonedLogDirs().stream()
+        .map(dir => logManager.directoryId(dir).get)
+        .collect(Collectors.toSet())
       lifecycleManager.start(
         () => sharedServer.loader.lastAppliedOffset(),
         brokerLifecycleChannelManager,
         clusterId,
         listenerInfo.toBrokerRegistrationRequest,
         featuresRemapped,
-        logManager.readBrokerEpochFromCleanShutdownFiles()
+        logManager.readBrokerEpochFromCleanShutdownFiles(),
+        initialCordonedLogDirs
       )
 
       // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a467f18ace7..80c5a1235f0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.config
 import org.apache.kafka.network.SocketServer
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler}
 import org.apache.kafka.server.config.{DynamicConfig, 
DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, 
DynamicBrokerConfig => JDynamicBrokerConfig}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, 
MetricConfigs}
@@ -195,7 +195,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     addReconfigurable(new 
DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))
 
     addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
-    addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
+    addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, 
kafkaServer.replicaManager.directoryEventHandler))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
     addBrokerReconfigurable(kafkaServer.socketServer)
     addBrokerReconfigurable(new 
DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
@@ -536,7 +536,7 @@ trait BrokerReconfigurable {
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
 }
 
-class DynamicLogConfig(logManager: LogManager) extends BrokerReconfigurable 
with Logging {
+class DynamicLogConfig(logManager: LogManager, directoryEventHandler: 
DirectoryEventHandler) extends BrokerReconfigurable with Logging {
 
   override def reconfigurableConfigs: util.Set[String] = {
     JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS
@@ -577,8 +577,20 @@ class DynamicLogConfig(logManager: LogManager) extends 
BrokerReconfigurable with
       }
     }
 
+    def validateCordonedLogDirs(): Unit = {
+      val logDirs = newConfig.logDirs()
+      val cordonedLogDirs = newConfig.cordonedLogDirs()
+      cordonedLogDirs.asScala.foreach(dir =>
+        if (!logDirs.contains(dir)) {
+          throw new ConfigException(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, 
cordonedLogDirs, s"Invalid entry in 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG}: $dir. " +
+            s"All cordoned log dirs must be entries of 
${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}.")
+        }
+      )
+    }
+
     validateLogLocalRetentionMs()
     validateLogLocalRetentionBytes()
+    validateCordonedLogDirs()
   }
 
   private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = 
{
@@ -598,6 +610,16 @@ class DynamicLogConfig(logManager: LogManager) extends 
BrokerReconfigurable with
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     val newBrokerDefaults = new util.HashMap[String, 
Object](newConfig.extractLogConfigMap)
 
+    logManager.updateCordonedLogDirs(newConfig.cordonedLogDirs.asScala.toSet)
+    val newCordoned: Set[String] = newConfig.cordonedLogDirs.asScala.toSet -- 
oldConfig.cordonedLogDirs.asScala.toSet
+    val newUncordoned: Set[String] = oldConfig.cordonedLogDirs.asScala.toSet 
-- newConfig.cordonedLogDirs.asScala.toSet
+    if (newCordoned.nonEmpty) {
+      directoryEventHandler.handleCordoned(newCordoned.map(dir => 
logManager.directoryId(dir).get).toSet.asJava)
+    }
+    if (newUncordoned.nonEmpty) {
+      directoryEventHandler.handleUncordoned(newUncordoned.map(dir => 
logManager.directoryId(dir).get).toSet.asJava)
+    }
+
     logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
 
     updateLogsConfig(newBrokerDefaults.asScala)
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala 
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 46576d97d33..4ac473ad4bc 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.metadata.{BrokerState, MetadataCache}
 import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.BrokerLifecycleManager
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.NodeToControllerChannelManager
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager
@@ -105,6 +106,7 @@ trait KafkaBroker extends Logging {
   def credentialProvider: CredentialProvider
   def clientToControllerChannelManager: NodeToControllerChannelManager
   def tokenCache: DelegationTokenCache
+  def lifecycleManager: BrokerLifecycleManager
 
   // For backwards compatibility, we need to keep older metrics tied
   // to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d05ef90e6fb..c7beff0504b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -484,6 +484,16 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     advertisedListeners.filterNot(l => 
controllerListenerNames.contains(l.listener))
   }
 
+  def validateCordonedLogDirs(): Unit = {
+    val cordonedLogDirs = getList(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG)
+    if (cordonedLogDirs.contains(ServerLogConfigs.CORDONED_LOG_DIRS_ALL)) {
+      require(cordonedLogDirs.size == 1, s"When 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} is set to 
${ServerLogConfigs.CORDONED_LOG_DIRS_ALL}, it must not contain other values")
+    } else {
+      val unknownLogDirs = 
cordonedLogDirs.asScala.filter(!logDirs().contains(_))
+      require(unknownLogDirs.isEmpty, s"All entries in 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} must be present in 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} or 
${ServerLogConfigs.LOG_DIR_CONFIG}. Missing entries : 
${unknownLogDirs.mkString(", ")}")
+    }
+  }
+
   validateValues()
 
   private def validateValues(): Unit = {
@@ -611,6 +621,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
           s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}. 
The valid options based on the current configuration " +
           s"are ${listenerNames.map(_.value).mkString(",")}"
       )
+      validateCordonedLogDirs()
     }
 
     require(!effectiveAdvertisedBrokerListeners.exists(endpoint => 
endpoint.host=="0.0.0.0"),
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d2471b2ecd2..d6ed8f66efe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1117,6 +1117,8 @@ class ReplicaManager(val config: KafkaConfig,
             throw new InvalidTopicException("The topic name is too long.")
           if (!logManager.isLogDirOnline(destinationDir))
             throw new KafkaStorageException(s"Log directory $destinationDir is 
offline")
+          if (logManager.cordonedLogDirs().contains(destinationDir))
+            throw new InvalidReplicaAssignmentException(s"Log directory 
$destinationDir is cordoned")
 
           getPartition(topicPartition) match {
             case online: HostedPartition.Online[Partition] =>
@@ -1171,7 +1173,8 @@ class ReplicaManager(val config: KafkaConfig,
           case e@(_: InvalidTopicException |
                   _: LogDirNotFoundException |
                   _: ReplicaNotAvailableException |
-                  _: KafkaStorageException) =>
+                  _: KafkaStorageException |
+                  _: InvalidReplicaAssignmentException) =>
             warn(s"Unable to alter log dirs for $topicPartition", e)
             (topicPartition, Errors.forException(e))
           case e: NotLeaderOrFollowerException =>
@@ -1225,12 +1228,17 @@ class ReplicaManager(val config: KafkaConfig,
             Collections.emptyList[DescribeLogDirsTopic]()
         }
 
+        val isCordoned = if 
(metadataCache.metadataVersion().isCordonedLogDirsSupported)
+          logManager.cordonedLogDirs().contains(absolutePath)
+        else
+          false
         val describeLogDirsResult = new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
           .setLogDir(absolutePath)
           .setTopics(topicInfos)
           .setErrorCode(Errors.NONE.code)
           .setTotalBytes(totalBytes)
           .setUsableBytes(usableBytes)
+          .setIsCordoned(isCordoned)
         describeLogDirsResult
 
       } catch {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7736f6736e9..e4b26070728 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -65,6 +65,7 @@ class LogManagerTest {
   logProps.put(TopicConfig.RETENTION_MS_CONFIG, maxLogAgeMs: java.lang.Integer)
   val logConfig = new LogConfig(logProps)
   var logDir: File = _
+  var logDir2: File = _
   var logManager: LogManager = _
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
@@ -73,6 +74,7 @@ class LogManagerTest {
   @BeforeEach
   def setUp(): Unit = {
     logDir = TestUtils.tempDir()
+    logDir2 = TestUtils.tempDir()
     logManager = createLogManager()
     logManager.startup(Set.empty)
     assertEquals(initialTaskDelayMs, logManager.initialTaskDelayMs)
@@ -83,6 +85,7 @@ class LogManagerTest {
     if (logManager != null)
       logManager.shutdown()
     Utils.delete(logDir)
+    Utils.delete(logDir2)
     // Some tests assign a new LogManager
     if (logManager != null)
       logManager.liveLogDirs.foreach(Utils.delete)
@@ -1053,6 +1056,58 @@ class LogManagerTest {
     verifyMetrics()
   }
 
+  @Test
+  def testLogManagerMetrics(): Unit = {
+    KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.forEach((metricName: 
MetricName) =>
+      KafkaYammerMetrics.defaultRegistry.removeMetric(metricName))
+    logManager.shutdown()
+    logManager = createLogManager(Seq(logDir, logDir2))
+    def allMetrics(): Set[MetricName] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
+      .filter(metric => metric.getType.contains("LogManager"))
+      .toSet
+    def metric(filter: String): Gauge[Int] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter(entry => entry._1.getName.contains(filter))
+      .values.head.asInstanceOf[Gauge[Int]]
+    def logDirMetric(filter: String, logDir: File): Gauge[Int] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter(entry => entry._1.getName.contains(filter))
+      .filter(entry => entry._1.getScope.contains(logDir.getAbsolutePath))
+      .values.head.asInstanceOf[Gauge[Int]]
+    // expecting 6 metrics:
+    // - OfflineLogDirectoryCount
+    // - CordonedLogDirectoryCount
+    // - LogDirectoryOffline per log dir
+    // - LogDirectoryCordoned per log dir
+    assertEquals(6, allMetrics().size)
+    assertEquals(0, metric("OfflineLogDirectoryCount").value)
+    assertEquals(0, metric("CordonedLogDirectoryCount").value)
+    assertEquals(0, logDirMetric("LogDirectoryOffline", logDir).value)
+    assertEquals(0, logDirMetric("LogDirectoryOffline", logDir2).value)
+    assertEquals(0, logDirMetric("LogDirectoryCordoned", logDir).value)
+    assertEquals(0, logDirMetric("LogDirectoryCordoned", logDir2).value)
+
+    logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath))
+    assertEquals(1, metric("CordonedLogDirectoryCount").value)
+    assertEquals(1, logDirMetric("LogDirectoryCordoned", logDir).value)
+    assertEquals(0, logDirMetric("LogDirectoryCordoned", logDir2).value)
+    logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath, 
logDir2.getAbsolutePath))
+    assertEquals(2, metric("CordonedLogDirectoryCount").value)
+    assertEquals(1, logDirMetric("LogDirectoryCordoned", logDir).value)
+    assertEquals(1, logDirMetric("LogDirectoryCordoned", logDir2).value)
+
+    logManager.handleLogDirFailure(logDir.getAbsolutePath)
+    assertEquals(1, metric("OfflineLogDirectoryCount").value)
+    assertEquals(1, logDirMetric("LogDirectoryOffline", logDir).value)
+    assertEquals(0, logDirMetric("LogDirectoryOffline", logDir2).value)
+
+    try {
+      logManager.shutdown()
+    } catch {
+      case _: Throwable => // ignore
+    } finally {
+      logManager = null
+    }
+  }
+
   @Test
   def testMetricsAreRemovedWhenMovingCurrentToFutureLog(): Unit = {
     val dir1 = TestUtils.tempDir()
@@ -1325,4 +1380,33 @@ class LogManagerTest {
       Files.setPosixFilePermissions(logDir.toPath, permissions)
     }
   }
+
+  @Test
+  def testUpdateCordonedLogDirs(): Unit = {
+    logManager.shutdown()
+    logManager = createLogManager(Seq(this.logDir, this.logDir2))
+    assertTrue(logManager.cordonedLogDirs().isEmpty)
+
+    val cordonedDirs = Set(logDir.getAbsolutePath)
+    logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath))
+    assertEquals(cordonedDirs, logManager.cordonedLogDirs())
+  }
+
+  @Test
+  def testNextLogDirs(): Unit = {
+    logManager.shutdown()
+    logManager = createLogManager(Seq(this.logDir, this.logDir2))
+    val nextLogDirs = logManager.nextLogDirs()
+    assertTrue(nextLogDirs.contains(logDir))
+    assertTrue(nextLogDirs.contains(logDir2))
+
+    logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath))
+    val nextLogDirs2 = logManager.nextLogDirs()
+    assertFalse(nextLogDirs2.contains(logDir))
+    assertTrue(nextLogDirs2.contains(logDir2))
+
+    logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath, 
logDir2.getAbsolutePath))
+    val nextLogDirs3 = logManager.nextLogDirs()
+    assertFalse(nextLogDirs3.isEmpty)
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 86d587eae0f..d64d9f00b9c 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -72,7 +72,7 @@ class BrokerLifecycleManagerTest {
     assertEquals(BrokerState.NOT_RUNNING, manager.state)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty())
+      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
     TestUtils.retry(60000) {
       assertEquals(BrokerState.STARTING, manager.state)
     }
@@ -88,7 +88,7 @@ class BrokerLifecycleManagerTest {
     context.controllerNodeProvider.node.set(controllerNode)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.of(10L))
+      Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
     TestUtils.retry(60000) {
       assertEquals(1, context.mockChannelManager.unsentQueue.size)
       assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
@@ -117,7 +117,7 @@ class BrokerLifecycleManagerTest {
     assertEquals(1, context.mockClient.futureResponses().size)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty())
+      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
     // We should send the first registration request and get a failure 
immediately
     TestUtils.retry(60000) {
       context.poll()
@@ -154,7 +154,7 @@ class BrokerLifecycleManagerTest {
       new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty())
+      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
     TestUtils.retry(10000) {
       context.poll()
       manager.eventQueue.wakeup()
@@ -233,7 +233,7 @@ class BrokerLifecycleManagerTest {
     val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty())
+      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
     poll(ctx, manager, registration)
 
     def nextHeartbeatDirs(): Set[String] =
@@ -260,7 +260,7 @@ class BrokerLifecycleManagerTest {
 
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty())
+      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
     val request = poll(ctx, manager, 
registration).asInstanceOf[BrokerRegistrationRequest]
 
     assertEquals(logDirs, new util.HashSet(request.data.logDirs()))
@@ -276,7 +276,7 @@ class BrokerLifecycleManagerTest {
 
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.of(10L))
+      Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
 
     def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
     def nextHeartbeatRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
@@ -298,4 +298,63 @@ class BrokerLifecycleManagerTest {
     nextHeartbeatRequest()
     assertEquals(1200L, manager.brokerEpoch)
   }
+
+  @Test
+  def testRegistrationIncludesCordonedDirs(): Unit = {
+    val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"), 
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
+    val ctx = new RegistrationTestContext(configProperties)
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"registration-includes-cordoned-dirs-", logDirs)
+    val controllerNode = new Node(3000, "localhost", 8021)
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
+
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
+      Collections.emptyMap(), OptionalLong.empty(),
+      logDirs
+    )
+    val request = poll(ctx, manager, 
registration).asInstanceOf[BrokerRegistrationRequest]
+
+    assertEquals(logDirs, new util.HashSet(request.data.cordonedLogDirs()))
+  }
+
+  @Test
+  def testAlwaysSendsAccumulatedCordonedDirs(): Unit = {
+    val ctx = new RegistrationTestContext(configProperties)
+    var enabled = false
+    def cordonedLogDirsEnabled(): Boolean  = {
+      enabled
+    }
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"cordoned-dirs-sent-in-heartbeat-", 
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")),
+      () => {}, () => cordonedLogDirsEnabled())
+    val controllerNode = new Node(3000, "localhost", 8021)
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
+      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+    poll(ctx, manager, registration)
+
+    def nextHeartbeatDirs(): Set[Uuid] =
+      poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
+        .data().cordonedLogDirs().asScala.toSet
+    assertEquals(Set(), nextHeartbeatDirs())
+
+    val dir1 = Uuid.randomUuid()
+    val dir2 = Uuid.randomUuid()
+    manager.propagateDirectoryCordoned(util.Set.of(dir1))
+    assertEquals(Set(), nextHeartbeatDirs())
+
+    enabled = true
+    manager.propagateDirectoryCordoned(util.Set.of(dir1))
+    assertEquals(Set(dir1), nextHeartbeatDirs())
+    manager.propagateDirectoryCordoned(util.Set.of(dir2))
+    assertEquals(Set(dir1, dir2), nextHeartbeatDirs())
+    manager.propagateDirectoryUncordoned(util.Set.of(dir1))
+    assertEquals(Set(dir2), nextHeartbeatDirs())
+    manager.propagateDirectoryUncordoned(util.Set.of(dir2))
+    assertEquals(Set(), nextHeartbeatDirs())
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index ea69a599a15..15b9d7cb479 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicReference
 import kafka.log.LogManager
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.utils.TestUtils
-import org.apache.kafka.common.{Endpoint, Reconfigurable}
+import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid}
 import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
 import org.apache.kafka.common.internals.Plugin
@@ -37,6 +37,7 @@ import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
 import org.apache.kafka.network.{SocketServer => JSocketServer, 
SocketServerConfigs}
 import org.apache.kafka.server.DynamicThreadPool
 import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.common.DirectoryEventHandler
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, 
RemoteLogManagerConfig}
 import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, 
KafkaYammerMetrics, MetricConfigs}
@@ -46,9 +47,9 @@ import org.apache.kafka.storage.internals.log.{CleanerConfig, 
LogConfig, Produce
 import org.apache.kafka.test.MockMetricsReporter
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers.anyString
+import org.mockito.ArgumentMatchers.{anyString, anySet}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-import org.mockito.Mockito.{mock, verify, verifyNoMoreInteractions, when}
+import org.mockito.Mockito.{mock, never, times, verify, 
verifyNoMoreInteractions, when}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Set
@@ -516,6 +517,8 @@ class DynamicBrokerConfigTest {
     val producerStateManagerConfig: ProducerStateManagerConfig = 
mock(classOf[ProducerStateManagerConfig])
     
when(logManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
     when(kafkaServer.logManager).thenReturn(logManager)
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    when(kafkaServer.replicaManager).thenReturn(replicaManager)
 
     val authorizer = new TestAuthorizer
     val authorizerPlugin: Plugin[Authorizer] = Plugin.wrapInstance(authorizer, 
null, "authorizer.class.name")
@@ -701,7 +704,7 @@ class DynamicBrokerConfigTest {
     val props = TestUtils.createBrokerConfig(0, port = 8181)
     props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[DirectoryEventHandler]))
     config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -724,7 +727,7 @@ class DynamicBrokerConfigTest {
     val props = TestUtils.createBrokerConfig(0, port = 8181)
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[DirectoryEventHandler]))
     config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -1009,7 +1012,7 @@ class DynamicBrokerConfigTest {
     props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
retentionMs.toString)
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, 
retentionBytes.toString)
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[DirectoryEventHandler]))
     config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -1026,10 +1029,12 @@ class DynamicBrokerConfigTest {
     val config = KafkaConfig(origProps)
     val serverMock = Mockito.mock(classOf[BrokerServer])
     val logManagerMock = Mockito.mock(classOf[LogManager])
+    val directoryEventHandler = Mockito.mock(classOf[DirectoryEventHandler])
 
     Mockito.when(serverMock.config).thenReturn(config)
     Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
     Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+    
Mockito.when(logManagerMock.directoryId(ArgumentMatchers.anyString())).thenAnswer(_
 => Some(Uuid.randomUuid()))
 
     val currentDefaultLogConfig = new AtomicReference(new LogConfig(new 
Properties))
     Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => 
currentDefaultLogConfig.get())
@@ -1037,7 +1042,7 @@ class DynamicBrokerConfigTest {
       .thenAnswer(invocation => 
currentDefaultLogConfig.set(invocation.getArgument(0)))
 
     config.dynamicConfig.initialize(None)
-    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicLogConfig(logManagerMock))
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicLogConfig(logManagerMock, directoryEventHandler))
   }
 
   @Test
@@ -1053,6 +1058,59 @@ class DynamicBrokerConfigTest {
     assertEquals(TimeUnit.MINUTES.toMillis(1), 
ctx.currentDefaultLogConfig.get().retentionMs)
   }
 
+  @Test
+  def testDynamicLogConfigCordonedLogDirs(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, logDirCount = 2)
+    val ctx = new DynamicLogConfigContext(origProps)
+    assertTrue(ctx.config.cordonedLogDirs.isEmpty)
+    val logDirs = ctx.config.logDirs()
+    verify(ctx.directoryEventHandler, never()).handleCordoned(anySet)
+    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+    // Cordoning 1 new log dir, so 1 new handleCordoned invocation
+    val props = new Properties()
+    props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, logDirs.get(0))
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
+    verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
+    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+    // When using *, no other entries must be specified, so no new invocations
+    props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*,/invalid/log/dir")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
+    verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
+    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+    // Invalid log dir, so no new invocations
+    props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/invalid/log/dir")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
+    verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
+    verify(ctx.directoryEventHandler, times(0)).handleUncordoned(anySet)
+
+    // * cordons the 2nd log dir, so 1 new handleCordoned invocation
+    props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(logDirs, ctx.config.cordonedLogDirs)
+    verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
+    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+    // clearing all cordoned log dirs, so 1 new handleUncordoned invocation
+    props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertTrue(ctx.config.cordonedLogDirs.isEmpty)
+    verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
+    verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+
+    // * cordons all log dirs, so 1 new handleCordoned invocation
+    props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, String.join(",", 
logDirs))
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(logDirs, ctx.config.cordonedLogDirs)
+    verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
+    verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+  }
+
   @Test
   def testLogRetentionTimeMinutesIsNotDynamicallyReconfigurable(): Unit = {
     val origProps = TestUtils.createBrokerConfig(0, port = 8181)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c44a4934b35..cf27943ca03 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1549,7 +1549,6 @@ class KafkaConfigTest {
     props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, dataDir)
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
-    KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(metadataDir, config.metadataLogDir)
@@ -1567,13 +1566,53 @@ class KafkaConfigTest {
     props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, s"$dataDir1,$dataDir2")
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
-    KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(dataDir1, config.metadataLogDir)
     assertEquals(util.List.of(dataDir1, dataDir2), config.logDirs)
   }
 
+  @Test
+  def testCordonAllLogDirs(): Unit = {
+    val dataDir1 = "/path/to/data/dir/1"
+    val dataDir2 = "/path/to/data/dir/2"
+
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
+    props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, s"$dataDir1,$dataDir2")
+    props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+    val config = KafkaConfig.fromProps(props)
+    assertEquals(config.logDirs(), config.cordonedLogDirs())
+  }
+
+  @Test
+  def testInvalidCordonedLogDirs(): Unit = {
+    val dataDir1 = "/path/to/data/dir/1"
+    val dataDir2 = "/path/to/data/dir/2"
+
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
+    props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, s"$dataDir1,$dataDir2")
+    props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/other/path")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+    val e = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
+    assertTrue(e.getMessage.contains("/other/path"))
+
+    props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, 
"/other/path,/another")
+    val e2 = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
+    assertTrue(e2.getMessage.contains("/other/path"))
+    assertTrue(e2.getMessage.contains("/another"))
+
+    props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, 
s"$dataDir1,*")
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
+  }
+
   @Test
   def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = {
     val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 580af684cd6..515cb65ee02 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -89,7 +89,7 @@ import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
-import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
 import java.io.{ByteArrayInputStream, File}
 import java.net.InetAddress
@@ -4021,6 +4021,30 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testAlterReplicaLogDirsToCordonedDir(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1))
+    val logMgr = 
Mockito.spy(TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))))
+    when(logMgr.cordonedLogDirs()).thenReturn(config.logDirs.asScala.toSet)
+    val replicaManager = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = logMgr,
+      quotaManagers = quotaManager,
+      metadataCache = new KRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val errors = replicaManager.alterReplicaLogDirs(Map(tp -> 
config.logDirs.get(0)))
+      assertEquals(Errors.INVALID_REPLICA_ASSIGNMENT, errors(tp))
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   @Test
   def testPartitionMetadataFile(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
@@ -5631,6 +5655,7 @@ class ReplicaManagerTest {
         assertTrue(response.totalBytes > 0)
         assertTrue(response.usableBytes >= 0)
         assertFalse(response.topics().isEmpty)
+        assertFalse(response.isCordoned)
         response.topics().forEach(t => assertFalse(t.partitions().isEmpty))
       }
     } finally {
@@ -5663,6 +5688,7 @@ class ReplicaManagerTest {
         assertTrue(response.totalBytes > 0)
         assertTrue(response.usableBytes >= 0)
         assertTrue(response.topics().isEmpty)
+        assertFalse(response.isCordoned)
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index 2762d36f487..58cc5f65528 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -328,21 +328,24 @@ public class BrokerHeartbeatManager {
     }
 
     Iterator<UsableBroker> usableBrokers(
-        Function<Integer, Optional<String>> idToRack
+        Function<Integer, Optional<String>> idToRack,
+        Function<Integer, Boolean> hasUncordonedDirs
     ) {
-        return new UsableBrokerIterator(brokers.values().iterator(),
-            idToRack);
+        return new UsableBrokerIterator(brokers.values().iterator(), idToRack, 
hasUncordonedDirs);
     }
 
     static class UsableBrokerIterator implements Iterator<UsableBroker> {
         private final Iterator<BrokerHeartbeatState> iterator;
         private final Function<Integer, Optional<String>> idToRack;
+        private final Function<Integer, Boolean> hasUncordonedDirs;
         private UsableBroker next;
 
         UsableBrokerIterator(Iterator<BrokerHeartbeatState> iterator,
-                             Function<Integer, Optional<String>> idToRack) {
+                             Function<Integer, Optional<String>> idToRack,
+                             Function<Integer, Boolean> hasUncordonedDirs) {
             this.iterator = iterator;
             this.idToRack = idToRack;
+            this.hasUncordonedDirs = hasUncordonedDirs;
             this.next = null;
         }
 
@@ -357,7 +360,7 @@ public class BrokerHeartbeatManager {
                     return false;
                 }
                 result = iterator.next();
-            } while (result.shuttingDown());
+            } while (result.shuttingDown() || 
!hasUncordonedDirs.apply(result.id()));
             Optional<String> rack = idToRack.apply(result.id());
             next = new UsableBroker(result.id(), rack, result.fenced());
             return true;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 4a355c206b4..4fc14209e34 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -432,7 +432,9 @@ public class ClusterControlManager {
         if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             record.setLogDirs(request.logDirs());
         }
-
+        if 
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
+            record.setCordonedLogDirs(request.cordonedLogDirs());
+        }
         if (!request.incarnationId().equals(prevIncarnationId)) {
             int prevNumRecords = records.size();
             boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
@@ -553,6 +555,22 @@ public class ClusterControlManager {
         return OptionalLong.empty();
     }
 
+    public void updateCordonedLogDirs(int brokerId, List<Uuid> 
cordonedLogDirs) {
+        brokerRegistrations.compute(brokerId,
+                (k, brokerRegistration) -> new BrokerRegistration.Builder().
+                        setId(brokerId).
+                        setEpoch(brokerRegistration.epoch()).
+                        setIncarnationId(brokerRegistration.incarnationId()).
+                        setListeners(brokerRegistration.listeners()).
+                        
setSupportedFeatures(brokerRegistration.supportedFeatures()).
+                        setRack(brokerRegistration.rack()).
+                        setFenced(brokerRegistration.fenced()).
+                        
setInControlledShutdown(brokerRegistration.inControlledShutdown()).
+                        setDirectories(brokerRegistration.directories()).
+                        setCordonedDirectories(cordonedLogDirs).
+                        build());
+    }
+
     public void replay(RegisterBrokerRecord record, long offset) {
         registerBrokerRecordOffsets.put(record.brokerId(), offset);
         int brokerId = record.brokerId();
@@ -575,6 +593,7 @@ public class ClusterControlManager {
                 setInControlledShutdown(record.inControlledShutdown()).
                 setIsMigratingZkBroker(record.isMigratingZkBroker()).
                 setDirectories(record.logDirs()).
+                setCordonedDirectories(record.cordonedLogDirs()).
                     build());
         updateDirectories(brokerId, prevRegistration == null ? null : 
prevRegistration.directories(), record.logDirs());
         if (heartbeatManager != null) {
@@ -617,6 +636,7 @@ public class ClusterControlManager {
             record.epoch(),
             BrokerRegistrationFencingChange.FENCE.asBoolean(),
             BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
+            Optional.empty(),
             Optional.empty()
         );
     }
@@ -628,6 +648,7 @@ public class ClusterControlManager {
             record.epoch(),
             BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
             BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
+            Optional.empty(),
             Optional.empty()
         );
     }
@@ -642,13 +663,15 @@ public class ClusterControlManager {
                 () -> new IllegalStateException(String.format("Unable to 
replay %s: unknown " +
                     "value for inControlledShutdown field: %x", record, 
record.inControlledShutdown())));
         Optional<List<Uuid>> directoriesChange = 
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
+        Optional<List<Uuid>> cordonedDirectoriesChange = 
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
         replayRegistrationChange(
             record,
             record.brokerId(),
             record.brokerEpoch(),
             fencingChange.asBoolean(),
             inControlledShutdownChange.asBoolean(),
-            directoriesChange
+            directoriesChange,
+            cordonedDirectoriesChange
         );
     }
 
@@ -658,7 +681,8 @@ public class ClusterControlManager {
         long brokerEpoch,
         Optional<Boolean> fencingChange,
         Optional<Boolean> inControlledShutdownChange,
-        Optional<List<Uuid>> directoriesChange
+        Optional<List<Uuid>> directoriesChange,
+        Optional<List<Uuid>> cordonedDirectoriesChange
     ) {
         BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
         if (curRegistration == null) {
@@ -671,7 +695,8 @@ public class ClusterControlManager {
             BrokerRegistration nextRegistration = curRegistration.cloneWith(
                 fencingChange,
                 inControlledShutdownChange,
-                directoriesChange
+                directoriesChange,
+                cordonedDirectoriesChange
             );
             if (!curRegistration.equals(nextRegistration)) {
                 log.info("Replayed {} modifying the registration for broker 
{}: {}",
@@ -705,7 +730,8 @@ public class ClusterControlManager {
             throw new RuntimeException("ClusterControlManager is not active.");
         }
         return heartbeatManager.usableBrokers(
-            id -> brokerRegistrations.get(id).rack());
+            id -> brokerRegistrations.get(id).rack(),
+            id -> brokerRegistrations.get(id).hasUncordonedDirs());
     }
 
     /**
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 4a0219d1759..31665883e3c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.mutable.BoundedList;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
@@ -60,6 +61,7 @@ import static 
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
 import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
 import static 
org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG;
 
 
 public class ConfigurationControlManager {
@@ -336,6 +338,8 @@ public class ConfigurationControlManager {
                 return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
             } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
                 return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+            } else if (isCordonedLogDirsDisallowed(configRecord)) {
+                return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
             } else if (configRecord.value() == null) {
                 allConfigs.remove(configRecord.name());
             } else if (configRecord.value().length() > Short.MAX_VALUE) {
@@ -353,6 +357,8 @@ public class ConfigurationControlManager {
                 return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
             } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
                 return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+            } else if (isCordonedLogDirsDisallowed(configRecord)) {
+                return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
             } else {
                 allConfigs.remove(configRecord.name());
             }
@@ -390,6 +396,10 @@ public class ConfigurationControlManager {
         new ApiError(INVALID_CONFIG, "The configuration value cannot be added 
because " +
             "it exceeds the maximum value size of " + Short.MAX_VALUE + " 
bytes.");
 
+    static final ApiError DISALLOWED_CORDONED_LOG_DIRS_ERROR =
+            new ApiError(INVALID_CONFIG, "The " + CORDONED_LOG_DIRS_CONFIG + " 
configuration value cannot be " +
+                    "set because it requires metadata.version >= " + 
MetadataVersion.IBP_4_3_IV0);
+
     boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
         if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
                 configRecord.resourceType() == BROKER.id() &&
@@ -401,6 +411,14 @@ public class ConfigurationControlManager {
         return false;
     }
 
+    boolean isCordonedLogDirsDisallowed(ConfigRecord configRecord) {
+        if (configRecord.name().equals(CORDONED_LOG_DIRS_CONFIG) &&
+                configRecord.resourceType() == BROKER.id()) {
+            return 
!featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported();
+        }
+        return false;
+    }
+
     boolean isDisallowedClusterMinIsrTransition(ConfigRecord configRecord) {
         if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
                 configRecord.resourceType() == BROKER.id() &&
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 316213bbc04..7e991169e9a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1531,6 +1531,35 @@ public class ReplicationControlManager {
         }
     }
 
+    /**
+     * Generates the appropriate record to handle a list of directories that 
are cordoned.
+     *
+     * @param brokerId     The broker id.
+     * @param brokerEpoch  The broker epoch.
+     * @param cordonedDirs The list of directories that are cordoned.
+     * @param records      The record list to append to.
+     */
+    void handleDirectoriesCordoned(
+            int brokerId,
+            long brokerEpoch,
+            List<Uuid> cordonedDirs,
+            List<ApiMessageAndVersion> records
+    ) {
+        BrokerRegistration registration = 
clusterControl.registration(brokerId);
+        List<Uuid> newCordonedDirs = 
registration.directoryIntersection(cordonedDirs);
+        if (!newCordonedDirs.isEmpty()) {
+            records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
+                    setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
+                    setCordonedLogDirs(newCordonedDirs),
+                    (short) 3));
+            if (log.isDebugEnabled()) {
+                List<Uuid> newUncordonedDirs = 
registration.directoryDifference(newCordonedDirs);
+                log.debug("Directories {} in broker {} marked cordoned, 
uncordoned directories: {}",
+                        newCordonedDirs, brokerId, newUncordonedDirs);
+            }
+        }
+    }
+
     ControllerResult<ElectLeadersResponseData> 
electLeaders(ElectLeadersRequestData request) {
         ElectionType electionType = electionType(request.electionType());
         List<ApiMessageAndVersion> records = 
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -1667,6 +1696,10 @@ public class ReplicationControlManager {
         if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             handleDirectoriesOffline(brokerId, brokerEpoch, 
request.offlineLogDirs(), records);
         }
+        if 
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
+            clusterControl.updateCordonedLogDirs(brokerId, 
request.cordonedLogDirs());
+            handleDirectoriesCordoned(brokerId, brokerEpoch, 
request.cordonedLogDirs(), records);
+        }
         boolean isCaughtUp = request.currentMetadataOffset() >= 
registerBrokerRecordOffset;
         BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
                 states.next().fenced(),
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index 8f0556959b9..bfdf9810a0b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -115,6 +115,7 @@ public final class ClusterDelta {
         changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
             BrokerRegistrationFencingChange.FENCE.asBoolean(),
             Optional.empty(),
+            Optional.empty(),
             Optional.empty()
         )));
     }
@@ -124,6 +125,7 @@ public final class ClusterDelta {
         changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
             BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
             Optional.empty(),
+            Optional.empty(),
             Optional.empty()
         )));
     }
@@ -140,10 +142,12 @@ public final class ClusterDelta {
                 () -> new IllegalStateException(String.format("Unable to 
replay %s: unknown " +
                     "value for inControlledShutdown field: %d", record, 
record.inControlledShutdown())));
         Optional<List<Uuid>> directoriesChange = 
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
+        Optional<List<Uuid>> cordonedDirectoriesChange = 
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
         BrokerRegistration nextRegistration = curRegistration.cloneWith(
             fencingChange.asBoolean(),
             inControlledShutdownChange.asBoolean(),
-            directoriesChange
+            directoriesChange,
+            cordonedDirectoriesChange
         );
         if (!curRegistration.equals(nextRegistration)) {
             changedBrokers.put(record.brokerId(), 
Optional.of(nextRegistration));
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 56b2bdf375e..4dcb5ba1b36 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -42,6 +42,7 @@ import java.util.stream.Collectors;
  * An immutable class which represents broker registrations.
  */
 public class BrokerRegistration {
+
     public static class Builder {
         private int id;
         private long epoch;
@@ -53,6 +54,7 @@ public class BrokerRegistration {
         private boolean inControlledShutdown;
         private boolean isMigratingZkBroker;
         private List<Uuid> directories;
+        private List<Uuid> cordonedDirectories;
 
         public Builder() {
             this.id = 0;
@@ -65,6 +67,7 @@ public class BrokerRegistration {
             this.inControlledShutdown = false;
             this.isMigratingZkBroker = false;
             this.directories = List.of();
+            this.cordonedDirectories = List.of();
         }
 
         public Builder setId(int id) {
@@ -127,6 +130,11 @@ public class BrokerRegistration {
             return this;
         }
 
+        public Builder setCordonedDirectories(List<Uuid> cordonedDirectories) {
+            this.cordonedDirectories = cordonedDirectories;
+            return this;
+        }
+
         public BrokerRegistration build() {
             return new BrokerRegistration(
                 id,
@@ -138,7 +146,8 @@ public class BrokerRegistration {
                 fenced,
                 inControlledShutdown,
                 isMigratingZkBroker,
-                directories);
+                directories,
+                cordonedDirectories);
         }
     }
 
@@ -152,6 +161,7 @@ public class BrokerRegistration {
     private final boolean inControlledShutdown;
     private final boolean isMigratingZkBroker;
     private final List<Uuid> directories;
+    private final List<Uuid> cordonedDirectories;
 
     private BrokerRegistration(
         int id,
@@ -163,7 +173,8 @@ public class BrokerRegistration {
         boolean fenced,
         boolean inControlledShutdown,
         boolean isMigratingZkBroker,
-        List<Uuid> directories
+        List<Uuid> directories,
+        List<Uuid> cordonedDirectories
     ) {
         this.id = id;
         this.epoch = epoch;
@@ -185,6 +196,7 @@ public class BrokerRegistration {
         directories = new ArrayList<>(directories);
         directories.sort(Uuid::compareTo);
         this.directories = Collections.unmodifiableList(directories);
+        this.cordonedDirectories = 
Collections.unmodifiableList(cordonedDirectories);
     }
 
     public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -209,7 +221,8 @@ public class BrokerRegistration {
             record.fenced(),
             record.inControlledShutdown(),
             record.isMigratingZkBroker(),
-            record.logDirs());
+            record.logDirs(),
+            record.cordonedLogDirs());
     }
 
     public int id() {
@@ -260,10 +273,21 @@ public class BrokerRegistration {
         return directories;
     }
 
+    public List<Uuid> cordonedDirectories() {
+        return cordonedDirectories;
+    }
+
     public boolean hasOnlineDir(Uuid dir) {
         return DirectoryId.isOnline(dir, directories);
     }
 
+    public boolean hasUncordonedDirs() {
+        if (directories.isEmpty()) return true;
+        List<Uuid> dirs = new ArrayList<>(directories);
+        dirs.removeAll(cordonedDirectories);
+        return !dirs.isEmpty();
+    }
+
     public List<Uuid> directoryIntersection(List<Uuid> otherDirectories) {
         List<Uuid> results = new ArrayList<>();
         for (Uuid directory : directories) {
@@ -307,6 +331,12 @@ public class BrokerRegistration {
             options.handleLoss("the online log directories of one or more 
brokers");
         }
 
+        if (cordonedDirectories.isEmpty() || 
options.metadataVersion().isCordonedLogDirsSupported()) {
+            registrationRecord.setCordonedLogDirs(cordonedDirectories);
+        } else {
+            options.handleLoss("the cordoned log directories of one or more 
brokers");
+        }
+
         for (Entry<String, Endpoint> entry : listeners.entrySet()) {
             Endpoint endpoint = entry.getValue();
             registrationRecord.endPoints().add(new BrokerEndpoint().
@@ -330,7 +360,7 @@ public class BrokerRegistration {
     @Override
     public int hashCode() {
         return Objects.hash(id, epoch, incarnationId, listeners, 
supportedFeatures,
-            rack, fenced, inControlledShutdown, isMigratingZkBroker, 
directories);
+            rack, fenced, inControlledShutdown, isMigratingZkBroker, 
directories, cordonedDirectories);
     }
 
     @Override
@@ -345,7 +375,8 @@ public class BrokerRegistration {
             other.fenced == fenced &&
             other.inControlledShutdown == inControlledShutdown &&
             other.isMigratingZkBroker == isMigratingZkBroker &&
-            other.directories.equals(directories);
+            other.directories.equals(directories) &&
+            other.cordonedDirectories.equals(cordonedDirectories);
     }
 
     @Override
@@ -367,19 +398,25 @@ public class BrokerRegistration {
                 ", inControlledShutdown=" + inControlledShutdown +
                 ", isMigratingZkBroker=" + isMigratingZkBroker +
                 ", directories=" + directories +
+                ", cordonedDirectories=" + cordonedDirectories +
                 ")";
     }
 
     public BrokerRegistration cloneWith(
         Optional<Boolean> fencingChange,
         Optional<Boolean> inControlledShutdownChange,
-        Optional<List<Uuid>> directoriesChange
+        Optional<List<Uuid>> directoriesChange,
+        Optional<List<Uuid>> cordonedDirectoriesChange
     ) {
         boolean newFenced = fencingChange.orElse(fenced);
         boolean newInControlledShutdownChange = 
inControlledShutdownChange.orElse(inControlledShutdown);
         List<Uuid> newDirectories = directoriesChange.orElse(directories);
+        List<Uuid> newCordonedDirectories = 
cordonedDirectoriesChange.orElse(cordonedDirectories);
 
-        if (newFenced == fenced && newInControlledShutdownChange == 
inControlledShutdown && newDirectories.equals(directories))
+        if (newFenced == fenced
+                && newInControlledShutdownChange == inControlledShutdown
+                && newDirectories.equals(directories)
+                && newCordonedDirectories.equals(cordonedDirectories))
             return this;
 
         return new BrokerRegistration(
@@ -392,7 +429,8 @@ public class BrokerRegistration {
             newFenced,
             newInControlledShutdownChange,
             isMigratingZkBroker,
-            newDirectories
+            newDirectories,
+            newCordonedDirectories
         );
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index 0c597bc7f89..099d4997d9b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -404,7 +404,7 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
 
     private static void throwInvalidReplicationFactorIfZero(int numUnfenced) {
         if (numUnfenced == 0) {
-            throw new InvalidReplicationFactorException("All brokers are 
currently fenced.");
+            throw new InvalidReplicationFactorException("All brokers are 
currently fenced, or have all their log directories cordoned.");
         }
     }
 
@@ -412,7 +412,7 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
         if (replicationFactor > numTotalBrokers) {
             throw new InvalidReplicationFactorException("The target 
replication factor " +
                     "of " + replicationFactor + " cannot be reached because 
only " +
-                    numTotalBrokers + " broker(s) are registered.");
+                    numTotalBrokers + " broker(s) are registered or some 
brokers have all their log directories cordoned.");
         }
     }
 
diff --git 
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
 
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
index 7a484a6aeb4..5329e53b2c7 100644
--- 
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
+++ 
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
@@ -15,11 +15,12 @@
 
 // Version 1 adds InControlledShutdown
 // Version 2 adds LogDirs
+// Version 3 adds CordonedLogDirs
 {
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
-  "validVersions": "0-2",
+  "validVersions": "0-3",
   "flexibleVersions": "0+",
   "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
@@ -31,6 +32,8 @@
    { "name": "InControlledShutdown", "type": "int8", "versions": "1+", 
"taggedVersions": "1+", "tag": 1,
      "about": "0 if no change, 1 if the broker is in controlled shutdown." },
    { "name": "LogDirs", "type":  "[]uuid", "versions":  "2+", 
"taggedVersions": "2+", "tag": 2,
-     "about": "Log directories configured in this broker which are available." 
}
+     "about": "Log directories configured in this broker which are available." 
},
+   { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "3+", 
"taggedVersions": "3+", "tag": "3",
+     "about": "Log directories that are cordoned." }
   ]
 }
diff --git 
a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json 
b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index 48e5c466c69..b7db680cbd3 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -16,11 +16,12 @@
 // Version 1 adds InControlledShutdown
 // Version 2 adds IsMigratingZkBroker
 // Version 3 adds LogDirs
+// Version 4 adds CordonedLogDirs
 {
   "apiKey": 0,
   "type": "metadata",
   "name": "RegisterBrokerRecord",
-  "validVersions": "0-3",
+  "validVersions": "0-4",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
@@ -58,6 +59,8 @@
     { "name": "InControlledShutdown", "type": "bool", "versions": "1+", 
"default": "false",
       "about": "True if the broker is in controlled shutdown." },
     { "name": "LogDirs", "type":  "[]uuid", "versions":  "3+", 
"taggedVersions": "3+", "tag": 0,
-      "about": "Log directories configured in this broker which are 
available." }
+      "about": "Log directories configured in this broker which are 
available." },
+    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", 
"taggedVersions": "4+", "tag": "1",
+      "about": "Log directories that are cordoned." }
   ]
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index 21bca1150fc..3dfeb0ef940 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -110,7 +110,8 @@ public class BrokerHeartbeatManagerTest {
         Set<UsableBroker> brokers = new HashSet<>();
         for (Iterator<UsableBroker> iterator = new UsableBrokerIterator(
             manager.brokers().iterator(),
-            id -> id % 2 == 0 ? Optional.of("rack1") : Optional.of("rack2"));
+            id -> id % 2 == 0 ? Optional.of("rack1") : Optional.of("rack2"),
+            id -> id % 3 != 0);
              iterator.hasNext(); ) {
             brokers.add(iterator.next());
         }
@@ -131,10 +132,8 @@ public class BrokerHeartbeatManagerTest {
         manager.touch(4, true, 100);
         assertEquals(98L, manager.lowestActiveOffset());
         Set<UsableBroker> expected = new HashSet<>();
-        expected.add(new UsableBroker(0, Optional.of("rack1"), false));
         expected.add(new UsableBroker(1, Optional.of("rack2"), false));
         expected.add(new UsableBroker(2, Optional.of("rack1"), false));
-        expected.add(new UsableBroker(3, Optional.of("rack2"), false));
         expected.add(new UsableBroker(4, Optional.of("rack1"), true));
         assertEquals(expected, usableBrokersToSet(manager));
         manager.maybeUpdateControlledShutdownOffset(2, 0);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 2c93d1100ec..1a1d5b74c41 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ConfigSynonym;
+import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
 
@@ -61,6 +62,7 @@ import static 
org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
+import static 
org.apache.kafka.controller.ConfigurationControlManager.DISALLOWED_CORDONED_LOG_DIRS_ERROR;
 import static 
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -554,6 +556,29 @@ public class ConfigurationControlManagerTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testCordonedLogDirsFeature(boolean enabled) {
+        FeatureControlManager featureManager = new 
FeatureControlManager.Builder().
+                setQuorumFeatures(new QuorumFeatures(0,
+                        QuorumFeatures.defaultSupportedFeatureMap(true),
+                        List.of())).
+                build();
+        featureManager.replay(new FeatureLevelRecord().
+                setName(MetadataVersion.FEATURE_NAME).
+                setFeatureLevel(enabled ? 
MetadataVersion.LATEST_PRODUCTION.featureLevel() : 
MetadataVersion.IBP_4_2_IV1.featureLevel()));
+        ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+                setFeatureControl(featureManager).
+                setKafkaConfigSchema(SCHEMA).
+                build();
+
+        ControllerResult<ApiError> result = manager.incrementalAlterConfig(new 
ConfigResource(ConfigResource.Type.BROKER, "1"),
+                toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, 
entry(SET, "*"))),
+                true);
+
+        assertEquals(enabled ? ApiError.NONE : 
DISALLOWED_CORDONED_LOG_DIRS_ERROR, result.response());
+    }
+
     private FeatureControlManager createFeatureControlManager() {
         FeatureControlManager featureControlManager = new 
FeatureControlManager.Builder().build();
         featureControlManager.replay(new FeatureLevelRecord().
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 721ef85de93..54e68da302a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1011,7 +1011,7 @@ public class QuorumControllerTest {
                 createTopicsRequestData, Set.of("foo")).get().
                     topics().find("foo").errorCode());
             assertEquals("Unable to replicate the partition 1 time(s): All 
brokers " +
-                "are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT,
+                "are currently fenced, or have all their log directories 
cordoned.", active.createTopics(ANONYMOUS_CONTEXT,
                     createTopicsRequestData, Set.of("foo")).
                         get().topics().find("foo").errorMessage());
             assertEquals(new BrokerHeartbeatReply(true, false, false, false),
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 7024d0de3a2..9b97d8eeb54 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -636,7 +636,7 @@ public class ReplicationControlManagerTest {
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
             setErrorCode(INVALID_REPLICATION_FACTOR.code()).
                 setErrorMessage("Unable to replicate the partition 3 time(s): 
All " +
-                    "brokers are currently fenced."));
+                    "brokers are currently fenced, or have all their log 
directories cordoned."));
         assertEquals(expectedResponse, result.response());
 
         ctx.registerBrokers(0, 1, 2);
@@ -894,7 +894,7 @@ public class ReplicationControlManagerTest {
             setErrorCode(INVALID_REPLICATION_FACTOR.code()).
             setErrorMessage("Unable to replicate the partition 4 time(s): The 
target " +
                 "replication factor of 4 cannot be reached because only 3 
broker(s) " +
-                "are registered."));
+                "are registered or some brokers have all their log directories 
cordoned."));
         assertEquals(expectedResponse, result.response());
     }
 
@@ -3411,6 +3411,27 @@ public class ReplicationControlManagerTest {
         return records.stream().filter(r -> 
clazz.equals(r.message().getClass())).collect(Collectors.toList());
     }
 
+    @Test
+    void testHandleDirectoriesCordoned() {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
+        int b1 = 101;
+        Uuid dir1b1 = Uuid.fromString("suitdzfTTdqoWcy8VqmkUg");
+        Uuid dir2b1 = Uuid.fromString("yh3acnzGSeurSTj8aIhOjw");
+        ctx.registerBrokersWithDirs(b1, List.of(dir1b1, dir2b1));
+        ctx.unfenceBrokers(b1);
+        assertEquals(List.of(), 
ctx.clusterControl.registration(b1).cordonedDirectories());
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ctx.replicationControl.handleDirectoriesCordoned(b1, 
defaultBrokerEpoch(b1), List.of(dir1b1), records);
+        assertEquals(
+                List.of(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord()
+                        .setBrokerId(b1).setBrokerEpoch(defaultBrokerEpoch(b1))
+                        .setCordonedLogDirs(List.of(dir1b1)), (short) 3)),
+                filter(records, BrokerRegistrationChangeRecord.class)
+        );
+        ctx.replay(records);
+        assertEquals(List.of(dir1b1), 
ctx.clusterControl.registration(b1).cordonedDirectories());
+    }
+
     @ParameterizedTest
     @CsvSource({"false, false", "false, true", "true, false", "true, true"})
     void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean 
useLegacyAlterConfigs) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
index 6d7b8898810..ab342b26087 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
@@ -69,7 +69,8 @@ public class ClusterImageBrokersNodeTest {
             "fenced=false, " +
             "inControlledShutdown=false, " +
             "isMigratingZkBroker=false, " +
-            "directories=[JsnDDNVyTL289kYk6sPzig, anCdBWcFTlu8gE1wP6bh3g])",
+            "directories=[JsnDDNVyTL289kYk6sPzig, anCdBWcFTlu8gE1wP6bh3g], " +
+            "cordonedDirectories=[])",
             child.stringify());
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 49ac33490b0..577338203ec 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -86,6 +86,7 @@ public class BrokerRegistrationTest {
             setInControlledShutdown(true).
             setIsMigratingZkBroker(true).
             setDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
+            
setCordonedDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
             build());
 
     @Test
@@ -117,20 +118,22 @@ public class BrokerRegistrationTest {
             "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
-            "rack=Optional.empty, fenced=true, inControlledShutdown=false, 
isMigratingZkBroker=false, directories=[])",
+            "rack=Optional.empty, fenced=true, inControlledShutdown=false, 
isMigratingZkBroker=false, " +
+            "directories=[], cordonedDirectories=[])",
             REGISTRATIONS.get(1).toString());
         assertEquals("BrokerRegistration(id=2, epoch=0, " +
             "incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 
2-3}, " +
-            "rack=Optional[myrack], fenced=false, inControlledShutdown=true, 
isMigratingZkBroker=false, directories=[])",
+            "rack=Optional[myrack], fenced=false, inControlledShutdown=true, 
isMigratingZkBroker=false, " +
+            "directories=[], cordonedDirectories=[])",
             REGISTRATIONS.get(2).toString());
         assertEquals("BrokerRegistration(id=3, epoch=0, " +
             "incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9093)], 
supportedFeatures={metadata.version: 7}, " +
             "rack=Optional.empty, fenced=false, inControlledShutdown=true, 
isMigratingZkBroker=true, " +
-            "directories=[r4HpEsMuST6nQ4rznIEJVA])",
+            "directories=[r4HpEsMuST6nQ4rznIEJVA], 
cordonedDirectories=[r4HpEsMuST6nQ4rznIEJVA])",
             REGISTRATIONS.get(3).toString());
     }
 
@@ -222,4 +225,39 @@ public class BrokerRegistrationTest {
         
assertFalse(registration.hasOnlineDir(Uuid.fromString("sOwN7HH7S1maxpU1WzlzXg")));
         assertFalse(registration.hasOnlineDir(DirectoryId.LOST));
     }
+
+    @Test
+    void testHasUncordonedDirs() {
+        BrokerRegistration registration = new BrokerRegistration.Builder().
+                setId(0).
+                setEpoch(0).
+                setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
+                setListeners(List.of(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
+                setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1, 
(short) 2))).
+                setRack(Optional.empty()).
+                setFenced(false).
+                setInControlledShutdown(false).
+                setDirectories(List.of(
+                        Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
+                )).
+                build();
+        assertTrue(registration.hasUncordonedDirs());
+        registration = new BrokerRegistration.Builder().
+                setId(0).
+                setEpoch(0).
+                setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
+                setListeners(List.of(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
+                setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1, 
(short) 2))).
+                setRack(Optional.empty()).
+                setFenced(false).
+                setInControlledShutdown(false).
+                setDirectories(List.of(
+                        Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
+                )).
+                setCordonedDirectories(List.of(
+                        Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
+                )).
+                build();
+        assertFalse(registration.hasUncordonedDirs());
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
index 55d992d28b6..0db3a42c3bd 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
@@ -190,7 +190,7 @@ public class StripedReplicaPlacerTest {
     public void testAllBrokersFenced() {
         MockRandom random = new MockRandom();
         StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
-        assertEquals("All brokers are currently fenced.",
+        assertEquals("All brokers are currently fenced, or have all their log 
directories cordoned.",
             assertThrows(InvalidReplicationFactorException.class,
                 () -> place(placer, 0, 1, (short) 1, List.of(
                     new UsableBroker(11, Optional.of("1"), true),
@@ -202,7 +202,7 @@ public class StripedReplicaPlacerTest {
         MockRandom random = new MockRandom();
         StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
         assertEquals("The target replication factor of 3 cannot be reached 
because only " +
-            "2 broker(s) are registered.",
+            "2 broker(s) are registered or some brokers have all their log 
directories cordoned.",
             assertThrows(InvalidReplicationFactorException.class,
                 () -> place(placer, 0, 1, (short) 3, List.of(
                     new UsableBroker(11, Optional.of("1"), false),
@@ -274,7 +274,7 @@ public class StripedReplicaPlacerTest {
         assertEquals(3, rackList.numTotalBrokers());
         assertEquals(0, rackList.numUnfencedBrokers());
         assertEquals(List.of(Optional.empty()), rackList.rackNames());
-        assertEquals("All brokers are currently fenced.",
+        assertEquals("All brokers are currently fenced, or have all their log 
directories cordoned.",
                 assertThrows(InvalidReplicationFactorException.class,
                         () -> rackList.place(3)).getMessage());
     }
@@ -286,7 +286,7 @@ public class StripedReplicaPlacerTest {
                 new UsableBroker(11, Optional.of("1"), false),
                 new UsableBroker(10, Optional.of("1"), false)).iterator());
         assertEquals("The target replication factor of 3 cannot be reached 
because only " +
-                        "2 broker(s) are registered.",
+                        "2 broker(s) are registered or some brokers have all 
their log directories cordoned.",
                 assertThrows(InvalidReplicationFactorException.class,
                         () -> rackList.place(3)).getMessage());
     }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
index b7104986738..987d27bd5d2 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
@@ -19,6 +19,8 @@ package org.apache.kafka.server.common;
 
 import org.apache.kafka.common.Uuid;
 
+import java.util.Set;
+
 public interface DirectoryEventHandler {
 
     /**
@@ -27,6 +29,8 @@ public interface DirectoryEventHandler {
     DirectoryEventHandler NOOP = new DirectoryEventHandler() {
         @Override public void handleAssignment(TopicIdPartition partition, 
Uuid directoryId, String reason, Runnable callback) {}
         @Override public void handleFailure(Uuid directoryId) {}
+        @Override public void handleCordoned(Set<Uuid> directoryIds) {}
+        @Override public void handleUncordoned(Set<Uuid> directoryIds) {}
     };
 
     /**
@@ -43,4 +47,16 @@ public interface DirectoryEventHandler {
      * @param directoryId  The directory ID
      */
     void handleFailure(Uuid directoryId);
+
+    /**
+     * Handle the transition of an online log directory to the cordoned state.
+     * @param directoryIds  The directory IDs to cordon
+     */
+    void handleCordoned(Set<Uuid> directoryIds);
+
+    /**
+     * Handle the transition of a cordoned log directory to the online state.
+     * @param directoryIds  The directory IDs to uncordon
+     */
+    void handleUncordoned(Set<Uuid> directoryIds);
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 8bdce25808f..406ccbeebcb 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -120,15 +120,18 @@ public enum MetadataVersion {
     // Enables "streams" groups by default for new clusters (KIP-1071).
     IBP_4_2_IV1(29, "4.2", "IV1", false),
 
+    // Enables support for cordoned log dirs
+    // BrokerRegistrationChangeRecord and RegisterBrokerRecord are updated
+    IBP_4_3_IV0(30, "4.3", "IV0", true),
+
     //
     // NOTE: MetadataVersions after this point are unstable and may be changed.
     // If users attempt to use an unstable MetadataVersion, they will get an 
error unless
     // they have set the configuration unstable.feature.versions.enable=true.
     // Please move this comment when updating the LATEST_PRODUCTION constant.
     //
+    IBP_4_4_IV0(31, "4.4", "IV0", false);
 
-    // New version for the Kafka 4.3.0 release.
-    IBP_4_3_IV0(30, "4.3", "IV0", false);
 
     // NOTES when adding a new version:
     //   Update the default version in @ClusterTest annotation to point to the 
latest version
@@ -148,7 +151,7 @@ public enum MetadataVersion {
      * <strong>Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
      * IT CANNOT BE CHANGED.</strong>
      */
-    public static final MetadataVersion LATEST_PRODUCTION = IBP_4_2_IV1;
+    public static final MetadataVersion LATEST_PRODUCTION = IBP_4_3_IV0;
     // If you change the value above please also update
     // LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py
 
@@ -212,8 +215,15 @@ public enum MetadataVersion {
         return this.isAtLeast(MetadataVersion.IBP_3_4_IV0);
     }
 
+    public boolean isCordonedLogDirsSupported() {
+        return this.isAtLeast(MetadataVersion.IBP_4_3_IV0);
+    }
+
     public short registerBrokerRecordVersion() {
-        if (isDirectoryAssignmentSupported()) {
+        if (isCordonedLogDirsSupported()) {
+            // new cordonedLogDirs field
+            return (short) 4;
+        } else if (isDirectoryAssignmentSupported()) {
             // new logDirs field
             return (short) 3;
         } else if (isMigrationSupported()) {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index d0e1c60d6d7..dbfe8b13eed 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.server.record.BrokerCompressionType;
 
+import java.util.List;
+
 import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
 
 /**
@@ -40,6 +42,11 @@ public class ServerLogConfigs {
     public static final String LOG_DIR_DOC = "A comma-separated list of the 
directories where the log data is stored. (supplemental to " + LOG_DIRS_CONFIG 
+ " property)";
     public static final String LOG_DIRS_DOC = "A comma-separated list of the 
directories where the log data is stored. If not set, the value in " + 
LOG_DIR_CONFIG + " is used.";
 
+    public static final String CORDONED_LOG_DIRS_CONFIG = "cordoned.log.dirs";
+    public static final List<String> CORDONED_LOG_DIRS_DEFAULT = List.of();
+    public static final String CORDONED_LOG_DIRS_DOC = "A comma-separated list 
of the directories that are cordoned. Entries in this list must be entries in 
log.dirs or log.dir configuration. This can also be set to * to cordon all log 
directories.";
+    public static final String CORDONED_LOG_DIRS_ALL = "*";
+
     public static final String LOG_SEGMENT_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
     public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a 
single log file";
 
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index e0bcfb7f146..b4381b2a56f 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -98,9 +98,10 @@ class MetadataVersionTest {
         assertEquals(IBP_4_3_IV0, MetadataVersion.fromVersionString("4.3-IV0", 
true));
 
         // Throws exception when unstableFeatureVersionsEnabled is false
-        assertEquals("Unknown metadata.version '4.3-IV0'. Supported 
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
-            + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 
3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, 
4.1-IV1, 4.2-IV0, 4.2-IV1",
-            assertThrows(IllegalArgumentException.class, () -> 
fromVersionString("4.3-IV0", false)).getMessage());
+        assertEquals("Unknown metadata.version '4.4-IV0'. Supported 
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
+            + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 
3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, "
+            + "4.1-IV1, 4.2-IV0, 4.2-IV1, 4.3-IV0",
+            assertThrows(IllegalArgumentException.class, () -> 
fromVersionString("4.4-IV0", false)).getMessage());
     }
 
     @Test
@@ -240,7 +241,9 @@ class MetadataVersionTest {
     @EnumSource(value = MetadataVersion.class)
     public void testRegisterBrokerRecordVersion(MetadataVersion 
metadataVersion) {
         final short expectedVersion;
-        if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
+        if (metadataVersion.isAtLeast(IBP_4_3_IV0)) {
+            expectedVersion = 4;
+        } else if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
             expectedVersion = 3;
         } else if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_4_IV0)) {
             expectedVersion = 2;
diff --git 
a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java 
b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
index e148fb1c0b1..0ce78168704 100644
--- a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
+++ b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
@@ -78,6 +78,7 @@ public class BrokerLifecycleManager {
     private final Time time;
     private final Set<Uuid> logDirs;
     private final Runnable shutdownHook;
+    private final Supplier<Boolean> cordonedLogDirsSupported;
 
     /**
      * The broker id.
@@ -144,7 +145,13 @@ public class BrokerLifecycleManager {
      * to the Controller.
      * This variable can only be read or written from the event queue thread.
      */
-    private Map<Uuid, Boolean> offlineDirs = new HashMap<>();
+    private final Map<Uuid, Boolean> offlineDirs = new HashMap<>();
+
+    /**
+     * Map of cordoned log directories. The value is true if the directory is 
cordoned.
+     * This variable can only be read or written from the event queue thread.
+     */
+    private final Map<Uuid, Boolean> cordonedLogDirs = new HashMap<>();
 
     /**
      * True if we sent an event queue to the active controller requesting 
controlled
@@ -211,7 +218,7 @@ public class BrokerLifecycleManager {
             Time time,
             String threadNamePrefix,
             Set<Uuid> logDirs) {
-        this(config, time, threadNamePrefix, logDirs, () -> { });
+        this(config, time, threadNamePrefix, logDirs, () -> { }, () -> false);
     }
 
     public BrokerLifecycleManager(
@@ -219,11 +226,13 @@ public class BrokerLifecycleManager {
             Time time,
             String threadNamePrefix,
             Set<Uuid> logDirs,
-            Runnable shutdownHook) {
+            Runnable shutdownHook,
+            Supplier<Boolean> cordonedLogDirsSupported) {
         this.config = config;
         this.time = time;
         this.logDirs = logDirs;
         this.shutdownHook = shutdownHook;
+        this.cordonedLogDirsSupported = cordonedLogDirsSupported;
         LogContext logContext = new LogContext("[BrokerLifecycleManager id=" + 
this.config.nodeId() + "] ");
         this.logger = logContext.logger(BrokerLifecycleManager.class);
         this.nodeId = config.nodeId();
@@ -251,8 +260,14 @@ public class BrokerLifecycleManager {
                String clusterId,
                ListenerCollection advertisedListeners,
                Map<String, VersionRange> supportedFeatures,
-               OptionalLong previousBrokerEpoch) {
+               OptionalLong previousBrokerEpoch,
+               Set<Uuid> cordonedLogDirs) {
         this.previousBrokerEpoch = previousBrokerEpoch;
+        if (!cordonedLogDirs.isEmpty()) {
+            // At this point we don't have fresh metadata yet so we don't know 
if the cordoned log dirs feature is supported.
+            // Queue an event, it will be ignored by the controller handling 
the broker registration if the feature is disabled.
+            eventQueue.append(new CordonedDirEvent(cordonedLogDirs));
+        }
         eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
                 channelManager, clusterId, advertisedListeners, 
supportedFeatures));
     }
@@ -275,6 +290,26 @@ public class BrokerLifecycleManager {
                 new OfflineDirBrokerFailureEvent(directory));
     }
 
+    /**
+     * Propagate directory cordoned to the controller.
+     * @param directories The IDs for the directories that is cordoned.
+     */
+    public void propagateDirectoryCordoned(Set<Uuid> directories) {
+        if (cordonedLogDirsSupported.get()) {
+            eventQueue.append(new CordonedDirEvent(directories));
+        }
+    }
+
+    /**
+     * Propagate directory uncordoned to the controller.
+     * @param directories The IDs for the directories that is uncordoned.
+     */
+    public void propagateDirectoryUncordoned(Set<Uuid> directories) {
+        if (cordonedLogDirsSupported.get()) {
+            eventQueue.append(new UncordonedDirEvent(directories));
+        }
+    }
+
     public void resendBrokerRegistration() {
         eventQueue.append(new ResendBrokerRegistrationEvent());
     }
@@ -397,6 +432,44 @@ public class BrokerLifecycleManager {
         }
     }
 
+    private class CordonedDirEvent implements EventQueue.Event {
+
+        private final Set<Uuid> dirs;
+
+        CordonedDirEvent(Set<Uuid> dirs) {
+            this.dirs = dirs;
+        }
+
+        @Override
+        public void run() {
+            for (Uuid dir : dirs) {
+                cordonedLogDirs.put(dir, true);
+            }
+            if (registered) {
+                scheduleNextCommunicationImmediately();
+            }
+        }
+    }
+
+    private class UncordonedDirEvent implements EventQueue.Event {
+
+        private final Set<Uuid> dirs;
+
+        UncordonedDirEvent(Set<Uuid> dirs) {
+            this.dirs = dirs;
+        }
+
+        @Override
+        public void run() {
+            for (Uuid dir : dirs) {
+                cordonedLogDirs.put(dir, false);
+            }
+            if (registered) {
+                scheduleNextCommunicationImmediately();
+            }
+        }
+    }
+
     private class StartupEvent implements EventQueue.Event {
 
         private final Supplier<Long> highestMetadataOffsetProvider;
@@ -453,7 +526,8 @@ public class BrokerLifecycleManager {
             .setListeners(advertisedListeners)
             .setRack(rack.orElse(null))
             .setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
-            .setLogDirs(sortedLogDirs);
+            .setLogDirs(sortedLogDirs)
+            
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
         if (logger.isDebugEnabled()) {
             logger.debug("Sending broker registration {}", data);
         }
@@ -531,7 +605,8 @@ public class BrokerLifecycleManager {
             .setCurrentMetadataOffset(metadataOffset)
             .setWantFence(!readyToUnfence)
             .setWantShutDown(state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
-            .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()));
+            .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()))
+            
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
         if (logger.isTraceEnabled()) {
             logger.trace("Sending broker heartbeat {}", data);
         }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index bfec3337ca3..9baa67c1515 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -92,6 +92,14 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
         return 
Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getList(ServerLogConfigs.LOG_DIR_CONFIG));
     }
 
+    public List<String> cordonedLogDirs() {
+        List<String> configuredCordonedLogDirs = 
getList(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG);
+        if 
(configuredCordonedLogDirs.contains(ServerLogConfigs.CORDONED_LOG_DIRS_ALL)) {
+            return logDirs();
+        }
+        return configuredCordonedLogDirs;
+    }
+
     public int numIoThreads() {
         return getInt(ServerConfigs.NUM_IO_THREADS_CONFIG);
     }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
index 4ccd3005b7d..f7b557cf503 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
@@ -205,8 +205,11 @@ public class DynamicBrokerConfig {
          * the names you would use when setting a static or dynamic broker 
configuration (not topic
          * configuration).
          */
-        public static final Set<String> RECONFIGURABLE_CONFIGS = Set.copyOf(
-                ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values());
+        public static final Set<String> RECONFIGURABLE_CONFIGS = Stream.of(
+                ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values(),
+                Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toUnmodifiableSet());
     }
 
     public static class DynamicListenerConfig {
diff --git 
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
 
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
new file mode 100644
index 00000000000..ce2eededede
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.FinalizedVersionRange;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.admin.UpdateFeaturesResult;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+        disksPerBroker = 2
+)
+public class CordonedLogDirsIntegrationTest {
+
+    private static final String TOPIC1 = "topic1";
+    private static final String TOPIC2 = "topic2";
+    private static final ConfigResource BROKER_0 = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+    private final ClusterInstance clusterInstance;
+    private List<String> logDirsBroker0;
+
+    public CordonedLogDirsIntegrationTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() {
+        logDirsBroker0 = clusterInstance.brokers().get(0).config().logDirs();
+    }
+
+    @ClusterTest(metadataVersion = MetadataVersion.IBP_4_2_IV1)
+    public void testFeatureNotEnabled() throws Exception {
+        testFeatureNotEnabled(List.of());
+    }
+
+    @ClusterTest(
+        metadataVersion = MetadataVersion.IBP_4_2_IV1,
+        serverProperties = {
+            @ClusterConfigProperty(key = CORDONED_LOG_DIRS_CONFIG, value = "*")
+        }
+    )
+    public void testFeatureNotEnabledStaticConfig() throws Exception {
+        testFeatureNotEnabled(logDirsBroker0);
+    }
+
+    private void testFeatureNotEnabled(List<String> initialCordonedLogDirs) 
throws Exception {
+        try (Admin admin = clusterInstance.admin()) {
+            // When the metadata version does not support cordoning log dirs:
+            // 1. we can create a topic, even if cordon.log.dirs is statically 
set
+            admin.createTopics(newTopic(TOPIC1)).all().get();
+            // 2. no log dirs are marked as cordoned
+            assertCordonedLogDirs(admin, List.of());
+            // 3. we can't dynamically configure cordoned.log.dirs
+            Throwable ee = assertThrows(ExecutionException.class, () ->
+                
admin.incrementalAlterConfigs(cordonedDirsConfig("")).all().get());
+            assertInstanceOf(InvalidConfigurationException.class, 
ee.getCause());
+
+            // Update the metadata version to support cordoning log dirs
+            short metadataVersion = MetadataVersion.IBP_4_3_IV0.featureLevel();
+            UpdateFeaturesResult updateResult = admin.updateFeatures(
+                    Map.of("metadata.version", new 
FeatureUpdate(metadataVersion, FeatureUpdate.UpgradeType.UPGRADE)),
+                    new UpdateFeaturesOptions());
+            updateResult.all().get();
+            TestUtils.waitForCondition(() -> {
+                FinalizedVersionRange versionRange = 
admin.describeFeatures().featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME);
+                return versionRange.maxVersionLevel() == metadataVersion && 
versionRange.minVersionLevel() == metadataVersion;
+            }, 10_000, "Unable to update the metadata version.");
+            
Thread.sleep(clusterInstance.brokers().get(0).config().brokerHeartbeatIntervalMs());
+
+            if (initialCordonedLogDirs.isEmpty()) {
+                // if no initial cordoned log dirs, this has not changed, and 
we can cordon log dirs
+                assertCordonedLogDirs(admin, List.of());
+                setCordonedLogDirs(admin, logDirsBroker0);
+                initialCordonedLogDirs = logDirsBroker0;
+            }
+            // The statically or dynamically configured log dirs are now 
marked as cordoned
+            assertCordonedLogDirs(admin, initialCordonedLogDirs);
+
+            // As all log dirs are cordoned, we can't create a topic
+            Set<NewTopic> newTopics = newTopic(TOPIC2);
+            ee = assertThrows(ExecutionException.class, () ->
+                admin.createTopics(newTopics).all().get());
+            assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
+            // We can't create partitions either
+            Map<String, NewPartitions> newPartitions = Map.of(TOPIC1, 
NewPartitions.increaseTo(2));
+            ee = assertThrows(ExecutionException.class, () ->
+                admin.createPartitions(newPartitions).all().get());
+            assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
+
+            // After uncordoning log dirs, we can create topics and partitions 
again
+            setCordonedLogDirs(admin, List.of());
+            admin.createTopics(newTopics).all().get();
+            admin.createPartitions(newPartitions).all().get();
+        }
+    }
+
+    @ClusterTest()
+    public void testCordonUncordonLogDirs() throws Exception {
+        try (Admin admin = clusterInstance.admin()) {
+            // No initial cordoned log dirs
+            assertCordonedLogDirs(admin, List.of());
+
+            // We can create topics
+            admin.createTopics(newTopic(TOPIC1)).all().get();
+
+            // Cordon all log dirs
+            setCordonedLogDirs(admin, logDirsBroker0);
+            assertCordonedLogDirs(admin, logDirsBroker0);
+
+            // We can't create new topics or partitions
+            Set<NewTopic> newTopics = newTopic(TOPIC2);
+            Throwable ee = assertThrows(ExecutionException.class, () ->
+                admin.createTopics(newTopics).all().get()
+            );
+            assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
+            Map<String, NewPartitions> newPartitions = Map.of(TOPIC1, 
NewPartitions.increaseTo(2));
+            ee = assertThrows(ExecutionException.class, () ->
+                admin.createPartitions(newPartitions).all().get()
+            );
+            assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
+
+            // Uncordon all log dirs
+            setCordonedLogDirs(admin, List.of(logDirsBroker0.get(0)));
+            assertCordonedLogDirs(admin, List.of(logDirsBroker0.get(0)));
+
+            // We can create topics and partitions again
+            admin.createTopics(newTopics).all().get();
+            admin.createPartitions(newPartitions).all().get();
+        }
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = CORDONED_LOG_DIRS_CONFIG, value = "*")
+        }
+    )
+    public void testStaticCordonUncordonLogDirs() throws Exception {
+        Set<NewTopic> newTopics = newTopic(TOPIC1);
+        try (Admin admin = clusterInstance.admin()) {
+            // All log dirs are statically cordoned, so we can't create topics
+            Throwable ee = assertThrows(ExecutionException.class, () ->
+                admin.createTopics(newTopics).all().get()
+            );
+            assertInstanceOf(InvalidReplicationFactorException.class, 
ee.getCause());
+
+            // Uncordon log dirs
+            setCordonedLogDirs(admin, List.of());
+
+            // We can't create topics again
+            admin.createTopics(newTopics).all().get();
+        }
+    }
+
+    @ClusterTest
+    public void testReassignWithCordonedLogDirs() throws Exception {
+        TopicPartitionReplica replica = new TopicPartitionReplica(TOPIC1, 0, 
0);
+        try (Admin admin = clusterInstance.admin()) {
+            admin.createTopics(newTopic(TOPIC1)).all().get();
+
+            // Find the log dir that does not host the replica and cordon it
+            AtomicReference<String> logDir = new AtomicReference<>();
+            TestUtils.waitForCondition(() -> {
+                DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = 
admin.describeReplicaLogDirs(List.of(replica)).all().get().get(replica);
+                logDir.set(info.getCurrentReplicaLogDir());
+                return info.getCurrentReplicaLogDir() != null;
+            }, 10_000, "Unable to find logdir for topic " + replica.topic());
+            assertNotNull(logDir.get());
+            String otherLogDir = logDirsBroker0.stream().filter(dir -> 
!dir.equals(logDir.get())).findFirst().get();
+            setCordonedLogDirs(admin, List.of(otherLogDir));
+
+            // We can't move the replica to the now cordoned log dir
+            Throwable ee = assertThrows(ExecutionException.class, () ->
+                admin.alterReplicaLogDirs(Map.of(replica, 
otherLogDir)).all().get()
+            );
+            assertInstanceOf(InvalidReplicaAssignmentException.class, 
ee.getCause());
+
+            // After uncordoning the log dir, we can move the replica on it
+            setCordonedLogDirs(admin, List.of());
+            admin.alterReplicaLogDirs(Map.of(replica, 
otherLogDir)).all().get();
+        }
+    }
+
+    private Map<ConfigResource, Collection<AlterConfigOp>> 
cordonedDirsConfig(String value) {
+        return Map.of(
+                BROKER_0,
+                Set.of(new AlterConfigOp(new 
ConfigEntry(CORDONED_LOG_DIRS_CONFIG, value), AlterConfigOp.OpType.SET))
+        );
+    }
+
+    private void setCordonedLogDirs(Admin admin, List<String> logDirs) throws 
ExecutionException, InterruptedException {
+        String logDirsStr = String.join(",", logDirs);
+        
admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr)).all().get();
+        TestUtils.waitForCondition(() -> {
+            Map<ConfigResource, Config> describeConfigs = 
admin.describeConfigs(Set.of(BROKER_0)).all().get();
+            Config config = describeConfigs.get(BROKER_0);
+            return 
logDirsStr.equals(config.get(CORDONED_LOG_DIRS_CONFIG).value());
+        }, 10_000, "Unable to set the " + CORDONED_LOG_DIRS_CONFIG + " 
configuration.");
+    }
+
+    private Set<NewTopic> newTopic(String name) {
+        return Set.of(new NewTopic(name, 1, (short) 
clusterInstance.brokers().size()));
+    }
+
+    private void assertCordonedLogDirs(Admin admin, List<String> 
expectedCordoned) throws ExecutionException, InterruptedException {
+        Map<Integer, Map<String, LogDirDescription>> logDescriptionsPerBroker 
= admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get();
+        for (Map.Entry<Integer, Map<String, LogDirDescription>> 
logDescriptions : logDescriptionsPerBroker.entrySet()) {
+            for (Map.Entry<String, LogDirDescription> logDescription : 
logDescriptions.getValue().entrySet()) {
+                if (logDescriptions.getKey().equals(0) && 
expectedCordoned.contains(logDescription.getKey())) {
+                    assertTrue(logDescription.getValue().isCordoned());
+                } else {
+                    assertFalse(logDescription.getValue().isCordoned());
+                }
+            }
+        }
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 41b9c6fbb43..f81d224e7ea 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -146,6 +146,7 @@ public class LogConfig extends AbstractConfig {
             .define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, 
ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, 
ServerLogConfigs.NUM_PARTITIONS_DOC)
             .define(ServerLogConfigs.LOG_DIR_CONFIG, LIST, 
ServerLogConfigs.LOG_DIR_DEFAULT, 
ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH, 
ServerLogConfigs.LOG_DIR_DOC)
             .define(ServerLogConfigs.LOG_DIRS_CONFIG, LIST, null, 
ConfigDef.ValidList.anyNonDuplicateValues(false, true), HIGH, 
ServerLogConfigs.LOG_DIRS_DOC)
+            .define(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, LIST, 
ServerLogConfigs.CORDONED_LOG_DIRS_DEFAULT, 
ConfigDef.ValidList.anyNonDuplicateValues(true, true), MEDIUM, 
ServerLogConfigs.CORDONED_LOG_DIRS_DOC)
             .define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, 
DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, 
ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
 
             .define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, 
HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
diff --git 
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
 
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index aef53a6ddc9..af11eaaeea2 100644
--- 
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++ 
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -52,7 +52,7 @@ public @interface ClusterTest {
     String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
     SecurityProtocol controllerSecurityProtocol() default 
SecurityProtocol.PLAINTEXT;
     String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
-    MetadataVersion metadataVersion() default MetadataVersion.IBP_4_3_IV0;
+    MetadataVersion metadataVersion() default MetadataVersion.IBP_4_4_IV0;
     ClusterConfigProperty[] serverProperties() default {};
     // users can add tags that they want to display in test
     String[] tags() default {};
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index ff6350693b4..565c78c9eef 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -67,7 +67,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(features.get(2))
         );
         assertFeatureOutput(
-                "metadata.version", "3.3-IV3", "4.3-IV0", "3.3-IV3",
+                "metadata.version", "3.3-IV3", "4.4-IV0", "3.3-IV3",
                 outputWithoutEpoch(features.get(3))
         );
         assertFeatureOutput(
@@ -107,7 +107,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(features.get(2))
         );
         assertFeatureOutput(
-                "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
+                "metadata.version", "3.3-IV3", "4.4-IV0", "3.7-IV0",
                 outputWithoutEpoch(features.get(3))
         );
         assertFeatureOutput(
@@ -164,7 +164,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(featuresWithUnstable.get(2))
         );
         assertFeatureOutput(
-                "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
+                "metadata.version", "3.3-IV3", "4.4-IV0", "3.7-IV0",
                 outputWithoutEpoch(featuresWithUnstable.get(3))
         );
         assertFeatureOutput(
@@ -201,7 +201,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(featuresWithoutUnstable.get(2))
         );
         assertFeatureOutput(
-                "metadata.version", "3.3-IV3", "4.2-IV1", "3.7-IV0",
+                "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
                 outputWithoutEpoch(featuresWithoutUnstable.get(3))
         );
         assertFeatureOutput(
@@ -258,7 +258,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(featuresWithUnstable.get(2))
         );
         assertFeatureOutput(
-                "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
+                "metadata.version", "3.3-IV3", "4.4-IV0", "3.7-IV0",
                 outputWithoutEpoch(featuresWithUnstable.get(3))
         );
         assertFeatureOutput(
@@ -295,7 +295,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(featuresWithoutUnstable.get(2))
         );
         assertFeatureOutput(
-                "metadata.version", "3.3-IV3", "4.2-IV1", "3.7-IV0",
+                "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
                 outputWithoutEpoch(featuresWithoutUnstable.get(3))
         );
         assertFeatureOutput(


Reply via email to