Repository: kafka Updated Branches: refs/heads/trunk 271f6b5ae -> a0f533266
KAFKA-3356: Remove ConsumerOffsetChecker Author: Mickael Maison <mickael.mai...@gmail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3036 from mimaison/KAFKA-3356 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0f53326 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0f53326 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0f53326 Branch: refs/heads/trunk Commit: a0f533266a8a51a6288eda64f3a80242af13e2f9 Parents: 271f6b5 Author: Mickael Maison <mickael.mai...@gmail.com> Authored: Sun Sep 24 10:08:52 2017 +0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Sun Sep 24 10:08:52 2017 +0800 ---------------------------------------------------------------------- bin/kafka-consumer-offset-checker.sh | 17 -- .../kafka/tools/ConsumerOffsetChecker.scala | 209 ------------------- docs/upgrade.html | 8 +- 3 files changed, 4 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/bin/kafka-consumer-offset-checker.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-consumer-offset-checker.sh b/bin/kafka-consumer-offset-checker.sh deleted file mode 100755 index 5993345..0000000 --- a/bin/kafka-consumer-offset-checker.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker "$@" http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala deleted file mode 100644 index 87147dc..0000000 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - - -import joptsimple._ -import kafka.utils._ -import kafka.consumer.SimpleConsumer -import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest} -import kafka.common.{OffsetMetadataAndError, TopicAndPartition} -import org.apache.kafka.common.errors.BrokerNotAvailableException -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.security.JaasUtils - -import scala.collection._ -import kafka.client.ClientUtils -import kafka.network.BlockingChannel -import kafka.api.PartitionOffsetRequestInfo -import org.I0Itec.zkclient.exception.ZkNoNodeException -import org.apache.kafka.common.network.ListenerName - -@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0") -object ConsumerOffsetChecker extends Logging { - - private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() - private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() - private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map() - - private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = { - try { - zkUtils.getBrokerInfo(bid) - .map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) - .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerOffsetChecker")) - .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))) - } catch { - case t: Throwable => - println("Could not parse broker info due to " + t.getCause) - None - } - } - - private def processPartition(zkUtils: ZkUtils, - group: String, topic: String, producerId: Int) { - val topicPartition = TopicAndPartition(topic, producerId) - val offsetOpt = offsetMap.get(topicPartition) - val groupDirs = new ZKGroupTopicDirs(group, topic) - val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(producerId))._1 - zkUtils.getLeaderForPartition(topic, producerId) match { - case Some(bid) => - val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid)) - consumerOpt.foreach { consumer => - val topicAndPartition = TopicAndPartition(topic, producerId) - val request = - OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - - val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), - owner match {case Some(ownerStr) => ownerStr case None => "none"})) - } - case None => - println("No broker for partition %s - %s".format(topic, producerId)) - } - } - - private def processTopic(zkUtils: ZkUtils, group: String, topic: String) { - topicPidMap.get(topic).foreach { producerIds => - producerIds.sorted.foreach { - producerId => processPartition(zkUtils, group, topic, producerId) - } - } - } - - private def printBrokerInfo() { - println("BROKER INFO") - for ((bid, consumerOpt) <- consumerMap) - consumerOpt.foreach { consumer => - println("%s -> %s:%d".format(bid, consumer.host, consumer.port)) - } - } - - def main(args: Array[String]) { - warn("WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead.") - - val parser = new OptionParser(false) - - val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). - withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) - val topicsOpt = parser.accepts("topic", - "Comma-separated list of consumer topics (all topics if absent)."). - withRequiredArg().ofType(classOf[String]) - val groupOpt = parser.accepts("group", "Consumer group."). - withRequiredArg().ofType(classOf[String]) - val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", "Socket timeout to use when querying for offsets."). - withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000) - val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries."). - withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000) - - parser.accepts("broker-info", "Print broker info") - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - Exit.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - - val group = options.valueOf(groupOpt) - val groupDirs = new ZKGroupDirs(group) - - val channelSocketTimeoutMs = options.valueOf(channelSocketTimeoutMsOpt).intValue() - val channelRetryBackoffMs = options.valueOf(channelRetryBackoffMsOpt).intValue() - - val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None - - var zkUtils: ZkUtils = null - var channel: BlockingChannel = null - try { - zkUtils = ZkUtils(zkConnect, - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()) - - val topicList = topics match { - case Some(x) => x.split(",").view.toList - case None => zkUtils.getChildren(groupDirs.consumerGroupDir + "/owners").toList - } - - topicPidMap = immutable.Map(zkUtils.getPartitionsForTopics(topicList).toSeq:_*) - val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs) - - debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) - channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) - debug("Received offset fetch response %s.".format(offsetFetchResponse)) - - offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => - if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { - val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) - // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool - // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) - try { - val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) - } catch { - case z: ZkNoNodeException => - if(zkUtils.pathExists(topicDirs.consumerOffsetDir)) - offsetMap.put(topicAndPartition,-1) - else - throw z - } - } - else if (offsetAndMetadata.error == Errors.NONE) - offsetMap.put(topicAndPartition, offsetAndMetadata.offset) - else { - println("Could not fetch offset for %s due to %s.".format(topicAndPartition, offsetAndMetadata.error.exception)) - } - } - channel.disconnect() - channel = null - - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) - topicList.sorted.foreach { - topic => processTopic(zkUtils, group, topic) - } - - if (options.has("broker-info")) - printBrokerInfo() - - consumerMap.values.flatten.foreach(_.close()) - } - catch { - case t: Throwable => - println("Exiting due to: %s.".format(t.getMessage)) - } - finally { - consumerMap.values.flatten.foreach(_.close()) - if (zkUtils != null) - zkUtils.close() - - if (channel != null) - channel.disconnect() - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index a98bdea..ce750ea 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -66,6 +66,7 @@ This was only intended for use on the broker, but it is no longer in use and the implementations have not been maintained. A stub implementation has been retained for binary compatibility.</li> <li>The Java clients and tools now accept any string as a client-id.</li> + <li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group details.</li> </ul> <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5> @@ -163,7 +164,7 @@ This config specifies the time, in milliseconds, that the <code>GroupCoordinator</code> will delay the initial consumer rebalance. The rebalance will be further delayed by the value of <code>group.initial.rebalance.delay.ms</code> as new members join the group, up to a maximum of <code>max.poll.interval.ms</code>. The default value for this is 3 seconds. - During development and testing it might be desirable to set this to 0 inorder to not delay test execution time. + During development and testing it might be desirable to set this to 0 in order to not delay test execution time. </li> <li><code>org.apache.kafka.common.Cluster#partitionsForTopic</code>, <code>partitionsForNode</code> and <code>availablePartitionsForTopic</code> methods will return an empty list instead of <code>null</code> (which is considered a bad practice) in case the metadata for the required topic does not exist. @@ -205,7 +206,7 @@ tool.</li> <li>EoS in Kafka introduces new request APIs and modifies several existing ones. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-RPCProtocolSummary">KIP-98</a> - for the full details</code></li> + for the full details</li> </ol> <h5><a id="upgrade_11_message_format" href="#upgrade_11_message_format">Notes on the new message format in 0.11.0</a></h5> @@ -236,7 +237,7 @@ is already not possible in that case. In order to avoid the cost of down-conversion, you should ensure that consumer applications are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion. - Note that 0.11.0 consumers support backwards compability with brokers 0.10.0 brokers and upward, so it is possible to upgrade the + Note that 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers and upward, so it is possible to upgrade the clients first before the brokers. </p> @@ -483,7 +484,6 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9 <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)</code> </li> <li> MessageReader interface was changed from <code>def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]</code> to <code>def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]</code> </li> - </li> <li> MessageFormatter's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li> <li> MessageReader's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li> <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li>