Repository: kafka Updated Branches: refs/heads/trunk 5da935ef7 -> 74eff8a83
MINOR: DefaultMessageFormatter custom deserializer fixes The ability to specify a deserializer for keys and values was added in a recent commit (845c6eae1f6c6bcf11), but it contained a few issues. Author: Ismael Juma <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #987 from ijuma/console-consumer-cleanups Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/74eff8a8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/74eff8a8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/74eff8a8 Branch: refs/heads/trunk Commit: 74eff8a830ff6508ab98761a9e77d19d4e49a73e Parents: 5da935e Author: Ismael Juma <[email protected]> Authored: Mon Feb 29 16:10:25 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Feb 29 16:10:25 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsoleConsumer.scala | 39 +++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/74eff8a8/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 0d85aca..7aee7ab 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -350,12 +350,10 @@ class DefaultMessageFormatter extends MessageFormatter { var keySeparator = "\t".getBytes var lineSeparator = "\n".getBytes - var keyDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer() - var valDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer() + var keyDeserializer: Option[Deserializer[_]] = None + var valueDeserializer: Option[Deserializer[_]] = None override def init(props: Properties) { - System.out.println(props) - if (props.containsKey("print.timestamp")) printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true") if (props.containsKey("print.key")) @@ -364,22 +362,23 @@ class DefaultMessageFormatter extends MessageFormatter { keySeparator = props.getProperty("key.separator").getBytes if (props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes + // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` + if (props.containsKey("key.deserializer")) + keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) + // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` + if (props.containsKey("value.deserializer")) + valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) + } - if (props.containsKey("key.decoder")) { - keyDecoder = Class.forName(props.getProperty("key.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]] + def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { - System.out.println("update key decoder") + def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) { + val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes) + val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes) + output.write(convertedBytes) + output.write(separator) } - if (props.containsKey("value.decoder")) { - valDecoder = Class.forName(props.getProperty("value.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]] - System.out.println("update value decoder") - } - System.out.println(keyDecoder) - System.out.println(valDecoder) - } - - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { if (printTimestamp) { if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) output.write(s"$timestampType:$timestamp".getBytes) @@ -387,12 +386,8 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(s"NO_TIMESTAMP".getBytes) output.write(keySeparator) } - if (printKey) { - output.write(if (key == null) "null".getBytes else keyDecoder.deserialize(null, key).toString.getBytes) - output.write(keySeparator) - } - output.write(if (value == null) "null".getBytes else valDecoder.deserialize(null, value).toString.getBytes) - output.write(lineSeparator) + if (printKey) write(keyDeserializer, key, keySeparator) + write(valueDeserializer, value, lineSeparator) } }
