mimaison commented on code in PR #14847:
URL: https://github.com/apache/kafka/pull/14847#discussion_r1579785296


##########
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:
##########
@@ -47,7 +47,7 @@ public final class MetaProperties {
     /**
      * The property that specifies the node id. Replaces broker.id in V1.
      */
-    static final String NODE_ID_PROP = "node.id";
+    public static final String NODE_ID_PROP = "node.id";

Review Comment:
   This is really not nice that we have to do that. I wonder if we need to wait 
for the `NodeIdProp` from KafkaConfig to move instead of doing that.



##########
tools/src/test/java/org/apache/kafka/tools/StorageToolTest.java:
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.metadata.properties.PropertiesUtils;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(value = 40)
+public class StorageToolTest {
+    private Properties newSelfManagedProperties() {
+        Properties properties = new Properties();
+        properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, 
"/tmp/foo,/tmp/bar");
+        properties.setProperty(StorageTool.PROCESS_ROLES_CONFIG, "controller");
+        properties.setProperty(MetaProperties.NODE_ID_PROP, "2");
+        properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, 
"2@localhost:9092");
+        return properties;
+    }
+
+    @Test
+    public void testConfigToLogDirectories() {
+        LogConfig config = new LogConfig(newSelfManagedProperties());
+        assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/foo")), 
StorageTool.configToLogDirectories(config));
+    }
+
+    @Test
+    public void testConfigToLogDirectoriesWithMetaLogDir() {
+        Properties properties = newSelfManagedProperties();
+        properties.setProperty(StorageTool.METADATA_LOG_DIR_CONFIG, 
"/tmp/baz");
+        LogConfig config = new LogConfig(properties);
+        assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/baz", 
"/tmp/foo")), StorageTool.configToLogDirectories(config));
+    }
+
+    @Test
+    public void testInfoCommandOnEmptyDirectory() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempDir = tempDir();
+        try {
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
true, new ArrayList<>(Collections.singletonList(tempDir.toString()))));
+            assertEquals("Found log directory:\n  " + tempDir + "\n\nFound 
problem:\n  " + tempDir + " is not formatted.\n\n", stream.toString());
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    public static File tempDir() {
+        return TestUtils.tempDirectory();
+    }
+
+    @Test
+    public void testInfoCommandOnMissingDirectory() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempDir = tempDir();
+        tempDir.delete();
+        try {
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
true, new ArrayList<>(Collections.singletonList(tempDir.toString()))));
+            assertEquals("Found problem:\n  " + tempDir + " does not 
exist\n\n", stream.toString());
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    @Test
+    public void testInfoCommandOnDirectoryAsFile() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempFile = tempFile();
+        try {
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
true, new ArrayList<>(Collections.singletonList(tempFile.toString()))));
+            assertEquals("Found problem:\n  " + tempFile + " is not a 
directory\n\n", stream.toString());
+        } finally {
+            tempFile.delete();

Review Comment:
   Is this needed? I think `tempFile()` automatically deletes the file on exit.



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.ProcessRole;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    // TODO delete these constants when they are migrated from KafkaConfig
+    public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
+    public static final String UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG = 
"unstable.metadata.versions.enable";
+    public static final String PROCESS_ROLES_CONFIG = "process.roles";
+    
+    public static void main(String... args) {
+        try {
+            
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> logConfig = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, logConfig);

Review Comment:
    It's not really nice that we cast the configs to `LogConfig` but then have 
to rely on `originals()` to access non LogConfig configs. 



##########
tools/src/test/java/org/apache/kafka/tools/StorageToolTest.java:
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.metadata.properties.PropertiesUtils;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(value = 40)
+public class StorageToolTest {
+    private Properties newSelfManagedProperties() {
+        Properties properties = new Properties();
+        properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, 
"/tmp/foo,/tmp/bar");
+        properties.setProperty(StorageTool.PROCESS_ROLES_CONFIG, "controller");
+        properties.setProperty(MetaProperties.NODE_ID_PROP, "2");
+        properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, 
"2@localhost:9092");
+        return properties;
+    }
+
+    @Test
+    public void testConfigToLogDirectories() {
+        LogConfig config = new LogConfig(newSelfManagedProperties());
+        assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/foo")), 
StorageTool.configToLogDirectories(config));
+    }
+
+    @Test
+    public void testConfigToLogDirectoriesWithMetaLogDir() {
+        Properties properties = newSelfManagedProperties();
+        properties.setProperty(StorageTool.METADATA_LOG_DIR_CONFIG, 
"/tmp/baz");
+        LogConfig config = new LogConfig(properties);
+        assertEquals(new ArrayList<>(Arrays.asList("/tmp/bar", "/tmp/baz", 
"/tmp/foo")), StorageTool.configToLogDirectories(config));
+    }
+
+    @Test
+    public void testInfoCommandOnEmptyDirectory() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempDir = tempDir();
+        try {
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
true, new ArrayList<>(Collections.singletonList(tempDir.toString()))));
+            assertEquals("Found log directory:\n  " + tempDir + "\n\nFound 
problem:\n  " + tempDir + " is not formatted.\n\n", stream.toString());
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    public static File tempDir() {
+        return TestUtils.tempDirectory();
+    }
+
+    @Test
+    public void testInfoCommandOnMissingDirectory() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempDir = tempDir();
+        tempDir.delete();
+        try {
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
true, new ArrayList<>(Collections.singletonList(tempDir.toString()))));
+            assertEquals("Found problem:\n  " + tempDir + " does not 
exist\n\n", stream.toString());
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    @Test
+    public void testInfoCommandOnDirectoryAsFile() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempFile = tempFile();
+        try {
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
true, new ArrayList<>(Collections.singletonList(tempFile.toString()))));
+            assertEquals("Found problem:\n  " + tempFile + " is not a 
directory\n\n", stream.toString());
+        } finally {
+            tempFile.delete();
+        }
+    }
+
+    @Test
+    public void testInfoWithMismatchedLegacyKafkaConfig() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempDir = tempDir();
+        try {
+            Files.write(tempDir.toPath().resolve("meta.properties"), 
String.join("\n", 
+                Arrays.asList("version=1", "node.id=1", 
"cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).getBytes(StandardCharsets.UTF_8));
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
false, new ArrayList<>(Collections.singletonList(tempDir.toString()))));
+            assertEquals("Found log directory:\n  " + tempDir + "\n\nFound 
metadata: {cluster.id=XcZZOzUqS4yHOjhMQB6JLQ, node.id=1, version=1}\n\n" +
+                "Found problem:\n  The kafka configuration file appears to be 
for a legacy cluster, " +
+                "but the directories are formatted for a cluster in KRaft 
mode.\n\n", stream.toString());
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    @Test
+    public void testInfoWithMismatchedSelfManagedKafkaConfig() throws 
IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempDir = tempDir();
+        try {
+            Files.write(tempDir.toPath().resolve("meta.properties"), 
String.join("\n", 
+                Arrays.asList("version=0", "broker.id=1", 
"cluster.id=26c36907-4158-4a35-919d-6534229f5241")).getBytes(StandardCharsets.UTF_8));
+            assertEquals(1, StorageTool.infoCommand(new PrintStream(stream), 
true, new ArrayList<>(Collections.singletonList(tempDir.toString()))));
+            assertEquals("Found log directory:\n  " + tempDir + "\n\nFound 
metadata: {broker.id=1, cluster.id=26c36907-4158-4a35-919d-6534229f5241, 
version=0}" +
+                "\n\nFound problem:\n  The kafka configuration file appears to 
be for a cluster in KRaft mode, " +
+                "but the directories are formatted for legacy mode.\n\n", 
stream.toString());
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    @Test
+    public void testFormatEmptyDirectory() throws IOException, TerseException {
+        File tempDir = tempDir();
+        try {
+            MetaProperties metaProperties = new MetaProperties.Builder()
+                .setVersion(MetaPropertiesVersion.V1)
+                .setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
+                .setNodeId(2)
+                .build();
+            ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            BootstrapMetadata bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(
+                MetadataVersion.latestTesting(), Optional.empty(), "test 
format command");
+
+            assertEquals(0, StorageTool.formatCommand(new PrintStream(stream),
+                new ArrayList<>(Collections.singletonList(tempDir.toString())),
+                metaProperties, bootstrapMetadata, 
MetadataVersion.latestTesting(), false));
+            assertTrue(stream.toString().startsWith("Formatting " + tempDir));
+
+            try {
+                assertEquals(1, StorageTool.formatCommand(new PrintStream(new 
ByteArrayOutputStream()),
+                    new 
ArrayList<>(Collections.singletonList(tempDir.toString())),
+                    metaProperties, bootstrapMetadata, 
MetadataVersion.latestTesting(), false));
+            } catch (Exception e) {
+                assertEquals("Log directory " + tempDir + " is already 
formatted. Use --ignore-formatted " +
+                    "to ignore this directory and format the others.", 
e.getMessage());
+            }
+
+            ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
+            assertEquals(0, StorageTool.formatCommand(new PrintStream(stream2),
+                new ArrayList<>(Collections.singletonList(tempDir.toString())),
+                metaProperties, bootstrapMetadata, 
MetadataVersion.latestTesting(), true));
+            assertEquals(String.format("All of the log directories are already 
formatted.%n"), stream2.toString());
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    private int runFormatCommand(ByteArrayOutputStream stream, List<String> 
directories, boolean ignoreFormatted) throws TerseException, IOException {
+        MetaProperties metaProperties = new MetaProperties.Builder()
+            .setVersion(MetaPropertiesVersion.V1)
+            .setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
+            .setNodeId(2)
+            .build();
+        BootstrapMetadata bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), null, "test 
format command");
+        return StorageTool.formatCommand(new PrintStream(stream), directories, 
metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), 
ignoreFormatted);
+    }
+
+    @Test
+    public void testFormatSucceedsIfAllDirectoriesAreAvailable() throws 
TerseException, IOException {
+        String availableDir1 = TestUtils.tempDirectory().getAbsolutePath();
+        String availableDir2 = TestUtils.tempDirectory().getAbsolutePath();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        assertEquals(0, runFormatCommand(stream, Arrays.asList(availableDir1, 
availableDir2), false));
+        assertTrue(stream.toString().contains(String.format("Formatting %s", 
availableDir1)));
+        assertTrue(stream.toString().contains(String.format("Formatting %s", 
availableDir2)));
+    }
+
+    @Test
+    public void testFormatSucceedsIfAtLeastOneDirectoryIsAvailable() throws 
TerseException, IOException {
+        String availableDir1 = TestUtils.tempDirectory().getAbsolutePath();
+        String unavailableDir1 = TestUtils.tempFile().getAbsolutePath();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        assertEquals(0, runFormatCommand(stream, Arrays.asList(availableDir1, 
unavailableDir1), false));
+        assertTrue(stream.toString().contains(String.format("I/O error trying 
to read log directory %s. Ignoring...", unavailableDir1)));
+        assertTrue(stream.toString().contains(String.format("Formatting %s", 
availableDir1)));
+        assertFalse(stream.toString().contains(String.format("Formatting %s", 
unavailableDir1)));
+    }
+
+    @Test
+    public void testFormatFailsIfAllDirectoriesAreUnavailable() throws 
IOException {
+        String unavailableDir1 = TestUtils.tempFile().getAbsolutePath();
+        String unavailableDir2 = TestUtils.tempFile().getAbsolutePath();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        assertEquals("No available log directories to format.", 
assertThrows(TerseException.class,
+            () -> runFormatCommand(stream, Arrays.asList(unavailableDir1, 
unavailableDir2), false)).getMessage());
+        assertTrue(stream.toString().contains(String.format("I/O error trying 
to read log directory %s. Ignoring...", unavailableDir1)));
+        assertTrue(stream.toString().contains(String.format("I/O error trying 
to read log directory %s. Ignoring...", unavailableDir2)));
+    }
+
+    @Test
+    public void testFormatSucceedsIfAtLeastOneFormattedDirectoryIsAvailable() 
throws TerseException, IOException {
+        String availableDir1 = TestUtils.tempDirectory().getAbsolutePath();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        assertEquals(0, runFormatCommand(stream, 
Collections.singletonList(availableDir1), false));
+
+        ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
+        String unavailableDir1 = TestUtils.tempFile().getAbsolutePath();
+        assertEquals(0, runFormatCommand(stream2, Arrays.asList(availableDir1, 
unavailableDir1), true));
+    }
+
+    @Test
+    public void testDefaultMetadataVersion() {
+        Namespace namespace = StorageTool.parseArguments(new 
String[]{"format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"});
+        MetadataVersion mv = StorageTool.getMetadataVersion(namespace, 
Optional.empty());
+        assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), 
mv.featureLevel(), 
+            "Expected the default metadata.version to be the latest production 
version");
+    }
+
+    @Test
+    public void testConfiguredMetadataVersion() {
+        Namespace namespace = StorageTool.parseArguments(new 
String[]{"format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"});
+        MetadataVersion mv = StorageTool.getMetadataVersion(namespace, 
Optional.of(MetadataVersion.IBP_3_3_IV2.toString()));
+        assertEquals(MetadataVersion.IBP_3_3_IV2.featureLevel(), 
mv.featureLevel(), "Expected the default metadata.version to be 3.3-IV2");
+    }
+
+    @Test
+    public void testMetadataVersionFlags() {
+        MetadataVersion mv = parseMetadataVersion("--release-version", "3.0");
+        assertEquals("3.0", mv.shortVersion());
+
+        mv = parseMetadataVersion("--release-version", "3.0-IV1");
+        assertEquals(MetadataVersion.IBP_3_0_IV1, mv);
+
+        assertThrows(IllegalArgumentException.class, () -> 
parseMetadataVersion("--release-version", "0.0"));
+    }
+
+    private MetadataVersion parseMetadataVersion(String... strings) {
+        List<String> args = new ArrayList<>(Arrays.asList("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"));
+        args.addAll(Arrays.asList(strings));
+        Namespace namespace = StorageTool.parseArguments(args.toArray(new 
String[0]));
+        return StorageTool.getMetadataVersion(namespace, null);
+    }
+
+    @Test
+    public void testAddScram() {
+        try {
+            // Validate we can add multiple SCRAM creds.
+            Optional<List<UserScramCredentialRecord>> scramRecords = 
parseAddScram();
+            assertEquals(Optional.empty(), scramRecords);
+
+            scramRecords = parseAddScram(
+                "-S", 
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]",
+                "-S", 
"SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"
+            );
+            assertEquals(2, scramRecords.get().size());
+
+            // Require name subfield.
+            try {
+                parseAddScram("-S", 
"SCRAM-SHA-256=[salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]");
+            } catch (TerseException e) {
+                assertEquals("You must supply 'name' to add-scram", 
e.getMessage());
+            }
+
+            // Require password xor saltedpassword
+            try {
+                parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]");
+            } catch (TerseException e) {
+                assertEquals("You must only supply one of 'password' or 
'saltedpassword' to add-scram", e.getMessage());
+            }
+
+            try {
+                parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",iterations=8192]");
+            } catch (TerseException e) {
+                assertEquals("You must supply one of 'password' or 
'saltedpassword' to add-scram", e.getMessage());
+            }
+
+            // Validate salt is required with saltedpassword
+            try {
+                parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]");
+            } catch (TerseException e) {
+                assertEquals("You must supply 'salt' with 'saltedpassword' to 
add-scram", e.getMessage());
+            }
+
+            // Validate salt is optional with password
+            assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,password=alice,iterations=4096]").get().size());
+
+            // Require 4096 <= iterations <= 16384
+            try {
+                parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16385]");
+            } catch (TerseException e) {
+                assertEquals("The 'iterations' value must be <= 16384 for 
add-scram", e.getMessage());
+            }
+
+            assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16384]").get().size());
+
+            try {
+                parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4095]");
+            } catch (TerseException e) {
+                assertEquals("The 'iterations' value must be >= 4096 for 
add-scram", e.getMessage());
+            }
+
+            assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4096]").get().size());
+
+            // Validate iterations is optional
+            assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,password=alice]").get().size());
+        } catch (StorageToolTestException | TerseException | 
NoSuchAlgorithmException | InvalidKeyException e) {
+            System.out.println("Test Exception: " + e.getMessage());
+        }
+    }
+
+    static class StorageToolTestException extends KafkaException {
+        public StorageToolTestException(String message) {
+            super(message);
+        }
+    }
+
+    public Optional<List<UserScramCredentialRecord>> parseAddScram(String... 
strings) 
+            throws TerseException, NoSuchAlgorithmException, 
InvalidKeyException {
+        List<String> argsList = new ArrayList<>(Arrays.asList("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"));
+        Collections.addAll(argsList, strings);
+
+        String[] args = argsList.toArray(new String[0]);
+        Namespace namespace = StorageTool.parseArguments(args);
+        return StorageTool.getUserScramCredentialRecords(namespace);
+    }
+
+    @Test
+    public void testScramWithBadMetadataVersion() throws IOException {
+        AtomicReference<String> exitString = new AtomicReference<>("");
+        Exit.Procedure exitProcedure = (code, message) -> {
+            if (message == null) {
+                message = "";
+            }
+            exitString.set(message);
+            throw new StorageToolTestException(exitString.get());
+        };
+        Exit.setExitProcedure(exitProcedure);
+
+        Properties properties = newSelfManagedProperties();
+        File propsFile = tempFile();
+        try (FileOutputStream propsStream = new FileOutputStream(propsFile)) {
+            properties.store(propsStream, "config.props");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        String[] args = {
+            "format", "-c", propsFile.toString(), "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ", "--release-version", "3.4", "-S",
+            
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=8192]"
+        };
+
+        try {
+            StorageTool.main(args);
+        } catch (StorageToolTestException e) {
+            assertEquals("SCRAM is only supported in metadata.version " + 
MetadataVersion.IBP_3_5_IV2 + " or later.", exitString.get());
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void testNoScramWithMetadataVersion() throws IOException {
+        AtomicReference<String> exitString = new AtomicReference<>("");
+        AtomicInteger exitStatus = new AtomicInteger(1);
+        Exit.Procedure exitProcedure = (code, message) -> {
+            exitStatus.set(code);
+            if (message == null) {
+                message = "";
+            }
+            exitString.set(message);
+            throw new StorageToolTestException(exitString.get());
+        };
+        Exit.setExitProcedure(exitProcedure);
+
+        Properties properties = newSelfManagedProperties();
+        File propsFile = tempFile();
+        try (FileOutputStream propsStream = new FileOutputStream(propsFile)) {
+            // This test does format the directory specified so use a tempdir
+            properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, 
tempDir().toString());
+            properties.store(propsStream, "config.props");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        String[] args = {
+            "format", "-c", propsFile.toString(), "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ", "--release-version", "3.4"
+        };
+
+        try {
+            StorageTool.main(args);
+        } catch (StorageToolTestException e) {
+            assertEquals("", exitString.get());
+            assertEquals(1, exitStatus.get());
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void testDirUuidGeneration() throws IOException, TerseException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        File tempDir = tempDir();
+        try {
+            MetaProperties metaProperties = 
+                new MetaProperties.Builder()
+                    .setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
+                    .setNodeId(2)
+                    .build();
+            BootstrapMetadata bootstrapMetadata = StorageTool
+                .buildBootstrapMetadata(MetadataVersion.latestTesting(), 
Optional.empty(), "test format command");
+            assertEquals(0, StorageTool.formatCommand(
+                new PrintStream(stream),
+                new 
ArrayList<>(Collections.singletonList(tempDir.toString())), 
+                metaProperties, 
+                bootstrapMetadata, 
+                MetadataVersion.latestTesting(), false));
+            File metaPropertiesFile = 
Paths.get(tempDir.toURI()).resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME).toFile();
+            assertTrue(metaPropertiesFile.exists());
+            MetaProperties metaProps = new MetaProperties.Builder(
+                
PropertiesUtils.readPropertiesFile(metaPropertiesFile.getAbsolutePath())).build();
+            assertTrue(metaProps.directoryId().isPresent());
+            assertFalse(DirectoryId.reserved(metaProps.directoryId().get()));
+        } finally {
+            Utils.delete(tempDir);
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testFormattingUnstableMetadataVersionBlocked(boolean 
enableUnstable) throws IOException {
+        AtomicReference<String> exitString = new AtomicReference<>("");
+        AtomicInteger exitStatus = new AtomicInteger(1);
+
+        Exit.Procedure exitProcedure = (code, message) -> {
+            exitStatus.set(code);
+            if (message == null) {
+                message = "";
+            }
+            exitString.set(message);
+            throw new StorageToolTestException(exitString.get());
+        };
+        Exit.setExitProcedure(exitProcedure);
+
+        Properties properties = newSelfManagedProperties();
+        File propsFile = tempFile();
+        try (OutputStream propsStream = 
Files.newOutputStream(propsFile.toPath())) {
+            properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, 
tempDir().toString());
+            
properties.setProperty(StorageTool.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, 
Boolean.toString(enableUnstable));
+            properties.store(propsStream, "config.props");
+        }
+        
+        String[] args = {
+            "format", "-c", propsFile.toString(), "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ", "--release-version", 
MetadataVersion.latestTesting().toString()
+        };
+        
+        try {
+            StorageTool.main(args);
+        } catch (StorageToolTestException e) {

Review Comment:
   Do we need this empty `catch` block?



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.ProcessRole;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    // TODO delete these constants when they are migrated from KafkaConfig
+    public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
+    public static final String UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG = 
"unstable.metadata.versions.enable";
+    public static final String PROCESS_ROLES_CONFIG = "process.roles";
+    
+    public static void main(String... args) {
+        try {
+            
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> logConfig = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, logConfig);
+            Exit.exit(0);
+            
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1, e.getMessage());
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1, e.getMessage());
+        }
+    }
+    
+    private static void executeCommand(Namespace namespace,
+                                       String command,
+                                       Optional<LogConfig> logConfig) throws 
Exception {
+        switch (command) {
+            case "info": {
+                List<String> directories = 
configToLogDirectories(logConfig.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(logConfig.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+
+            case "format": {
+                List<String> directories = 
configToLogDirectories(logConfig.get());
+                String clusterId = namespace.getString("cluster_id");
+                
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, interBrokerProtocolVersion(logConfig.get()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata.version of at least " + MetadataVersion.IBP_3_0_IV0 + ".");
+                }
+                if (!metadataVersion.isProduction()) {
+                    if (unstableMetadataVersionsEnabled(logConfig.get())) {
+                        System.out.println("WARNING: using pre-production 
metadata.version " + metadataVersion + ".");
+                    } else {
+                        throw new TerseException("The metadata.version " + 
metadataVersion + " is not ready for production use yet.");
+                    }
+                }
+                MetaProperties metaProperties = new MetaProperties.Builder()
+                    .setVersion(MetaPropertiesVersion.V1)
+                    .setClusterId(clusterId)
+                    .setNodeId(nodeId(logConfig.get()))
+                    .build();
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadata.version " + MetadataVersion.IBP_3_5_IV2 + " or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(logConfig.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, 
+                    bootstrapMetadata, metadataVersion, ignoreFormatted));
+                break;
+            }
+
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+
+            default: {
+                throw new RuntimeException("Unknown command " + command);
+            }
+        }
+    }
+    
+    public static Namespace parseArguments(String[] args) {
+        ArgumentParser parser = 
ArgumentParsers.newArgumentParser("kafka-storage", true, "-", 
"@").description("The Kafka storage tool.");
+        Subparsers subparsers = parser.addSubparsers().dest("command");
+        Subparser infoParser = subparsers.addParser("info").help("Get 
information about the Kafka log directories on this node.");
+        Subparser formatParser = subparsers.addParser("format").help("Format 
the Kafka log directories on this node.");
+        subparsers.addParser("random-uuid").help("Print a random UUID.");
+
+        for (Subparser subpparser : Arrays.asList(infoParser, formatParser)) {
+            subpparser.addArgument("--config", 
"-c").action(store()).required(true).help("The Kafka configuration file to 
use.");
+        }
+
+        formatParser.addArgument("--cluster-id", 
"-t").action(store()).required(true).help("The cluster ID to use.");
+        formatParser.addArgument("--add-scram", "-S").action(append()).help("A 
SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.\n" +
+            "'SCRAM-SHA-256=[name=alice,password=alice-secret]'\n" +
+            
"'SCRAM-SHA-512=[name=alice,iterations=8192,salt=\"N3E=\",saltedpassword=\"YCE=\"]'");
+        formatParser.addArgument("--ignore-formatted", 
"-g").action(storeTrue());
+        formatParser.addArgument("--release-version", 
"-r").action(store()).help(
+            "A KRaft release version to use for the initial metadata.version. 
The minimum is " + MetadataVersion.IBP_3_0_IV0 + 
+                " the default is " + 
MetadataVersion.LATEST_PRODUCTION.version());
+
+        return parser.parseArgsOrFail(args);
+    }
+    
+    static List<String> configToLogDirectories(LogConfig logConfig) {
+        List<String> logDirs = logDirs(logConfig);
+        SortedSet<String> directories = new TreeSet<>(logDirs);
+        String metadataLogDir = metadataLogDir(logConfig);
+        if (metadataLogDir != null) {
+            directories.add(metadataLogDir);
+        }
+        return new ArrayList<>(directories);
+    }
+
+    private static boolean configToSelfManagedMode(LogConfig logConfig) {
+        String processRolesConfig = (String) 
logConfig.originals().get(PROCESS_ROLES_CONFIG);
+        List<String> processRoles = processRolesConfig != null ? 
Arrays.asList(processRolesConfig.split("\\s*,\\s*")) : Collections.emptyList();
+        return !parseProcessRoles(processRoles).isEmpty();
+    }
+
+    static MetadataVersion getMetadataVersion(Namespace namespace, 
Optional<String> defaultVersionString) {
+        MetadataVersion defaultValue;
+        if (defaultVersionString != null && defaultVersionString.isPresent()) {
+            defaultValue = 
MetadataVersion.fromVersionString(defaultVersionString.get());
+        } else {
+            defaultValue = MetadataVersion.LATEST_PRODUCTION;
+        }
+        String releaseVersion = namespace.getString("release_version");
+        if (releaseVersion != null) {
+            return MetadataVersion.fromVersionString(releaseVersion);
+        } else {
+            return defaultValue;
+        }
+    }
+
+    private static UserScramCredentialRecord 
getUserScramCredentialRecord(String mechanism, String config) 
+            throws TerseException, NoSuchAlgorithmException, 
InvalidKeyException {
+        // Remove '[' and ']'
+        String cleanedConfig = config.substring(1, config.length() - 1);
+
+        // Split K->V pairs on ',' and no K or V should contain ','
+        String[] keyValuePairs = 
cleanedConfig.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");
+
+        // Create Map of K to V and replace all " in V
+        Map<String, String> argMap = Arrays.stream(keyValuePairs)
+            .map(pair -> pair.split("=", 2))
+            .collect(Collectors.toMap(pair -> pair[0], pair -> 
pair[1].replaceAll("\"", "")));
+
+        ScramMechanism scramMechanism = 
ScramMechanism.forMechanismName(mechanism);
+        String name = getName(argMap);
+        byte[] salt = getSalt(argMap, scramMechanism);
+        int iterations = getIterations(argMap, scramMechanism);
+        byte[] saltedPassword = getSaltedPassword(argMap, scramMechanism, 
salt, iterations);
+
+        try {
+            ScramFormatter formatter = new ScramFormatter(scramMechanism);
+
+            return new UserScramCredentialRecord()
+                .setName(name)
+                .setMechanism(scramMechanism.type())
+                .setSalt(salt)
+                
.setStoredKey(formatter.storedKey(formatter.clientKey(saltedPassword)))
+                .setServerKey(formatter.serverKey(saltedPassword))
+                .setIterations(iterations);
+        } catch (Throwable e) {
+            throw new TerseException("Error attempting to create 
UserScramCredentialRecord: " + e.getMessage());
+        }
+    }
+
+    private static String getName(Map<String, String> argMap) throws 
TerseException {
+        if (!argMap.containsKey("name")) {
+            throw new TerseException("You must supply 'name' to add-scram");
+        }
+        return argMap.get("name");
+    }
+
+    private static byte[] getSalt(Map<String, String> argMap, ScramMechanism 
scramMechanism) throws NoSuchAlgorithmException {
+        if (argMap.containsKey("salt")) {
+            return Base64.getDecoder().decode(argMap.get("salt"));
+        } else {
+            return new ScramFormatter(scramMechanism).secureRandomBytes();
+        }
+    }
+
+    private static int getIterations(Map<String, String> argMap, 
ScramMechanism scramMechanism) throws TerseException {
+        if (argMap.containsKey("iterations")) {
+            int iterations = Integer.parseInt(argMap.get("iterations"));
+            if (iterations < scramMechanism.minIterations()) {
+                throw new TerseException("The 'iterations' value must be >= " 
+ scramMechanism.minIterations() + " for add-scram");
+            }
+            if (iterations > scramMechanism.maxIterations()) {
+                throw new TerseException("The 'iterations' value must be <= " 
+ scramMechanism.maxIterations() + " for add-scram");
+            }
+            return iterations;
+        } else {
+            return 4096;
+        }
+    }
+
+    private static byte[] getSaltedPassword(
+        Map<String, String> argMap,
+        ScramMechanism scramMechanism,
+        byte[] salt,
+        int iterations
+    ) throws TerseException, NoSuchAlgorithmException, InvalidKeyException {
+        if (argMap.containsKey("password")) {
+            if (argMap.containsKey("saltedpassword")) {
+                throw new TerseException("You must only supply one of 
'password' or 'saltedpassword' to add-scram");
+            }
+            return new 
ScramFormatter(scramMechanism).saltedPassword(argMap.get("password"), salt, 
iterations);
+        } else {
+            if (!argMap.containsKey("saltedpassword")) {
+                throw new TerseException("You must supply one of 'password' or 
'saltedpassword' to add-scram");
+            }
+            if (!argMap.containsKey("salt")) {
+                throw new TerseException("You must supply 'salt' with 
'saltedpassword' to add-scram");
+            }
+            return Base64.getDecoder().decode(argMap.get("saltedpassword"));
+        }
+    }
+    
+    public static Optional<List<UserScramCredentialRecord>> 
getUserScramCredentialRecords(Namespace namespace) 
+            throws TerseException, NoSuchAlgorithmException, 
InvalidKeyException {
+        if (namespace.getList("add_scram") != null) {
+            List<String> listOfAddConfig = namespace.getList("add_scram");
+            List<UserScramCredentialRecord> userScramCredentialRecords = new 
ArrayList<>();
+            for (String singleAddConfig : listOfAddConfig) {
+                String[] singleAddConfigList = singleAddConfig.split("\\s+");
+
+                // The first subarg must be of the form key=value
+                String[] nameValueRecord = singleAddConfigList[0].split("=", 
2);
+                switch (nameValueRecord[0]) {
+                    case "SCRAM-SHA-256":
+                    case "SCRAM-SHA-512":
+                        
userScramCredentialRecords.add(getUserScramCredentialRecord(nameValueRecord[0], 
nameValueRecord[1]));
+                        break;
+                    default:
+                        throw new TerseException("The add-scram mechanism " + 
nameValueRecord[0] + " is not supported.");
+                }
+            }
+            return Optional.of(userScramCredentialRecords);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<MetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {
+            Path directoryPath = Paths.get(directory);
+            if (!Files.isDirectory(directoryPath)) {
+                if (!Files.exists(directoryPath)) {
+                    problems.add(directoryPath + " does not exist");
+                } else {
+                    problems.add(directoryPath + " is not a directory");
+                }
+            } else {
+                foundDirectories.add(directoryPath.toString());
+                Path metaPath = 
directoryPath.resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME);
+                if (!Files.exists(metaPath)) {
+                    problems.add(directoryPath + " is not formatted.");
+                } else {
+                    Properties properties = 
Utils.loadProps(metaPath.toString());
+                    try {
+                        Optional<MetaProperties> curMetadata = Optional.of(new 
MetaProperties.Builder(properties).build());
+                        if (!prevMetadata.isPresent()) {
+                            prevMetadata = curMetadata;
+                        } else {
+                            if 
(!prevMetadata.get().clusterId().equals(curMetadata.get().clusterId())) {
+                                problems.add("Mismatched cluster IDs between 
storage directories.");
+                            } else if 
(!prevMetadata.get().nodeId().equals(curMetadata.get().nodeId())) {
+                                problems.add("Mismatched node IDs between 
storage directories.");
+                            }
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        problems.add("Error loading " + metaPath + ": " + 
e.getMessage());
+                    }
+                }
+            }
+        }
+
+        if (prevMetadata.isPresent()) {
+            if (selfManagedMode) {
+                if 
(prevMetadata.get().version().equals(MetaPropertiesVersion.V0)) {
+                    problems.add("The kafka configuration file appears to be 
for a cluster in KRaft mode, " +
+                        "but the directories are formatted for legacy mode.");
+                }
+            } else if 
(prevMetadata.get().version().equals(MetaPropertiesVersion.V1)) {
+                problems.add("The kafka configuration file appears to be for a 
legacy cluster, " +
+                    "but the directories are formatted for a cluster in KRaft 
mode.");
+            }
+        }
+
+        return validateDirectories(stream, directories, problems, 
foundDirectories, prevMetadata);
+    }
+
+    private static int validateDirectories(PrintStream stream,
+                                           List<String> directories,
+                                           List<String> problems,
+                                           List<String> foundDirectories,
+                                           Optional<MetaProperties> 
prevMetadata) {
+        if (directories.isEmpty()) {
+            stream.println("No directories specified.");
+            return 0;
+        } else {
+            if (!foundDirectories.isEmpty()) {
+                if (foundDirectories.size() == 1) {
+                    stream.println("Found log directory:");
+                } else {
+                    stream.println("Found log directories:");
+                }
+                foundDirectories.forEach(d -> stream.println("  " + d));
+                stream.println("");
+            }
+
+            if (prevMetadata.isPresent()) {
+                Map<String, String> sortedOutput = new TreeMap<>();
+                MetaProperties prev = prevMetadata.get();
+                prev.toProperties().entrySet().forEach(e -> 
sortedOutput.put(e.getKey().toString(), e.getValue().toString()));
+                stream.println("Found metadata: " + sortedOutput);
+                stream.println("");
+            }
+
+            if (!problems.isEmpty()) {
+                if (problems.size() == 1) {
+                    stream.println("Found problem:");
+                } else {
+                    stream.println("Found problems:");
+                }
+                problems.forEach(d -> stream.println("  " + d));
+                stream.println("");
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    public static BootstrapMetadata buildBootstrapMetadata(MetadataVersion 
metadataVersion,
+                                                           
Optional<List<ApiMessageAndVersion>> metadataOptionalArguments,
+                                                           String source) {
+        List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+        metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord()
+            .setName(MetadataVersion.FEATURE_NAME)
+            .setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
+        
+        if (metadataOptionalArguments != null) {
+            metadataOptionalArguments.ifPresent(metadataArguments -> 
metadataArguments.forEach(metadataRecords::add));
+        }
+        
+        return BootstrapMetadata.fromRecords(metadataRecords, source);
+    }
+
+    public static int formatCommand(PrintStream stream,
+                             List<String> directories,
+                             MetaProperties metaProperties,
+                             BootstrapMetadata bootstrapMetadata,
+                             MetadataVersion metadataVersion,
+                             boolean ignoreFormatted) throws TerseException, 
IOException {
+        if (directories.isEmpty()) {
+            throw new TerseException("No log directories found in the 
configuration.");
+        }
+
+        MetaPropertiesEnsemble.Loader loader = new 
MetaPropertiesEnsemble.Loader();
+        directories.forEach(loader::addLogDir);
+        MetaPropertiesEnsemble metaPropertiesEnsemble = loader.load();
+        metaPropertiesEnsemble.verify(metaProperties.clusterId(), 
metaProperties.nodeId(), 
EnumSet.noneOf(MetaPropertiesEnsemble.VerificationFlag.class));
+
+        System.out.println("metaPropertiesEnsemble=" + metaPropertiesEnsemble);
+        MetaPropertiesEnsemble.Copier copier = new 
MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble);
+        if (!(ignoreFormatted || copier.logDirProps().isEmpty())) {
+            String firstLogDir = 
copier.logDirProps().keySet().iterator().next();
+            throw new TerseException("Log directory " + firstLogDir + " is 
already formatted. " +
+                "Use --ignore-formatted to ignore this directory and format 
the others.");
+        }
+        if (!copier.errorLogDirs().isEmpty()) {
+            copier.errorLogDirs().forEach(errorLogDir -> 
+                stream.printf("I/O error trying to read log directory %s. 
Ignoring...%n", errorLogDir));
+            if (metaPropertiesEnsemble.emptyLogDirs().isEmpty() && 
copier.logDirProps().isEmpty()) {
+                throw new TerseException("No available log directories to 
format.");
+            }
+        }
+        if (metaPropertiesEnsemble.emptyLogDirs().isEmpty()) {
+            stream.println("All of the log directories are already 
formatted.");
+        } else {
+            metaPropertiesEnsemble.emptyLogDirs().forEach(logDir -> {
+                copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
+                    setDirectoryId(copier.generateValidDirectoryId()).
+                    build());
+                copier.setPreWriteHandler((logDir1, isNew, metaProperties1) -> 
{
+                    stream.println("Formatting " + logDir1 + " with 
metadata.version " + metadataVersion + ".");
+                    Files.createDirectories(Paths.get(logDir1));
+                    BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(logDir1, Optional.empty());
+                    bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
+                });
+                copier.setWriteErrorHandler((logDir2, e) -> {
+                    throw new RuntimeException("Error while writing 
meta.properties file " + logDir2 + ": " + e.getMessage());
+                });
+                try {
+                    copier.writeLogDirChanges();
+                } catch (Throwable e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        return 0;
+    }
+
+    public static List<String> logDirs(LogConfig logConfig) {
+        String logDirsConfig = (String) 
logConfig.originals().get(ServerLogConfigs.LOG_DIRS_CONFIG);
+        String logDirConfig = (String) 
logConfig.originals().get(ServerLogConfigs.LOG_DIR_CONFIG);
+        String csvList = logDirsConfig != null ? logDirsConfig : logDirConfig;
+        if (csvList == null || csvList.isEmpty()) {
+            return Collections.emptyList();
+        } else {
+            return Arrays.stream(csvList.split("\\s*,\\s*"))
+                .filter(v -> !v.equals(""))

Review Comment:
   `!v.equals("")` -> `!v.isEmpty()`



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.ProcessRole;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    // TODO delete these constants when they are migrated from KafkaConfig
+    public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
+    public static final String UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG = 
"unstable.metadata.versions.enable";
+    public static final String PROCESS_ROLES_CONFIG = "process.roles";
+    
+    public static void main(String... args) {
+        try {
+            
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> logConfig = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, logConfig);
+            Exit.exit(0);
+            
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1, e.getMessage());
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1, e.getMessage());
+        }
+    }
+    
+    private static void executeCommand(Namespace namespace,
+                                       String command,
+                                       Optional<LogConfig> logConfig) throws 
Exception {
+        switch (command) {
+            case "info": {
+                List<String> directories = 
configToLogDirectories(logConfig.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(logConfig.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+
+            case "format": {
+                List<String> directories = 
configToLogDirectories(logConfig.get());
+                String clusterId = namespace.getString("cluster_id");
+                
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, interBrokerProtocolVersion(logConfig.get()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata.version of at least " + MetadataVersion.IBP_3_0_IV0 + ".");
+                }
+                if (!metadataVersion.isProduction()) {
+                    if (unstableMetadataVersionsEnabled(logConfig.get())) {
+                        System.out.println("WARNING: using pre-production 
metadata.version " + metadataVersion + ".");
+                    } else {
+                        throw new TerseException("The metadata.version " + 
metadataVersion + " is not ready for production use yet.");
+                    }
+                }
+                MetaProperties metaProperties = new MetaProperties.Builder()
+                    .setVersion(MetaPropertiesVersion.V1)
+                    .setClusterId(clusterId)
+                    .setNodeId(nodeId(logConfig.get()))
+                    .build();
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadata.version " + MetadataVersion.IBP_3_5_IV2 + " or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(logConfig.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, 
+                    bootstrapMetadata, metadataVersion, ignoreFormatted));
+                break;
+            }
+
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+
+            default: {
+                throw new RuntimeException("Unknown command " + command);
+            }
+        }
+    }
+    
+    public static Namespace parseArguments(String[] args) {
+        ArgumentParser parser = 
ArgumentParsers.newArgumentParser("kafka-storage", true, "-", 
"@").description("The Kafka storage tool.");
+        Subparsers subparsers = parser.addSubparsers().dest("command");
+        Subparser infoParser = subparsers.addParser("info").help("Get 
information about the Kafka log directories on this node.");
+        Subparser formatParser = subparsers.addParser("format").help("Format 
the Kafka log directories on this node.");
+        subparsers.addParser("random-uuid").help("Print a random UUID.");
+
+        for (Subparser subpparser : Arrays.asList(infoParser, formatParser)) {
+            subpparser.addArgument("--config", 
"-c").action(store()).required(true).help("The Kafka configuration file to 
use.");
+        }
+
+        formatParser.addArgument("--cluster-id", 
"-t").action(store()).required(true).help("The cluster ID to use.");
+        formatParser.addArgument("--add-scram", "-S").action(append()).help("A 
SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.\n" +
+            "'SCRAM-SHA-256=[name=alice,password=alice-secret]'\n" +
+            
"'SCRAM-SHA-512=[name=alice,iterations=8192,salt=\"N3E=\",saltedpassword=\"YCE=\"]'");
+        formatParser.addArgument("--ignore-formatted", 
"-g").action(storeTrue());
+        formatParser.addArgument("--release-version", 
"-r").action(store()).help(
+            "A KRaft release version to use for the initial metadata.version. 
The minimum is " + MetadataVersion.IBP_3_0_IV0 + 
+                " the default is " + 
MetadataVersion.LATEST_PRODUCTION.version());
+
+        return parser.parseArgsOrFail(args);
+    }
+    
+    static List<String> configToLogDirectories(LogConfig logConfig) {
+        List<String> logDirs = logDirs(logConfig);
+        SortedSet<String> directories = new TreeSet<>(logDirs);
+        String metadataLogDir = metadataLogDir(logConfig);
+        if (metadataLogDir != null) {
+            directories.add(metadataLogDir);
+        }
+        return new ArrayList<>(directories);
+    }
+
+    private static boolean configToSelfManagedMode(LogConfig logConfig) {
+        String processRolesConfig = (String) 
logConfig.originals().get(PROCESS_ROLES_CONFIG);
+        List<String> processRoles = processRolesConfig != null ? 
Arrays.asList(processRolesConfig.split("\\s*,\\s*")) : Collections.emptyList();
+        return !parseProcessRoles(processRoles).isEmpty();
+    }
+
+    static MetadataVersion getMetadataVersion(Namespace namespace, 
Optional<String> defaultVersionString) {
+        MetadataVersion defaultValue;
+        if (defaultVersionString != null && defaultVersionString.isPresent()) {
+            defaultValue = 
MetadataVersion.fromVersionString(defaultVersionString.get());
+        } else {
+            defaultValue = MetadataVersion.LATEST_PRODUCTION;
+        }
+        String releaseVersion = namespace.getString("release_version");
+        if (releaseVersion != null) {
+            return MetadataVersion.fromVersionString(releaseVersion);
+        } else {
+            return defaultValue;
+        }
+    }
+
+    private static UserScramCredentialRecord 
getUserScramCredentialRecord(String mechanism, String config) 
+            throws TerseException, NoSuchAlgorithmException, 
InvalidKeyException {
+        // Remove '[' and ']'
+        String cleanedConfig = config.substring(1, config.length() - 1);
+
+        // Split K->V pairs on ',' and no K or V should contain ','
+        String[] keyValuePairs = 
cleanedConfig.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");
+
+        // Create Map of K to V and replace all " in V
+        Map<String, String> argMap = Arrays.stream(keyValuePairs)
+            .map(pair -> pair.split("=", 2))
+            .collect(Collectors.toMap(pair -> pair[0], pair -> 
pair[1].replaceAll("\"", "")));
+
+        ScramMechanism scramMechanism = 
ScramMechanism.forMechanismName(mechanism);
+        String name = getName(argMap);
+        byte[] salt = getSalt(argMap, scramMechanism);
+        int iterations = getIterations(argMap, scramMechanism);
+        byte[] saltedPassword = getSaltedPassword(argMap, scramMechanism, 
salt, iterations);
+
+        try {
+            ScramFormatter formatter = new ScramFormatter(scramMechanism);
+
+            return new UserScramCredentialRecord()
+                .setName(name)
+                .setMechanism(scramMechanism.type())
+                .setSalt(salt)
+                
.setStoredKey(formatter.storedKey(formatter.clientKey(saltedPassword)))
+                .setServerKey(formatter.serverKey(saltedPassword))
+                .setIterations(iterations);
+        } catch (Throwable e) {
+            throw new TerseException("Error attempting to create 
UserScramCredentialRecord: " + e.getMessage());
+        }
+    }
+
+    private static String getName(Map<String, String> argMap) throws 
TerseException {
+        if (!argMap.containsKey("name")) {
+            throw new TerseException("You must supply 'name' to add-scram");
+        }
+        return argMap.get("name");
+    }
+
+    private static byte[] getSalt(Map<String, String> argMap, ScramMechanism 
scramMechanism) throws NoSuchAlgorithmException {
+        if (argMap.containsKey("salt")) {
+            return Base64.getDecoder().decode(argMap.get("salt"));
+        } else {
+            return new ScramFormatter(scramMechanism).secureRandomBytes();
+        }
+    }
+
+    private static int getIterations(Map<String, String> argMap, 
ScramMechanism scramMechanism) throws TerseException {
+        if (argMap.containsKey("iterations")) {
+            int iterations = Integer.parseInt(argMap.get("iterations"));
+            if (iterations < scramMechanism.minIterations()) {
+                throw new TerseException("The 'iterations' value must be >= " 
+ scramMechanism.minIterations() + " for add-scram");
+            }
+            if (iterations > scramMechanism.maxIterations()) {
+                throw new TerseException("The 'iterations' value must be <= " 
+ scramMechanism.maxIterations() + " for add-scram");
+            }
+            return iterations;
+        } else {
+            return 4096;
+        }
+    }
+
+    private static byte[] getSaltedPassword(
+        Map<String, String> argMap,
+        ScramMechanism scramMechanism,
+        byte[] salt,
+        int iterations
+    ) throws TerseException, NoSuchAlgorithmException, InvalidKeyException {
+        if (argMap.containsKey("password")) {
+            if (argMap.containsKey("saltedpassword")) {
+                throw new TerseException("You must only supply one of 
'password' or 'saltedpassword' to add-scram");
+            }
+            return new 
ScramFormatter(scramMechanism).saltedPassword(argMap.get("password"), salt, 
iterations);
+        } else {
+            if (!argMap.containsKey("saltedpassword")) {
+                throw new TerseException("You must supply one of 'password' or 
'saltedpassword' to add-scram");
+            }
+            if (!argMap.containsKey("salt")) {
+                throw new TerseException("You must supply 'salt' with 
'saltedpassword' to add-scram");
+            }
+            return Base64.getDecoder().decode(argMap.get("saltedpassword"));
+        }
+    }
+    
+    public static Optional<List<UserScramCredentialRecord>> 
getUserScramCredentialRecords(Namespace namespace) 
+            throws TerseException, NoSuchAlgorithmException, 
InvalidKeyException {
+        if (namespace.getList("add_scram") != null) {
+            List<String> listOfAddConfig = namespace.getList("add_scram");
+            List<UserScramCredentialRecord> userScramCredentialRecords = new 
ArrayList<>();
+            for (String singleAddConfig : listOfAddConfig) {
+                String[] singleAddConfigList = singleAddConfig.split("\\s+");
+
+                // The first subarg must be of the form key=value
+                String[] nameValueRecord = singleAddConfigList[0].split("=", 
2);
+                switch (nameValueRecord[0]) {
+                    case "SCRAM-SHA-256":
+                    case "SCRAM-SHA-512":
+                        
userScramCredentialRecords.add(getUserScramCredentialRecord(nameValueRecord[0], 
nameValueRecord[1]));
+                        break;
+                    default:
+                        throw new TerseException("The add-scram mechanism " + 
nameValueRecord[0] + " is not supported.");
+                }
+            }
+            return Optional.of(userScramCredentialRecords);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<MetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {
+            Path directoryPath = Paths.get(directory);
+            if (!Files.isDirectory(directoryPath)) {
+                if (!Files.exists(directoryPath)) {
+                    problems.add(directoryPath + " does not exist");
+                } else {
+                    problems.add(directoryPath + " is not a directory");
+                }
+            } else {
+                foundDirectories.add(directoryPath.toString());
+                Path metaPath = 
directoryPath.resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME);
+                if (!Files.exists(metaPath)) {
+                    problems.add(directoryPath + " is not formatted.");
+                } else {
+                    Properties properties = 
Utils.loadProps(metaPath.toString());
+                    try {
+                        Optional<MetaProperties> curMetadata = Optional.of(new 
MetaProperties.Builder(properties).build());
+                        if (!prevMetadata.isPresent()) {
+                            prevMetadata = curMetadata;
+                        } else {
+                            if 
(!prevMetadata.get().clusterId().equals(curMetadata.get().clusterId())) {
+                                problems.add("Mismatched cluster IDs between 
storage directories.");
+                            } else if 
(!prevMetadata.get().nodeId().equals(curMetadata.get().nodeId())) {
+                                problems.add("Mismatched node IDs between 
storage directories.");
+                            }
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        problems.add("Error loading " + metaPath + ": " + 
e.getMessage());
+                    }
+                }
+            }
+        }
+
+        if (prevMetadata.isPresent()) {
+            if (selfManagedMode) {
+                if 
(prevMetadata.get().version().equals(MetaPropertiesVersion.V0)) {
+                    problems.add("The kafka configuration file appears to be 
for a cluster in KRaft mode, " +
+                        "but the directories are formatted for legacy mode.");
+                }
+            } else if 
(prevMetadata.get().version().equals(MetaPropertiesVersion.V1)) {
+                problems.add("The kafka configuration file appears to be for a 
legacy cluster, " +
+                    "but the directories are formatted for a cluster in KRaft 
mode.");
+            }
+        }
+
+        return validateDirectories(stream, directories, problems, 
foundDirectories, prevMetadata);
+    }
+
+    private static int validateDirectories(PrintStream stream,
+                                           List<String> directories,
+                                           List<String> problems,
+                                           List<String> foundDirectories,
+                                           Optional<MetaProperties> 
prevMetadata) {
+        if (directories.isEmpty()) {
+            stream.println("No directories specified.");
+            return 0;
+        } else {
+            if (!foundDirectories.isEmpty()) {
+                if (foundDirectories.size() == 1) {
+                    stream.println("Found log directory:");
+                } else {
+                    stream.println("Found log directories:");
+                }
+                foundDirectories.forEach(d -> stream.println("  " + d));
+                stream.println("");

Review Comment:
   To just print a new line you can call `println()` without any arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to