Updated Branches: refs/heads/0.8 a376f9221 -> a737986e5
KAFKA-815 Improve SimpleConsumerShell to take in a max messages config option;reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a737986e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a737986e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a737986e Branch: refs/heads/0.8 Commit: a737986e54ea53a2b93f3d08f5eb7fd155095f3c Parents: a376f92 Author: Neha Narkhede <[email protected]> Authored: Tue Mar 19 17:19:20 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Tue Mar 19 17:19:29 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/consumer/ConsoleConsumer.scala | 5 ++++ .../scala/kafka/tools/SimpleConsumerShell.scala | 19 +++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a737986e/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 7e84043..d6c4a51 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -290,6 +290,11 @@ class DefaultMessageFormatter extends MessageFormatter { } } +class NoOpMessageFormatter extends MessageFormatter { + override def init(props: Properties) {} + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} +} + class ChecksumMessageFormatter extends MessageFormatter { private var topicStr: String = _ http://git-wip-us.apache.org/repos/asf/kafka/blob/a737986e/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 8f274df..3cfa384 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -84,6 +84,11 @@ object SimpleConsumerShell extends Logging { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) + val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume") + .withRequiredArg + .describedAs("max-messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Integer.MAX_VALUE) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", @@ -105,6 +110,7 @@ object SimpleConsumerShell extends Logging { val fetchSize = options.valueOf(fetchSizeOpt).intValue val clientId = options.valueOf(clientIdOpt).toString val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() + val maxMessages = options.valueOf(maxMessagesOpt).intValue val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val printOffsets = if(options.has(printOffsetOpt)) true else false @@ -181,14 +187,16 @@ object SimpleConsumerShell extends Logging { val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) - info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]" - .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) + val replicaString = if(replicaId > 0) "leader" else "replica" + info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]" + .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset + var numMessagesConsumed = 0 try { - while(true) { + while(numMessagesConsumed < maxMessages) { val fetchRequest = fetchRequestBuilder .addFetch(topic, partitionId, offset, fetchSize) .build() @@ -199,7 +207,7 @@ object SimpleConsumerShell extends Logging { return } debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) - for(messageAndOffset <- messageSet) { + for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) { try { offset = messageAndOffset.nextOffset if(printOffsets) @@ -207,6 +215,7 @@ object SimpleConsumerShell extends Logging { val message = messageAndOffset.message val key = if(message.hasKey) Utils.readBytes(message.key) else null formatter.writeTo(key, Utils.readBytes(message.payload), System.out) + numMessagesConsumed += 1 } catch { case e => if (skipMessageOnError) @@ -226,6 +235,8 @@ object SimpleConsumerShell extends Logging { } catch { case e: Throwable => error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e) + }finally { + info("Consumed " + numMessagesConsumed + " messages") } } }, false)
