Repository: kafka
Updated Branches:
  refs/heads/trunk 271f6b5ae -> a0f533266


KAFKA-3356: Remove ConsumerOffsetChecker

Author: Mickael Maison <mickael.mai...@gmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3036 from mimaison/KAFKA-3356


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0f53326
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0f53326
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0f53326

Branch: refs/heads/trunk
Commit: a0f533266a8a51a6288eda64f3a80242af13e2f9
Parents: 271f6b5
Author: Mickael Maison <mickael.mai...@gmail.com>
Authored: Sun Sep 24 10:08:52 2017 +0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Sun Sep 24 10:08:52 2017 +0800

----------------------------------------------------------------------
 bin/kafka-consumer-offset-checker.sh            |  17 --
 .../kafka/tools/ConsumerOffsetChecker.scala     | 209 -------------------
 docs/upgrade.html                               |   8 +-
 3 files changed, 4 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/bin/kafka-consumer-offset-checker.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-consumer-offset-checker.sh 
b/bin/kafka-consumer-offset-checker.sh
deleted file mode 100755
index 5993345..0000000
--- a/bin/kafka-consumer-offset-checker.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
deleted file mode 100644
index 87147dc..0000000
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-
-import joptsimple._
-import kafka.utils._
-import kafka.consumer.SimpleConsumer
-import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest}
-import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
-import org.apache.kafka.common.errors.BrokerNotAvailableException
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.security.JaasUtils
-
-import scala.collection._
-import kafka.client.ClientUtils
-import kafka.network.BlockingChannel
-import kafka.api.PartitionOffsetRequestInfo
-import org.I0Itec.zkclient.exception.ZkNoNodeException
-import org.apache.kafka.common.network.ListenerName
-
-@deprecated("This class has been deprecated and will be removed in a future 
release.", "0.11.0.0")
-object ConsumerOffsetChecker extends Logging {
-
-  private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = 
mutable.Map()
-  private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()
-  private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()
-
-  private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] 
= {
-    try {
-      zkUtils.getBrokerInfo(bid)
-        
.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
-        .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 
10000, 100000, "ConsumerOffsetChecker"))
-        .orElse(throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(bid)))
-    } catch {
-      case t: Throwable =>
-        println("Could not parse broker info due to " + t.getCause)
-        None
-    }
-  }
-
-  private def processPartition(zkUtils: ZkUtils,
-                               group: String, topic: String, producerId: Int) {
-    val topicPartition = TopicAndPartition(topic, producerId)
-    val offsetOpt = offsetMap.get(topicPartition)
-    val groupDirs = new ZKGroupTopicDirs(group, topic)
-    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + 
"/%s".format(producerId))._1
-    zkUtils.getLeaderForPartition(topic, producerId) match {
-      case Some(bid) =>
-        val consumerOpt = consumerMap.getOrElseUpdate(bid, 
getConsumer(zkUtils, bid))
-        consumerOpt.foreach { consumer =>
-          val topicAndPartition = TopicAndPartition(topic, producerId)
-          val request =
-            OffsetRequest(immutable.Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
-          val logSize = 
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-
-          val lagString = offsetOpt.map(o => if (o == -1) "unknown" else 
(logSize - o).toString)
-          println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, 
producerId, offsetOpt.getOrElse("unknown"), logSize, 
lagString.getOrElse("unknown"),
-                                                                 owner match 
{case Some(ownerStr) => ownerStr case None => "none"}))
-        }
-      case None =>
-        println("No broker for partition %s - %s".format(topic, producerId))
-    }
-  }
-
-  private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
-    topicPidMap.get(topic).foreach { producerIds =>
-      producerIds.sorted.foreach {
-        producerId => processPartition(zkUtils, group, topic, producerId)
-        }
-    }
-  }
-
-  private def printBrokerInfo() {
-    println("BROKER INFO")
-    for ((bid, consumerOpt) <- consumerMap)
-      consumerOpt.foreach { consumer =>
-        println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
-      }
-  }
-
-  def main(args: Array[String]) {
-    warn("WARNING: ConsumerOffsetChecker is deprecated and will be dropped in 
releases following 0.9.0. Use ConsumerGroupCommand instead.")
-
-    val parser = new OptionParser(false)
-
-    val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect 
string.").
-            
withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])
-    val topicsOpt = parser.accepts("topic",
-            "Comma-separated list of consumer topics (all topics if absent).").
-            withRequiredArg().ofType(classOf[String])
-    val groupOpt = parser.accepts("group", "Consumer group.").
-            withRequiredArg().ofType(classOf[String])
-    val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", 
"Socket timeout to use when querying for offsets.").
-            
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000)
-    val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry 
back-off to use for failed offset queries.").
-            
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000)
-
-    parser.accepts("broker-info", "Print broker info")
-    parser.accepts("help", "Print this message.")
-
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "Check the offset of your 
consumers.")
-
-    val options = parser.parse(args : _*)
-
-    if (options.has("help")) {
-       parser.printHelpOn(System.out)
-       Exit.exit(0)
-    }
-
-    CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt)
-
-    val zkConnect = options.valueOf(zkConnectOpt)
-
-    val group = options.valueOf(groupOpt)
-    val groupDirs = new ZKGroupDirs(group)
-
-    val channelSocketTimeoutMs = 
options.valueOf(channelSocketTimeoutMsOpt).intValue()
-    val channelRetryBackoffMs = 
options.valueOf(channelRetryBackoffMsOpt).intValue()
-
-    val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) 
else None
-
-    var zkUtils: ZkUtils = null
-    var channel: BlockingChannel = null
-    try {
-      zkUtils = ZkUtils(zkConnect,
-                        30000,
-                        30000,
-                        JaasUtils.isZkSecurityEnabled())
-
-      val topicList = topics match {
-        case Some(x) => x.split(",").view.toList
-        case None => zkUtils.getChildren(groupDirs.consumerGroupDir +  
"/owners").toList
-      }
-
-      topicPidMap = 
immutable.Map(zkUtils.getPartitionsForTopics(topicList).toSeq:_*)
-      val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => 
partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
-      channel = ClientUtils.channelToOffsetManager(group, zkUtils, 
channelSocketTimeoutMs, channelRetryBackoffMs)
-
-      debug("Sending offset fetch request to coordinator 
%s:%d.".format(channel.host, channel.port))
-      channel.send(OffsetFetchRequest(group, topicPartitions))
-      val offsetFetchResponse = 
OffsetFetchResponse.readFrom(channel.receive().payload())
-      debug("Received offset fetch response %s.".format(offsetFetchResponse))
-
-      offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, 
offsetAndMetadata) =>
-        if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
-          val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
-          // this group may not have migrated off zookeeper for offsets 
storage (we don't expose the dual-commit option in this tool
-          // (meaning the lag may be off until all the consumers in the group 
have the same setting for offsets storage)
-          try {
-            val offset = zkUtils.readData(topicDirs.consumerOffsetDir + 
"/%d".format(topicAndPartition.partition))._1.toLong
-            offsetMap.put(topicAndPartition, offset)
-          } catch {
-            case z: ZkNoNodeException =>
-              if(zkUtils.pathExists(topicDirs.consumerOffsetDir))
-                offsetMap.put(topicAndPartition,-1)
-              else
-                throw z
-          }
-        }
-        else if (offsetAndMetadata.error == Errors.NONE)
-          offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
-        else {
-          println("Could not fetch offset for %s due to 
%s.".format(topicAndPartition, offsetAndMetadata.error.exception))
-        }
-      }
-      channel.disconnect()
-      channel = null
-
-      println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", 
"Pid", "Offset", "logSize", "Lag", "Owner"))
-      topicList.sorted.foreach {
-        topic => processTopic(zkUtils, group, topic)
-      }
-
-      if (options.has("broker-info"))
-        printBrokerInfo()
-
-      consumerMap.values.flatten.foreach(_.close())
-    }
-    catch {
-      case t: Throwable =>
-        println("Exiting due to: %s.".format(t.getMessage))
-    }
-    finally {
-      consumerMap.values.flatten.foreach(_.close())
-      if (zkUtils != null)
-        zkUtils.close()
-
-      if (channel != null)
-        channel.disconnect()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index a98bdea..ce750ea 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -66,6 +66,7 @@
         This was only intended for use on the broker, but it is no longer in 
use and the implementations have not been maintained.
         A stub implementation has been retained for binary compatibility.</li>
     <li>The Java clients and tools now accept any string as a client-id.</li>
+    <li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has 
been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group 
details.</li>
 </ul>
 
 <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New 
Protocol Versions</a></h5>
@@ -163,7 +164,7 @@
         This config specifies the time, in milliseconds, that the 
<code>GroupCoordinator</code> will delay the initial consumer rebalance.
         The rebalance will be further delayed by the value of 
<code>group.initial.rebalance.delay.ms</code> as new members join the group, up 
to a maximum of <code>max.poll.interval.ms</code>.
         The default value for this is 3 seconds.
-        During development and testing it might be desirable to set this to 0 
inorder to not delay test execution time.
+        During development and testing it might be desirable to set this to 0 
in order to not delay test execution time.
     </li>
     <li><code>org.apache.kafka.common.Cluster#partitionsForTopic</code>, 
<code>partitionsForNode</code> and <code>availablePartitionsForTopic</code> 
methods
         will return an empty list instead of <code>null</code> (which is 
considered a bad practice) in case the metadata for the required topic does not 
exist.
@@ -205,7 +206,7 @@
     tool.</li>
   <li>EoS in Kafka introduces new request APIs and modifies several existing 
ones. See
     <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-RPCProtocolSummary";>KIP-98</a>
-    for the full details</code></li>
+    for the full details</li>
 </ol>
 
 <h5><a id="upgrade_11_message_format" href="#upgrade_11_message_format">Notes 
on the new message format in 0.11.0</a></h5>
@@ -236,7 +237,7 @@
   is already not possible in that case. In order to avoid the cost of 
down-conversion, you should ensure that consumer applications
   are upgraded to the latest 0.11.0 client. Significantly, since the old 
consumer has been deprecated in 0.11.0.0, it does not support
   the new message format. You must upgrade to use the new consumer to use the 
new message format without the cost of down-conversion.
-  Note that 0.11.0 consumers support backwards compability with brokers 0.10.0 
brokers and upward, so it is possible to upgrade the
+  Note that 0.11.0 consumers support backwards compatibility with brokers 
0.10.0 brokers and upward, so it is possible to upgrade the
   clients first before the brokers.
 </p>
 
@@ -483,7 +484,6 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients 
should be upgraded to 0.9
         <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], 
Array[Byte]], output: PrintStream)</code> </li>
     <li> MessageReader interface was changed from <code>def readMessage(): 
KeyedMessage[Array[Byte], Array[Byte]]</code> to
         <code>def readMessage(): ProducerRecord[Array[Byte], 
Array[Byte]]</code> </li>
-    </li>
     <li> MessageFormatter's package was changed from <code>kafka.tools</code> 
to <code>kafka.common</code> </li>
     <li> MessageReader's package was changed from <code>kafka.tools</code> to 
<code>kafka.common</code> </li>
     <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: 
MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never 
called. </li>

Reply via email to