This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e27ea8d4dbb KAFKA-19702 Move MetadataVersionConfigValidator and
related test code to metadata module (#20526)
e27ea8d4dbb is described below
commit e27ea8d4dbb17681a05672dd6fedb8a85c37a28f
Author: Lan Ding <[email protected]>
AuthorDate: Mon Sep 29 01:32:27 2025 +0800
KAFKA-19702 Move MetadataVersionConfigValidator and related test code to
metadata module (#20526)
1. Move `MetadataVersionConfigValidator` to metadata module.
2. Move `MetadataVersionConfigValidatorTest` to metadata module.
3. Remove `KafkaConfig#validateWithMetadataVersion`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 7 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 13 --
.../server/MetadataVersionConfigValidatorTest.java | 103 ---------------
.../test/scala/unit/kafka/log/LogConfigTest.scala | 18 ---
.../metadata}/MetadataVersionConfigValidator.java | 37 ++++--
.../MetadataVersionConfigValidatorTest.java | 140 +++++++++++++++++++++
6 files changed, 169 insertions(+), 149 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 689c62b8687..a9217c4d023 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -41,7 +41,7 @@ import
org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
-import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
+import org.apache.kafka.metadata.{BrokerState, ListenerInfo,
MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
@@ -469,7 +469,10 @@ class BrokerServer(
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent")
- metadataPublishers.add(new MetadataVersionConfigValidator(config,
sharedServer.metadataPublishingFaultHandler))
+ metadataPublishers.add(new
MetadataVersionConfigValidator(config.brokerId,
+ () => config.processRoles.contains(ProcessRole.BrokerRole) &&
config.logDirs().size() > 1,
+ sharedServer.metadataPublishingFaultHandler
+ ))
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
logManager,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 671bee32d28..d9b8c5cd91f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs,
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -653,18 +652,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement
KafkaPrincipalSerde")
}
- /**
- * Validate some configurations for new MetadataVersion. A new
MetadataVersion can take place when
- * a FeatureLevelRecord for "metadata.version" is read from the cluster
metadata.
- */
- def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {
- if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) {
- require(metadataVersion.isDirectoryAssignmentSupported,
- s"Multiple log directories (aka JBOD) are not supported in the current
MetadataVersion ${metadataVersion}. " +
- s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
- }
- }
-
/**
* Copy the subset of properties that are relevant to Logs. The individual
properties
* are listed here since the names are slightly different in each Config
class...
diff --git
a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java
b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java
deleted file mode 100644
index daa0aacca7d..00000000000
--- a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server;
-
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.image.MetadataDelta;
-import org.apache.kafka.image.MetadataImage;
-import org.apache.kafka.image.MetadataProvenance;
-import org.apache.kafka.image.loader.LogDeltaManifest;
-import org.apache.kafka.raft.LeaderAndEpoch;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.server.fault.FaultHandler;
-
-import org.junit.jupiter.api.Test;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.BDDMockito.willAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-public class MetadataVersionConfigValidatorTest {
-
- private static final LogDeltaManifest TEST_MANIFEST =
LogDeltaManifest.newBuilder()
- .provenance(MetadataProvenance.EMPTY)
- .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
- .numBatches(1)
- .elapsedNs(90)
- .numBytes(88)
- .build();
- public static final MetadataProvenance TEST_PROVENANCE =
- new MetadataProvenance(50, 3, 8000, true);
-
- void testWith(MetadataVersion metadataVersion, KafkaConfig config,
FaultHandler faultHandler) throws Exception {
- try (MetadataVersionConfigValidator validator = new
MetadataVersionConfigValidator(config, faultHandler)) {
- MetadataDelta delta = new MetadataDelta.Builder()
- .setImage(MetadataImage.EMPTY)
- .build();
- if (metadataVersion != null) {
- delta.replay(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(metadataVersion.featureLevel()));
- }
- MetadataImage image = delta.apply(TEST_PROVENANCE);
-
- validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
- }
- }
-
- @Test
- void testValidatesConfigOnMetadataChange() throws Exception {
- MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
- KafkaConfig config = mock(KafkaConfig.class);
- FaultHandler faultHandler = mock(FaultHandler.class);
-
- when(config.brokerId()).thenReturn(8);
-
- testWith(metadataVersion, config, faultHandler);
-
- verify(config,
times(1)).validateWithMetadataVersion(eq(metadataVersion));
- verifyNoMoreInteractions(faultHandler);
- }
-
- @SuppressWarnings("ThrowableNotThrown")
- @Test
- void testInvokesFaultHandlerOnException() throws Exception {
- MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
- Exception exception = new Exception();
- KafkaConfig config = mock(KafkaConfig.class);
- FaultHandler faultHandler = mock(FaultHandler.class);
-
- when(faultHandler.handleFault(any(), any())).thenReturn(new
RuntimeException("returned exception"));
- when(config.brokerId()).thenReturn(8);
- willAnswer(invocation -> {
- throw exception;
- }).given(config).validateWithMetadataVersion(eq(metadataVersion));
-
- testWith(metadataVersion, config, faultHandler);
-
- verify(config,
times(1)).validateWithMetadataVersion(eq(metadataVersion));
- verify(faultHandler, times(1)).handleFault(
- eq("Broker configuration does not support the cluster
MetadataVersion"),
- eq(exception));
- }
-}
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index e23e16fa40a..e942e7e3380 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -23,7 +23,6 @@ import
org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
import org.apache.kafka.common.config.ConfigDef.Type.INT
import org.apache.kafka.common.config.{ConfigException, SslConfigs,
TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException
-import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -429,21 +428,4 @@ class LogConfigTest {
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
deleteOnDisable.toString)
LogConfig.validate(logProps)
}
-
- @Test
- def testValidateWithMetadataVersionJbodSupport(): Unit = {
- def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
- KafkaConfig.fromProps(
- TestUtils.createBrokerConfig(nodeId = 0, logDirCount = if
(jbodConfig) 2 else 1)
- ).validateWithMetadataVersion(metadataVersion)
-
- validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false)
- validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false)
- validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false)
- assertThrows(classOf[IllegalArgumentException], () =>
- validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true))
- assertThrows(classOf[IllegalArgumentException], () =>
- validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true))
- validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true)
- }
}
diff --git
a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java
similarity index 64%
rename from core/src/main/java/kafka/server/MetadataVersionConfigValidator.java
rename to
metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java
index 6f53ec092f6..2df73463c25 100644
--- a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.server;
+package org.apache.kafka.metadata;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -24,18 +24,20 @@ import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
+import java.util.function.Supplier;
+
public class MetadataVersionConfigValidator implements MetadataPublisher {
private final String name;
- private final KafkaConfig config;
+ private final Supplier<Boolean> hasMultiLogDirs;
private final FaultHandler faultHandler;
public MetadataVersionConfigValidator(
- KafkaConfig config,
- FaultHandler faultHandler
+ int id,
+ Supplier<Boolean> hasMultiLogDirs,
+ FaultHandler faultHandler
) {
- int id = config.brokerId();
this.name = "MetadataVersionPublisher(id=" + id + ")";
- this.config = config;
+ this.hasMultiLogDirs = hasMultiLogDirs;
this.faultHandler = faultHandler;
}
@@ -46,9 +48,9 @@ public class MetadataVersionConfigValidator implements
MetadataPublisher {
@Override
public void onMetadataUpdate(
- MetadataDelta delta,
- MetadataImage newImage,
- LoaderManifest manifest
+ MetadataDelta delta,
+ MetadataImage newImage,
+ LoaderManifest manifest
) {
if (delta.featuresDelta() != null) {
if (delta.metadataVersionChanged().isPresent()) {
@@ -57,13 +59,22 @@ public class MetadataVersionConfigValidator implements
MetadataPublisher {
}
}
+ /**
+ * Validate some configurations for the new MetadataVersion. A new
MetadataVersion can take place when
+ * a FeatureLevelRecord for "metadata.version" is read from the cluster
metadata.
+ */
@SuppressWarnings("ThrowableNotThrown")
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
- try {
- this.config.validateWithMetadataVersion(metadataVersion);
- } catch (Throwable t) {
+ if (this.hasMultiLogDirs.get() &&
!metadataVersion.isDirectoryAssignmentSupported()) {
+ String errorMsg = String.format(
+ "Multiple log directories (aka JBOD) are not supported in the
current MetadataVersion %s. Need %s or higher",
+ metadataVersion, MetadataVersion.IBP_3_7_IV2
+ );
+
this.faultHandler.handleFault(
- "Broker configuration does not support the cluster
MetadataVersion", t);
+ "Broker configuration does not support the cluster
MetadataVersion",
+ new IllegalArgumentException(errorMsg)
+ );
}
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java
new file mode 100644
index 00000000000..50ad1b07ccd
--- /dev/null
+++
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
+public class MetadataVersionConfigValidatorTest {
+
+ private static final LogDeltaManifest TEST_MANIFEST =
LogDeltaManifest.newBuilder()
+ .provenance(MetadataProvenance.EMPTY)
+ .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
+ .numBatches(1)
+ .elapsedNs(90)
+ .numBytes(88)
+ .build();
+ public static final MetadataProvenance TEST_PROVENANCE =
+ new MetadataProvenance(50, 3, 8000, true);
+
+ void executeMetadataUpdate(
+ MetadataVersion metadataVersion,
+ Supplier<Boolean> multiLogDirSupplier,
+ FaultHandler faultHandler
+ ) throws Exception {
+ try (MetadataVersionConfigValidator validator = new
MetadataVersionConfigValidator(0, multiLogDirSupplier, faultHandler)) {
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
+ if (metadataVersion != null) {
+ delta.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(metadataVersion.featureLevel()));
+ }
+ MetadataImage image = delta.apply(TEST_PROVENANCE);
+
+ validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
+ }
+ }
+
+ @Test
+ void testValidatesConfigOnMetadataChange() throws Exception {
+ MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
+ FaultHandler faultHandler = mock(FaultHandler.class);
+ Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
+ when(multiLogDirSupplier.get()).thenReturn(false);
+
+ executeMetadataUpdate(metadataVersion, multiLogDirSupplier,
faultHandler);
+
+ verify(multiLogDirSupplier, times(1)).get();
+ verifyNoMoreInteractions(faultHandler);
+ }
+
+ @Test
+ void testInvokesFaultHandlerOnException() throws Exception {
+ MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV1;
+ Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
+ FaultHandler faultHandler = mock(FaultHandler.class);
+
+ when(multiLogDirSupplier.get()).thenReturn(true);
+
+ executeMetadataUpdate(metadataVersion, multiLogDirSupplier,
faultHandler);
+
+ verify(multiLogDirSupplier, times(1)).get();
+ verify(faultHandler, times(1)).handleFault(
+ eq("Broker configuration does not support the cluster
MetadataVersion"),
+ any(IllegalArgumentException.class));
+ }
+
+ @Test
+ void testValidateWithMetadataVersionJbodSupport() throws Exception {
+ FaultHandler faultHandler = mock(FaultHandler.class);
+ validate(MetadataVersion.IBP_3_6_IV2, false, faultHandler);
+ verifyNoMoreInteractions(faultHandler);
+
+ faultHandler = mock(FaultHandler.class);
+ validate(MetadataVersion.IBP_3_7_IV0, false, faultHandler);
+ verifyNoMoreInteractions(faultHandler);
+
+ faultHandler = mock(FaultHandler.class);
+ validate(MetadataVersion.IBP_3_7_IV2, false, faultHandler);
+ verifyNoMoreInteractions(faultHandler);
+
+ faultHandler = mock(FaultHandler.class);
+ validate(MetadataVersion.IBP_3_6_IV2, true, faultHandler);
+ verify(faultHandler, times(1)).handleFault(
+ eq("Broker configuration does not support the cluster
MetadataVersion"),
+ any(IllegalArgumentException.class));
+
+ faultHandler = mock(FaultHandler.class);
+ validate(MetadataVersion.IBP_3_7_IV0, true, faultHandler);
+ verify(faultHandler, times(1)).handleFault(
+ eq("Broker configuration does not support the cluster
MetadataVersion"),
+ any(IllegalArgumentException.class));
+
+ faultHandler = mock(FaultHandler.class);
+ validate(MetadataVersion.IBP_3_7_IV2, true, faultHandler);
+ verifyNoMoreInteractions(faultHandler);
+ }
+
+ private void validate(MetadataVersion metadataVersion, boolean jbodConfig,
FaultHandler faultHandler)
+ throws Exception {
+ Supplier<Boolean> multiLogDirSupplier = () -> jbodConfig;
+
+ executeMetadataUpdate(metadataVersion, multiLogDirSupplier,
faultHandler);
+ }
+}