[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-12 Thread GitBox


dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r593503408



##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -40,9 +40,16 @@ object LogDirsCommand {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
 val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+val clusterBrokers: Array[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
 val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
 case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+case None => clusterBrokers
+}
+
+val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => 
clusterBrokers.contains(brokerId))
+if (!nonExistBrokers.isEmpty) {
+  System.err.println(s"The given node(s) does not exist from 
broker-list ${nonExistBrokers.mkString(",")}")
+  sys.exit(1)

Review comment:
   Should we do this only when handle the brokers provided by the user? It 
does not make sense to validate the list of brokers otherwise. What do you 
think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594184162



##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+var nonExistBrokers: Set[Int] = Set.empty
+try {
+val clusterBrokers: Set[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet

Review comment:
   nit: We can remove specifying `Set[Int]`.

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+var nonExistBrokers: Set[Int] = Set.empty
+try {
+val clusterBrokers: Set[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
+case Some(brokerListStr) =>
+val inputBrokers: Set[Int] = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet

Review comment:
   ditto.

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+var nonExistBrokers: Set[Int] = Set.empty
+try {
+val clusterBrokers: Set[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
+case Some(brokerListStr) =>
+val inputBrokers: Set[Int] = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+nonExistBrokers = inputBrokers.diff(clusterBrokers)
+inputBrokers
+case None => clusterBrokers
+}

Review comment:
   nit: We usually avoid using mutable variable unless it is really 
necessary. In this case, I would rather return the `nonExistingBrokers` when 
the argument is processed. Something like this:
   
   ```
   val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
   case Some(brokerListStr) =>
   val inputBrokers: Set[Int] = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
  (inputBrokers, inputBrokers.diff(clusterBrokers)
   case None => (clusterBrokers, Set.empty)
   }
   ```

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.descr

[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594476389



##
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##
@@ -0,0 +1,52 @@
+package unit.kafka.admin

Review comment:
   We must add the licence header here. You can copy it from another file.

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,29 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+try {
+val clusterBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
+case Some(brokerListStr) =>
+val inputBrokers = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+(inputBrokers, inputBrokers.diff(clusterBrokers))
+case None => (clusterBrokers, Set.empty)
+}
 
-out.println("Querying brokers for log directories information")
-val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-val logDirInfosByBroker = 
describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> 
v.asScala }
+if (nonExistingBrokers.nonEmpty) {
+out.println(s"ERROR: The given node(s) does not exist from 
broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist 
node(s): ${clusterBrokers.mkString(",")}")

Review comment:
   nit: Should we say `--broker-list` instead of `broker-list`? Also, 
should we say `broker(s)` instead of `node(s)` to be consistent with the 
message below?

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,29 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+try {
+val clusterBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
+case Some(brokerListStr) =>
+val inputBrokers = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+(inputBrokers, inputBrokers.diff(clusterBrokers))
+case None => (clusterBrokers, Set.empty)
+}
 
-out.println("Querying brokers for log directories information")
-val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-val logDirInfosByBroker = 
describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> 
v.asScala }
+if (nonExistingBrokers.nonEmpty) {
+out.println(s"ERROR: The given node(s) does not exist from 
broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist 
node(s): ${clusterBrokers.mkString(",")}")
+} else {
+out.println("Querying brokers for log directories information")
+val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)

Review comment:
   nit: `DescribeLogDirsResult` can be removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org