Repository: kafka Updated Branches: refs/heads/trunk d2f5589af -> 8e7516ea2
KAFKA-4866; Console consumer `print.value` property is ignored This property is mentioned in the quickstart. Author: huxi <h...@zhenrongbao.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #2661 from amethystic/kafka4866_consoleconsumer_ignore_printvalue Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e7516ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e7516ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e7516ea Branch: refs/heads/trunk Commit: 8e7516ea2e6b99a6e9a3fb5a958e62b1fb186cf9 Parents: d2f5589 Author: huxi <h...@zhenrongbao.com> Authored: Tue Apr 11 11:31:11 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Tue Apr 11 12:38:09 2017 +0100 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsoleConsumer.scala | 26 ++++++++++++++++---- .../unit/kafka/tools/ConsoleConsumerTest.scala | 7 +++--- 2 files changed, 25 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8e7516ea/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 56f125a..393fee6 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -414,6 +414,7 @@ object ConsoleConsumer extends Logging { class DefaultMessageFormatter extends MessageFormatter { var printKey = false + var printValue = true var printTimestamp = false var keySeparator = "\t".getBytes(StandardCharsets.UTF_8) var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8) @@ -426,6 +427,8 @@ class DefaultMessageFormatter extends MessageFormatter { printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true") if (props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true") + if (props.containsKey("print.value")) + printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true") if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8) if (props.containsKey("line.separator")) @@ -440,12 +443,18 @@ class DefaultMessageFormatter extends MessageFormatter { def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { - def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) { + def writeSeparator(columnSeparator: Boolean): Unit = { + if (columnSeparator) + output.write(keySeparator) + else + output.write(lineSeparator) + } + + def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) { val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8)) val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString. getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes) output.write(convertedBytes) - output.write(separator) } import consumerRecord._ @@ -455,11 +464,18 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8)) else output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8)) - output.write(keySeparator) + writeSeparator(printKey || printValue) } - if (printKey) write(keyDeserializer, key, keySeparator) - write(valueDeserializer, value, lineSeparator) + if (printKey) { + write(keyDeserializer, key) + writeSeparator(printValue) + } + + if (printValue) { + write(valueDeserializer, value) + output.write(lineSeparator) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8e7516ea/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 013ed3e..e0917a2 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -25,9 +25,8 @@ import kafka.utils.TestUtils import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test -import org.scalatest.junit.JUnitSuite -class ConsoleConsumerTest extends JUnitSuite { +class ConsoleConsumerTest { @Test def shouldLimitReadsToMaxMessageLimit() { @@ -160,7 +159,8 @@ class ConsoleConsumerTest extends JUnitSuite { "--topic", "test", "--partition", "0", "--offset", "LatEst", - "--new-consumer") //new + "--new-consumer", //new + "--property", "print.value=false") //When val config = new ConsoleConsumer.ConsumerConfig(args) @@ -172,6 +172,7 @@ class ConsoleConsumerTest extends JUnitSuite { assertEquals(0, config.partitionArg.get) assertEquals(-1, config.offsetArg) assertEquals(false, config.fromBeginning) + assertEquals(false, config.formatter.asInstanceOf[DefaultMessageFormatter].printValue) } @Test