This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e27ea8d4dbb KAFKA-19702 Move MetadataVersionConfigValidator and 
related test code to metadata module (#20526)
e27ea8d4dbb is described below

commit e27ea8d4dbb17681a05672dd6fedb8a85c37a28f
Author: Lan Ding <[email protected]>
AuthorDate: Mon Sep 29 01:32:27 2025 +0800

    KAFKA-19702 Move MetadataVersionConfigValidator and related test code to 
metadata module (#20526)
    
    1. Move `MetadataVersionConfigValidator` to metadata module.
    2. Move `MetadataVersionConfigValidatorTest` to metadata module.
    3. Remove `KafkaConfig#validateWithMetadataVersion`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   7 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  13 --
 .../server/MetadataVersionConfigValidatorTest.java | 103 ---------------
 .../test/scala/unit/kafka/log/LogConfigTest.scala  |  18 ---
 .../metadata}/MetadataVersionConfigValidator.java  |  37 ++++--
 .../MetadataVersionConfigValidatorTest.java        | 140 +++++++++++++++++++++
 6 files changed, 169 insertions(+), 149 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 689c62b8687..a9217c4d023 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -41,7 +41,7 @@ import 
org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRecordSerde, ShareCoordinatorService}
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
-import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
+import org.apache.kafka.metadata.{BrokerState, ListenerInfo, 
MetadataVersionConfigValidator}
 import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, ScramPublisher}
 import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
 import org.apache.kafka.server.authorizer.Authorizer
@@ -469,7 +469,10 @@ class BrokerServer(
         socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
         config.numIoThreads, "RequestHandlerAvgIdlePercent")
 
-      metadataPublishers.add(new MetadataVersionConfigValidator(config, 
sharedServer.metadataPublishingFaultHandler))
+      metadataPublishers.add(new 
MetadataVersionConfigValidator(config.brokerId,
+        () => config.processRoles.contains(ProcessRole.BrokerRole) && 
config.logDirs().size() > 1,
+        sharedServer.metadataPublishingFaultHandler
+      ))
       brokerMetadataPublisher = new BrokerMetadataPublisher(config,
         metadataCache,
         logManager,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 671bee32d28..d9b8c5cd91f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
 import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
 import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, 
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -653,18 +652,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement 
KafkaPrincipalSerde")
   }
 
