Repository: kafka Updated Branches: refs/heads/trunk ff7b0f5b4 -> d142f8294
KAFKA-3256; Add print.timestamp option to console consumer. Author: Jiangjie Qin <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #949 from becketqin/KAFKA-3256 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d142f829 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d142f829 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d142f829 Branch: refs/heads/trunk Commit: d142f8294af67fea20d77dcc5272770af153c0d9 Parents: ff7b0f5 Author: Jiangjie Qin <[email protected]> Authored: Mon Feb 22 15:57:55 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Mon Feb 22 15:57:55 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d142f829/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index fe2ce9f..0ae057f 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -214,7 +214,7 @@ object ConsoleConsumer extends Logging { .describedAs("class") .ofType(classOf[String]) .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property") + val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.") .withRequiredArg .describedAs("prop") .ofType(classOf[String]) @@ -345,10 +345,13 @@ trait MessageFormatter{ class DefaultMessageFormatter extends MessageFormatter { var printKey = false + var printTimestamp = false var keySeparator = "\t".getBytes var lineSeparator = "\n".getBytes override def init(props: Properties) { + if (props.containsKey("print.timestamp")) + printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true") if (props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") if (props.containsKey("key.separator")) @@ -358,8 +361,11 @@ class DefaultMessageFormatter extends MessageFormatter { } def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { - if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) { - output.write(s"$timestampType:$timestamp".getBytes) + if (printTimestamp) { + if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) + output.write(s"$timestampType:$timestamp".getBytes) + else + output.write(s"NO_TIMESTAMP".getBytes) output.write(keySeparator) } if (printKey) {
