Repository: kafka Updated Branches: refs/heads/trunk 49ddc897b -> ff300c9d4
KAFKA-3645; Fix ConsumerGroupCommand and ConsumerOffsetChecker to correctly read endpoint info from ZK The host and port entries under /brokers/ids/<bid> gets filled only for PLAINTEXT security protocol. For other protocols the host is null and the actual endpoint is under "endpoints". This causes NPE when running the consumer group and offset checker scripts in a kerberized env. By always reading the host and port values from the "endpoint", a more meaningful exception would be thrown rather than a NPE. Author: Arun Mahadevan <[email protected]> Reviewers: Sriharsha Chintalapani <[email protected]>, Ismael Juma <[email protected]> Closes #1301 from arunmahadevan/cg_kerb_fix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff300c9d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff300c9d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff300c9d Branch: refs/heads/trunk Commit: ff300c9d4f45e4a355db11258965c3a3a6f6bbf7 Parents: 49ddc89 Author: Arun Mahadevan <[email protected]> Authored: Sat Jun 4 09:24:45 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 4 09:24:45 2016 +0100 ---------------------------------------------------------------------- .../kafka/admin/ConsumerGroupCommand.scala | 20 +++++------------ .../kafka/tools/ConsumerOffsetChecker.scala | 23 ++++++-------------- 2 files changed, 12 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ff300c9d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index b086d8f..f0c817f 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -30,7 +30,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.BrokerNotAvailableException -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils @@ -277,20 +277,10 @@ object ConsumerGroupCommand { private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = { try { - zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { - case Some(brokerInfoString) => - Json.parseFull(brokerInfoString) match { - case Some(m) => - val brokerInfo = m.asInstanceOf[Map[String, Any]] - val host = brokerInfo.get("host").get.asInstanceOf[String] - val port = brokerInfo.get("port").get.asInstanceOf[Int] - Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand")) - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) - } - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) - } + zkUtils.getBrokerInfo(brokerId) + .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerGroupCommand")) + .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))) } catch { case t: Throwable => println("Could not parse broker info due to " + t.getMessage) http://git-wip-us.apache.org/repos/asf/kafka/blob/ff300c9d/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 5c01f34..8f86f66 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -21,11 +21,12 @@ package kafka.tools import joptsimple._ import kafka.utils._ import kafka.consumer.SimpleConsumer -import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest} +import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest} import kafka.common.{OffsetMetadataAndError, TopicAndPartition} import org.apache.kafka.common.errors.BrokerNotAvailableException -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.security.JaasUtils + import scala.collection._ import kafka.client.ClientUtils import kafka.network.BlockingChannel @@ -40,20 +41,10 @@ object ConsumerOffsetChecker extends Logging { private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = { try { - zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + bid)._1 match { - case Some(brokerInfoString) => - Json.parseFull(brokerInfoString) match { - case Some(m) => - val brokerInfo = m.asInstanceOf[Map[String, Any]] - val host = brokerInfo.get("host").get.asInstanceOf[String] - val port = brokerInfo.get("port").get.asInstanceOf[Int] - Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker")) - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) - } - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) - } + zkUtils.getBrokerInfo(bid) + .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerOffsetChecker")) + .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))) } catch { case t: Throwable => println("Could not parse broker info due to " + t.getCause)