-  /**
-   * Validate some configurations for new MetadataVersion. A new 
MetadataVersion can take place when
-   * a FeatureLevelRecord for "metadata.version" is read from the cluster 
metadata.
-   */
-  def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {
-    if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) {
-      require(metadataVersion.isDirectoryAssignmentSupported,
-        s"Multiple log directories (aka JBOD) are not supported in the current 
MetadataVersion ${metadataVersion}. " +
-          s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
-    }
-  }
-
   /**
    * Copy the subset of properties that are relevant to Logs. The individual 
properties
    * are listed here since the names are slightly different in each Config 
class...
diff --git 
a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java 
b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java
deleted file mode 100644
index daa0aacca7d..00000000000
--- a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server;
-
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.image.MetadataDelta;
-import org.apache.kafka.image.MetadataImage;
-import org.apache.kafka.image.MetadataProvenance;
-import org.apache.kafka.image.loader.LogDeltaManifest;
-import org.apache.kafka.raft.LeaderAndEpoch;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.server.fault.FaultHandler;
-
-import org.junit.jupiter.api.Test;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.BDDMockito.willAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-public class MetadataVersionConfigValidatorTest {
-
-    private static final LogDeltaManifest TEST_MANIFEST = 
LogDeltaManifest.newBuilder()
-            .provenance(MetadataProvenance.EMPTY)
-            .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
-            .numBatches(1)
-            .elapsedNs(90)
-            .numBytes(88)
-            .build();
-    public static final MetadataProvenance TEST_PROVENANCE =
-            new MetadataProvenance(50, 3, 8000, true);
-
-    void testWith(MetadataVersion metadataVersion, KafkaConfig config, 
FaultHandler faultHandler) throws Exception {
-        try (MetadataVersionConfigValidator validator = new 
MetadataVersionConfigValidator(config, faultHandler)) {
-            MetadataDelta delta = new MetadataDelta.Builder()
-                    .setImage(MetadataImage.EMPTY)
-                    .build();
-            if (metadataVersion != null) {
-                delta.replay(new FeatureLevelRecord().
-                        setName(MetadataVersion.FEATURE_NAME).
-                        setFeatureLevel(metadataVersion.featureLevel()));
-            }
-            MetadataImage image = delta.apply(TEST_PROVENANCE);
-
-            validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
-        }
-    }
-
-    @Test
-    void testValidatesConfigOnMetadataChange() throws Exception {
-        MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
-        KafkaConfig config = mock(KafkaConfig.class);
-        FaultHandler faultHandler = mock(FaultHandler.class);
-
-        when(config.brokerId()).thenReturn(8);
-
-        testWith(metadataVersion, config, faultHandler);
-
-        verify(config, 
times(1)).validateWithMetadataVersion(eq(metadataVersion));
-        verifyNoMoreInteractions(faultHandler);
-    }
-
-    @SuppressWarnings("ThrowableNotThrown")
-    @Test
-    void testInvokesFaultHandlerOnException() throws Exception {
-        MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
-        Exception exception = new Exception();
-        KafkaConfig config = mock(KafkaConfig.class);
-        FaultHandler faultHandler = mock(FaultHandler.class);
-
-        when(faultHandler.handleFault(any(), any())).thenReturn(new 
RuntimeException("returned exception"));
-        when(config.brokerId()).thenReturn(8);
-        willAnswer(invocation -> {
-            throw exception;
-        }).given(config).validateWithMetadataVersion(eq(metadataVersion));
-
-        testWith(metadataVersion, config, faultHandler);
-
-        verify(config, 
times(1)).validateWithMetadataVersion(eq(metadataVersion));
-        verify(faultHandler, times(1)).handleFault(
-                eq("Broker configuration does not support the cluster 
MetadataVersion"),
-                eq(exception));
-    }
-}
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index e23e16fa40a..e942e7e3380 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -23,7 +23,6 @@ import 
org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
 import org.apache.kafka.common.config.ConfigDef.Type.INT
 import org.apache.kafka.common.config.{ConfigException, SslConfigs, 
TopicConfig}
 import org.apache.kafka.common.errors.InvalidConfigurationException
-import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -429,21 +428,4 @@ class LogConfigTest {
     logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
deleteOnDisable.toString)
     LogConfig.validate(logProps)
   }
-
-  @Test
-  def testValidateWithMetadataVersionJbodSupport(): Unit = {
-    def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
-      KafkaConfig.fromProps(
-          TestUtils.createBrokerConfig(nodeId = 0, logDirCount = if 
(jbodConfig) 2 else 1)
-        ).validateWithMetadataVersion(metadataVersion)
-
-    validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false)
-    validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false)
-    validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false)
-    assertThrows(classOf[IllegalArgumentException], () =>
-      validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true))
-    assertThrows(classOf[IllegalArgumentException], () =>
-      validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true))
-    validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true)
-  }
 }
diff --git 
a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java 
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java
similarity index 64%
rename from core/src/main/java/kafka/server/MetadataVersionConfigValidator.java
rename to 
metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java
index 6f53ec092f6..2df73463c25 100644
--- a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.server;
+package org.apache.kafka.metadata;
 
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -24,18 +24,20 @@ import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.fault.FaultHandler;
 
+import java.util.function.Supplier;
+
 public class MetadataVersionConfigValidator implements MetadataPublisher {
     private final String name;
-    private final KafkaConfig config;
+    private final Supplier<Boolean> hasMultiLogDirs;
     private final FaultHandler faultHandler;
 
     public MetadataVersionConfigValidator(
-            KafkaConfig config,
-            FaultHandler faultHandler
+        int id,
+        Supplier<Boolean> hasMultiLogDirs,
+        FaultHandler faultHandler
     ) {
-        int id = config.brokerId();
         this.name = "MetadataVersionPublisher(id=" + id + ")";
-        this.config = config;
+        this.hasMultiLogDirs = hasMultiLogDirs;
         this.faultHandler = faultHandler;
     }
 
@@ -46,9 +48,9 @@ public class MetadataVersionConfigValidator implements 
MetadataPublisher {
 
     @Override
     public void onMetadataUpdate(
-            MetadataDelta delta,
-            MetadataImage newImage,
-            LoaderManifest manifest
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LoaderManifest manifest
     ) {
         if (delta.featuresDelta() != null) {
             if (delta.metadataVersionChanged().isPresent()) {
@@ -57,13 +59,22 @@ public class MetadataVersionConfigValidator implements 
MetadataPublisher {
         }
     }
 
+    /**
+     * Validate some configurations for the new MetadataVersion. A new 
MetadataVersion can take place when
+     * a FeatureLevelRecord for "metadata.version" is read from the cluster 
metadata.
+     */
     @SuppressWarnings("ThrowableNotThrown")
     private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
