This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a45d36ca5d5 KAFKA-19774: Mechanism to cordon log dirs (KIP-1066)
(#21273)
a45d36ca5d5 is described below
commit a45d36ca5d5b6f48d4313d50d1f7b6c5e6c905a4
Author: Mickael Maison <[email protected]>
AuthorDate: Thu Feb 19 15:18:28 2026 +0100
KAFKA-19774: Mechanism to cordon log dirs (KIP-1066) (#21273)
Implements [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg)
Reviewers: Christo Lolov <[email protected]>, PoAn Yang
<[email protected]>
---
.../clients/CreateTopicsRequestWithPolicyTest.java | 3 +-
.../kafka/clients/admin/KafkaAdminClient.java | 3 +-
.../kafka/clients/admin/LogDirDescription.java | 14 +-
.../common/message/BrokerHeartbeatRequest.json | 8 +-
.../common/message/BrokerHeartbeatResponse.json | 4 +-
.../common/message/BrokerRegistrationRequest.json | 11 +-
.../common/message/BrokerRegistrationResponse.json | 6 +-
.../common/message/DescribeLogDirsRequest.json | 3 +-
.../common/message/DescribeLogDirsResponse.json | 6 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 44 +++-
.../kafka/clients/admin/MockAdminClient.java | 3 +-
core/src/main/scala/kafka/log/LogManager.scala | 33 ++-
.../src/main/scala/kafka/server/BrokerServer.scala | 21 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 28 ++-
core/src/main/scala/kafka/server/KafkaBroker.scala | 2 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 11 +
.../main/scala/kafka/server/ReplicaManager.scala | 10 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 84 +++++++
.../kafka/server/BrokerLifecycleManagerTest.scala | 73 +++++-
.../kafka/server/DynamicBrokerConfigTest.scala | 72 +++++-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 43 +++-
.../unit/kafka/server/ReplicaManagerTest.scala | 28 ++-
.../kafka/controller/BrokerHeartbeatManager.java | 13 +-
.../kafka/controller/ClusterControlManager.java | 36 ++-
.../controller/ConfigurationControlManager.java | 18 ++
.../controller/ReplicationControlManager.java | 33 +++
.../java/org/apache/kafka/image/ClusterDelta.java | 6 +-
.../apache/kafka/metadata/BrokerRegistration.java | 54 ++++-
.../metadata/placement/StripedReplicaPlacer.java | 4 +-
.../metadata/BrokerRegistrationChangeRecord.json | 7 +-
.../common/metadata/RegisterBrokerRecord.json | 7 +-
.../controller/BrokerHeartbeatManagerTest.java | 5 +-
.../ConfigurationControlManagerTest.java | 25 ++
.../kafka/controller/QuorumControllerTest.java | 2 +-
.../controller/ReplicationControlManagerTest.java | 25 +-
.../image/node/ClusterImageBrokersNodeTest.java | 3 +-
.../kafka/metadata/BrokerRegistrationTest.java | 44 +++-
.../placement/StripedReplicaPlacerTest.java | 8 +-
.../kafka/server/common/DirectoryEventHandler.java | 16 ++
.../kafka/server/common/MetadataVersion.java | 18 +-
.../kafka/server/config/ServerLogConfigs.java | 7 +
.../kafka/server/common/MetadataVersionTest.java | 11 +-
.../kafka/server/BrokerLifecycleManager.java | 87 ++++++-
.../kafka/server/config/AbstractKafkaConfig.java | 8 +
.../kafka/server/config/DynamicBrokerConfig.java | 7 +-
.../server/CordonedLogDirsIntegrationTest.java | 264 +++++++++++++++++++++
.../kafka/storage/internals/log/LogConfig.java | 1 +
.../apache/kafka/common/test/api/ClusterTest.java | 2 +-
.../org/apache/kafka/tools/FeatureCommandTest.java | 12 +-
49 files changed, 1120 insertions(+), 113 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
index bfbc4794da5..da1d7f60535 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestWithPolicyTest.java
@@ -216,7 +216,8 @@ public class CreateTopicsRequestWithPolicyTest {
admin,
true,
InvalidReplicationFactorException.class,
- "Unable to replicate the partition 4 time(s): The target
replication factor of 4 cannot be reached because only 3 broker(s) are
registered."
+ "Unable to replicate the partition 4 time(s): The target
replication factor of 4 cannot be "
+ + "reached because only 3 broker(s) are registered or some
brokers have all their log directories cordoned."
);
validateErrorCreateTopicsRequests(
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a80c860de5e..85f1e87459f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3054,7 +3054,8 @@ public class KafkaAdminClient extends AdminClient {
Errors.forCode(logDirResult.errorCode()).exception(),
replicaInfoMap,
logDirResult.totalBytes(),
- logDirResult.usableBytes()));
+ logDirResult.usableBytes(),
+ logDirResult.isCordoned()));
}
return result;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
index 340e88db160..be1bb6e8f5b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
@@ -33,16 +33,18 @@ public class LogDirDescription {
private final ApiException error;
private final OptionalLong totalBytes;
private final OptionalLong usableBytes;
+ private final boolean isCordoned;
public LogDirDescription(ApiException error, Map<TopicPartition,
ReplicaInfo> replicaInfos) {
- this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES);
+ this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES,
false);
}
- public LogDirDescription(ApiException error, Map<TopicPartition,
ReplicaInfo> replicaInfos, long totalBytes, long usableBytes) {
+ public LogDirDescription(ApiException error, Map<TopicPartition,
ReplicaInfo> replicaInfos, long totalBytes, long usableBytes, boolean
isCordoned) {
this.error = error;
this.replicaInfos = replicaInfos;
this.totalBytes = (totalBytes == UNKNOWN_VOLUME_BYTES) ?
OptionalLong.empty() : OptionalLong.of(totalBytes);
this.usableBytes = (usableBytes == UNKNOWN_VOLUME_BYTES) ?
OptionalLong.empty() : OptionalLong.of(usableBytes);
+ this.isCordoned = isCordoned;
}
/**
@@ -82,6 +84,13 @@ public class LogDirDescription {
return usableBytes;
}
+ /**
+ * Whether this log directory is cordoned or not.
+ */
+ public boolean isCordoned() {
+ return isCordoned;
+ }
+
@Override
public String toString() {
return "LogDirDescription(" +
@@ -89,6 +98,7 @@ public class LogDirDescription {
", error=" + error +
", totalBytes=" + totalBytes +
", usableBytes=" + usableBytes +
+ ", isCordoned=" + isCordoned +
')';
}
}
diff --git
a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
index 8f574b41fc4..41d5fa0a150 100644
--- a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
@@ -13,12 +13,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Version 1 adds the OfflineLogDirs flexible field
+// Version 2 adds the CordonedLogDirs flexible field
{
"apiKey": 63,
"type": "request",
"listeners": ["controller"],
"name": "BrokerHeartbeatRequest",
- "validVersions": "0-1",
+ "validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
@@ -32,6 +34,8 @@
{ "name": "WantShutDown", "type": "bool", "versions": "0+",
"about": "True if the broker wants to be shut down, false otherwise." },
{ "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+",
"taggedVersions": "1+", "tag": 0,
- "about": "Log directories that failed and went offline." }
+ "about": "Log directories that failed and went offline." },
+ { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "2+",
"taggedVersions": "2+",
+ "tag": "1", "about": "Log directories that are cordoned." }
]
}
diff --git
a/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json
b/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json
index 9956daf1ff0..157fbfac68c 100644
--- a/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/BrokerHeartbeatResponse.json
@@ -13,11 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Version 1 is the same as version 0 (new field in request).
+// Version 2 is the same as version 0 (new field in request).
{
"apiKey": 63,
"type": "response",
"name": "BrokerHeartbeatResponse",
- "validVersions": "0-1",
+ "validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git
a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 8a9348596ed..53e37f21d5a 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -14,19 +14,16 @@
// limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating
from Zk mode to KRaft mode.
-
// Version 2 adds LogDirs for KIP-858
-
// Version 3 adds the PreviousBrokerEpoch for the KIP-966
-
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in
the response from being 0.
-
+// Version 5 adds the CordonedLogDirs flexible field
{
"apiKey":62,
"type": "request",
"listeners": ["controller"],
"name": "BrokerRegistrationRequest",
- "validVersions": "0-4",
+ "validVersions": "0-5",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
@@ -64,6 +61,8 @@
{ "name": "LogDirs", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are
available.", "ignorable": true },
{ "name": "PreviousBrokerEpoch", "type": "int64", "versions": "3+",
"default": "-1", "ignorable": true,
- "about": "The epoch before a clean shutdown." }
+ "about": "The epoch before a clean shutdown." },
+ { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "5+",
"taggedVersions": "5+",
+ "tag": "0", "about": "Log directories that are cordoned." }
]
}
diff --git
a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
index dbc15cc4003..956e945df40 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
@@ -14,13 +14,15 @@
// limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating
from Zk mode to KRaft mode.
-//
// Version 2 adds the PreviousBrokerEpoch to the request for the KIP-966
+// Version 3 is the same as version 2 (new field in request).
+// Version 4 is the same as version 2 (new field in request).
+// Version 5 is the same as version 2 (new field in request).
{
"apiKey": 62,
"type": "response",
"name": "BrokerRegistrationResponse",
- "validVersions": "0-4",
+ "validVersions": "0-5",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git
a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
index 4f3bfa2c58c..c21a9cf906e 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
@@ -23,7 +23,8 @@
// Version 2 is the first flexible version.
// Version 3 is the same as version 2 (new field in response).
// Version 4 is the same as version 2 (new fields in response).
- "validVersions": "1-4",
+ // Version 5 is the same as version 2 (new fields in response).
+ "validVersions": "1-5",
"flexibleVersions": "2+",
"fields": [
{ "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+",
"nullableVersions": "0+",
diff --git
a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
index 725d1ad337b..71d7fbde9ae 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
@@ -22,7 +22,8 @@
// Version 2 is the first flexible version.
// Version 3 adds the top-level ErrorCode field
// Version 4 adds the TotalBytes and UsableBytes fields
- "validVersions": "1-4",
+ // Version 5 adds IsCordoned field
+ "validVersions": "1-5",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -55,6 +56,9 @@
},
{ "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable":
true, "default": "-1",
"about": "The usable size in bytes of the volume the log directory is
in. This value does not include the size of data stored in remote storage."
+ },
+ { "name": "IsCordoned", "type": "bool", "versions": "5+", "ignorable":
true, "default": false,
+ "about": "True if this log directory is cordoned."
}
]}
]
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1b274c95add..89d1890ab68 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2252,7 +2252,12 @@ public class KafkaAdminClientTest {
private static DescribeLogDirsResponse
prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp,
long partitionSize, long offsetLag, long totalBytes, long usableBytes) {
return prepareDescribeLogDirsResponse(error, logDir,
- prepareDescribeLogDirsTopics(partitionSize, offsetLag,
tp.topic(), tp.partition(), false), totalBytes, usableBytes);
+ prepareDescribeLogDirsTopics(partitionSize, offsetLag,
tp.topic(), tp.partition(), false), totalBytes, usableBytes, false);
+ }
+
+ private static DescribeLogDirsResponse
prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp,
long partitionSize, long offsetLag, long totalBytes, long usableBytes, boolean
isCordoned) {
+ return prepareDescribeLogDirsResponse(error, logDir,
+ prepareDescribeLogDirsTopics(partitionSize, offsetLag,
tp.topic(), tp.partition(), false), totalBytes, usableBytes, isCordoned);
}
private static List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
@@ -2278,7 +2283,8 @@ public class KafkaAdminClientTest {
private static DescribeLogDirsResponse
prepareDescribeLogDirsResponse(Errors error, String logDir,
List<DescribeLogDirsTopic> topics,
- long
totalBytes, long usableBytes) {
+ long
totalBytes, long usableBytes,
+
boolean isCordoned) {
return new DescribeLogDirsResponse(
new DescribeLogDirsResponseData().setResults(singletonList(new
DescribeLogDirsResponseData.DescribeLogDirsResult()
.setErrorCode(error.code())
@@ -2286,6 +2292,7 @@ public class KafkaAdminClientTest {
.setTopics(topics)
.setTotalBytes(totalBytes)
.setUsableBytes(usableBytes)
+ .setIsCordoned(isCordoned)
)));
}
@@ -2355,6 +2362,7 @@ public class KafkaAdminClientTest {
assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
assertEquals(totalBytes, descriptionsMap.get(logDir).totalBytes());
assertEquals(usableBytes, descriptionsMap.get(logDir).usableBytes());
+ assertFalse(descriptionsMap.get(logDir).isCordoned());
}
@Test
@@ -2434,6 +2442,38 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDescribeLogDirsWithCordonedDir() throws
ExecutionException, InterruptedException {
+ Set<Integer> brokers = singleton(0);
+ String logDir = "/var/data/kafka";
+ TopicPartition tp = new TopicPartition("topic", 12);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponseFrom(
+ prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp,
123, -1, -1, -1, true),
+ env.cluster().nodeById(0));
+
+ DescribeLogDirsResult result =
env.adminClient().describeLogDirs(brokers);
+
+ Map<Integer, KafkaFuture<Map<String, LogDirDescription>>>
descriptions = result.descriptions();
+ assertEquals(brokers, descriptions.keySet());
+ assertNotNull(descriptions.get(0));
+ Map<String, LogDirDescription> descriptionsMap =
descriptions.get(0).get();
+ assertEquals(singleton(logDir), descriptionsMap.keySet());
+ assertTrue(descriptionsMap.get(logDir).isCordoned());
+ assertEquals(Set.of(tp),
descriptionsMap.get(logDir).replicaInfos().keySet());
+
+ Map<Integer, Map<String, LogDirDescription>> allDescriptions =
result.allDescriptions().get();
+ assertEquals(brokers, allDescriptions.keySet());
+ Map<String, LogDirDescription> allMap = allDescriptions.get(0);
+ assertNotNull(allMap);
+ assertEquals(singleton(logDir), allMap.keySet());
+ assertTrue(allMap.get(logDir).isCordoned());
+ assertEquals(Set.of(tp),
allMap.get(logDir).replicaInfos().keySet());
+ }
+ }
+
@Test
public void testDescribeReplicaLogDirs() throws ExecutionException,
InterruptedException {
TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 12, 1);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 48874f1a1b2..bb460855df0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1087,7 +1087,8 @@ public class MockAdminClient extends AdminClient {
logDirDescription.error(),
topicPartitionReplicaInfoMap,
logDirDescription.totalBytes().orElse(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES),
-
logDirDescription.usableBytes().orElse(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES)));
+
logDirDescription.usableBytes().orElse(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES),
+ logDirDescription.isCordoned()));
}
}
}
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index efdab2a0f31..85c56672aaa 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -100,6 +100,7 @@ class LogManager(logDirs: Seq[File],
private val strayLogs = new ConcurrentHashMap[TopicPartition, UnifiedLog]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] =
createAndValidateLogDirs(logDirs, initialOfflineDirs)
+ @volatile private var _cordonedLogDirs: Set[String] = Set()
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir =
recoveryThreadsPerDataDir
@@ -128,6 +129,14 @@ class LogManager(logDirs: Seq[File],
private val directoryIds: mutable.Map[String, Uuid] =
loadDirectoryIds(liveLogDirs)
def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
+ def updateCordonedLogDirs(newCordonedLogDirs: Set[String]): Unit = {
+ _cordonedLogDirs = newCordonedLogDirs
+ }
+
+ def cordonedLogDirs(): Set[String] = {
+ _cordonedLogDirs
+ }
+
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir,
JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
@@ -154,11 +163,15 @@ class LogManager(logDirs: Seq[File],
private[kafka] def cleaner: LogCleaner = _cleaner
metricsGroup.newGauge("OfflineLogDirectoryCount", () => offlineLogDirs.size)
+ metricsGroup.newGauge("CordonedLogDirectoryCount", () =>
cordonedLogDirs().size)
for (dir <- logDirs) {
metricsGroup.newGauge("LogDirectoryOffline",
() => if (_liveLogDirs.contains(dir)) 0 else 1,
Map("logDirectory" -> dir.getAbsolutePath).asJava)
+ metricsGroup.newGauge("LogDirectoryCordoned",
+ () => if (cordonedLogDirs().contains(dir.getAbsolutePath)) 1 else 0,
+ Map("logDirectory" -> dir.getAbsolutePath).asJava)
}
/**
@@ -1365,20 +1378,28 @@ class LogManager(logDirs: Seq[File],
* Provides the full ordered list of suggested directories for the next
partition.
* Currently this is done by calculating the number of partitions in each
directory and then sorting the
* data directories by fewest partitions.
+ *
+ * It's possible replicas are assigned to this broker right before all its
log directories are cordoned.
+ * In that case, pick the first available directory
*/
- private def nextLogDirs(): List[File] = {
+ def nextLogDirs(): List[File] = {
if (_liveLogDirs.size == 1) {
List(_liveLogDirs.peek())
} else {
// count the number of logs in each parent directory (including 0 for
empty directories
val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs)
=> parent -> logs.size }
val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
- val dirCounts = (zeros ++ logCounts).toBuffer
+ val dirCounts = (zeros ++ logCounts).filter(d =>
!cordonedLogDirs().contains(d._1)).toBuffer
- // choose the directory with the least logs in it
- dirCounts.sortBy(_._2).map {
- case (path: String, _: Int) => new File(path)
- }.toList
+ if (dirCounts.isEmpty) {
+ // all log directories are cordoned, choose the first live directory
+ List(_liveLogDirs.peek())
+ } else {
+ // choose the directory with the least logs in it
+ dirCounts.sortBy(_._2).map {
+ case (path: String, _: Int) => new File(path)
+ }.toList
+ }
}
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 977eca37a80..698f8857881 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -65,6 +65,7 @@ import java.util
import java.util.Optional
import java.util.concurrent.locks.{Condition, ReentrantLock}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit,
TimeoutException}
+import java.util.stream.Collectors
import scala.collection.Map
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
@@ -218,11 +219,13 @@ class BrokerServer(
brokerTopicStats,
logDirFailureChannel)
- lifecycleManager = new BrokerLifecycleManager(config,
+ lifecycleManager = new BrokerLifecycleManager(
+ config,
time,
s"broker-${config.nodeId}-",
logManager.directoryIdsSet.asJava,
- () => new Thread(() => shutdown(), "kafka-shutdown-thread").start())
+ () => new Thread(() => shutdown(), "kafka-shutdown-thread").start(),
+ () => metadataCache.metadataVersion().isCordonedLogDirsSupported)
// Enable delegation token cache for all SCRAM mechanisms to simplify
dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled
dynamically.
@@ -332,7 +335,13 @@ class BrokerServer(
override def handleFailure(directoryId: Uuid): Unit =
lifecycleManager.propagateDirectoryFailure(directoryId,
config.logDirFailureTimeoutMs)
- }
+
+ override def handleCordoned(directoryIds: util.Set[Uuid]): Unit =
+ lifecycleManager.propagateDirectoryCordoned(directoryIds)
+
+ override def handleUncordoned(directoryIds: util.Set[Uuid]): Unit =
+ lifecycleManager.propagateDirectoryUncordoned(directoryIds)
+}
/**
* TODO: move this action queue to handle thread so we can simplify
concurrency handling
@@ -411,13 +420,17 @@ class BrokerServer(
s"broker-${config.nodeId}-",
config.brokerHeartbeatIntervalMs
)
+ val initialCordonedLogDirs: util.Set[Uuid] =
config.cordonedLogDirs().stream()
+ .map(dir => logManager.directoryId(dir).get)
+ .collect(Collectors.toSet())
lifecycleManager.start(
() => sharedServer.loader.lastAppliedOffset(),
brokerLifecycleChannelManager,
clusterId,
listenerInfo.toBrokerRegistrationRequest,
featuresRemapped,
- logManager.readBrokerEpochFromCleanShutdownFiles()
+ logManager.readBrokerEpochFromCleanShutdownFiles(),
+ initialCordonedLogDirs
)
// The FetchSessionCache is divided into config.numIoThreads shards,
each responsible
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a467f18ace7..80c5a1235f0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.config
import org.apache.kafka.network.SocketServer
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler}
import org.apache.kafka.server.config.{DynamicConfig,
DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs,
DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin,
MetricConfigs}
@@ -195,7 +195,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
addReconfigurable(new
DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))
addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
- addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
+ addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager,
kafkaServer.replicaManager.directoryEventHandler))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
addBrokerReconfigurable(new
DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
@@ -536,7 +536,7 @@ trait BrokerReconfigurable {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}
-class DynamicLogConfig(logManager: LogManager) extends BrokerReconfigurable
with Logging {
+class DynamicLogConfig(logManager: LogManager, directoryEventHandler:
DirectoryEventHandler) extends BrokerReconfigurable with Logging {
override def reconfigurableConfigs: util.Set[String] = {
JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS
@@ -577,8 +577,20 @@ class DynamicLogConfig(logManager: LogManager) extends
BrokerReconfigurable with
}
}
+ def validateCordonedLogDirs(): Unit = {
+ val logDirs = newConfig.logDirs()
+ val cordonedLogDirs = newConfig.cordonedLogDirs()
+ cordonedLogDirs.asScala.foreach(dir =>
+ if (!logDirs.contains(dir)) {
+ throw new ConfigException(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG,
cordonedLogDirs, s"Invalid entry in
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG}: $dir. " +
+ s"All cordoned log dirs must be entries of
${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}.")
+ }
+ )
+ }
+
validateLogLocalRetentionMs()
validateLogLocalRetentionBytes()
+ validateCordonedLogDirs()
}
private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit =
{
@@ -598,6 +610,16 @@ class DynamicLogConfig(logManager: LogManager) extends
BrokerReconfigurable with
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
val newBrokerDefaults = new util.HashMap[String,
Object](newConfig.extractLogConfigMap)
+ logManager.updateCordonedLogDirs(newConfig.cordonedLogDirs.asScala.toSet)
+ val newCordoned: Set[String] = newConfig.cordonedLogDirs.asScala.toSet --
oldConfig.cordonedLogDirs.asScala.toSet
+ val newUncordoned: Set[String] = oldConfig.cordonedLogDirs.asScala.toSet
-- newConfig.cordonedLogDirs.asScala.toSet
+ if (newCordoned.nonEmpty) {
+ directoryEventHandler.handleCordoned(newCordoned.map(dir =>
logManager.directoryId(dir).get).toSet.asJava)
+ }
+ if (newUncordoned.nonEmpty) {
+ directoryEventHandler.handleUncordoned(newUncordoned.map(dir =>
logManager.directoryId(dir).get).toSet.asJava)
+ }
+
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
updateLogsConfig(newBrokerDefaults.asScala)
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 46576d97d33..4ac473ad4bc 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.{BrokerState, MetadataCache}
import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.BrokerLifecycleManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
@@ -105,6 +106,7 @@ trait KafkaBroker extends Logging {
def credentialProvider: CredentialProvider
def clientToControllerChannelManager: NodeToControllerChannelManager
def tokenCache: DelegationTokenCache
+ def lifecycleManager: BrokerLifecycleManager
// For backwards compatibility, we need to keep older metrics tied
// to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d05ef90e6fb..c7beff0504b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -484,6 +484,16 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
advertisedListeners.filterNot(l =>
controllerListenerNames.contains(l.listener))
}
+ def validateCordonedLogDirs(): Unit = {
+ val cordonedLogDirs = getList(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG)
+ if (cordonedLogDirs.contains(ServerLogConfigs.CORDONED_LOG_DIRS_ALL)) {
+ require(cordonedLogDirs.size == 1, s"When
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} is set to
${ServerLogConfigs.CORDONED_LOG_DIRS_ALL}, it must not contain other values")
+ } else {
+ val unknownLogDirs =
cordonedLogDirs.asScala.filter(!logDirs().contains(_))
+ require(unknownLogDirs.isEmpty, s"All entries in
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} must be present in
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} or
${ServerLogConfigs.LOG_DIR_CONFIG}. Missing entries :
${unknownLogDirs.mkString(", ")}")
+ }
+ }
+
validateValues()
private def validateValues(): Unit = {
@@ -611,6 +621,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}.
The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}"
)
+ validateCordonedLogDirs()
}
require(!effectiveAdvertisedBrokerListeners.exists(endpoint =>
endpoint.host=="0.0.0.0"),
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d2471b2ecd2..d6ed8f66efe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1117,6 +1117,8 @@ class ReplicaManager(val config: KafkaConfig,
throw new InvalidTopicException("The topic name is too long.")
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is
offline")
+ if (logManager.cordonedLogDirs().contains(destinationDir))
+ throw new InvalidReplicaAssignmentException(s"Log directory
$destinationDir is cordoned")
getPartition(topicPartition) match {
case online: HostedPartition.Online[Partition] =>
@@ -1171,7 +1173,8 @@ class ReplicaManager(val config: KafkaConfig,
case e@(_: InvalidTopicException |
_: LogDirNotFoundException |
_: ReplicaNotAvailableException |
- _: KafkaStorageException) =>
+ _: KafkaStorageException |
+ _: InvalidReplicaAssignmentException) =>
warn(s"Unable to alter log dirs for $topicPartition", e)
(topicPartition, Errors.forException(e))
case e: NotLeaderOrFollowerException =>
@@ -1225,12 +1228,17 @@ class ReplicaManager(val config: KafkaConfig,
Collections.emptyList[DescribeLogDirsTopic]()
}
+ val isCordoned = if
(metadataCache.metadataVersion().isCordonedLogDirsSupported)
+ logManager.cordonedLogDirs().contains(absolutePath)
+ else
+ false
val describeLogDirsResult = new
DescribeLogDirsResponseData.DescribeLogDirsResult()
.setLogDir(absolutePath)
.setTopics(topicInfos)
.setErrorCode(Errors.NONE.code)
.setTotalBytes(totalBytes)
.setUsableBytes(usableBytes)
+ .setIsCordoned(isCordoned)
describeLogDirsResult
} catch {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7736f6736e9..e4b26070728 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -65,6 +65,7 @@ class LogManagerTest {
logProps.put(TopicConfig.RETENTION_MS_CONFIG, maxLogAgeMs: java.lang.Integer)
val logConfig = new LogConfig(logProps)
var logDir: File = _
+ var logDir2: File = _
var logManager: LogManager = _
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
@@ -73,6 +74,7 @@ class LogManagerTest {
@BeforeEach
def setUp(): Unit = {
logDir = TestUtils.tempDir()
+ logDir2 = TestUtils.tempDir()
logManager = createLogManager()
logManager.startup(Set.empty)
assertEquals(initialTaskDelayMs, logManager.initialTaskDelayMs)
@@ -83,6 +85,7 @@ class LogManagerTest {
if (logManager != null)
logManager.shutdown()
Utils.delete(logDir)
+ Utils.delete(logDir2)
// Some tests assign a new LogManager
if (logManager != null)
logManager.liveLogDirs.foreach(Utils.delete)
@@ -1053,6 +1056,58 @@ class LogManagerTest {
verifyMetrics()
}
+ @Test
+ def testLogManagerMetrics(): Unit = {
+ KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.forEach((metricName:
MetricName) =>
+ KafkaYammerMetrics.defaultRegistry.removeMetric(metricName))
+ logManager.shutdown()
+ logManager = createLogManager(Seq(logDir, logDir2))
+ def allMetrics(): Set[MetricName] =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
+ .filter(metric => metric.getType.contains("LogManager"))
+ .toSet
+ def metric(filter: String): Gauge[Int] =
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+ .filter(entry => entry._1.getName.contains(filter))
+ .values.head.asInstanceOf[Gauge[Int]]
+ def logDirMetric(filter: String, logDir: File): Gauge[Int] =
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+ .filter(entry => entry._1.getName.contains(filter))
+ .filter(entry => entry._1.getScope.contains(logDir.getAbsolutePath))
+ .values.head.asInstanceOf[Gauge[Int]]
+ // expecting 6 metrics:
+ // - OfflineLogDirectoryCount
+ // - CordonedLogDirectoryCount
+ // - LogDirectoryOffline per log dir
+ // - LogDirectoryCordoned per log dir
+ assertEquals(6, allMetrics().size)
+ assertEquals(0, metric("OfflineLogDirectoryCount").value)
+ assertEquals(0, metric("CordonedLogDirectoryCount").value)
+ assertEquals(0, logDirMetric("LogDirectoryOffline", logDir).value)
+ assertEquals(0, logDirMetric("LogDirectoryOffline", logDir2).value)
+ assertEquals(0, logDirMetric("LogDirectoryCordoned", logDir).value)
+ assertEquals(0, logDirMetric("LogDirectoryCordoned", logDir2).value)
+
+ logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath))
+ assertEquals(1, metric("CordonedLogDirectoryCount").value)
+ assertEquals(1, logDirMetric("LogDirectoryCordoned", logDir).value)
+ assertEquals(0, logDirMetric("LogDirectoryCordoned", logDir2).value)
+ logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath,
logDir2.getAbsolutePath))
+ assertEquals(2, metric("CordonedLogDirectoryCount").value)
+ assertEquals(1, logDirMetric("LogDirectoryCordoned", logDir).value)
+ assertEquals(1, logDirMetric("LogDirectoryCordoned", logDir2).value)
+
+ logManager.handleLogDirFailure(logDir.getAbsolutePath)
+ assertEquals(1, metric("OfflineLogDirectoryCount").value)
+ assertEquals(1, logDirMetric("LogDirectoryOffline", logDir).value)
+ assertEquals(0, logDirMetric("LogDirectoryOffline", logDir2).value)
+
+ try {
+ logManager.shutdown()
+ } catch {
+ case _: Throwable => // ignore
+ } finally {
+ logManager = null
+ }
+ }
+
@Test
def testMetricsAreRemovedWhenMovingCurrentToFutureLog(): Unit = {
val dir1 = TestUtils.tempDir()
@@ -1325,4 +1380,33 @@ class LogManagerTest {
Files.setPosixFilePermissions(logDir.toPath, permissions)
}
}
+
+ @Test
+ def testUpdateCordonedLogDirs(): Unit = {
+ logManager.shutdown()
+ logManager = createLogManager(Seq(this.logDir, this.logDir2))
+ assertTrue(logManager.cordonedLogDirs().isEmpty)
+
+ val cordonedDirs = Set(logDir.getAbsolutePath)
+ logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath))
+ assertEquals(cordonedDirs, logManager.cordonedLogDirs())
+ }
+
+ @Test
+ def testNextLogDirs(): Unit = {
+ logManager.shutdown()
+ logManager = createLogManager(Seq(this.logDir, this.logDir2))
+ val nextLogDirs = logManager.nextLogDirs()
+ assertTrue(nextLogDirs.contains(logDir))
+ assertTrue(nextLogDirs.contains(logDir2))
+
+ logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath))
+ val nextLogDirs2 = logManager.nextLogDirs()
+ assertFalse(nextLogDirs2.contains(logDir))
+ assertTrue(nextLogDirs2.contains(logDir2))
+
+ logManager.updateCordonedLogDirs(Set(logDir.getAbsolutePath,
logDir2.getAbsolutePath))
+ val nextLogDirs3 = logManager.nextLogDirs()
+ assertFalse(nextLogDirs3.isEmpty)
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 86d587eae0f..d64d9f00b9c 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -72,7 +72,7 @@ class BrokerLifecycleManagerTest {
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty())
+ Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
TestUtils.retry(60000) {
assertEquals(BrokerState.STARTING, manager.state)
}
@@ -88,7 +88,7 @@ class BrokerLifecycleManagerTest {
context.controllerNodeProvider.node.set(controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.of(10L))
+ Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
TestUtils.retry(60000) {
assertEquals(1, context.mockChannelManager.unsentQueue.size)
assertEquals(10L,
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
@@ -117,7 +117,7 @@ class BrokerLifecycleManagerTest {
assertEquals(1, context.mockClient.futureResponses().size)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty())
+ Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
// We should send the first registration request and get a failure
immediately
TestUtils.retry(60000) {
context.poll()
@@ -154,7 +154,7 @@ class BrokerLifecycleManagerTest {
new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty())
+ Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
TestUtils.retry(10000) {
context.poll()
manager.eventQueue.wakeup()
@@ -233,7 +233,7 @@ class BrokerLifecycleManagerTest {
val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty())
+ Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
poll(ctx, manager, registration)
def nextHeartbeatDirs(): Set[String] =
@@ -260,7 +260,7 @@ class BrokerLifecycleManagerTest {
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty())
+ Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
val request = poll(ctx, manager,
registration).asInstanceOf[BrokerRegistrationRequest]
assertEquals(logDirs, new util.HashSet(request.data.logDirs()))
@@ -276,7 +276,7 @@ class BrokerLifecycleManagerTest {
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.of(10L))
+ Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx,
manager, prepareResponse[T](ctx, response))
def nextHeartbeatRequest() = doPoll[AbstractRequest](new
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
@@ -298,4 +298,63 @@ class BrokerLifecycleManagerTest {
nextHeartbeatRequest()
assertEquals(1200L, manager.brokerEpoch)
}
+
+ @Test
+ def testRegistrationIncludesCordonedDirs(): Unit = {
+ val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"),
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
+ val ctx = new RegistrationTestContext(configProperties)
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"registration-includes-cordoned-dirs-", logDirs)
+ val controllerNode = new Node(3000, "localhost", 8021)
+ ctx.controllerNodeProvider.node.set(controllerNode)
+
+ val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
+
+ manager.start(() => ctx.highestMetadataOffset.get(),
+ ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
+ Collections.emptyMap(), OptionalLong.empty(),
+ logDirs
+ )
+ val request = poll(ctx, manager,
registration).asInstanceOf[BrokerRegistrationRequest]
+
+ assertEquals(logDirs, new util.HashSet(request.data.cordonedLogDirs()))
+ }
+
+ @Test
+ def testAlwaysSendsAccumulatedCordonedDirs(): Unit = {
+ val ctx = new RegistrationTestContext(configProperties)
+ var enabled = false
+ def cordonedLogDirsEnabled(): Boolean = {
+ enabled
+ }
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"cordoned-dirs-sent-in-heartbeat-",
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")),
+ () => {}, () => cordonedLogDirsEnabled())
+ val controllerNode = new Node(3000, "localhost", 8021)
+ ctx.controllerNodeProvider.node.set(controllerNode)
+
+ val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
+ manager.start(() => ctx.highestMetadataOffset.get(),
+ ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
+ Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+ poll(ctx, manager, registration)
+
+ def nextHeartbeatDirs(): Set[Uuid] =
+ poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
+ .data().cordonedLogDirs().asScala.toSet
+ assertEquals(Set(), nextHeartbeatDirs())
+
+ val dir1 = Uuid.randomUuid()
+ val dir2 = Uuid.randomUuid()
+ manager.propagateDirectoryCordoned(util.Set.of(dir1))
+ assertEquals(Set(), nextHeartbeatDirs())
+
+ enabled = true
+ manager.propagateDirectoryCordoned(util.Set.of(dir1))
+ assertEquals(Set(dir1), nextHeartbeatDirs())
+ manager.propagateDirectoryCordoned(util.Set.of(dir2))
+ assertEquals(Set(dir1, dir2), nextHeartbeatDirs())
+ manager.propagateDirectoryUncordoned(util.Set.of(dir1))
+ assertEquals(Set(dir2), nextHeartbeatDirs())
+ manager.propagateDirectoryUncordoned(util.Set.of(dir2))
+ assertEquals(Set(), nextHeartbeatDirs())
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index ea69a599a15..15b9d7cb479 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicReference
import kafka.log.LogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.utils.TestUtils
-import org.apache.kafka.common.{Endpoint, Reconfigurable}
+import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.internals.Plugin
@@ -37,6 +37,7 @@ import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
import org.apache.kafka.network.{SocketServer => JSocketServer,
SocketServerConfigs}
import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager,
RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin,
KafkaYammerMetrics, MetricConfigs}
@@ -46,9 +47,9 @@ import org.apache.kafka.storage.internals.log.{CleanerConfig,
LogConfig, Produce
import org.apache.kafka.test.MockMetricsReporter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers.anyString
+import org.mockito.ArgumentMatchers.{anyString, anySet}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-import org.mockito.Mockito.{mock, verify, verifyNoMoreInteractions, when}
+import org.mockito.Mockito.{mock, never, times, verify,
verifyNoMoreInteractions, when}
import scala.jdk.CollectionConverters._
import scala.collection.Set
@@ -516,6 +517,8 @@ class DynamicBrokerConfigTest {
val producerStateManagerConfig: ProducerStateManagerConfig =
mock(classOf[ProducerStateManagerConfig])
when(logManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
when(kafkaServer.logManager).thenReturn(logManager)
+ val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ when(kafkaServer.replicaManager).thenReturn(replicaManager)
val authorizer = new TestAuthorizer
val authorizerPlugin: Plugin[Authorizer] = Plugin.wrapInstance(authorizer,
null, "authorizer.class.name")
@@ -701,7 +704,7 @@ class DynamicBrokerConfigTest {
val props = TestUtils.createBrokerConfig(0, port = 8181)
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
val config = KafkaConfig(props)
- val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]))
+ val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]),
mock(classOf[DirectoryEventHandler]))
config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
@@ -724,7 +727,7 @@ class DynamicBrokerConfigTest {
val props = TestUtils.createBrokerConfig(0, port = 8181)
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
val config = KafkaConfig(props)
- val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]))
+ val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]),
mock(classOf[DirectoryEventHandler]))
config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
@@ -1009,7 +1012,7 @@ class DynamicBrokerConfigTest {
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG,
retentionMs.toString)
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG,
retentionBytes.toString)
val config = KafkaConfig(props)
- val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]))
+ val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]),
mock(classOf[DirectoryEventHandler]))
config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
@@ -1026,10 +1029,12 @@ class DynamicBrokerConfigTest {
val config = KafkaConfig(origProps)
val serverMock = Mockito.mock(classOf[BrokerServer])
val logManagerMock = Mockito.mock(classOf[LogManager])
+ val directoryEventHandler = Mockito.mock(classOf[DirectoryEventHandler])
Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+
Mockito.when(logManagerMock.directoryId(ArgumentMatchers.anyString())).thenAnswer(_
=> Some(Uuid.randomUuid()))
val currentDefaultLogConfig = new AtomicReference(new LogConfig(new
Properties))
Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ =>
currentDefaultLogConfig.get())
@@ -1037,7 +1042,7 @@ class DynamicBrokerConfigTest {
.thenAnswer(invocation =>
currentDefaultLogConfig.set(invocation.getArgument(0)))
config.dynamicConfig.initialize(None)
- config.dynamicConfig.addBrokerReconfigurable(new
DynamicLogConfig(logManagerMock))
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicLogConfig(logManagerMock, directoryEventHandler))
}
@Test
@@ -1053,6 +1058,59 @@ class DynamicBrokerConfigTest {
assertEquals(TimeUnit.MINUTES.toMillis(1),
ctx.currentDefaultLogConfig.get().retentionMs)
}
+ @Test
+ def testDynamicLogConfigCordonedLogDirs(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, logDirCount = 2)
+ val ctx = new DynamicLogConfigContext(origProps)
+ assertTrue(ctx.config.cordonedLogDirs.isEmpty)
+ val logDirs = ctx.config.logDirs()
+ verify(ctx.directoryEventHandler, never()).handleCordoned(anySet)
+ verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+ // Cordoning 1 new log dir, so 1 new handleCordoned invocation
+ val props = new Properties()
+ props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, logDirs.get(0))
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
+ verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
+ verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+ // When using *, no other entries must be specified, so no new invocations
+ props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*,/invalid/log/dir")
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
+ verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
+ verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+ // Invalid log dir, so no new invocations
+ props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/invalid/log/dir")
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
+ verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
+ verify(ctx.directoryEventHandler, times(0)).handleUncordoned(anySet)
+
+ // * cordons the 2nd log dir, so 1 new handleCordoned invocation
+ props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(logDirs, ctx.config.cordonedLogDirs)
+ verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
+ verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
+
+ // clearing all cordoned log dirs, so 1 new handleUncordoned invocation
+ props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "")
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertTrue(ctx.config.cordonedLogDirs.isEmpty)
+ verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
+ verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+
+ // * cordons all log dirs, so 1 new handleCordoned invocation
+ props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, String.join(",",
logDirs))
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(logDirs, ctx.config.cordonedLogDirs)
+ verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
+ verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+ }
+
@Test
def testLogRetentionTimeMinutesIsNotDynamicallyReconfigurable(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, port = 8181)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c44a4934b35..cf27943ca03 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1549,7 +1549,6 @@ class KafkaConfigTest {
props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, dataDir)
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
- KafkaConfig.fromProps(props)
val config = KafkaConfig.fromProps(props)
assertEquals(metadataDir, config.metadataLogDir)
@@ -1567,13 +1566,53 @@ class KafkaConfigTest {
props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, s"$dataDir1,$dataDir2")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
- KafkaConfig.fromProps(props)
val config = KafkaConfig.fromProps(props)
assertEquals(dataDir1, config.metadataLogDir)
assertEquals(util.List.of(dataDir1, dataDir2), config.logDirs)
}
+ @Test
+ def testCordonAllLogDirs(): Unit = {
+ val dataDir1 = "/path/to/data/dir/1"
+ val dataDir2 = "/path/to/data/dir/2"
+
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
+ props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, s"$dataDir1,$dataDir2")
+ props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+ val config = KafkaConfig.fromProps(props)
+ assertEquals(config.logDirs(), config.cordonedLogDirs())
+ }
+
+ @Test
+ def testInvalidCordonedLogDirs(): Unit = {
+ val dataDir1 = "/path/to/data/dir/1"
+ val dataDir2 = "/path/to/data/dir/2"
+
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
+ props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, s"$dataDir1,$dataDir2")
+ props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/other/path")
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+ val e = assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
+ assertTrue(e.getMessage.contains("/other/path"))
+
+ props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG,
"/other/path,/another")
+ val e2 = assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
+ assertTrue(e2.getMessage.contains("/other/path"))
+ assertTrue(e2.getMessage.contains("/another"))
+
+ props.setProperty(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG,
s"$dataDir1,*")
+ assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
+ }
+
@Test
def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = {
val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 580af684cd6..515cb65ee02 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -89,7 +89,7 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
-import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import java.io.{ByteArrayInputStream, File}
import java.net.InetAddress
@@ -4021,6 +4021,30 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testAlterReplicaLogDirsToCordonedDir(): Unit = {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1))
+ val logMgr =
Mockito.spy(TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))))
+ when(logMgr.cordonedLogDirs()).thenReturn(config.logDirs.asScala.toSet)
+ val replicaManager = new ReplicaManager(
+ metrics = metrics,
+ config = config,
+ time = time,
+ scheduler = new MockScheduler(time),
+ logManager = logMgr,
+ quotaManagers = quotaManager,
+ metadataCache = new KRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
+ logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+ alterPartitionManager = alterPartitionManager)
+ try {
+ val tp = new TopicPartition(topic, 0)
+ val errors = replicaManager.alterReplicaLogDirs(Map(tp ->
config.logDirs.get(0)))
+ assertEquals(Errors.INVALID_REPLICA_ASSIGNMENT, errors(tp))
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
@Test
def testPartitionMetadataFile(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
@@ -5631,6 +5655,7 @@ class ReplicaManagerTest {
assertTrue(response.totalBytes > 0)
assertTrue(response.usableBytes >= 0)
assertFalse(response.topics().isEmpty)
+ assertFalse(response.isCordoned)
response.topics().forEach(t => assertFalse(t.partitions().isEmpty))
}
} finally {
@@ -5663,6 +5688,7 @@ class ReplicaManagerTest {
assertTrue(response.totalBytes > 0)
assertTrue(response.usableBytes >= 0)
assertTrue(response.topics().isEmpty)
+ assertFalse(response.isCordoned)
}
} finally {
replicaManager.shutdown(checkpointHW = false)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index 2762d36f487..58cc5f65528 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -328,21 +328,24 @@ public class BrokerHeartbeatManager {
}
Iterator<UsableBroker> usableBrokers(
- Function<Integer, Optional<String>> idToRack
+ Function<Integer, Optional<String>> idToRack,
+ Function<Integer, Boolean> hasUncordonedDirs
) {
- return new UsableBrokerIterator(brokers.values().iterator(),
- idToRack);
+ return new UsableBrokerIterator(brokers.values().iterator(), idToRack,
hasUncordonedDirs);
}
static class UsableBrokerIterator implements Iterator<UsableBroker> {
private final Iterator<BrokerHeartbeatState> iterator;
private final Function<Integer, Optional<String>> idToRack;
+ private final Function<Integer, Boolean> hasUncordonedDirs;
private UsableBroker next;
UsableBrokerIterator(Iterator<BrokerHeartbeatState> iterator,
- Function<Integer, Optional<String>> idToRack) {
+ Function<Integer, Optional<String>> idToRack,
+ Function<Integer, Boolean> hasUncordonedDirs) {
this.iterator = iterator;
this.idToRack = idToRack;
+ this.hasUncordonedDirs = hasUncordonedDirs;
this.next = null;
}
@@ -357,7 +360,7 @@ public class BrokerHeartbeatManager {
return false;
}
result = iterator.next();
- } while (result.shuttingDown());
+ } while (result.shuttingDown() ||
!hasUncordonedDirs.apply(result.id()));
Optional<String> rack = idToRack.apply(result.id());
next = new UsableBroker(result.id(), rack, result.fenced());
return true;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 4a355c206b4..4fc14209e34 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -432,7 +432,9 @@ public class ClusterControlManager {
if
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
record.setLogDirs(request.logDirs());
}
-
+ if
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
+ record.setCordonedLogDirs(request.cordonedLogDirs());
+ }
if (!request.incarnationId().equals(prevIncarnationId)) {
int prevNumRecords = records.size();
boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
@@ -553,6 +555,22 @@ public class ClusterControlManager {
return OptionalLong.empty();
}
+ public void updateCordonedLogDirs(int brokerId, List<Uuid>
cordonedLogDirs) {
+ brokerRegistrations.compute(brokerId,
+ (k, brokerRegistration) -> new BrokerRegistration.Builder().
+ setId(brokerId).
+ setEpoch(brokerRegistration.epoch()).
+ setIncarnationId(brokerRegistration.incarnationId()).
+ setListeners(brokerRegistration.listeners()).
+
setSupportedFeatures(brokerRegistration.supportedFeatures()).
+ setRack(brokerRegistration.rack()).
+ setFenced(brokerRegistration.fenced()).
+
setInControlledShutdown(brokerRegistration.inControlledShutdown()).
+ setDirectories(brokerRegistration.directories()).
+ setCordonedDirectories(cordonedLogDirs).
+ build());
+ }
+
public void replay(RegisterBrokerRecord record, long offset) {
registerBrokerRecordOffsets.put(record.brokerId(), offset);
int brokerId = record.brokerId();
@@ -575,6 +593,7 @@ public class ClusterControlManager {
setInControlledShutdown(record.inControlledShutdown()).
setIsMigratingZkBroker(record.isMigratingZkBroker()).
setDirectories(record.logDirs()).
+ setCordonedDirectories(record.cordonedLogDirs()).
build());
updateDirectories(brokerId, prevRegistration == null ? null :
prevRegistration.directories(), record.logDirs());
if (heartbeatManager != null) {
@@ -617,6 +636,7 @@ public class ClusterControlManager {
record.epoch(),
BrokerRegistrationFencingChange.FENCE.asBoolean(),
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
+ Optional.empty(),
Optional.empty()
);
}
@@ -628,6 +648,7 @@ public class ClusterControlManager {
record.epoch(),
BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
+ Optional.empty(),
Optional.empty()
);
}
@@ -642,13 +663,15 @@ public class ClusterControlManager {
() -> new IllegalStateException(String.format("Unable to
replay %s: unknown " +
"value for inControlledShutdown field: %x", record,
record.inControlledShutdown())));
Optional<List<Uuid>> directoriesChange =
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
+ Optional<List<Uuid>> cordonedDirectoriesChange =
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
replayRegistrationChange(
record,
record.brokerId(),
record.brokerEpoch(),
fencingChange.asBoolean(),
inControlledShutdownChange.asBoolean(),
- directoriesChange
+ directoriesChange,
+ cordonedDirectoriesChange
);
}
@@ -658,7 +681,8 @@ public class ClusterControlManager {
long brokerEpoch,
Optional<Boolean> fencingChange,
Optional<Boolean> inControlledShutdownChange,
- Optional<List<Uuid>> directoriesChange
+ Optional<List<Uuid>> directoriesChange,
+ Optional<List<Uuid>> cordonedDirectoriesChange
) {
BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
if (curRegistration == null) {
@@ -671,7 +695,8 @@ public class ClusterControlManager {
BrokerRegistration nextRegistration = curRegistration.cloneWith(
fencingChange,
inControlledShutdownChange,
- directoriesChange
+ directoriesChange,
+ cordonedDirectoriesChange
);
if (!curRegistration.equals(nextRegistration)) {
log.info("Replayed {} modifying the registration for broker
{}: {}",
@@ -705,7 +730,8 @@ public class ClusterControlManager {
throw new RuntimeException("ClusterControlManager is not active.");
}
return heartbeatManager.usableBrokers(
- id -> brokerRegistrations.get(id).rack());
+ id -> brokerRegistrations.get(id).rack(),
+ id -> brokerRegistrations.get(id).hasUncordonedDirs());
}
/**
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 4a0219d1759..31665883e3c 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
@@ -60,6 +61,7 @@ import static
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION
import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
import static
org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
+import static
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG;
public class ConfigurationControlManager {
@@ -336,6 +338,8 @@ public class ConfigurationControlManager {
return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
} else if (isDisallowedClusterMinIsrTransition(configRecord)) {
return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+ } else if (isCordonedLogDirsDisallowed(configRecord)) {
+ return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
} else if (configRecord.value() == null) {
allConfigs.remove(configRecord.name());
} else if (configRecord.value().length() > Short.MAX_VALUE) {
@@ -353,6 +357,8 @@ public class ConfigurationControlManager {
return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
} else if (isDisallowedClusterMinIsrTransition(configRecord)) {
return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+ } else if (isCordonedLogDirsDisallowed(configRecord)) {
+ return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
} else {
allConfigs.remove(configRecord.name());
}
@@ -390,6 +396,10 @@ public class ConfigurationControlManager {
new ApiError(INVALID_CONFIG, "The configuration value cannot be added
because " +
"it exceeds the maximum value size of " + Short.MAX_VALUE + "
bytes.");
+ static final ApiError DISALLOWED_CORDONED_LOG_DIRS_ERROR =
+ new ApiError(INVALID_CONFIG, "The " + CORDONED_LOG_DIRS_CONFIG + "
configuration value cannot be " +
+ "set because it requires metadata.version >= " +
MetadataVersion.IBP_4_3_IV0);
+
boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
configRecord.resourceType() == BROKER.id() &&
@@ -401,6 +411,14 @@ public class ConfigurationControlManager {
return false;
}
+ boolean isCordonedLogDirsDisallowed(ConfigRecord configRecord) {
+ if (configRecord.name().equals(CORDONED_LOG_DIRS_CONFIG) &&
+ configRecord.resourceType() == BROKER.id()) {
+ return
!featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported();
+ }
+ return false;
+ }
+
boolean isDisallowedClusterMinIsrTransition(ConfigRecord configRecord) {
if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
configRecord.resourceType() == BROKER.id() &&
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 316213bbc04..7e991169e9a 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1531,6 +1531,35 @@ public class ReplicationControlManager {
}
}
+ /**
+ * Generates the appropriate record to handle a list of directories that
are cordoned.
+ *
+ * @param brokerId The broker id.
+ * @param brokerEpoch The broker epoch.
+ * @param cordonedDirs The list of directories that are cordoned.
+ * @param records The record list to append to.
+ */
+ void handleDirectoriesCordoned(
+ int brokerId,
+ long brokerEpoch,
+ List<Uuid> cordonedDirs,
+ List<ApiMessageAndVersion> records
+ ) {
+ BrokerRegistration registration =
clusterControl.registration(brokerId);
+ List<Uuid> newCordonedDirs =
registration.directoryIntersection(cordonedDirs);
+ if (!newCordonedDirs.isEmpty()) {
+ records.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
+ setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
+ setCordonedLogDirs(newCordonedDirs),
+ (short) 3));
+ if (log.isDebugEnabled()) {
+ List<Uuid> newUncordonedDirs =
registration.directoryDifference(newCordonedDirs);
+ log.debug("Directories {} in broker {} marked cordoned,
uncordoned directories: {}",
+ newCordonedDirs, brokerId, newUncordonedDirs);
+ }
+ }
+ }
+
ControllerResult<ElectLeadersResponseData>
electLeaders(ElectLeadersRequestData request) {
ElectionType electionType = electionType(request.electionType());
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -1667,6 +1696,10 @@ public class ReplicationControlManager {
if
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
handleDirectoriesOffline(brokerId, brokerEpoch,
request.offlineLogDirs(), records);
}
+ if
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
+ clusterControl.updateCordonedLogDirs(brokerId,
request.cordonedLogDirs());
+ handleDirectoriesCordoned(brokerId, brokerEpoch,
request.cordonedLogDirs(), records);
+ }
boolean isCaughtUp = request.currentMetadataOffset() >=
registerBrokerRecordOffset;
BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
states.next().fenced(),
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index 8f0556959b9..bfdf9810a0b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -115,6 +115,7 @@ public final class ClusterDelta {
changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
BrokerRegistrationFencingChange.FENCE.asBoolean(),
Optional.empty(),
+ Optional.empty(),
Optional.empty()
)));
}
@@ -124,6 +125,7 @@ public final class ClusterDelta {
changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
Optional.empty(),
+ Optional.empty(),
Optional.empty()
)));
}
@@ -140,10 +142,12 @@ public final class ClusterDelta {
() -> new IllegalStateException(String.format("Unable to
replay %s: unknown " +
"value for inControlledShutdown field: %d", record,
record.inControlledShutdown())));
Optional<List<Uuid>> directoriesChange =
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
+ Optional<List<Uuid>> cordonedDirectoriesChange =
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
BrokerRegistration nextRegistration = curRegistration.cloneWith(
fencingChange.asBoolean(),
inControlledShutdownChange.asBoolean(),
- directoriesChange
+ directoriesChange,
+ cordonedDirectoriesChange
);
if (!curRegistration.equals(nextRegistration)) {
changedBrokers.put(record.brokerId(),
Optional.of(nextRegistration));
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 56b2bdf375e..4dcb5ba1b36 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -42,6 +42,7 @@ import java.util.stream.Collectors;
* An immutable class which represents broker registrations.
*/
public class BrokerRegistration {
+
public static class Builder {
private int id;
private long epoch;
@@ -53,6 +54,7 @@ public class BrokerRegistration {
private boolean inControlledShutdown;
private boolean isMigratingZkBroker;
private List<Uuid> directories;
+ private List<Uuid> cordonedDirectories;
public Builder() {
this.id = 0;
@@ -65,6 +67,7 @@ public class BrokerRegistration {
this.inControlledShutdown = false;
this.isMigratingZkBroker = false;
this.directories = List.of();
+ this.cordonedDirectories = List.of();
}
public Builder setId(int id) {
@@ -127,6 +130,11 @@ public class BrokerRegistration {
return this;
}
+ public Builder setCordonedDirectories(List<Uuid> cordonedDirectories) {
+ this.cordonedDirectories = cordonedDirectories;
+ return this;
+ }
+
public BrokerRegistration build() {
return new BrokerRegistration(
id,
@@ -138,7 +146,8 @@ public class BrokerRegistration {
fenced,
inControlledShutdown,
isMigratingZkBroker,
- directories);
+ directories,
+ cordonedDirectories);
}
}
@@ -152,6 +161,7 @@ public class BrokerRegistration {
private final boolean inControlledShutdown;
private final boolean isMigratingZkBroker;
private final List<Uuid> directories;
+ private final List<Uuid> cordonedDirectories;
private BrokerRegistration(
int id,
@@ -163,7 +173,8 @@ public class BrokerRegistration {
boolean fenced,
boolean inControlledShutdown,
boolean isMigratingZkBroker,
- List<Uuid> directories
+ List<Uuid> directories,
+ List<Uuid> cordonedDirectories
) {
this.id = id;
this.epoch = epoch;
@@ -185,6 +196,7 @@ public class BrokerRegistration {
directories = new ArrayList<>(directories);
directories.sort(Uuid::compareTo);
this.directories = Collections.unmodifiableList(directories);
+ this.cordonedDirectories =
Collections.unmodifiableList(cordonedDirectories);
}
public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -209,7 +221,8 @@ public class BrokerRegistration {
record.fenced(),
record.inControlledShutdown(),
record.isMigratingZkBroker(),
- record.logDirs());
+ record.logDirs(),
+ record.cordonedLogDirs());
}
public int id() {
@@ -260,10 +273,21 @@ public class BrokerRegistration {
return directories;
}
+ public List<Uuid> cordonedDirectories() {
+ return cordonedDirectories;
+ }
+
public boolean hasOnlineDir(Uuid dir) {
return DirectoryId.isOnline(dir, directories);
}
+ public boolean hasUncordonedDirs() {
+ if (directories.isEmpty()) return true;
+ List<Uuid> dirs = new ArrayList<>(directories);
+ dirs.removeAll(cordonedDirectories);
+ return !dirs.isEmpty();
+ }
+
public List<Uuid> directoryIntersection(List<Uuid> otherDirectories) {
List<Uuid> results = new ArrayList<>();
for (Uuid directory : directories) {
@@ -307,6 +331,12 @@ public class BrokerRegistration {
options.handleLoss("the online log directories of one or more
brokers");
}
+ if (cordonedDirectories.isEmpty() ||
options.metadataVersion().isCordonedLogDirsSupported()) {
+ registrationRecord.setCordonedLogDirs(cordonedDirectories);
+ } else {
+ options.handleLoss("the cordoned log directories of one or more
brokers");
+ }
+
for (Entry<String, Endpoint> entry : listeners.entrySet()) {
Endpoint endpoint = entry.getValue();
registrationRecord.endPoints().add(new BrokerEndpoint().
@@ -330,7 +360,7 @@ public class BrokerRegistration {
@Override
public int hashCode() {
return Objects.hash(id, epoch, incarnationId, listeners,
supportedFeatures,
- rack, fenced, inControlledShutdown, isMigratingZkBroker,
directories);
+ rack, fenced, inControlledShutdown, isMigratingZkBroker,
directories, cordonedDirectories);
}
@Override
@@ -345,7 +375,8 @@ public class BrokerRegistration {
other.fenced == fenced &&
other.inControlledShutdown == inControlledShutdown &&
other.isMigratingZkBroker == isMigratingZkBroker &&
- other.directories.equals(directories);
+ other.directories.equals(directories) &&
+ other.cordonedDirectories.equals(cordonedDirectories);
}
@Override
@@ -367,19 +398,25 @@ public class BrokerRegistration {
", inControlledShutdown=" + inControlledShutdown +
", isMigratingZkBroker=" + isMigratingZkBroker +
", directories=" + directories +
+ ", cordonedDirectories=" + cordonedDirectories +
")";
}
public BrokerRegistration cloneWith(
Optional<Boolean> fencingChange,
Optional<Boolean> inControlledShutdownChange,
- Optional<List<Uuid>> directoriesChange
+ Optional<List<Uuid>> directoriesChange,
+ Optional<List<Uuid>> cordonedDirectoriesChange
) {
boolean newFenced = fencingChange.orElse(fenced);
boolean newInControlledShutdownChange =
inControlledShutdownChange.orElse(inControlledShutdown);
List<Uuid> newDirectories = directoriesChange.orElse(directories);
+ List<Uuid> newCordonedDirectories =
cordonedDirectoriesChange.orElse(cordonedDirectories);
- if (newFenced == fenced && newInControlledShutdownChange ==
inControlledShutdown && newDirectories.equals(directories))
+ if (newFenced == fenced
+ && newInControlledShutdownChange == inControlledShutdown
+ && newDirectories.equals(directories)
+ && newCordonedDirectories.equals(cordonedDirectories))
return this;
return new BrokerRegistration(
@@ -392,7 +429,8 @@ public class BrokerRegistration {
newFenced,
newInControlledShutdownChange,
isMigratingZkBroker,
- newDirectories
+ newDirectories,
+ newCordonedDirectories
);
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index 0c597bc7f89..099d4997d9b 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -404,7 +404,7 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
private static void throwInvalidReplicationFactorIfZero(int numUnfenced) {
if (numUnfenced == 0) {
- throw new InvalidReplicationFactorException("All brokers are
currently fenced.");
+ throw new InvalidReplicationFactorException("All brokers are
currently fenced, or have all their log directories cordoned.");
}
}
@@ -412,7 +412,7 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
if (replicationFactor > numTotalBrokers) {
throw new InvalidReplicationFactorException("The target
replication factor " +
"of " + replicationFactor + " cannot be reached because
only " +
- numTotalBrokers + " broker(s) are registered.");
+ numTotalBrokers + " broker(s) are registered or some
brokers have all their log directories cordoned.");
}
}
diff --git
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
index 7a484a6aeb4..5329e53b2c7 100644
---
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
+++
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
@@ -15,11 +15,12 @@
// Version 1 adds InControlledShutdown
// Version 2 adds LogDirs
+// Version 3 adds CordonedLogDirs
{
"apiKey": 17,
"type": "metadata",
"name": "BrokerRegistrationChangeRecord",
- "validVersions": "0-2",
+ "validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
@@ -31,6 +32,8 @@
{ "name": "InControlledShutdown", "type": "int8", "versions": "1+",
"taggedVersions": "1+", "tag": 1,
"about": "0 if no change, 1 if the broker is in controlled shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "2+",
"taggedVersions": "2+", "tag": 2,
- "about": "Log directories configured in this broker which are available."
}
+ "about": "Log directories configured in this broker which are available."
},
+ { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "3+",
"taggedVersions": "3+", "tag": "3",
+ "about": "Log directories that are cordoned." }
]
}
diff --git
a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index 48e5c466c69..b7db680cbd3 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -16,11 +16,12 @@
// Version 1 adds InControlledShutdown
// Version 2 adds IsMigratingZkBroker
// Version 3 adds LogDirs
+// Version 4 adds CordonedLogDirs
{
"apiKey": 0,
"type": "metadata",
"name": "RegisterBrokerRecord",
- "validVersions": "0-3",
+ "validVersions": "0-4",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
@@ -58,6 +59,8 @@
{ "name": "InControlledShutdown", "type": "bool", "versions": "1+",
"default": "false",
"about": "True if the broker is in controlled shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "3+",
"taggedVersions": "3+", "tag": 0,
- "about": "Log directories configured in this broker which are
available." }
+ "about": "Log directories configured in this broker which are
available." },
+ { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "4+",
"taggedVersions": "4+", "tag": "1",
+ "about": "Log directories that are cordoned." }
]
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index 21bca1150fc..3dfeb0ef940 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -110,7 +110,8 @@ public class BrokerHeartbeatManagerTest {
Set<UsableBroker> brokers = new HashSet<>();
for (Iterator<UsableBroker> iterator = new UsableBrokerIterator(
manager.brokers().iterator(),
- id -> id % 2 == 0 ? Optional.of("rack1") : Optional.of("rack2"));
+ id -> id % 2 == 0 ? Optional.of("rack1") : Optional.of("rack2"),
+ id -> id % 3 != 0);
iterator.hasNext(); ) {
brokers.add(iterator.next());
}
@@ -131,10 +132,8 @@ public class BrokerHeartbeatManagerTest {
manager.touch(4, true, 100);
assertEquals(98L, manager.lowestActiveOffset());
Set<UsableBroker> expected = new HashSet<>();
- expected.add(new UsableBroker(0, Optional.of("rack1"), false));
expected.add(new UsableBroker(1, Optional.of("rack2"), false));
expected.add(new UsableBroker(2, Optional.of("rack1"), false));
- expected.add(new UsableBroker(3, Optional.of("rack2"), false));
expected.add(new UsableBroker(4, Optional.of("rack1"), true));
assertEquals(expected, usableBrokersToSet(manager));
manager.maybeUpdateControlledShutdownOffset(2, 0);
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 2c93d1100ec..1a1d5b74c41 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ConfigSynonym;
+import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
@@ -61,6 +62,7 @@ import static
org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
+import static
org.apache.kafka.controller.ConfigurationControlManager.DISALLOWED_CORDONED_LOG_DIRS_ERROR;
import static
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -554,6 +556,29 @@ public class ConfigurationControlManagerTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testCordonedLogDirsFeature(boolean enabled) {
+ FeatureControlManager featureManager = new
FeatureControlManager.Builder().
+ setQuorumFeatures(new QuorumFeatures(0,
+ QuorumFeatures.defaultSupportedFeatureMap(true),
+ List.of())).
+ build();
+ featureManager.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(enabled ?
MetadataVersion.LATEST_PRODUCTION.featureLevel() :
MetadataVersion.IBP_4_2_IV1.featureLevel()));
+ ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(featureManager).
+ setKafkaConfigSchema(SCHEMA).
+ build();
+
+ ControllerResult<ApiError> result = manager.incrementalAlterConfig(new
ConfigResource(ConfigResource.Type.BROKER, "1"),
+ toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG,
entry(SET, "*"))),
+ true);
+
+ assertEquals(enabled ? ApiError.NONE :
DISALLOWED_CORDONED_LOG_DIRS_ERROR, result.response());
+ }
+
private FeatureControlManager createFeatureControlManager() {
FeatureControlManager featureControlManager = new
FeatureControlManager.Builder().build();
featureControlManager.replay(new FeatureLevelRecord().
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 721ef85de93..54e68da302a 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1011,7 +1011,7 @@ public class QuorumControllerTest {
createTopicsRequestData, Set.of("foo")).get().
topics().find("foo").errorCode());
assertEquals("Unable to replicate the partition 1 time(s): All
brokers " +
- "are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT,
+ "are currently fenced, or have all their log directories
cordoned.", active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Set.of("foo")).
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 7024d0de3a2..9b97d8eeb54 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -636,7 +636,7 @@ public class ReplicationControlManagerTest {
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("Unable to replicate the partition 3 time(s):
All " +
- "brokers are currently fenced."));
+ "brokers are currently fenced, or have all their log
directories cordoned."));
assertEquals(expectedResponse, result.response());
ctx.registerBrokers(0, 1, 2);
@@ -894,7 +894,7 @@ public class ReplicationControlManagerTest {
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("Unable to replicate the partition 4 time(s): The
target " +
"replication factor of 4 cannot be reached because only 3
broker(s) " +
- "are registered."));
+ "are registered or some brokers have all their log directories
cordoned."));
assertEquals(expectedResponse, result.response());
}
@@ -3411,6 +3411,27 @@ public class ReplicationControlManagerTest {
return records.stream().filter(r ->
clazz.equals(r.message().getClass())).collect(Collectors.toList());
}
+ @Test
+ void testHandleDirectoriesCordoned() {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().build();
+ int b1 = 101;
+ Uuid dir1b1 = Uuid.fromString("suitdzfTTdqoWcy8VqmkUg");
+ Uuid dir2b1 = Uuid.fromString("yh3acnzGSeurSTj8aIhOjw");
+ ctx.registerBrokersWithDirs(b1, List.of(dir1b1, dir2b1));
+ ctx.unfenceBrokers(b1);
+ assertEquals(List.of(),
ctx.clusterControl.registration(b1).cordonedDirectories());
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ ctx.replicationControl.handleDirectoriesCordoned(b1,
defaultBrokerEpoch(b1), List.of(dir1b1), records);
+ assertEquals(
+ List.of(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord()
+ .setBrokerId(b1).setBrokerEpoch(defaultBrokerEpoch(b1))
+ .setCordonedLogDirs(List.of(dir1b1)), (short) 3)),
+ filter(records, BrokerRegistrationChangeRecord.class)
+ );
+ ctx.replay(records);
+ assertEquals(List.of(dir1b1),
ctx.clusterControl.registration(b1).cordonedDirectories());
+ }
+
@ParameterizedTest
@CsvSource({"false, false", "false, true", "true, false", "true, true"})
void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean
useLegacyAlterConfigs) {
diff --git
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
index 6d7b8898810..ab342b26087 100644
---
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
@@ -69,7 +69,8 @@ public class ClusterImageBrokersNodeTest {
"fenced=false, " +
"inControlledShutdown=false, " +
"isMigratingZkBroker=false, " +
- "directories=[JsnDDNVyTL289kYk6sPzig, anCdBWcFTlu8gE1wP6bh3g])",
+ "directories=[JsnDDNVyTL289kYk6sPzig, anCdBWcFTlu8gE1wP6bh3g], " +
+ "cordonedDirectories=[])",
child.stringify());
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 49ac33490b0..577338203ec 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -86,6 +86,7 @@ public class BrokerRegistrationTest {
setInControlledShutdown(true).
setIsMigratingZkBroker(true).
setDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
+
setCordonedDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
build());
@Test
@@ -117,20 +118,22 @@ public class BrokerRegistrationTest {
"incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
- "rack=Optional.empty, fenced=true, inControlledShutdown=false,
isMigratingZkBroker=false, directories=[])",
+ "rack=Optional.empty, fenced=true, inControlledShutdown=false,
isMigratingZkBroker=false, " +
+ "directories=[], cordonedDirectories=[])",
REGISTRATIONS.get(1).toString());
assertEquals("BrokerRegistration(id=2, epoch=0, " +
"incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo:
2-3}, " +
- "rack=Optional[myrack], fenced=false, inControlledShutdown=true,
isMigratingZkBroker=false, directories=[])",
+ "rack=Optional[myrack], fenced=false, inControlledShutdown=true,
isMigratingZkBroker=false, " +
+ "directories=[], cordonedDirectories=[])",
REGISTRATIONS.get(2).toString());
assertEquals("BrokerRegistration(id=3, epoch=0, " +
"incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9093)],
supportedFeatures={metadata.version: 7}, " +
"rack=Optional.empty, fenced=false, inControlledShutdown=true,
isMigratingZkBroker=true, " +
- "directories=[r4HpEsMuST6nQ4rznIEJVA])",
+ "directories=[r4HpEsMuST6nQ4rznIEJVA],
cordonedDirectories=[r4HpEsMuST6nQ4rznIEJVA])",
REGISTRATIONS.get(3).toString());
}
@@ -222,4 +225,39 @@ public class BrokerRegistrationTest {
assertFalse(registration.hasOnlineDir(Uuid.fromString("sOwN7HH7S1maxpU1WzlzXg")));
assertFalse(registration.hasOnlineDir(DirectoryId.LOST));
}
+
+ @Test
+ void testHasUncordonedDirs() {
+ BrokerRegistration registration = new BrokerRegistration.Builder().
+ setId(0).
+ setEpoch(0).
+ setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
+ setListeners(List.of(new Endpoint("INTERNAL",
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
+ setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1,
(short) 2))).
+ setRack(Optional.empty()).
+ setFenced(false).
+ setInControlledShutdown(false).
+ setDirectories(List.of(
+ Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
+ )).
+ build();
+ assertTrue(registration.hasUncordonedDirs());
+ registration = new BrokerRegistration.Builder().
+ setId(0).
+ setEpoch(0).
+ setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
+ setListeners(List.of(new Endpoint("INTERNAL",
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
+ setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1,
(short) 2))).
+ setRack(Optional.empty()).
+ setFenced(false).
+ setInControlledShutdown(false).
+ setDirectories(List.of(
+ Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
+ )).
+ setCordonedDirectories(List.of(
+ Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
+ )).
+ build();
+ assertFalse(registration.hasUncordonedDirs());
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
index 55d992d28b6..0db3a42c3bd 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
@@ -190,7 +190,7 @@ public class StripedReplicaPlacerTest {
public void testAllBrokersFenced() {
MockRandom random = new MockRandom();
StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
- assertEquals("All brokers are currently fenced.",
+ assertEquals("All brokers are currently fenced, or have all their log
directories cordoned.",
assertThrows(InvalidReplicationFactorException.class,
() -> place(placer, 0, 1, (short) 1, List.of(
new UsableBroker(11, Optional.of("1"), true),
@@ -202,7 +202,7 @@ public class StripedReplicaPlacerTest {
MockRandom random = new MockRandom();
StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
assertEquals("The target replication factor of 3 cannot be reached
because only " +
- "2 broker(s) are registered.",
+ "2 broker(s) are registered or some brokers have all their log
directories cordoned.",
assertThrows(InvalidReplicationFactorException.class,
() -> place(placer, 0, 1, (short) 3, List.of(
new UsableBroker(11, Optional.of("1"), false),
@@ -274,7 +274,7 @@ public class StripedReplicaPlacerTest {
assertEquals(3, rackList.numTotalBrokers());
assertEquals(0, rackList.numUnfencedBrokers());
assertEquals(List.of(Optional.empty()), rackList.rackNames());
- assertEquals("All brokers are currently fenced.",
+ assertEquals("All brokers are currently fenced, or have all their log
directories cordoned.",
assertThrows(InvalidReplicationFactorException.class,
() -> rackList.place(3)).getMessage());
}
@@ -286,7 +286,7 @@ public class StripedReplicaPlacerTest {
new UsableBroker(11, Optional.of("1"), false),
new UsableBroker(10, Optional.of("1"), false)).iterator());
assertEquals("The target replication factor of 3 cannot be reached
because only " +
- "2 broker(s) are registered.",
+ "2 broker(s) are registered or some brokers have all
their log directories cordoned.",
assertThrows(InvalidReplicationFactorException.class,
() -> rackList.place(3)).getMessage());
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
index b7104986738..987d27bd5d2 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
@@ -19,6 +19,8 @@ package org.apache.kafka.server.common;
import org.apache.kafka.common.Uuid;
+import java.util.Set;
+
public interface DirectoryEventHandler {
/**
@@ -27,6 +29,8 @@ public interface DirectoryEventHandler {
DirectoryEventHandler NOOP = new DirectoryEventHandler() {
@Override public void handleAssignment(TopicIdPartition partition,
Uuid directoryId, String reason, Runnable callback) {}
@Override public void handleFailure(Uuid directoryId) {}
+ @Override public void handleCordoned(Set<Uuid> directoryIds) {}
+ @Override public void handleUncordoned(Set<Uuid> directoryIds) {}
};
/**
@@ -43,4 +47,16 @@ public interface DirectoryEventHandler {
* @param directoryId The directory ID
*/
void handleFailure(Uuid directoryId);
+
+ /**
+ * Handle the transition of an online log directory to the cordoned state.
+ * @param directoryIds The directory IDs to cordon
+ */
+ void handleCordoned(Set<Uuid> directoryIds);
+
+ /**
+ * Handle the transition of a cordoned log directory to the online state.
+ * @param directoryIds The directory IDs to uncordon
+ */
+ void handleUncordoned(Set<Uuid> directoryIds);
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 8bdce25808f..406ccbeebcb 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -120,15 +120,18 @@ public enum MetadataVersion {
// Enables "streams" groups by default for new clusters (KIP-1071).
IBP_4_2_IV1(29, "4.2", "IV1", false),
+ // Enables support for cordoned log dirs
+ // BrokerRegistrationChangeRecord and RegisterBrokerRecord are updated
+ IBP_4_3_IV0(30, "4.3", "IV0", true),
+
//
// NOTE: MetadataVersions after this point are unstable and may be changed.
// If users attempt to use an unstable MetadataVersion, they will get an
error unless
// they have set the configuration unstable.feature.versions.enable=true.
// Please move this comment when updating the LATEST_PRODUCTION constant.
//
+ IBP_4_4_IV0(31, "4.4", "IV0", false);
- // New version for the Kafka 4.3.0 release.
- IBP_4_3_IV0(30, "4.3", "IV0", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the
latest version
@@ -148,7 +151,7 @@ public enum MetadataVersion {
* <strong>Think carefully before you update this value. ONCE A METADATA
VERSION IS PRODUCTION,
* IT CANNOT BE CHANGED.</strong>
*/
- public static final MetadataVersion LATEST_PRODUCTION = IBP_4_2_IV1;
+ public static final MetadataVersion LATEST_PRODUCTION = IBP_4_3_IV0;
// If you change the value above please also update
// LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py
@@ -212,8 +215,15 @@ public enum MetadataVersion {
return this.isAtLeast(MetadataVersion.IBP_3_4_IV0);
}
+ public boolean isCordonedLogDirsSupported() {
+ return this.isAtLeast(MetadataVersion.IBP_4_3_IV0);
+ }
+
public short registerBrokerRecordVersion() {
- if (isDirectoryAssignmentSupported()) {
+ if (isCordonedLogDirsSupported()) {
+ // new cordonedLogDirs field
+ return (short) 4;
+ } else if (isDirectoryAssignmentSupported()) {
// new logDirs field
return (short) 3;
} else if (isMigrationSupported()) {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index d0e1c60d6d7..dbfe8b13eed 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.server.record.BrokerCompressionType;
+import java.util.List;
+
import static
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
/**
@@ -40,6 +42,11 @@ public class ServerLogConfigs {
public static final String LOG_DIR_DOC = "A comma-separated list of the
directories where the log data is stored. (supplemental to " + LOG_DIRS_CONFIG
+ " property)";
public static final String LOG_DIRS_DOC = "A comma-separated list of the
directories where the log data is stored. If not set, the value in " +
LOG_DIR_CONFIG + " is used.";
+ public static final String CORDONED_LOG_DIRS_CONFIG = "cordoned.log.dirs";
+ public static final List<String> CORDONED_LOG_DIRS_DEFAULT = List.of();
+ public static final String CORDONED_LOG_DIRS_DOC = "A comma-separated list
of the directories that are cordoned. Entries in this list must be entries in
log.dirs or log.dir configuration. This can also be set to * to cordon all log
directories.";
+ public static final String CORDONED_LOG_DIRS_ALL = "*";
+
public static final String LOG_SEGMENT_BYTES_CONFIG =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a
single log file";
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index e0bcfb7f146..b4381b2a56f 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -98,9 +98,10 @@ class MetadataVersionTest {
assertEquals(IBP_4_3_IV0, MetadataVersion.fromVersionString("4.3-IV0",
true));
// Throws exception when unstableFeatureVersionsEnabled is false
- assertEquals("Unknown metadata.version '4.3-IV0'. Supported
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
- + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3,
3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0,
4.1-IV1, 4.2-IV0, 4.2-IV1",
- assertThrows(IllegalArgumentException.class, () ->
fromVersionString("4.3-IV0", false)).getMessage());
+ assertEquals("Unknown metadata.version '4.4-IV0'. Supported
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
+ + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3,
3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, "
+ + "4.1-IV1, 4.2-IV0, 4.2-IV1, 4.3-IV0",
+ assertThrows(IllegalArgumentException.class, () ->
fromVersionString("4.4-IV0", false)).getMessage());
}
@Test
@@ -240,7 +241,9 @@ class MetadataVersionTest {
@EnumSource(value = MetadataVersion.class)
public void testRegisterBrokerRecordVersion(MetadataVersion
metadataVersion) {
final short expectedVersion;
- if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
+ if (metadataVersion.isAtLeast(IBP_4_3_IV0)) {
+ expectedVersion = 4;
+ } else if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
expectedVersion = 3;
} else if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_4_IV0)) {
expectedVersion = 2;
diff --git
a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
index e148fb1c0b1..0ce78168704 100644
--- a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
+++ b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
@@ -78,6 +78,7 @@ public class BrokerLifecycleManager {
private final Time time;
private final Set<Uuid> logDirs;
private final Runnable shutdownHook;
+ private final Supplier<Boolean> cordonedLogDirsSupported;
/**
* The broker id.
@@ -144,7 +145,13 @@ public class BrokerLifecycleManager {
* to the Controller.
* This variable can only be read or written from the event queue thread.
*/
- private Map<Uuid, Boolean> offlineDirs = new HashMap<>();
+ private final Map<Uuid, Boolean> offlineDirs = new HashMap<>();
+
+ /**
+ * Map of cordoned log directories. The value is true if the directory is
cordoned.
+ * This variable can only be read or written from the event queue thread.
+ */
+ private final Map<Uuid, Boolean> cordonedLogDirs = new HashMap<>();
/**
* True if we sent an event queue to the active controller requesting
controlled
@@ -211,7 +218,7 @@ public class BrokerLifecycleManager {
Time time,
String threadNamePrefix,
Set<Uuid> logDirs) {
- this(config, time, threadNamePrefix, logDirs, () -> { });
+ this(config, time, threadNamePrefix, logDirs, () -> { }, () -> false);
}
public BrokerLifecycleManager(
@@ -219,11 +226,13 @@ public class BrokerLifecycleManager {
Time time,
String threadNamePrefix,
Set<Uuid> logDirs,
- Runnable shutdownHook) {
+ Runnable shutdownHook,
+ Supplier<Boolean> cordonedLogDirsSupported) {
this.config = config;
this.time = time;
this.logDirs = logDirs;
this.shutdownHook = shutdownHook;
+ this.cordonedLogDirsSupported = cordonedLogDirsSupported;
LogContext logContext = new LogContext("[BrokerLifecycleManager id=" +
this.config.nodeId() + "] ");
this.logger = logContext.logger(BrokerLifecycleManager.class);
this.nodeId = config.nodeId();
@@ -251,8 +260,14 @@ public class BrokerLifecycleManager {
String clusterId,
ListenerCollection advertisedListeners,
Map<String, VersionRange> supportedFeatures,
- OptionalLong previousBrokerEpoch) {
+ OptionalLong previousBrokerEpoch,
+ Set<Uuid> cordonedLogDirs) {
this.previousBrokerEpoch = previousBrokerEpoch;
+ if (!cordonedLogDirs.isEmpty()) {
+ // At this point we don't have fresh metadata yet so we don't know
if the cordoned log dirs feature is supported.
+ // Queue an event, it will be ignored by the controller handling
the broker registration if the feature is disabled.
+ eventQueue.append(new CordonedDirEvent(cordonedLogDirs));
+ }
eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
channelManager, clusterId, advertisedListeners,
supportedFeatures));
}
@@ -275,6 +290,26 @@ public class BrokerLifecycleManager {
new OfflineDirBrokerFailureEvent(directory));
}
+ /**
+ * Propagate directory cordoned to the controller.
+ * @param directories The IDs for the directories that is cordoned.
+ */
+ public void propagateDirectoryCordoned(Set<Uuid> directories) {
+ if (cordonedLogDirsSupported.get()) {
+ eventQueue.append(new CordonedDirEvent(directories));
+ }
+ }
+
+ /**
+ * Propagate directory uncordoned to the controller.
+ * @param directories The IDs for the directories that is uncordoned.
+ */
+ public void propagateDirectoryUncordoned(Set<Uuid> directories) {
+ if (cordonedLogDirsSupported.get()) {
+ eventQueue.append(new UncordonedDirEvent(directories));
+ }
+ }
+
public void resendBrokerRegistration() {
eventQueue.append(new ResendBrokerRegistrationEvent());
}
@@ -397,6 +432,44 @@ public class BrokerLifecycleManager {
}
}
+ private class CordonedDirEvent implements EventQueue.Event {
+
+ private final Set<Uuid> dirs;
+
+ CordonedDirEvent(Set<Uuid> dirs) {
+ this.dirs = dirs;
+ }
+
+ @Override
+ public void run() {
+ for (Uuid dir : dirs) {
+ cordonedLogDirs.put(dir, true);
+ }
+ if (registered) {
+ scheduleNextCommunicationImmediately();
+ }
+ }
+ }
+
+ private class UncordonedDirEvent implements EventQueue.Event {
+
+ private final Set<Uuid> dirs;
+
+ UncordonedDirEvent(Set<Uuid> dirs) {
+ this.dirs = dirs;
+ }
+
+ @Override
+ public void run() {
+ for (Uuid dir : dirs) {
+ cordonedLogDirs.put(dir, false);
+ }
+ if (registered) {
+ scheduleNextCommunicationImmediately();
+ }
+ }
+ }
+
private class StartupEvent implements EventQueue.Event {
private final Supplier<Long> highestMetadataOffsetProvider;
@@ -453,7 +526,8 @@ public class BrokerLifecycleManager {
.setListeners(advertisedListeners)
.setRack(rack.orElse(null))
.setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
- .setLogDirs(sortedLogDirs);
+ .setLogDirs(sortedLogDirs)
+
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
if (logger.isDebugEnabled()) {
logger.debug("Sending broker registration {}", data);
}
@@ -531,7 +605,8 @@ public class BrokerLifecycleManager {
.setCurrentMetadataOffset(metadataOffset)
.setWantFence(!readyToUnfence)
.setWantShutDown(state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
- .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()));
+ .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()))
+
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
if (logger.isTraceEnabled()) {
logger.trace("Sending broker heartbeat {}", data);
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index bfec3337ca3..9baa67c1515 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -92,6 +92,14 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
return
Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getList(ServerLogConfigs.LOG_DIR_CONFIG));
}
+ public List<String> cordonedLogDirs() {
+ List<String> configuredCordonedLogDirs =
getList(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG);
+ if
(configuredCordonedLogDirs.contains(ServerLogConfigs.CORDONED_LOG_DIRS_ALL)) {
+ return logDirs();
+ }
+ return configuredCordonedLogDirs;
+ }
+
public int numIoThreads() {
return getInt(ServerConfigs.NUM_IO_THREADS_CONFIG);
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
index 4ccd3005b7d..f7b557cf503 100644
---
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
@@ -205,8 +205,11 @@ public class DynamicBrokerConfig {
* the names you would use when setting a static or dynamic broker
configuration (not topic
* configuration).
*/
- public static final Set<String> RECONFIGURABLE_CONFIGS = Set.copyOf(
- ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values());
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Stream.of(
+ ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values(),
+ Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toUnmodifiableSet());
}
public static class DynamicListenerConfig {
diff --git
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
new file mode 100644
index 00000000000..ce2eededede
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.FinalizedVersionRange;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.admin.UpdateFeaturesResult;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ disksPerBroker = 2
+)
+public class CordonedLogDirsIntegrationTest {
+
+ private static final String TOPIC1 = "topic1";
+ private static final String TOPIC2 = "topic2";
+ private static final ConfigResource BROKER_0 = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ private final ClusterInstance clusterInstance;
+ private List<String> logDirsBroker0;
+
+ public CordonedLogDirsIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ @BeforeEach
+ public void setup() {
+ logDirsBroker0 = clusterInstance.brokers().get(0).config().logDirs();
+ }
+
+ @ClusterTest(metadataVersion = MetadataVersion.IBP_4_2_IV1)
+ public void testFeatureNotEnabled() throws Exception {
+ testFeatureNotEnabled(List.of());
+ }
+
+ @ClusterTest(
+ metadataVersion = MetadataVersion.IBP_4_2_IV1,
+ serverProperties = {
+ @ClusterConfigProperty(key = CORDONED_LOG_DIRS_CONFIG, value = "*")
+ }
+ )
+ public void testFeatureNotEnabledStaticConfig() throws Exception {
+ testFeatureNotEnabled(logDirsBroker0);
+ }
+
+ private void testFeatureNotEnabled(List<String> initialCordonedLogDirs)
throws Exception {
+ try (Admin admin = clusterInstance.admin()) {
+ // When the metadata version does not support cordoning log dirs:
+ // 1. we can create a topic, even if cordon.log.dirs is statically
set
+ admin.createTopics(newTopic(TOPIC1)).all().get();
+ // 2. no log dirs are marked as cordoned
+ assertCordonedLogDirs(admin, List.of());
+ // 3. we can't dynamically configure cordoned.log.dirs
+ Throwable ee = assertThrows(ExecutionException.class, () ->
+
admin.incrementalAlterConfigs(cordonedDirsConfig("")).all().get());
+ assertInstanceOf(InvalidConfigurationException.class,
ee.getCause());
+
+ // Update the metadata version to support cordoning log dirs
+ short metadataVersion = MetadataVersion.IBP_4_3_IV0.featureLevel();
+ UpdateFeaturesResult updateResult = admin.updateFeatures(
+ Map.of("metadata.version", new
FeatureUpdate(metadataVersion, FeatureUpdate.UpgradeType.UPGRADE)),
+ new UpdateFeaturesOptions());
+ updateResult.all().get();
+ TestUtils.waitForCondition(() -> {
+ FinalizedVersionRange versionRange =
admin.describeFeatures().featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME);
+ return versionRange.maxVersionLevel() == metadataVersion &&
versionRange.minVersionLevel() == metadataVersion;
+ }, 10_000, "Unable to update the metadata version.");
+
Thread.sleep(clusterInstance.brokers().get(0).config().brokerHeartbeatIntervalMs());
+
+ if (initialCordonedLogDirs.isEmpty()) {
+ // if no initial cordoned log dirs, this has not changed, and
we can cordon log dirs
+ assertCordonedLogDirs(admin, List.of());
+ setCordonedLogDirs(admin, logDirsBroker0);
+ initialCordonedLogDirs = logDirsBroker0;
+ }
+ // The statically or dynamically configured log dirs are now
marked as cordoned
+ assertCordonedLogDirs(admin, initialCordonedLogDirs);
+
+ // As all log dirs are cordoned, we can't create a topic
+ Set<NewTopic> newTopics = newTopic(TOPIC2);
+ ee = assertThrows(ExecutionException.class, () ->
+ admin.createTopics(newTopics).all().get());
+ assertInstanceOf(InvalidReplicationFactorException.class,
ee.getCause());
+ // We can't create partitions either
+ Map<String, NewPartitions> newPartitions = Map.of(TOPIC1,
NewPartitions.increaseTo(2));
+ ee = assertThrows(ExecutionException.class, () ->
+ admin.createPartitions(newPartitions).all().get());
+ assertInstanceOf(InvalidReplicationFactorException.class,
ee.getCause());
+
+ // After uncordoning log dirs, we can create topics and partitions
again
+ setCordonedLogDirs(admin, List.of());
+ admin.createTopics(newTopics).all().get();
+ admin.createPartitions(newPartitions).all().get();
+ }
+ }
+
+ @ClusterTest()
+ public void testCordonUncordonLogDirs() throws Exception {
+ try (Admin admin = clusterInstance.admin()) {
+ // No initial cordoned log dirs
+ assertCordonedLogDirs(admin, List.of());
+
+ // We can create topics
+ admin.createTopics(newTopic(TOPIC1)).all().get();
+
+ // Cordon all log dirs
+ setCordonedLogDirs(admin, logDirsBroker0);
+ assertCordonedLogDirs(admin, logDirsBroker0);
+
+ // We can't create new topics or partitions
+ Set<NewTopic> newTopics = newTopic(TOPIC2);
+ Throwable ee = assertThrows(ExecutionException.class, () ->
+ admin.createTopics(newTopics).all().get()
+ );
+ assertInstanceOf(InvalidReplicationFactorException.class,
ee.getCause());
+ Map<String, NewPartitions> newPartitions = Map.of(TOPIC1,
NewPartitions.increaseTo(2));
+ ee = assertThrows(ExecutionException.class, () ->
+ admin.createPartitions(newPartitions).all().get()
+ );
+ assertInstanceOf(InvalidReplicationFactorException.class,
ee.getCause());
+
+ // Uncordon all log dirs
+ setCordonedLogDirs(admin, List.of(logDirsBroker0.get(0)));
+ assertCordonedLogDirs(admin, List.of(logDirsBroker0.get(0)));
+
+ // We can create topics and partitions again
+ admin.createTopics(newTopics).all().get();
+ admin.createPartitions(newPartitions).all().get();
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key = CORDONED_LOG_DIRS_CONFIG, value = "*")
+ }
+ )
+ public void testStaticCordonUncordonLogDirs() throws Exception {
+ Set<NewTopic> newTopics = newTopic(TOPIC1);
+ try (Admin admin = clusterInstance.admin()) {
+ // All log dirs are statically cordoned, so we can't create topics
+ Throwable ee = assertThrows(ExecutionException.class, () ->
+ admin.createTopics(newTopics).all().get()
+ );
+ assertInstanceOf(InvalidReplicationFactorException.class,
ee.getCause());
+
+ // Uncordon log dirs
+ setCordonedLogDirs(admin, List.of());
+
+ // We can't create topics again
+ admin.createTopics(newTopics).all().get();
+ }
+ }
+
+ @ClusterTest
+ public void testReassignWithCordonedLogDirs() throws Exception {
+ TopicPartitionReplica replica = new TopicPartitionReplica(TOPIC1, 0,
0);
+ try (Admin admin = clusterInstance.admin()) {
+ admin.createTopics(newTopic(TOPIC1)).all().get();
+
+ // Find the log dir that does not host the replica and cordon it
+ AtomicReference<String> logDir = new AtomicReference<>();
+ TestUtils.waitForCondition(() -> {
+ DescribeReplicaLogDirsResult.ReplicaLogDirInfo info =
admin.describeReplicaLogDirs(List.of(replica)).all().get().get(replica);
+ logDir.set(info.getCurrentReplicaLogDir());
+ return info.getCurrentReplicaLogDir() != null;
+ }, 10_000, "Unable to find logdir for topic " + replica.topic());
+ assertNotNull(logDir.get());
+ String otherLogDir = logDirsBroker0.stream().filter(dir ->
!dir.equals(logDir.get())).findFirst().get();
+ setCordonedLogDirs(admin, List.of(otherLogDir));
+
+ // We can't move the replica to the now cordoned log dir
+ Throwable ee = assertThrows(ExecutionException.class, () ->
+ admin.alterReplicaLogDirs(Map.of(replica,
otherLogDir)).all().get()
+ );
+ assertInstanceOf(InvalidReplicaAssignmentException.class,
ee.getCause());
+
+ // After uncordoning the log dir, we can move the replica on it
+ setCordonedLogDirs(admin, List.of());
+ admin.alterReplicaLogDirs(Map.of(replica,
otherLogDir)).all().get();
+ }
+ }
+
+ private Map<ConfigResource, Collection<AlterConfigOp>>
cordonedDirsConfig(String value) {
+ return Map.of(
+ BROKER_0,
+ Set.of(new AlterConfigOp(new
ConfigEntry(CORDONED_LOG_DIRS_CONFIG, value), AlterConfigOp.OpType.SET))
+ );
+ }
+
+ private void setCordonedLogDirs(Admin admin, List<String> logDirs) throws
ExecutionException, InterruptedException {
+ String logDirsStr = String.join(",", logDirs);
+
admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr)).all().get();
+ TestUtils.waitForCondition(() -> {
+ Map<ConfigResource, Config> describeConfigs =
admin.describeConfigs(Set.of(BROKER_0)).all().get();
+ Config config = describeConfigs.get(BROKER_0);
+ return
logDirsStr.equals(config.get(CORDONED_LOG_DIRS_CONFIG).value());
+ }, 10_000, "Unable to set the " + CORDONED_LOG_DIRS_CONFIG + "
configuration.");
+ }
+
+ private Set<NewTopic> newTopic(String name) {
+ return Set.of(new NewTopic(name, 1, (short)
clusterInstance.brokers().size()));
+ }
+
+ private void assertCordonedLogDirs(Admin admin, List<String>
expectedCordoned) throws ExecutionException, InterruptedException {
+ Map<Integer, Map<String, LogDirDescription>> logDescriptionsPerBroker
= admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get();
+ for (Map.Entry<Integer, Map<String, LogDirDescription>>
logDescriptions : logDescriptionsPerBroker.entrySet()) {
+ for (Map.Entry<String, LogDirDescription> logDescription :
logDescriptions.getValue().entrySet()) {
+ if (logDescriptions.getKey().equals(0) &&
expectedCordoned.contains(logDescription.getKey())) {
+ assertTrue(logDescription.getValue().isCordoned());
+ } else {
+ assertFalse(logDescription.getValue().isCordoned());
+ }
+ }
+ }
+ }
+}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 41b9c6fbb43..f81d224e7ea 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -146,6 +146,7 @@ public class LogConfig extends AbstractConfig {
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT,
ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM,
ServerLogConfigs.NUM_PARTITIONS_DOC)
.define(ServerLogConfigs.LOG_DIR_CONFIG, LIST,
ServerLogConfigs.LOG_DIR_DEFAULT,
ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH,
ServerLogConfigs.LOG_DIR_DOC)
.define(ServerLogConfigs.LOG_DIRS_CONFIG, LIST, null,
ConfigDef.ValidList.anyNonDuplicateValues(false, true), HIGH,
ServerLogConfigs.LOG_DIRS_DOC)
+ .define(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, LIST,
ServerLogConfigs.CORDONED_LOG_DIRS_DEFAULT,
ConfigDef.ValidList.anyNonDuplicateValues(true, true), MEDIUM,
ServerLogConfigs.CORDONED_LOG_DIRS_DOC)
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT,
DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH,
ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null,
HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
diff --git
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index aef53a6ddc9..af11eaaeea2 100644
---
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -52,7 +52,7 @@ public @interface ClusterTest {
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
SecurityProtocol controllerSecurityProtocol() default
SecurityProtocol.PLAINTEXT;
String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
- MetadataVersion metadataVersion() default MetadataVersion.IBP_4_3_IV0;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_4_4_IV0;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test
String[] tags() default {};
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index ff6350693b4..565c78c9eef 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -67,7 +67,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(features.get(2))
);
assertFeatureOutput(
- "metadata.version", "3.3-IV3", "4.3-IV0", "3.3-IV3",
+ "metadata.version", "3.3-IV3", "4.4-IV0", "3.3-IV3",
outputWithoutEpoch(features.get(3))
);
assertFeatureOutput(
@@ -107,7 +107,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(features.get(2))
);
assertFeatureOutput(
- "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
+ "metadata.version", "3.3-IV3", "4.4-IV0", "3.7-IV0",
outputWithoutEpoch(features.get(3))
);
assertFeatureOutput(
@@ -164,7 +164,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(featuresWithUnstable.get(2))
);
assertFeatureOutput(
- "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
+ "metadata.version", "3.3-IV3", "4.4-IV0", "3.7-IV0",
outputWithoutEpoch(featuresWithUnstable.get(3))
);
assertFeatureOutput(
@@ -201,7 +201,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(featuresWithoutUnstable.get(2))
);
assertFeatureOutput(
- "metadata.version", "3.3-IV3", "4.2-IV1", "3.7-IV0",
+ "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
outputWithoutEpoch(featuresWithoutUnstable.get(3))
);
assertFeatureOutput(
@@ -258,7 +258,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(featuresWithUnstable.get(2))
);
assertFeatureOutput(
- "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
+ "metadata.version", "3.3-IV3", "4.4-IV0", "3.7-IV0",
outputWithoutEpoch(featuresWithUnstable.get(3))
);
assertFeatureOutput(
@@ -295,7 +295,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(featuresWithoutUnstable.get(2))
);
assertFeatureOutput(
- "metadata.version", "3.3-IV3", "4.2-IV1", "3.7-IV0",
+ "metadata.version", "3.3-IV3", "4.3-IV0", "3.7-IV0",
outputWithoutEpoch(featuresWithoutUnstable.get(3))
);
assertFeatureOutput(