[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
dajac commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r593503408 ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -40,9 +40,16 @@ object LogDirsCommand { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) +val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray +case None => clusterBrokers +} + +val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => clusterBrokers.contains(brokerId)) +if (!nonExistBrokers.isEmpty) { + System.err.println(s"The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}") + sys.exit(1) Review comment: Should we do this only when handle the brokers provided by the user? It does not make sense to validate the list of brokers otherwise. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
dajac commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r594184162 ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +var nonExistBrokers: Set[Int] = Set.empty +try { +val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet Review comment: nit: We can remove specifying `Set[Int]`. ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +var nonExistBrokers: Set[Int] = Set.empty +try { +val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet Review comment: ditto. ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +var nonExistBrokers: Set[Int] = Set.empty +try { +val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +nonExistBrokers = inputBrokers.diff(clusterBrokers) +inputBrokers +case None => clusterBrokers +} Review comment: nit: We usually avoid using mutable variable unless it is really necessary. In this case, I would rather return the `nonExistingBrokers` when the argument is processed. Something like this: ``` val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { case Some(brokerListStr) => val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet (inputBrokers, inputBrokers.diff(clusterBrokers) case None => (clusterBrokers, Set.empty) } ``` ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.descr
[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
dajac commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r594476389 ## File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala ## @@ -0,0 +1,52 @@ +package unit.kafka.admin Review comment: We must add the licence header here. You can copy it from another file. ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,29 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +try { +val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +(inputBrokers, inputBrokers.diff(clusterBrokers)) +case None => (clusterBrokers, Set.empty) +} -out.println("Querying brokers for log directories information") -val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) -val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } +if (nonExistingBrokers.nonEmpty) { +out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") Review comment: nit: Should we say `--broker-list` instead of `broker-list`? Also, should we say `broker(s)` instead of `node(s)` to be consistent with the message below? ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,29 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +try { +val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +(inputBrokers, inputBrokers.diff(clusterBrokers)) +case None => (clusterBrokers, Set.empty) +} -out.println("Querying brokers for log directories information") -val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) -val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } +if (nonExistingBrokers.nonEmpty) { +out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") +} else { +out.println("Querying brokers for log directories information") +val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava) Review comment: nit: `DescribeLogDirsResult` can be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org