-        try {
-            this.config.validateWithMetadataVersion(metadataVersion);
-        } catch (Throwable t) {
+        if (this.hasMultiLogDirs.get() && 
!metadataVersion.isDirectoryAssignmentSupported()) {
+            String errorMsg = String.format(
+                "Multiple log directories (aka JBOD) are not supported in the 
current MetadataVersion %s. Need %s or higher",
+                metadataVersion, MetadataVersion.IBP_3_7_IV2
+            );
+
             this.faultHandler.handleFault(
-                    "Broker configuration does not support the cluster 
MetadataVersion", t);
+                "Broker configuration does not support the cluster 
MetadataVersion",
+                new IllegalArgumentException(errorMsg)
+            );
         }
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java
new file mode 100644
index 00000000000..50ad1b07ccd
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataVersionConfigValidatorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
+public class MetadataVersionConfigValidatorTest {
+
+    private static final LogDeltaManifest TEST_MANIFEST = 
LogDeltaManifest.newBuilder()
+        .provenance(MetadataProvenance.EMPTY)
+        .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
+        .numBatches(1)
+        .elapsedNs(90)
+        .numBytes(88)
+        .build();
+    public static final MetadataProvenance TEST_PROVENANCE =
+        new MetadataProvenance(50, 3, 8000, true);
+
+    void executeMetadataUpdate(
+        MetadataVersion metadataVersion,
+        Supplier<Boolean> multiLogDirSupplier,
+        FaultHandler faultHandler
+    ) throws Exception {
+        try (MetadataVersionConfigValidator validator = new 
MetadataVersionConfigValidator(0, multiLogDirSupplier, faultHandler)) {
+            MetadataDelta delta = new MetadataDelta.Builder()
+                .setImage(MetadataImage.EMPTY)
+                .build();
+            if (metadataVersion != null) {
+                delta.replay(new FeatureLevelRecord().
+                    setName(MetadataVersion.FEATURE_NAME).
+                    setFeatureLevel(metadataVersion.featureLevel()));
+            }
+            MetadataImage image = delta.apply(TEST_PROVENANCE);
+
+            validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
+        }
+    }
+
+    @Test
+    void testValidatesConfigOnMetadataChange() throws Exception {
+        MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
+        FaultHandler faultHandler = mock(FaultHandler.class);
+        Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
+        when(multiLogDirSupplier.get()).thenReturn(false);
+
+        executeMetadataUpdate(metadataVersion, multiLogDirSupplier, 
faultHandler);
+
+        verify(multiLogDirSupplier, times(1)).get();
+        verifyNoMoreInteractions(faultHandler);
+    }
+
+    @Test
+    void testInvokesFaultHandlerOnException() throws Exception {
+        MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV1;
+        Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
+        FaultHandler faultHandler = mock(FaultHandler.class);
+
+        when(multiLogDirSupplier.get()).thenReturn(true);
+
+        executeMetadataUpdate(metadataVersion, multiLogDirSupplier, 
faultHandler);
+
+        verify(multiLogDirSupplier, times(1)).get();
+        verify(faultHandler, times(1)).handleFault(
+            eq("Broker configuration does not support the cluster 
MetadataVersion"),
+            any(IllegalArgumentException.class));
+    }
+
+    @Test
+    void testValidateWithMetadataVersionJbodSupport() throws Exception {
+        FaultHandler faultHandler = mock(FaultHandler.class);
+        validate(MetadataVersion.IBP_3_6_IV2, false, faultHandler);
+        verifyNoMoreInteractions(faultHandler);
+
+        faultHandler = mock(FaultHandler.class);
+        validate(MetadataVersion.IBP_3_7_IV0, false, faultHandler);
+        verifyNoMoreInteractions(faultHandler);
+
+        faultHandler = mock(FaultHandler.class);
+        validate(MetadataVersion.IBP_3_7_IV2, false, faultHandler);
+        verifyNoMoreInteractions(faultHandler);
+
+        faultHandler = mock(FaultHandler.class);
+        validate(MetadataVersion.IBP_3_6_IV2, true, faultHandler);
+        verify(faultHandler, times(1)).handleFault(
+            eq("Broker configuration does not support the cluster 
MetadataVersion"),
+            any(IllegalArgumentException.class));
+
+        faultHandler = mock(FaultHandler.class);
+        validate(MetadataVersion.IBP_3_7_IV0, true, faultHandler);
+        verify(faultHandler, times(1)).handleFault(
+            eq("Broker configuration does not support the cluster 
MetadataVersion"),
+            any(IllegalArgumentException.class));
+
+        faultHandler = mock(FaultHandler.class);
+        validate(MetadataVersion.IBP_3_7_IV2, true, faultHandler);
+        verifyNoMoreInteractions(faultHandler);
+    }
+
+    private void validate(MetadataVersion metadataVersion, boolean jbodConfig, 
FaultHandler faultHandler)
+        throws Exception {
+        Supplier<Boolean> multiLogDirSupplier = () -> jbodConfig;
+
+        executeMetadataUpdate(metadataVersion, multiLogDirSupplier, 
faultHandler);
+    }
+}

Reply via email to