This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5fc3fd4 MINOR: Use TopicPartition in ConsumerGroupCommand instead of
TopicAndPartition where possible (#4333)
5fc3fd4 is described below
commit 5fc3fd48b0f43056e524b4c0f35aea3e3f9653e8
Author: Vahid Hashemian <[email protected]>
AuthorDate: Fri Dec 22 18:37:36 2017 -0800
MINOR: Use TopicPartition in ConsumerGroupCommand instead of
TopicAndPartition where possible (#4333)
Reviewers: Jason Gustafson <[email protected]>
---
.../scala/kafka/admin/ConsumerGroupCommand.scala | 40 +++++++++++-----------
1 file changed, 20 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 918593b..e835510 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -284,8 +284,8 @@ object ConsumerGroupCommand extends Logging {
protected def collectConsumerAssignment(group: String,
coordinator: Option[Node],
- topicPartitions:
Seq[TopicAndPartition],
- getPartitionOffset:
TopicAndPartition => Option[Long],
+ topicPartitions:
Seq[TopicPartition],
+ getPartitionOffset: TopicPartition
=> Option[Long],
consumerIdOpt: Option[String],
hostOpt: Option[String],
clientIdOpt: Option[String]):
Array[PartitionAssignmentState] = {
@@ -328,7 +328,6 @@ object ConsumerGroupCommand extends Logging {
}
}
-
def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new
UnsupportedOperationException
def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition,
OffsetAndMetadata]): String = throw new UnsupportedOperationException
@@ -396,18 +395,18 @@ object ConsumerGroupCommand extends Logging {
topicsByConsumerId(consumerId).flatMap { _ =>
// since consumers with no topic partitions are processed here, we
pass empty for topic partitions and offsets
// since consumer id is repeated in client id, leave host and client
id empty
- collectConsumerAssignment(group, None, Array[TopicAndPartition](),
Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None)
+ collectConsumerAssignment(group, None, Array[TopicPartition](),
Map[TopicPartition, Option[Long]](), Some(consumerId), None, None)
}
}
(None, Some(assignmentRows))
}
- private def getAllTopicPartitions(topics: Seq[String]):
Seq[TopicAndPartition] = {
+ private def getAllTopicPartitions(topics: Seq[String]):
Seq[TopicPartition] = {
val topicPartitionMap = zkUtils.getPartitionsForTopics(topics)
topics.flatMap { topic =>
val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
- partitions.map(TopicAndPartition(topic, _))
+ partitions.map(new TopicPartition(topic, _))
}
}
@@ -416,7 +415,7 @@ object ConsumerGroupCommand extends Logging {
case Some(-1) => LogOffsetResult.Unknown
case Some(brokerId) =>
getZkConsumer(brokerId).map { consumer =>
- val topicAndPartition = TopicAndPartition(topicPartition.topic,
topicPartition.partition)
+ val topicAndPartition = new TopicAndPartition(topicPartition)
val request = OffsetRequest(Map(topicAndPartition ->
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logEndOffset =
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
consumer.close()
@@ -429,12 +428,12 @@ object ConsumerGroupCommand extends Logging {
}
private def getPartitionOffsets(group: String,
- topicPartitions: Seq[TopicAndPartition],
+ topicPartitions: Seq[TopicPartition],
channelSocketTimeoutMs: Int,
- channelRetryBackoffMs: Int):
Map[TopicAndPartition, Long] = {
+ channelRetryBackoffMs: Int):
Map[TopicPartition, Long] = {
val offsetMap = mutable.Map[TopicAndPartition, Long]()
val channel = ClientUtils.channelToOffsetManager(group, zkUtils,
channelSocketTimeoutMs, channelRetryBackoffMs)
- channel.send(OffsetFetchRequest(group, topicPartitions))
+ channel.send(OffsetFetchRequest(group, topicPartitions.map(new
TopicAndPartition(_))))
val offsetFetchResponse =
OffsetFetchResponse.readFrom(channel.receive().payload())
offsetFetchResponse.requestInfo.foreach { case (topicAndPartition,
offsetAndMetadata) =>
@@ -457,7 +456,9 @@ object ConsumerGroupCommand extends Logging {
}
}
channel.disconnect()
- offsetMap.toMap
+ offsetMap.map { case (topicAndPartition, offset) =>
+ (new TopicPartition(topicAndPartition.topic,
topicAndPartition.partition), offset)
+ }.toMap
}
private def deleteForGroup() {
@@ -541,10 +542,10 @@ object ConsumerGroupCommand extends Logging {
List[PartitionAssignmentState]()
else {
consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size >
_.assignment.size).flatMap { consumerSummary =>
- val topicPartitions = consumerSummary.assignment.map(tp =>
TopicAndPartition(tp.topic, tp.partition))
+ val topicPartitions = consumerSummary.assignment
assignedTopicPartitions = assignedTopicPartitions ++
consumerSummary.assignment
- val partitionOffsets: Map[TopicAndPartition, Option[Long]] =
consumerSummary.assignment.map { topicPartition =>
- new TopicAndPartition(topicPartition) ->
offsets.get(topicPartition)
+ val partitionOffsets: Map[TopicPartition, Option[Long]] =
consumerSummary.assignment.map { topicPartition =>
+ new TopicPartition(topicPartition.topic,
topicPartition.partition) -> offsets.get(topicPartition)
}.toMap
collectConsumerAssignment(group,
Some(consumerGroupSummary.coordinator), topicPartitions,
partitionOffsets, Some(s"${consumerSummary.consumerId}"),
Some(s"${consumerSummary.host}"),
@@ -554,9 +555,8 @@ object ConsumerGroupCommand extends Logging {
val rowsWithoutConsumer =
offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
case (topicPartition, offset) =>
- val topicAndPartition = new TopicAndPartition(topicPartition)
- collectConsumerAssignment(group,
Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
- Map(topicAndPartition -> Some(offset)),
Some(MISSING_COLUMN_VALUE),
+ collectConsumerAssignment(group,
Some(consumerGroupSummary.coordinator), Seq(topicPartition),
+ Map(topicPartition -> Some(offset)),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
}
@@ -658,9 +658,9 @@ object ConsumerGroupCommand extends Logging {
private def parseTopicPartitionsToReset(topicArgs: Seq[String]):
Seq[TopicPartition] = topicArgs.flatMap {
case topicArg if topicArg.contains(":") =>
- val topicAndPartitions = topicArg.split(":")
- val topic = topicAndPartitions(0)
- topicAndPartitions(1).split(",").map(partition => new
TopicPartition(topic, partition.toInt))
+ val topicPartitions = topicArg.split(":")
+ val topic = topicPartitions(0)
+ topicPartitions(1).split(",").map(partition => new
TopicPartition(topic, partition.toInt))
case topic => getConsumer.partitionsFor(topic).asScala
.map(partitionInfo => new TopicPartition(topic,
partitionInfo.partition))
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].