This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 150fd5b0b1 KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469) 150fd5b0b1 is described below commit 150fd5b0b18c4761d8f7d7ba9a480aa9f622024f Author: dengziming <dengziming1...@gmail.com> AuthorDate: Sat Aug 20 23:37:26 2022 +0800 KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469) Add `MetadataQuorumCommand` to describe quorum status, I'm trying to use arg4j style command format, currently, we only support one sub-command which is "describe" and we can specify 2 arguments which are --status and --replication. ``` # describe quorum status kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication ReplicaId LogEndOffset Lag LastFetchTimeMs LastCaughtUpTimeMs Status 0 10 0 -1 -1 Leader 1 10 0 -1 -1 Follower 2 10 0 -1 -1 Follower kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status ClusterId: fMCL8kv1SWm87L_Md-I2hg LeaderId: 3002 LeaderEpoch: 2 HighWatermark: 10 MaxFollowerLag: 0 MaxFollowerLagTimeMs: -1 CurrentVoters: [3000,3001,3002] CurrentObservers: [0,1,2] # specify AdminClient properties kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config config.properties describe --status ``` Reviewers: Jason Gustafson <ja...@confluent.io> --- bin/kafka-metadata-quorum.sh | 17 ++ bin/windows/kafka-metatada-quorum.bat | 17 ++ build.gradle | 1 + checkstyle/import-control.xml | 1 + .../kafka/clients/admin/KafkaAdminClient.java | 2 + .../org/apache/kafka/clients/admin/QuorumInfo.java | 14 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 2 +- .../scala/kafka/admin/MetadataQuorumCommand.scala | 172 ++++++++++++++++++ .../test/junit/RaftClusterInvocationContext.java | 4 + .../kafka/admin/MetadataQuorumCommandTest.scala | 192 +++++++++++++++++++++ .../kafka/server/DescribeQuorumRequestTest.scala | 2 + .../org/apache/kafka/server/util}/ToolsUtils.java | 51 +++++- .../apache/kafka/tools/ProducerPerformance.java | 1 + .../apache/kafka/tools/TransactionsCommand.java | 46 +---- 14 files changed, 474 insertions(+), 48 deletions(-) diff --git a/bin/kafka-metadata-quorum.sh b/bin/kafka-metadata-quorum.sh new file mode 100755 index 0000000000..24bedbded1 --- /dev/null +++ b/bin/kafka-metadata-quorum.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@" diff --git a/bin/windows/kafka-metatada-quorum.bat b/bin/windows/kafka-metatada-quorum.bat new file mode 100644 index 0000000000..4ea8e3109f --- /dev/null +++ b/bin/windows/kafka-metatada-quorum.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +"%~dp0kafka-run-class.bat" kafka.admin.MetadataQuorumCommand %* diff --git a/build.gradle b/build.gradle index f17011ca4d..7c38d899a6 100644 --- a/build.gradle +++ b/build.gradle @@ -1705,6 +1705,7 @@ project(':tools') { dependencies { implementation project(':clients') + implementation project(':server-common') implementation project(':log4j-appender') implementation libs.argparse4j implementation libs.jacksonDatabind diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 4b07a26cba..d24d1e7e5e 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -366,6 +366,7 @@ <subpackage name="tools"> <allow pkg="org.apache.kafka.common"/> + <allow pkg="org.apache.kafka.server.util" /> <allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="org.apache.kafka.clients.producer" /> <allow pkg="org.apache.kafka.clients.consumer" /> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 41eb27a1dd..e5df779b61 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4359,6 +4359,8 @@ public class KafkaAdminClient extends AdminClient { private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { return new QuorumInfo( partition.leaderId(), + partition.leaderEpoch(), + partition.highWatermark(), partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()), partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java index 75476d77dc..3a0b6cf6f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java @@ -25,11 +25,15 @@ import java.util.OptionalLong; */ public class QuorumInfo { private final Integer leaderId; + private final Integer leaderEpoch; + private final Long highWatermark; private final List<ReplicaState> voters; private final List<ReplicaState> observers; - QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) { + QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List<ReplicaState> voters, List<ReplicaState> observers) { this.leaderId = leaderId; + this.leaderEpoch = leaderEpoch; + this.highWatermark = highWatermark; this.voters = voters; this.observers = observers; } @@ -38,6 +42,14 @@ public class QuorumInfo { return leaderId; } + public Integer leaderEpoch() { + return leaderEpoch; + } + + public Long highWatermark() { + return highWatermark; + } + public List<ReplicaState> voters() { return voters; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 5faf53f075..193457655a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -608,7 +608,7 @@ public class KafkaAdminClientTest { } private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) { - return new QuorumInfo(1, + return new QuorumInfo(1, 1, 1L, singletonList(new QuorumInfo.ReplicaState(1, 100, emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000), emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))), diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala new file mode 100644 index 0000000000..b6e4e1597b --- /dev/null +++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.tools.TerseFailure +import kafka.utils.Exit +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue} +import net.sourceforge.argparse4j.inf.Subparsers +import org.apache.kafka.clients._ +import org.apache.kafka.clients.admin.{Admin, QuorumInfo} +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.util.ToolsUtils.prettyPrintTable + +import java.io.File +import java.util.Properties +import scala.jdk.CollectionConverters._ + +/** + * A tool for describing quorum status + */ +object MetadataQuorumCommand { + + def main(args: Array[String]): Unit = { + val res = mainNoExit(args) + Exit.exit(res) + } + + def mainNoExit(args: Array[String]): Int = { + val parser = ArgumentParsers + .newArgumentParser("kafka-metadata-quorum") + .defaultHelp(true) + .description("This tool describes kraft metadata quorum status.") + parser + .addArgument("--bootstrap-server") + .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") + .required(true) + + parser + .addArgument("--command-config") + .`type`(fileType()) + .help("Property file containing configs to be passed to Admin Client.") + val subparsers = parser.addSubparsers().dest("command") + addDescribeParser(subparsers) + + var admin: Admin = null + try { + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + + val commandConfig = namespace.get[File]("command_config") + val props = if (commandConfig != null) { + if (!commandConfig.exists()) { + throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!") + } + Utils.loadProps(commandConfig.getPath) + } else { + new Properties() + } + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")) + admin = Admin.create(props) + + if (command == "describe") { + if (namespace.getBoolean("status") && namespace.getBoolean("replication")) { + throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command") + } else if (namespace.getBoolean("replication")) { + handleDescribeReplication(admin) + } else if (namespace.getBoolean("status")) { + handleDescribeStatus(admin) + } else { + throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command") + } + } else { + throw new IllegalStateException(s"Unknown command: $command, only 'describe' is supported") + } + 0 + } catch { + case e: TerseFailure => + Console.err.println(e.getMessage) + 1 + } finally { + if (admin != null) { + admin.close() + } + } + } + + def addDescribeParser(subparsers: Subparsers): Unit = { + val describeParser = subparsers + .addParser("describe") + .help("Describe the metadata quorum info") + + val statusArgs = describeParser.addArgumentGroup("Status") + statusArgs + .addArgument("--status") + .help( + "A short summary of the quorum status and the other provides detailed information about the status of replication.") + .action(storeTrue()) + val replicationArgs = describeParser.addArgumentGroup("Replication") + replicationArgs + .addArgument("--replication") + .help("Detailed information about the status of replication") + .action(storeTrue()) + } + + private def handleDescribeReplication(admin: Admin): Unit = { + val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get + val leaderId = quorumInfo.leaderId + val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head + + def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] = + infos.map { info => + Array(info.replicaId, + info.logEndOffset, + leader.logEndOffset - info.logEndOffset, + info.lastFetchTimeMs.orElse(-1), + info.lastCaughtUpTimeMs.orElse(-1), + status + ).map(_.toString) + } + prettyPrintTable( + Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"), + (convertQuorumInfo(Seq(leader), "Leader") + ++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower") + ++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava, + scala.Console.out + ) + } + + private def handleDescribeStatus(admin: Admin): Unit = { + val clusterId = admin.describeCluster.clusterId.get + val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get + val leaderId = quorumInfo.leaderId + val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head + val maxLagFollower = quorumInfo.voters.asScala + .minBy(_.logEndOffset) + val maxFollowerLag = leader.logEndOffset - maxLagFollower.logEndOffset + val maxFollowerLagTimeMs = + if (leader == maxLagFollower) { + 0 + } else if (leader.lastCaughtUpTimeMs.isPresent && maxLagFollower.lastCaughtUpTimeMs.isPresent) { + leader.lastCaughtUpTimeMs.getAsLong - maxLagFollower.lastCaughtUpTimeMs.getAsLong + } else { + -1 + } + println( + s"""|ClusterId: $clusterId + |LeaderId: ${quorumInfo.leaderId} + |LeaderEpoch: ${quorumInfo.leaderEpoch} + |HighWatermark: ${quorumInfo.highWatermark} + |MaxFollowerLag: $maxFollowerLag + |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs + |CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")} + |CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")} + |""".stripMargin + ) + } +} diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 40669f3068..f5c281ff24 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -183,6 +183,10 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte )); } + public Collection<ControllerServer> controllerServers() { + return controllers().collect(Collectors.toList()); + } + @Override public ClusterType clusterType() { return ClusterType.RAFT; diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala new file mode 100644 index 0000000000..24b6616cb1 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.common.errors.UnsupportedVersionException +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.api.{Tag, Test} +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.concurrent.ExecutionException + +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT) +@Tag("integration") +class MetadataQuorumCommandTest(cluster: ClusterInstance) { + + /** + * 1. The same number of broker controllers + * 2. More brokers than controllers + * 3. Fewer brokers than controllers + */ + @ClusterTests( + Array( + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) + )) + def testDescribeQuorumReplicationSuccessful(): Unit = { + cluster.waitForReadyBrokers() + val describeOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) + ) + + val leaderPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Leader\s+""".r + val followerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Follower\s+""".r + val observerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Observer\s+""".r + val outputs = describeOutput.split("\n").tail + if (cluster.config().clusterType() == Type.CO_KRAFT) { + assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length) + } else { + assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length) + } + // `matches` is not supported in scala 2.12, use `findFirstIn` instead. + assertTrue(leaderPattern.findFirstIn(outputs.head).nonEmpty) + assertEquals(1, outputs.count(leaderPattern.findFirstIn(_).nonEmpty)) + assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.findFirstIn(_).nonEmpty)) + + if (cluster.config().clusterType() == Type.CO_KRAFT) { + assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.findFirstIn(_).nonEmpty)) + } else { + assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.findFirstIn(_).nonEmpty)) + } + } + + /** + * 1. The same number of broker controllers + * 2. More brokers than controllers + * 3. Fewer brokers than controllers + */ + @ClusterTests( + Array( + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) + )) + def testDescribeQuorumStatusSuccessful(): Unit = { + cluster.waitForReadyBrokers() + val describeOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) + ) + val outputs = describeOutput.split("\n") + + assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty) + assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty) + assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty) + // HighWatermark may be -1 + assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty) + assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty) + assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty) + assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty) + + // There are no observers if we have fewer brokers than controllers + if (cluster.config().clusterType() == Type.CO_KRAFT + && cluster.config().numBrokers() <= cluster.config().numControllers()) { + assertTrue("""CurrentObservers:\s+\[\]""".r.findFirstIn(outputs(7)).nonEmpty) + } else { + assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(7)).nonEmpty) + } + } + + @ClusterTests( + Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1), + new ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1))) + def testOnlyOneBrokerAndOneController(): Unit = { + val statusOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) + ) + assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")(4)) + assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")(5)) + + val replicationOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) + ) + assertEquals("0", replicationOutput.split("\n")(1).split("\\s+")(2)) + } + + @ClusterTest(clusterType = Type.ZK, brokers = 3) + def testDescribeQuorumInZkMode(): Unit = { + assertTrue( + assertThrows( + classOf[ExecutionException], + () => + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) + ).getCause.isInstanceOf[UnsupportedVersionException] + ) + assertTrue( + assertThrows( + classOf[ExecutionException], + () => + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) + ).getCause.isInstanceOf[UnsupportedVersionException] + ) + } +} + +class MetadataQuorumCommandErrorTest { + + @Test + def testPropertiesFileDoesNotExists(): Unit = { + assertEquals(1, + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))) + assertEquals( + "Properties file admin.properties does not exists!", + TestUtils + .grabConsoleError( + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))) + .trim + ) + } + + @Test + def testDescribeOptions(): Unit = { + assertEquals(1, MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe"))) + assertEquals( + "One of --status or --replication must be specified with describe sub-command", + TestUtils + .grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe"))) + .trim + ) + + assertEquals(1, + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"))) + assertEquals( + "Only one of --status or --replication should be specified with describe sub-command", + TestUtils + .grabConsoleError( + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"))) + .trim + ) + } +} diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index eed58961e4..28a6f80123 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -74,6 +74,8 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { val leaderId = partitionData.leaderId assertTrue(leaderId > 0) + assertTrue(partitionData.leaderEpoch() > 0) + assertTrue(partitionData.highWatermark() > 0) val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java similarity index 61% rename from tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java rename to server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java index 3a80b5811f..0c923cd66c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java @@ -14,13 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.tools; +package org.apache.kafka.server.util; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.stream.Collectors; public class ToolsUtils { @@ -52,4 +56,49 @@ public class ToolsUtils { } } } + + private static void appendColumnValue( + StringBuilder rowBuilder, + String value, + int length + ) { + int padLength = length - value.length(); + rowBuilder.append(value); + for (int i = 0; i < padLength; i++) + rowBuilder.append(' '); + } + + private static void printRow( + List<Integer> columnLengths, + String[] row, + PrintStream out + ) { + StringBuilder rowBuilder = new StringBuilder(); + for (int i = 0; i < row.length; i++) { + Integer columnLength = columnLengths.get(i); + String columnValue = row[i]; + appendColumnValue(rowBuilder, columnValue, columnLength); + rowBuilder.append('\t'); + } + out.println(rowBuilder); + } + + public static void prettyPrintTable( + String[] headers, + List<String[]> rows, + PrintStream out + ) { + List<Integer> columnLengths = Arrays.stream(headers) + .map(String::length) + .collect(Collectors.toList()); + + for (String[] row : rows) { + for (int i = 0; i < headers.length; i++) { + columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); + } + } + + printRow(columnLengths, headers, out); + rows.forEach(row -> printRow(columnLengths, row, out)); + } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 6967a16fa6..f2ee53cb3f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -43,6 +43,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.ToolsUtils; public class ProducerPerformance { diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index 92e713ac3b..194524d265 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -66,6 +66,7 @@ import java.util.stream.Collectors; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static net.sourceforge.argparse4j.impl.Arguments.store; +import static org.apache.kafka.server.util.ToolsUtils.prettyPrintTable; public abstract class TransactionsCommand { private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class); @@ -903,51 +904,6 @@ public abstract class TransactionsCommand { } } - private static void appendColumnValue( - StringBuilder rowBuilder, - String value, - int length - ) { - int padLength = length - value.length(); - rowBuilder.append(value); - for (int i = 0; i < padLength; i++) - rowBuilder.append(' '); - } - - private static void printRow( - List<Integer> columnLengths, - String[] row, - PrintStream out - ) { - StringBuilder rowBuilder = new StringBuilder(); - for (int i = 0; i < row.length; i++) { - Integer columnLength = columnLengths.get(i); - String columnValue = row[i]; - appendColumnValue(rowBuilder, columnValue, columnLength); - rowBuilder.append('\t'); - } - out.println(rowBuilder); - } - - private static void prettyPrintTable( - String[] headers, - List<String[]> rows, - PrintStream out - ) { - List<Integer> columnLengths = Arrays.stream(headers) - .map(String::length) - .collect(Collectors.toList()); - - for (String[] row : rows) { - for (int i = 0; i < headers.length; i++) { - columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); - } - } - - printRow(columnLengths, headers, out); - rows.forEach(row -> printRow(columnLengths, row, out)); - } - private static void printErrorAndExit(String message, Throwable t) { log.debug(message, t);