Repository: kafka Updated Branches: refs/heads/trunk 74eff8a83 -> 695fdc69d
KAFKA-3273; MessageFormatter and MessageReader interfaces should be resilient to changes * Change `MessageFormat.writeTo` to take a `ConsumerRecord` * Change `MessageReader.readMessage()` to use `ProducerRecord` Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #972 from ijuma/kafka-3273-message-formatter-and-reader-resilient Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/695fdc69 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/695fdc69 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/695fdc69 Branch: refs/heads/trunk Commit: 695fdc69db6e080419bb05d624e91fa88d5c0a02 Parents: 74eff8a Author: Ismael Juma <[email protected]> Authored: Mon Feb 29 18:52:54 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Mon Feb 29 18:52:54 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/common/MessageFormatter.scala | 39 +++++++++++++ .../main/scala/kafka/common/MessageReader.scala | 39 +++++++++++++ .../coordinator/GroupMetadataManager.scala | 60 +++++++++++--------- .../scala/kafka/tools/ConsoleConsumer.scala | 45 +++++++-------- .../scala/kafka/tools/ConsoleProducer.scala | 40 +++++-------- .../scala/kafka/tools/SimpleConsumerShell.scala | 31 +++++----- .../unit/kafka/tools/ConsoleConsumerTest.scala | 3 +- docs/upgrade.html | 11 +++- 8 files changed, 176 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/common/MessageFormatter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/MessageFormatter.scala b/core/src/main/scala/kafka/common/MessageFormatter.scala new file mode 100644 index 0000000..ef3c723 --- /dev/null +++ b/core/src/main/scala/kafka/common/MessageFormatter.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +import java.io.PrintStream +import java.util.Properties + +import org.apache.kafka.clients.consumer.ConsumerRecord + +/** + * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to + * a `PrintStream`. + * + * This is used by the `ConsoleConsumer`. + */ +trait MessageFormatter { + + def init(props: Properties) {} + + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit + + def close() {} + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/common/MessageReader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/MessageReader.scala b/core/src/main/scala/kafka/common/MessageReader.scala new file mode 100644 index 0000000..56b55ce --- /dev/null +++ b/core/src/main/scala/kafka/common/MessageReader.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +import java.io.InputStream +import java.util.Properties + +import org.apache.kafka.clients.producer.ProducerRecord + +/** + * Typical implementations of this interface convert data from an `InputStream` received via `init` into a + * `ProducerRecord` instance on each invocation of `readMessage`. + * + * This is used by the `ConsoleProducer`. + */ +trait MessageReader { + + def init(inputStream: InputStream, props: Properties) {} + + def readMessage(): ProducerRecord[Array[Byte], Array[Byte]] + + def close() {} + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index b3e1bc1..2c29172 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -22,23 +22,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.CoreUtils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field} +import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} import org.apache.kafka.common.protocol.types.Type.STRING import org.apache.kafka.common.protocol.types.Type.INT32 import org.apache.kafka.common.protocol.types.Type.INT64 import org.apache.kafka.common.protocol.types.Type.BYTES -import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Time - +import org.apache.kafka.clients.consumer.ConsumerRecord import kafka.utils._ import kafka.common._ import kafka.message._ import kafka.log.FileMessageSet import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition -import kafka.tools.MessageFormatter +import kafka.common.MessageFormatter import kafka.server.ReplicaManager import scala.collection._ @@ -968,37 +967,46 @@ object GroupMetadataManager { // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false. // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets) class OffsetsMessageFormatter extends MessageFormatter { - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { - val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key)) - // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp. - // only print if the message is an offset record - if (formattedKey.isInstanceOf[OffsetKey]) { - val groupTopicPartition = formattedKey.asInstanceOf[OffsetKey].toString - val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString - output.write(groupTopicPartition.getBytes) - output.write("::".getBytes) - output.write(formattedValue.getBytes) - output.write("\n".getBytes) + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { + Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach { + // Only print if the message is an offset record. + // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp. + case offsetKey: OffsetKey => + val groupTopicPartition = offsetKey.key + val value = consumerRecord.value + val formattedValue = + if (value == null) "NULL" + else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString + output.write(groupTopicPartition.toString.getBytes) + output.write("::".getBytes) + output.write(formattedValue.getBytes) + output.write("\n".getBytes) + case _ => // no-op } } } // Formatter for use with tools to read group metadata history class GroupMetadataMessageFormatter extends MessageFormatter { - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { - val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key)) - // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp. - // only print if the message is a group metadata record - if (formattedKey.isInstanceOf[GroupMetadataKey]) { - val groupId = formattedKey.asInstanceOf[GroupMetadataKey].key - val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString - output.write(groupId.getBytes) - output.write("::".getBytes) - output.write(formattedValue.getBytes) - output.write("\n".getBytes) + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { + Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach { + // Only print if the message is a group metadata record. + // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp. + case groupMetadataKey: GroupMetadataKey => + val groupId = groupMetadataKey.key + val value = consumerRecord.value + val formattedValue = + if (value == null) "NULL" + else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString + output.write(groupId.getBytes) + output.write("::".getBytes) + output.write(formattedValue.getBytes) + output.write("\n".getBytes) + case _ => // no-op } } } + } case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) { http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 7aee7ab..855025e 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -20,20 +20,21 @@ package kafka.tools import java.io.PrintStream import java.util.concurrent.CountDownLatch import java.util.{Properties, Random} + import joptsimple._ -import kafka.common.StreamEndException +import kafka.common.{MessageFormatter, StreamEndException} import kafka.consumer._ import kafka.message._ import kafka.metrics.KafkaMetricsReporter import kafka.utils._ -import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.record.TimestampType -import org.apache.kafka.common.serialization.{Deserializer, ByteArrayDeserializer} +import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.utils.Utils import org.apache.log4j.Logger -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * Consumer that dumps messages to standard out. @@ -126,7 +127,8 @@ object ConsoleConsumer extends Logging { } messageCount += 1 try { - formatter.writeTo(msg.key, msg.value, msg.timestamp, msg.timestampType, System.out) + formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType, + msg.key, msg.value), System.out) } catch { case e: Throwable => if (skipMessageOnError) { @@ -285,7 +287,7 @@ object ConsoleConsumer extends Logging { val fromBeginning = options.has(resetBeginningOpt) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) + val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala) val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1 val bootstrapServer = options.valueOf(bootstrapServerOpt) @@ -310,9 +312,9 @@ object ConsoleConsumer extends Logging { } //Provide the consumer with a randomly assigned group id - if(!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"console-consumer-" + new Random().nextInt(100000)) - groupIdPassed=false + if (!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}") + groupIdPassed = false } def tryParse(parser: OptionParser, args: Array[String]) = { @@ -336,14 +338,6 @@ object ConsoleConsumer extends Logging { } } -trait MessageFormatter{ - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) - - def init(props: Properties) {} - - def close() {} -} - class DefaultMessageFormatter extends MessageFormatter { var printKey = false var printTimestamp = false @@ -370,7 +364,7 @@ class DefaultMessageFormatter extends MessageFormatter { valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) } - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) { val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes) @@ -379,6 +373,8 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(separator) } + import consumerRecord._ + if (printTimestamp) { if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) output.write(s"$timestampType:$timestamp".getBytes) @@ -386,6 +382,7 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(s"NO_TIMESTAMP".getBytes) output.write(keySeparator) } + if (printKey) write(keyDeserializer, key, keySeparator) write(valueDeserializer, value, lineSeparator) } @@ -395,9 +392,10 @@ class LoggingMessageFormatter extends MessageFormatter { private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter val logger = Logger.getLogger(getClass().getName) - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream): Unit = { - defaultWriter.writeTo(key, value, timestamp, timestampType, output) - if(logger.isInfoEnabled) + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { + import consumerRecord._ + defaultWriter.writeTo(consumerRecord, output) + if (logger.isInfoEnabled) logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} + s"key:${if (key == null) "null" else new String(key)}, " + s"value:${if (value == null) "null" else new String(value)}") @@ -407,7 +405,7 @@ class LoggingMessageFormatter extends MessageFormatter { class NoOpMessageFormatter extends MessageFormatter { override def init(props: Properties) {} - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream){} + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream){} } class ChecksumMessageFormatter extends MessageFormatter { @@ -421,7 +419,8 @@ class ChecksumMessageFormatter extends MessageFormatter { topicStr = "" } - def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { + import consumerRecord._ val chksum = if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) new Message(value, key, timestamp, timestampType, NoCompressionCodec, 0, -1, Message.MagicValue_V1).checksum http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/ConsoleProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index bce819e..0116a96 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -20,14 +20,13 @@ package kafka.tools import kafka.common._ import kafka.message._ import kafka.serializer._ -import kafka.utils.{ToolsUtils, CommandLineUtils} -import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} - +import kafka.utils.{CommandLineUtils, ToolsUtils} +import kafka.producer.{NewShinyProducer, OldProducer} import java.util.Properties import java.io._ import joptsimple._ -import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.utils.Utils object ConsoleProducer { @@ -52,12 +51,12 @@ object ConsoleProducer { } }) - var message: KeyedMessage[Array[Byte], Array[Byte]] = null + var message: ProducerRecord[Array[Byte], Array[Byte]] = null do { message = reader.readMessage() - if(message != null) - producer.send(message.topic, message.key, message.message) - } while(message != null) + if (message != null) + producer.send(message.topic, message.key, message.value) + } while (message != null) } catch { case e: joptsimple.OptionException => System.err.println(e.getMessage) @@ -285,12 +284,6 @@ object ConsoleProducer { val maxBlockMs = options.valueOf(maxBlockMsOpt) } - trait MessageReader { - def init(inputStream: InputStream, props: Properties) {} - def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] - def close() {} - } - class LineMessageReader extends MessageReader { var topic: String = null var reader: BufferedReader = null @@ -301,11 +294,11 @@ object ConsoleProducer { override def init(inputStream: InputStream, props: Properties) { topic = props.getProperty("topic") - if(props.containsKey("parse.key")) + if (props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) + if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") - if(props.containsKey("ignore.error")) + if (props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") reader = new BufferedReader(new InputStreamReader(inputStream)) } @@ -317,17 +310,14 @@ object ConsoleProducer { case (line, true) => line.indexOf(keySeparator) match { case -1 => - if(ignoreError) - new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) - else - throw new KafkaException("No key found on line " + lineNumber + ": " + line) + if (ignoreError) new ProducerRecord(topic, line.getBytes) + else throw new KafkaException(s"No key found on line ${lineNumber}: $line") case n => - new KeyedMessage[Array[Byte], Array[Byte]](topic, - line.substring(0, n).getBytes, - (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) + val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes + new ProducerRecord(topic, line.substring(0, n).getBytes, value) } case (line, false) => - new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) + new ProducerRecord(topic, line.getBytes) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index dda9697..b4b68e0 100755 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -21,10 +21,12 @@ import joptsimple._ import kafka.utils._ import kafka.consumer._ import kafka.client.ClientUtils -import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} +import kafka.api.{FetchRequestBuilder, OffsetRequest, Request} import kafka.cluster.BrokerEndPoint + import scala.collection.JavaConversions._ -import kafka.common.TopicAndPartition +import kafka.common.{MessageFormatter, TopicAndPartition} +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.utils.Utils /** @@ -137,7 +139,7 @@ object SimpleConsumerShell extends Logging { // validating partition id val partitionsMetadata = topicsMetadata(0).partitionsMetadata val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId) - if(!partitionMetadataOpt.isDefined) { + if (!partitionMetadataOpt.isDefined) { System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic)) System.exit(1) } @@ -145,9 +147,9 @@ object SimpleConsumerShell extends Logging { // validating replica id and initializing target broker var fetchTargetBroker: BrokerEndPoint = null var replicaOpt: Option[BrokerEndPoint] = null - if(replicaId == UseLeaderReplica) { + if (replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader - if(!replicaOpt.isDefined) { + if (!replicaOpt.isDefined) { System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId)) System.exit(1) } @@ -186,7 +188,7 @@ object SimpleConsumerShell extends Logging { } // initializing formatter - val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + val formatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) val replicaString = if(replicaId > 0) "leader" else "replica" @@ -202,7 +204,7 @@ object SimpleConsumerShell extends Logging { var offset = startingOffset var numMessagesConsumed = 0 try { - while(numMessagesConsumed < maxMessages) { + while (numMessagesConsumed < maxMessages) { val fetchRequest = fetchRequestBuilder .addFetch(topic, partitionId, offset, fetchSize) .build() @@ -213,15 +215,16 @@ object SimpleConsumerShell extends Logging { return } debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) - for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) { + for (messageAndOffset <- messageSet if numMessagesConsumed < maxMessages) { try { offset = messageAndOffset.nextOffset - if(printOffsets) + if (printOffsets) System.out.println("next offset = " + offset) val message = messageAndOffset.message - val key = if(message.hasKey) Utils.readBytes(message.key) else null + val key = if (message.hasKey) Utils.readBytes(message.key) else null val value = if (message.isNull) null else Utils.readBytes(message.payload) - formatter.writeTo(key, value, message.timestamp, message.timestampType, System.out) + formatter.writeTo(new ConsumerRecord(topic, partitionId, offset, message.timestamp, + message.timestampType, key, value), System.out) numMessagesConsumed += 1 } catch { case e: Throwable => @@ -230,7 +233,7 @@ object SimpleConsumerShell extends Logging { else throw e } - if(System.out.checkError()) { + if (System.out.checkError()) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") formatter.close() @@ -242,8 +245,8 @@ object SimpleConsumerShell extends Logging { } catch { case e: Throwable => error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e) - }finally { - info("Consumed " + numMessagesConsumed + " messages") + } finally { + info(s"Consumed $numMessagesConsumed messages") } } }, false) http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index de92a24..31b3211 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -19,6 +19,7 @@ package kafka.tools import java.io.FileOutputStream +import kafka.common.MessageFormatter import kafka.consumer.{BaseConsumer, BaseConsumerRecord} import kafka.utils.TestUtils import org.easymock.EasyMock @@ -39,7 +40,7 @@ class ConsoleConsumerTest extends JUnitSuite { //Expectations val messageLimit: Int = 10 - EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit) + EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit) EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit) EasyMock.replay(consumer) http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index 3370822..863a6fa 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -65,9 +65,14 @@ are introduced, it is important to upgrade your Kafka clusters before upgrading <li> Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages. </li> <li> ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li> <li> FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li> - <li> MessageFormatter interface was changed from <code>void writeTo(byte[] key, byte[] value, PrintStream output)</code> to - <code>void writeTo(byte[] key, byte[] value, long timestamp, TimestampType timestampType, PrintStream output)</code> </li> - <li> MirrorMakerMessageHandler no longer exposes <em>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</em> method as it was never called. </li> + <li> MessageFormatter interface was changed from <code>def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)</code> to + <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)</code> </li> + <li> MessageReader interface was changed from <code>def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]</code> to + <code>def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]</code> </li> + </li> + <li> MessageFormatter's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li> + <li> MessageReader's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li> + <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li> </ul> <h4><a id="upgrade_9" href="#upgrade_9">Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0</a></h4>
