cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -407,36 +414,41 @@ object StorageTool extends Logging {
     if (directories.isEmpty) {
       throw new TerseFailure("No log directories found in the configuration.")
     }
-
-    val unformattedDirectories = directories.filter(directory => {
-      if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
-          true
-      } else if (!ignoreFormatted) {
-        throw new TerseFailure(s"Log directory $directory is already 
formatted. " +
-          "Use --ignore-formatted to ignore this directory and format the 
others.")
-      } else {
-        false
-      }
-    })
-    if (unformattedDirectories.isEmpty) {
+    val loader = new MetaPropertiesEnsemble.Loader()
+    directories.foreach(loader.addLogDir(_))
+    val metaPropertiesEnsemble = loader.load()
+    metaPropertiesEnsemble.verify(metaProperties.clusterId(), 
metaProperties.nodeId(),
+      util.EnumSet.noneOf(classOf[VerificationFlag]))
+
+    val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
+    if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
+      val firstLogDir = copier.logDirProps().keySet().iterator().next()
+      throw new TerseFailure(s"Log directory ${firstLogDir} directory is 
already formatted. " +
+        "Use --ignore-formatted to ignore this directory and format the 
others.")
+    }
+    if (!copier.errorLogDirs().isEmpty) {
+      val firstLogDir = copier.errorLogDirs().iterator().next()
+      throw new TerseFailure(s"I/O error trying to read log directory 
${firstLogDir}.")
+    }
+    if (copier.emptyLogDirs().isEmpty) {
       stream.println("All of the log directories are already formatted.")
+    } else {
+      copier.emptyLogDirs().forEach(logDir => {
+        val newMetaProperties = new MetaProperties.Builder(metaProperties).
+          setDirectoryId(copier.generateValidDirectoryId()).
+          build()
+        copier.logDirProps().put(logDir, newMetaProperties)

Review Comment:
   Fair point. I added a setter function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to