http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index af64ffe..9e4c149 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -26,7 +26,7 @@ import kafka.cluster.{Partition, Replica} import kafka.common._ import kafka.controller.KafkaController import kafka.log.{LogAppendInfo, LogManager} -import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet} +import kafka.message.InvalidMessageException import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.UnboundedQuota import kafka.utils._ @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordEx import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Time @@ -78,9 +79,11 @@ case class LogReadResult(info: FetchDataInfo, } } +case class FetchPartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, records: Records) + object LogReadResult { val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, - MessageSet.Empty), + MemoryRecords.EMPTY), -1L, -1, false) @@ -276,11 +279,7 @@ class ReplicaManager(val config: KafkaConfig, } def getPartition(topic: String, partitionId: Int): Option[Partition] = { - val partition = allPartitions.get((topic, partitionId)) - if (partition == null) - None - else - Some(partition) + Option(allPartitions.get((topic, partitionId))) } def getReplicaOrException(topic: String, partition: Int): Replica = { @@ -318,15 +317,15 @@ class ReplicaManager(val config: KafkaConfig, * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied */ - def appendMessages(timeout: Long, - requiredAcks: Short, - internalTopicsAllowed: Boolean, - messagesPerPartition: Map[TopicPartition, MessageSet], - responseCallback: Map[TopicPartition, PartitionResponse] => Unit) { + def appendRecords(timeout: Long, + requiredAcks: Short, + internalTopicsAllowed: Boolean, + entriesPerPartition: Map[TopicPartition, MemoryRecords], + responseCallback: Map[TopicPartition, PartitionResponse] => Unit) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds - val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) + val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) val produceStatus = localProduceResults.map { case (topicPartition, result) => @@ -336,13 +335,13 @@ class ReplicaManager(val config: KafkaConfig, new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.logAppendTime)) // response status } - if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) { + if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation - val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq + val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq // try to complete the request immediately, otherwise put it into the purgatory // this is because while the delayed produce operation is being created, new @@ -357,9 +356,9 @@ class ReplicaManager(val config: KafkaConfig, } else { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all - val responseStatus = messagesPerPartition.map { case (topicAndPartition, _) => + val responseStatus = entriesPerPartition.map { case (topicAndPartition, _) => topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code, - LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp) + LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP) } responseCallback(responseStatus) } @@ -370,11 +369,12 @@ class ReplicaManager(val config: KafkaConfig, // 1. required acks = -1 // 2. there is data to append // 3. at least one partition append was successful (fewer errors than partitions) - private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicPartition, MessageSet], - localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = { + private def delayedRequestRequired(requiredAcks: Short, + entriesPerPartition: Map[TopicPartition, MemoryRecords], + localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = { requiredAcks == -1 && - messagesPerPartition.nonEmpty && - localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size + entriesPerPartition.nonEmpty && + localProduceResults.values.count(_.error.isDefined) < entriesPerPartition.size } private def isValidRequiredAcks(requiredAcks: Short): Boolean = { @@ -385,10 +385,10 @@ class ReplicaManager(val config: KafkaConfig, * Append the messages to the local replica logs */ private def appendToLocalLog(internalTopicsAllowed: Boolean, - messagesPerPartition: Map[TopicPartition, MessageSet], + entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { - trace("Append [%s] to local log ".format(messagesPerPartition)) - messagesPerPartition.map { case (topicPartition, messages) => + trace("Append [%s] to local log ".format(entriesPerPartition)) + entriesPerPartition.map { case (topicPartition, records) => BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() @@ -402,7 +402,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionOpt = getPartition(topicPartition.topic, topicPartition.partition) val info = partitionOpt match { case Some(partition) => - partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks) + partition.appendRecordsToLeader(records, requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicPartition, localBrokerId)) } @@ -414,13 +414,13 @@ class ReplicaManager(val config: KafkaConfig, info.lastOffset - info.firstOffset + 1 // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(records.sizeInBytes) BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset)) + .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset)) (topicPartition, LogAppendResult(info)) } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions @@ -434,6 +434,7 @@ class ReplicaManager(val config: KafkaConfig, _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | + _: InvalidRecordException | _: InvalidMessageException | _: InvalidTimestampException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) @@ -458,7 +459,7 @@ class ReplicaManager(val config: KafkaConfig, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota = UnboundedQuota, - responseCallback: Seq[(TopicAndPartition, FetchResponsePartitionData)] => Unit) { + responseCallback: Seq[(TopicAndPartition, FetchPartitionData)] => Unit) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) @@ -480,7 +481,7 @@ class ReplicaManager(val config: KafkaConfig, // check if this fetch request can be satisfied right away val logReadResultValues = logReadResults.map { case (_, v) => v } - val bytesReadable = logReadResultValues.map(_.info.messageSet.sizeInBytes).sum + val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) => errorIncurred || (readResult.errorCode != Errors.NONE.code)) @@ -490,7 +491,7 @@ class ReplicaManager(val config: KafkaConfig, // 4) some error happens while reading data if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.map { case (tp, result) => - tp -> FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet) + tp -> FetchPartitionData(result.errorCode, result.hw, result.info.records) } responseCallback(fetchPartitionData) } else { @@ -568,16 +569,16 @@ class ReplicaManager(val config: KafkaConfig, // If the partition is being throttled, simply return an empty set. if (shouldLeaderThrottle(quota, TopicAndPartition(tp.topic, tp.partition), replicaId)) - FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty) + FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY) // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don't need to report a `RecordTooLargeException` - else if (!hardMaxBytesLimit && fetch.firstMessageSetIncomplete) - FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty) + else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete) + FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY) else fetch case None => error(s"Leader for partition $tp does not have a local log") - FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY) } val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0 @@ -590,12 +591,14 @@ class ReplicaManager(val config: KafkaConfig, _: NotLeaderForPartitionException | _: ReplicaNotAvailableException | _: OffsetOutOfRangeException) => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, partitionFetchSize, false, Some(e)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, + partitionFetchSize, false, Some(e)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() - error(s"Error processing fetch operation on partition ${tp}, offset $offset", e) - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, partitionFetchSize, false, Some(e)) + error(s"Error processing fetch operation on partition $tp, offset $offset", e) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, + partitionFetchSize, false, Some(e)) } } @@ -604,7 +607,7 @@ class ReplicaManager(val config: KafkaConfig, var minOneMessage = !hardMaxBytesLimit readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) - val messageSetSize = readResult.info.messageSet.sizeInBytes + val messageSetSize = readResult.info.records.sizeInBytes // Once we read from a non-empty partition, we stop ignoring request and partition level size limits if (messageSetSize > 0) minOneMessage = false @@ -625,9 +628,9 @@ class ReplicaManager(val config: KafkaConfig, quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync } - def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] = + def getMagicAndTimestampType(topicAndPartition: TopicAndPartition): Option[(Byte, TimestampType)] = getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { replica => - replica.log.map(_.config.messageFormatVersion.messageFormatVersion) + replica.log.map(log => (log.config.messageFormatVersion.messageFormatVersion, log.config.messageTimestampType)) } def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 221ef6c..ceff78c 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -23,15 +23,16 @@ import java.nio.ByteBuffer import joptsimple.OptionParser import kafka.coordinator.{GroupMetadataKey, GroupMetadataManager, OffsetKey} import kafka.log._ -import kafka.message._ import kafka.serializer.Decoder import kafka.utils._ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.record.{CompressionType, FileRecords, LogEntry, Record} import org.apache.kafka.common.utils.Utils import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ object DumpLogSegments { @@ -69,7 +70,7 @@ object DumpLogSegments { CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) - val printDataLog = (options.has(printOpt) || options.has(offsetsOpt) || options.has(valueDecoderOpt) || options.has(keyDecoderOpt)) + val printDataLog = options.has(printOpt) || options.has(offsetsOpt) || options.has(valueDecoderOpt) || options.has(keyDecoderOpt) val verifyOnly = options.has(verifyOpt) val indexSanityOnly = options.has(indexSanityOpt) @@ -132,7 +133,7 @@ object DumpLogSegments { maxMessageSize: Int) { val startOffset = file.getName().split("\\.")(0).toLong val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix) - val messageSet = new FileMessageSet(logFile, false) + val fileRecords = FileRecords.open(logFile, false) val index = new OffsetIndex(file, baseOffset = startOffset) //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not. @@ -144,11 +145,11 @@ object DumpLogSegments { for(i <- 0 until index.entries) { val entry = index.entry(i) - val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, maxMessageSize) - val messageAndOffset = getIterator(partialFileMessageSet.head, isDeepIteration = true).next() - if(messageAndOffset.offset != entry.offset + index.baseOffset) { + val slice = fileRecords.read(entry.position, maxMessageSize) + val logEntry = getIterator(slice.shallowIterator.next, isDeepIteration = true).next() + if (logEntry.offset != entry.offset + index.baseOffset) { var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]()) - misMatchesSeq ::=(entry.offset + index.baseOffset, messageAndOffset.offset) + misMatchesSeq ::=(entry.offset + index.baseOffset, logEntry.offset) misMatchesForIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq) } // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one @@ -164,9 +165,9 @@ object DumpLogSegments { verifyOnly: Boolean, timeIndexDumpErrors: TimeIndexDumpErrors, maxMessageSize: Int) { - val startOffset = file.getName().split("\\.")(0).toLong + val startOffset = file.getName.split("\\.")(0).toLong val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix) - val messageSet = new FileMessageSet(logFile, false) + val fileRecords = FileRecords.open(logFile, false) val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.IndexFileSuffix) val index = new OffsetIndex(indexFile, baseOffset = startOffset) val timeIndex = new TimeIndex(file, baseOffset = startOffset) @@ -178,26 +179,26 @@ object DumpLogSegments { return } - var prevTimestamp = Message.NoTimestamp + var prevTimestamp = Record.NO_TIMESTAMP for(i <- 0 until timeIndex.entries) { val entry = timeIndex.entry(i) val position = index.lookup(entry.offset + timeIndex.baseOffset).position - val partialFileMessageSet: FileMessageSet = messageSet.read(position, Int.MaxValue) - val shallowIter = partialFileMessageSet.iterator - var maxTimestamp = Message.NoTimestamp + val partialFileRecords = fileRecords.read(position, Int.MaxValue) + val shallowEntries = partialFileRecords.shallowIterator.asScala + var maxTimestamp = Record.NO_TIMESTAMP // We first find the message by offset then check if the timestamp is correct. - val wrapperMessageOpt = shallowIter.find(_.offset >= entry.offset + timeIndex.baseOffset) - wrapperMessageOpt match { + val maybeLogEntry = shallowEntries.find(_.offset >= entry.offset + timeIndex.baseOffset) + maybeLogEntry match { case None => timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, -1.toLong) - case Some(wrapperMessage) if wrapperMessage.offset != entry.offset + timeIndex.baseOffset => + case Some(logEntry) if logEntry.offset != entry.offset + timeIndex.baseOffset => timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, - wrapperMessage.offset) - case Some(wrapperMessage) => - val deepIter = getIterator(wrapperMessage, isDeepIteration = true) - for (messageAndOffset <- deepIter) - maxTimestamp = math.max(maxTimestamp, messageAndOffset.message.timestamp) + logEntry.offset) + case Some(shallowLogEntry) => + val deepIter = getIterator(shallowLogEntry, isDeepIteration = true) + for (deepLogEntry <- deepIter) + maxTimestamp = math.max(maxTimestamp, deepLogEntry.record.timestamp) if (maxTimestamp != entry.timestamp) timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp) @@ -216,20 +217,20 @@ object DumpLogSegments { } private trait MessageParser[K, V] { - def parse(message: Message): (Option[K], Option[V]) + def parse(record: Record): (Option[K], Option[V]) } private class DecoderMessageParser[K, V](keyDecoder: Decoder[K], valueDecoder: Decoder[V]) extends MessageParser[K, V] { - override def parse(message: Message): (Option[K], Option[V]) = { - if (message.isNull) { + override def parse(record: Record): (Option[K], Option[V]) = { + if (record.hasNullValue) { (None, None) } else { - val key = if (message.hasKey) - Some(keyDecoder.fromBytes(Utils.readBytes(message.key))) + val key = if (record.hasKey) + Some(keyDecoder.fromBytes(Utils.readBytes(record.key))) else None - val payload = Some(valueDecoder.fromBytes(Utils.readBytes(message.payload))) + val payload = Some(valueDecoder.fromBytes(Utils.readBytes(record.value))) (key, payload) } @@ -249,7 +250,7 @@ object DumpLogSegments { val topicPartition = offsetKey.key.topicPartition val offset = GroupMetadataManager.readOffsetMessageValue(payload) - val keyString = s"offset::${group}:${topicPartition.topic}:${topicPartition.partition}" + val keyString = s"offset::$group:${topicPartition.topic}:${topicPartition.partition}" val valueString = if (offset.metadata.isEmpty) String.valueOf(offset.offset) else @@ -271,27 +272,27 @@ object DumpLogSegments { if (userData.isEmpty) s"${member.memberId}=${partitionAssignment.partitions()}" else - s"${member.memberId}=${partitionAssignment.partitions()}:${userData}" + s"${member.memberId}=${partitionAssignment.partitions()}:$userData" } else { s"${member.memberId}=${hex(member.assignment)}" } }.mkString("{", ",", "}") - val keyString = s"metadata::${groupId}" - val valueString = s"${protocolType}:${group.protocol}:${group.generationId}:${assignment}" + val keyString = s"metadata::$groupId" + val valueString = s"$protocolType:${group.protocol}:${group.generationId}:$assignment" (Some(keyString), Some(valueString)) } - override def parse(message: Message): (Option[String], Option[String]) = { - if (message.isNull) + override def parse(record: Record): (Option[String], Option[String]) = { + if (record.hasNullValue) (None, None) - else if (!message.hasKey) { + else if (!record.hasKey) { throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)") } else { - GroupMetadataManager.readMessageKey(message.key) match { - case offsetKey: OffsetKey => parseOffsets(offsetKey, message.payload) - case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, message.payload) + GroupMetadataManager.readMessageKey(record.key) match { + case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value) + case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value) case _ => throw new KafkaException("Failed to decode message using offset topic decoder (message had an invalid key)") } } @@ -307,70 +308,51 @@ object DumpLogSegments { parser: MessageParser[_, _]) { val startOffset = file.getName().split("\\.")(0).toLong println("Starting offset: " + startOffset) - val messageSet = new FileMessageSet(file, false) + val messageSet = FileRecords.open(file, false) var validBytes = 0L var lastOffset = -1l - val shallowIterator = messageSet.iterator(maxMessageSize) - for(shallowMessageAndOffset <- shallowIterator) { // this only does shallow iteration - val itr = getIterator(shallowMessageAndOffset, isDeepIteration) - for (messageAndOffset <- itr) { - val msg = messageAndOffset.message + val shallowIterator = messageSet.shallowIterator(maxMessageSize).asScala + for (shallowLogEntry <- shallowIterator) { // this only does shallow iteration + val itr = getIterator(shallowLogEntry, isDeepIteration) + for (deepLogEntry <- itr) { + val record = deepLogEntry.record() if(lastOffset == -1) - lastOffset = messageAndOffset.offset + lastOffset = deepLogEntry.offset // If we are iterating uncompressed messages, offsets must be consecutive - else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) { + else if (record.compressionType == CompressionType.NONE && deepLogEntry.offset != lastOffset +1) { var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]()) - nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset) + nonConsecutivePairsSeq ::=(lastOffset, deepLogEntry.offset) nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq) } - lastOffset = messageAndOffset.offset - - print("offset: " + messageAndOffset.offset + " position: " + validBytes + - " " + msg.timestampType + ": " + msg.timestamp + " isvalid: " + msg.isValid + - " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + - " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) - if(msg.hasKey) - print(" keysize: " + msg.keySize) - if(printContents) { - val (key, payload) = parser.parse(msg) - key.map(key => print(s" key: ${key}")) - payload.map(payload => print(s" payload: ${payload}")) + lastOffset = deepLogEntry.offset + + print("offset: " + deepLogEntry.offset + " position: " + validBytes + + " " + record.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid + + " payloadsize: " + record.valueSize + " magic: " + record.magic + + " compresscodec: " + record.compressionType + " crc: " + record.checksum) + if (record.hasKey) + print(" keysize: " + record.keySize) + if (printContents) { + val (key, payload) = parser.parse(record) + key.foreach(key => print(s" key: $key")) + payload.foreach(payload => print(s" payload: $payload")) } println() } - validBytes += MessageSet.entrySize(shallowMessageAndOffset.message) + + validBytes += shallowLogEntry.sizeInBytes } val trailingBytes = messageSet.sizeInBytes - validBytes if(trailingBytes > 0) println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName)) } - private def getIterator(messageAndOffset: MessageAndOffset, isDeepIteration: Boolean) = { - if (isDeepIteration) { - val message = messageAndOffset.message - message.compressionCodec match { - case NoCompressionCodec => - getSingleMessageIterator(messageAndOffset) - case _ => - ByteBufferMessageSet.deepIterator(messageAndOffset) - } - } else - getSingleMessageIterator(messageAndOffset) - } - - private def getSingleMessageIterator(messageAndOffset: MessageAndOffset) = { - new IteratorTemplate[MessageAndOffset] { - var messageIterated = false - - override def makeNext(): MessageAndOffset = { - if (!messageIterated) { - messageIterated = true - messageAndOffset - } else - allDone() - } - } + private def getIterator(logEntry: LogEntry, isDeepIteration: Boolean): Iterator[LogEntry] = { + if (isDeepIteration) + logEntry.iterator.asScala + else + Iterator(logEntry) } class TimeIndexDumpErrors { http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 479b43c..c483021 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -17,25 +17,26 @@ package kafka.tools -import joptsimple.OptionParser -import kafka.cluster.BrokerEndPoint -import kafka.message.{ByteBufferMessageSet, MessageAndOffset, MessageSet} +import java.text.SimpleDateFormat +import java.util.Date import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference - -import kafka.client.ClientUtils import java.util.regex.{Pattern, PatternSyntaxException} +import joptsimple.OptionParser import kafka.api._ -import java.text.SimpleDateFormat -import java.util.Date - +import kafka.client.ClientUtils +import kafka.cluster.BrokerEndPoint import kafka.common.TopicAndPartition -import kafka.utils._ import kafka.consumer.{ConsumerConfig, SimpleConsumer, Whitelist} +import kafka.message.{ByteBufferMessageSet, MessageSet} +import kafka.utils._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Time +import scala.collection.JavaConverters._ + + /** * For verifying the consistency among replicas. * @@ -149,15 +150,15 @@ object ReplicaVerificationTool extends Logging { debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + brokerId -> partitions.map { partition => TopicAndPartition(partition.topic, partition.partitionId) } } debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) + topicPartitionReplicaList.groupBy(replica => TopicAndPartition(replica.topic, replica.partitionId)) .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map { partitionMetadata => - (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id) + (TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id) } }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) => topicAndPartition @@ -200,8 +201,6 @@ object ReplicaVerificationTool extends Logging { private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) -private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) - private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], @@ -276,41 +275,42 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") - val messageIteratorMap = fetchResponsePerReplica.map { + val logEntryIteratorMap = fetchResponsePerReplica.map { case(replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowIterator.asScala + } val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ( (replicaId, messageIterator) <- messageIteratorMap) { + for ( (replicaId, logEntries) <- logEntryIteratorMap) { try { - if (messageIterator.hasNext) { - val messageAndOffset = messageIterator.next() + if (logEntries.hasNext) { + val logEntry = logEntries.next() // only verify up to the high watermark - if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) + if (logEntry.offset >= fetchResponsePerReplica.get(replicaId).hw) isMessageInAllReplicas = false else { messageInfoFromFirstReplicaOpt match { case None => messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + MessageInfo(replicaId, logEntry.offset,logEntry.nextOffset, logEntry.record.checksum)) case Some(messageInfoFromFirstReplica) => - if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { + if (messageInfoFromFirstReplica.offset != logEntry.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " + messageInfoFromFirstReplica.offset + " doesn't match replica " - + replicaId + "'s offset " + messageAndOffset.offset) + + replicaId + "'s offset " + logEntry.offset) System.exit(1) } - if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) + if (messageInfoFromFirstReplica.checksum != logEntry.record.checksum) println(ReplicaVerificationTool.getCurrentTimeString + ": partition " - + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica " + + topicAndPartition + " has unmatched checksum at offset " + logEntry.offset + "; replica " + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum - + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum) + + "; replica " + replicaId + "'s checksum " + logEntry.record.checksum) } } } else http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala index f36e146..51e987a 100644 --- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala @@ -50,11 +50,13 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { def getGroupMetadataLogOpt: Option[Log] = logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0)) - TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)), + TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowIterator.asScala.nonEmpty)), "Commit message not appended in time") val logSegments = getGroupMetadataLogOpt.get.logSegments - val incorrectCompressionCodecs = logSegments.flatMap(_.log.map(_.message.compressionCodec)).filter(_ != offsetsTopicCompressionCodec) + val incorrectCompressionCodecs = logSegments + .flatMap(_.log.shallowIterator.asScala.map(_.record.compressionType.id)) + .filter(_ != offsetsTopicCompressionCodec.codec) assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs) consumer.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/kafka/tools/TestLogCleaning.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala index 51f02d1..ecf7408 100755 --- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala @@ -21,12 +21,15 @@ import joptsimple.OptionParser import java.util.Properties import java.util.Random import java.io._ + import kafka.consumer._ import kafka.serializer._ import kafka.utils._ -import kafka.log.FileMessageSet import kafka.log.Log -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.record.FileRecords + +import scala.collection.JavaConverters._ /** * This is a torture test that runs against an existing broker. Here is how it works: @@ -135,15 +138,15 @@ object TestLogCleaning { def dumpLog(dir: File) { require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath) - for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) { - val ms = new FileMessageSet(new File(dir, file)) - for(entry <- ms) { - val key = TestUtils.readString(entry.message.key) + for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) { + val fileRecords = FileRecords.open(new File(dir, file)) + for (entry <- fileRecords.shallowIterator.asScala) { + val key = TestUtils.readString(entry.record.key) val content = - if(entry.message.isNull) + if(entry.record.hasNullValue) null else - TestUtils.readString(entry.message.payload) + TestUtils.readString(entry.record.value) println("offset = %s, key = %s, content = %s".format(entry.offset, key, content)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index f5cee0c..3381fb7 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -20,10 +20,10 @@ package kafka import java.util.Properties import java.util.concurrent.atomic._ -import kafka.message._ import kafka.log._ import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException +import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.utils.Utils /** @@ -36,13 +36,13 @@ object StressTestLog { def main(args: Array[String]) { val dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir()) val time = new MockTime - val logProprties = new Properties() - logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer) - logProprties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer) - logProprties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer) + val logProperties = new Properties() + logProperties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer) + logProperties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer) + logProperties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer) val log = new Log(dir = dir, - config = LogConfig(logProprties), + config = LogConfig(logProperties), recoveryPoint = 0L, scheduler = time.scheduler, time = time) @@ -84,7 +84,7 @@ object StressTestLog { class WriterThread(val log: Log) extends WorkerThread { @volatile var offset = 0 override def work() { - val logAppendInfo = log.append(TestUtils.singleMessageSet(offset.toString.getBytes)) + val logAppendInfo = log.append(TestUtils.singletonRecords(offset.toString.getBytes)) require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset == offset) offset += 1 if(offset % 1000 == 0) @@ -96,11 +96,11 @@ object StressTestLog { @volatile var offset = 0 override def work() { try { - log.read(offset, 1024, Some(offset+1)).messageSet match { - case read: FileMessageSet if read.sizeInBytes > 0 => { - val first = read.head + log.read(offset, 1024, Some(offset+1)).records match { + case read: FileRecords if read.sizeInBytes > 0 => { + val first = read.shallowIterator.next() require(first.offset == offset, "We should either read nothing or the message we asked for.") - require(MessageSet.entrySize(first.message) == read.sizeInBytes, "Expected %d but got %d.".format(MessageSet.entrySize(first.message), read.sizeInBytes)) + require(first.sizeInBytes == read.sizeInBytes, "Expected %d but got %d.".format(first.sizeInBytes, read.sizeInBytes)) offset += 1 } case _ => http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 6fef2b3..f0883ad 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -22,13 +22,14 @@ import java.nio._ import java.nio.channels._ import java.util.{Properties, Random} +import joptsimple._ import kafka.log._ -import kafka.utils._ import kafka.message._ +import kafka.utils._ +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record} +import org.apache.kafka.common.utils.{Time, Utils} import scala.math._ -import joptsimple._ -import org.apache.kafka.common.utils.{Time, Utils} /** * This test does linear writes using either a kafka log or a file and measures throughput and latency. @@ -64,7 +65,7 @@ object TestLinearWriteSpeed { .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Long]) - .defaultsTo(1000) + .defaultsTo(1000L) val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.") .withRequiredArg .describedAs("mb") @@ -81,7 +82,7 @@ object TestLinearWriteSpeed { .ofType(classOf[java.lang.String]) .defaultsTo(NoCompressionCodec.name) val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.") - val channelOpt = parser.accepts("channel", "Do writes to file channesl.") + val channelOpt = parser.accepts("channel", "Do writes to file channels.") val logOpt = parser.accepts("log", "Do writes to kafka logs.") val options = parser.parse(args : _*) @@ -101,9 +102,9 @@ object TestLinearWriteSpeed { val rand = new Random rand.nextBytes(buffer.array) val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead) - val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, - messages = (0 until numMessages).map(_ => new Message(new Array[Byte](messageSize))): _*) - + val messageSet = MemoryRecords.withRecords(CompressionType.forId(compressionCodec.codec), + (0 until numMessages).map(_ => Record.create(new Array[Byte](messageSize))): _*) + val writables = new Array[Writable](numFiles) val scheduler = new KafkaScheduler(1) scheduler.startup() @@ -199,7 +200,7 @@ object TestLinearWriteSpeed { } } - class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { + class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable { Utils.delete(dir) val log = new Log(dir, config, 0L, scheduler, Time.SYSTEM) def write(): Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index d1fcbc0..b98822d 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -301,7 +301,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { var counter = 0 for (_ <- 0 until numDups; key <- 0 until numKeys) yield { val count = counter - log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) + log.append(TestUtils.singletonRecords(value = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) counter += 1 (key, count) } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index a981e68..1c5a526 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -18,20 +18,20 @@ package kafka.coordinator import kafka.utils.timer.MockTimer -import org.apache.kafka.common.record.Record +import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType} import org.junit.Assert._ import kafka.common.{OffsetAndMetadata, Topic} -import kafka.message.{Message, MessageSet} -import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig} +import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest} +import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse -import org.easymock.{Capture, IAnswer, EasyMock} +import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnitSuite import java.util.concurrent.TimeUnit + import scala.collection._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future, Promise} @@ -305,7 +305,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None) - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) + .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() EasyMock.replay(replicaManager) timer.advanceClock(DefaultSessionTimeout + 100) @@ -988,17 +989,18 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() - EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(), + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), EasyMock.anyBoolean(), - EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) -> new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP) ) )}) - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) + .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() EasyMock.replay(replicaManager) groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback) @@ -1069,17 +1071,18 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() - EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(), + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), EasyMock.anyBoolean(), - EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) -> new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP) ) )}) - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) + .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() EasyMock.replay(replicaManager) groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback) @@ -1090,7 +1093,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val (responseFuture, responseCallback) = setupHeartbeatCallback EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None) - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) + .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() EasyMock.replay(replicaManager) groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala index 6c03476..62b7f42 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -21,12 +21,11 @@ import kafka.api.ApiVersion import kafka.cluster.Partition import kafka.common.{OffsetAndMetadata, Topic} import kafka.log.LogAppendInfo -import kafka.message.{ByteBufferMessageSet, Message, MessageSet} import kafka.server.{KafkaConfig, ReplicaManager} import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.Record +import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType} import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, EasyMock, IAnswer} @@ -34,6 +33,7 @@ import org.junit.{After, Before, Test} import org.junit.Assert._ import scala.collection._ +import JavaConverters._ class GroupMetadataManagerTest { @@ -50,7 +50,6 @@ class GroupMetadataManagerTest { val rebalanceTimeout = 60000 val sessionTimeout = 10000 - @Before def setUp() { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")) @@ -176,7 +175,7 @@ class GroupMetadataManagerTest { @Test def testStoreNonEmptyGroupWhenCoordinatorHasMoved() { - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None) + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None) val memberId = "memberId" val clientId = "clientId" val clientHost = "localhost" @@ -245,7 +244,7 @@ class GroupMetadataManagerTest { @Test def testCommitOffsetWhenCoordinatorHasMoved() { - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None) + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None) val memberId = "" val generationId = -1 val topicPartition = new TopicPartition("foo", 0) @@ -363,7 +362,7 @@ class GroupMetadataManagerTest { time.sleep(2) EasyMock.reset(partition) - EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(partition) @@ -391,24 +390,74 @@ class GroupMetadataManagerTest { // expect the group metadata tombstone EasyMock.reset(partition) - val messageSetCapture: Capture[ByteBufferMessageSet] = EasyMock.newCapture() + val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() + + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) + .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)) + EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition)) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) + .andReturn(LogAppendInfo.UnknownLogAppendInfo) + EasyMock.replay(replicaManager, partition) + + groupMetadataManager.cleanupGroupMetadata() + + assertTrue(recordsCapture.hasCaptured) + + val records = recordsCapture.getValue.records.asScala.toList + assertEquals(1, records.size) + + val metadataTombstone = records.head + assertTrue(metadataTombstone.hasKey) + assertTrue(metadataTombstone.hasNullValue) + assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic) + assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType) + assertTrue(metadataTombstone.timestamp > 0) + + val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey] + assertEquals(groupId, groupKey.key) + + // the full group should be gone since all offsets were removed + assertEquals(None, groupMetadataManager.getGroup(groupId)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) + } + + @Test + def testGroupMetadataRemovalWithLogAppendTime() { + val topicPartition1 = new TopicPartition("foo", 0) + val topicPartition2 = new TopicPartition("foo", 1) + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + group.generationId = 5 + + // expect the group metadata tombstone + EasyMock.reset(partition) + val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1)) + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) + .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.LOG_APPEND_TIME)) EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition)) - EasyMock.expect(partition.appendMessagesToLeader(EasyMock.capture(messageSetCapture), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(replicaManager, partition) groupMetadataManager.cleanupGroupMetadata() - assertTrue(messageSetCapture.hasCaptured) + assertTrue(recordsCapture.hasCaptured) - val messageSet = messageSetCapture.getValue - assertEquals(1, messageSet.size) + val records = recordsCapture.getValue.records.asScala.toList + assertEquals(1, records.size) - val metadataTombstone = messageSet.head.message + val metadataTombstone = records.head assertTrue(metadataTombstone.hasKey) - assertTrue(metadataTombstone.isNull) + assertTrue(metadataTombstone.hasNullValue) + assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic) + assertEquals(TimestampType.LOG_APPEND_TIME, metadataTombstone.timestampType) + assertTrue(metadataTombstone.timestamp > 0) val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey] assertEquals(groupId, groupKey.key) @@ -463,22 +512,22 @@ class GroupMetadataManagerTest { // expect the offset tombstone EasyMock.reset(partition) - val messageSetCapture: Capture[ByteBufferMessageSet] = EasyMock.newCapture() + val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() - EasyMock.expect(partition.appendMessagesToLeader(EasyMock.capture(messageSetCapture), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(partition) groupMetadataManager.cleanupGroupMetadata() - assertTrue(messageSetCapture.hasCaptured) + assertTrue(recordsCapture.hasCaptured) // verify the tombstones are correct and only for the expired offsets - val messageSet = messageSetCapture.getValue - assertEquals(2, messageSet.size) - messageSet.map(_.message).foreach { message => + val records = recordsCapture.getValue.records.asScala.toList + assertEquals(2, records.size) + records.foreach { message => assertTrue(message.hasKey) - assertTrue(message.isNull) + assertTrue(message.hasNullValue) val offsetKey = GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey] assertEquals(groupId, offsetKey.key.group) assertEquals("foo", offsetKey.key.topicPartition.topic) @@ -539,7 +588,7 @@ class GroupMetadataManagerTest { // expect the offset tombstone EasyMock.reset(partition) - EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(partition) @@ -557,17 +606,18 @@ class GroupMetadataManagerTest { private def expectAppendMessage(error: Errors) { val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() - EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(), + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), EasyMock.anyBoolean(), - EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) -> new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP) ) )}) - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1)) + EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) + .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 791bdb0..296dc15 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -25,9 +25,10 @@ import org.junit.Assert._ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import org.apache.kafka.common.record.CompressionType +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record} import org.apache.kafka.common.utils.Utils import java.util.{Collection, Properties} + import scala.collection.JavaConverters._ @RunWith(value = classOf[Parameterized]) @@ -50,22 +51,22 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin def testBrokerSideCompression() { val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression) val logProps = new Properties() - logProps.put(LogConfig.CompressionTypeProp,brokerCompression) + logProps.put(LogConfig.CompressionTypeProp, brokerCompression) /*configure broker-side compression */ val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) /* append two messages */ - log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes))) + log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), + Record.create("hello".getBytes), Record.create("there".getBytes))) - def readMessage(offset: Int) = log.read(offset, 4096).messageSet.head.message + def readMessage(offset: Int) = log.read(offset, 4096).records.shallowIterator.next().record if (!brokerCompression.equals("producer")) { val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression) - assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode, readMessage(0).compressionCodec) + assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode.codec, readMessage(0).compressionType.id) } else - assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode, readMessage(0).compressionCodec) - + assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode.codec, readMessage(0).compressionType.id) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala deleted file mode 100644 index a7f0446..0000000 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ /dev/null @@ -1,354 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io._ -import java.nio._ -import java.nio.channels._ - -import kafka.common.LongRef -import org.junit.Assert._ -import kafka.utils.TestUtils._ -import kafka.message._ -import kafka.common.KafkaException -import org.easymock.EasyMock -import org.junit.Test - -class FileMessageSetTest extends BaseMessageSetTestCases { - - val messageSet = createMessageSet(messages) - - def createMessageSet(messages: Seq[Message]): FileMessageSet = { - val set = new FileMessageSet(tempFile()) - set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) - set.flush() - set - } - - /** - * Test that the cached size variable matches the actual file size as we append messages - */ - @Test - def testFileSize() { - assertEquals(messageSet.channel.size, messageSet.sizeInBytes) - for (_ <- 0 until 20) { - messageSet.append(singleMessageSet("abcd".getBytes)) - assertEquals(messageSet.channel.size, messageSet.sizeInBytes) - } - } - - /** - * Test that adding invalid bytes to the end of the log doesn't break iteration - */ - @Test - def testIterationOverPartialAndTruncation() { - testPartialWrite(0, messageSet) - testPartialWrite(2, messageSet) - testPartialWrite(4, messageSet) - testPartialWrite(5, messageSet) - testPartialWrite(6, messageSet) - } - - def testPartialWrite(size: Int, messageSet: FileMessageSet) { - val buffer = ByteBuffer.allocate(size) - for (_ <- 0 until size) - buffer.put(0: Byte) - buffer.rewind() - messageSet.channel.write(buffer) - // appending those bytes should not change the contents - checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) - } - - /** - * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel. - */ - @Test - def testIterationDoesntChangePosition() { - val position = messageSet.channel.position - checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) - assertEquals(position, messageSet.channel.position) - } - - /** - * Test a simple append and read. - */ - @Test - def testRead() { - var read = messageSet.read(0, messageSet.sizeInBytes) - checkEquals(messageSet.iterator, read.iterator) - val items = read.iterator.toList - val sec = items.tail.head - read = messageSet.read(position = MessageSet.entrySize(sec.message), size = messageSet.sizeInBytes) - assertEquals("Try a read starting from the second message", items.tail, read.toList) - read = messageSet.read(MessageSet.entrySize(sec.message), MessageSet.entrySize(sec.message)) - assertEquals("Try a read of a single message starting from the second message", List(items.tail.head), read.toList) - } - - /** - * Test the MessageSet.searchFor API. - */ - @Test - def testSearch() { - // append a new message with a high offset - val lastMessage = new Message("test".getBytes) - messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(50), lastMessage)) - val messages = messageSet.toSeq - var position = 0 - val message1Size = MessageSet.entrySize(messages.head.message) - assertEquals("Should be able to find the first message by its offset", - (OffsetPosition(0L, position), message1Size), - messageSet.searchForOffsetWithSize(0, 0)) - position += message1Size - val message2Size = MessageSet.entrySize(messages(1).message) - assertEquals("Should be able to find second message when starting from 0", - (OffsetPosition(1L, position), message2Size), - messageSet.searchForOffsetWithSize(1, 0)) - assertEquals("Should be able to find second message starting from its offset", - (OffsetPosition(1L, position), message2Size), - messageSet.searchForOffsetWithSize(1, position)) - position += message2Size + MessageSet.entrySize(messages(2).message) - val message4Size = MessageSet.entrySize(messages(3).message) - assertEquals("Should be able to find fourth message from a non-existant offset", - (OffsetPosition(50L, position), message4Size), - messageSet.searchForOffsetWithSize(3, position)) - assertEquals("Should be able to find fourth message by correct offset", - (OffsetPosition(50L, position), message4Size), - messageSet.searchForOffsetWithSize(50, position)) - } - - /** - * Test that the message set iterator obeys start and end slicing - */ - @Test - def testIteratorWithLimits() { - val message = messageSet.toList(1) - val start = messageSet.searchForOffsetWithSize(1, 0)._1.position - val size = message.message.size + 12 - val slice = messageSet.read(start, size) - assertEquals(List(message), slice.toList) - val slice2 = messageSet.read(start, size - 1) - assertEquals(List(), slice2.toList) - } - - /** - * Test the truncateTo method lops off messages and appropriately updates the size - */ - @Test - def testTruncate() { - val message = messageSet.toList.head - val end = messageSet.searchForOffsetWithSize(1, 0)._1.position - messageSet.truncateTo(end) - assertEquals(List(message), messageSet.toList) - assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes) - } - - /** - * Test that truncateTo only calls truncate on the FileChannel if the size of the - * FileChannel is bigger than the target size. This is important because some JVMs - * change the mtime of the file, even if truncate should do nothing. - */ - @Test - def testTruncateNotCalledIfSizeIsSameAsTargetSize() { - val channelMock = EasyMock.createMock(classOf[FileChannel]) - - EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce() - EasyMock.expect(channelMock.position(42L)).andReturn(null) - EasyMock.replay(channelMock) - - val msgSet = new FileMessageSet(tempFile(), channelMock) - msgSet.truncateTo(42) - - EasyMock.verify(channelMock) - } - - /** - * Expect a KafkaException if targetSize is bigger than the size of - * the FileMessageSet. - */ - @Test - def testTruncateNotCalledIfSizeIsBiggerThanTargetSize() { - val channelMock = EasyMock.createMock(classOf[FileChannel]) - - EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce() - EasyMock.expect(channelMock.position(42L)).andReturn(null) - EasyMock.replay(channelMock) - - val msgSet = new FileMessageSet(tempFile(), channelMock) - - try { - msgSet.truncateTo(43) - fail("Should throw KafkaException") - } catch { - case _: KafkaException => // expected - } - - EasyMock.verify(channelMock) - } - - /** - * see #testTruncateNotCalledIfSizeIsSameAsTargetSize - */ - @Test - def testTruncateIfSizeIsDifferentToTargetSize() { - val channelMock = EasyMock.createMock(classOf[FileChannel]) - - EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce() - EasyMock.expect(channelMock.position(42L)).andReturn(null).once() - EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once() - EasyMock.expect(channelMock.position(23L)).andReturn(null).once() - EasyMock.replay(channelMock) - - val msgSet = new FileMessageSet(tempFile(), channelMock) - msgSet.truncateTo(23) - - EasyMock.verify(channelMock) - } - - - /** - * Test the new FileMessageSet with pre allocate as true - */ - @Test - def testPreallocateTrue() { - val temp = tempFile() - val set = new FileMessageSet(temp, false, 512 *1024 *1024, true) - val position = set.channel.position - val size = set.sizeInBytes() - assertEquals(0, position) - assertEquals(0, size) - assertEquals(512 *1024 *1024, temp.length) - } - - /** - * Test the new FileMessageSet with pre allocate as false - */ - @Test - def testPreallocateFalse() { - val temp = tempFile() - val set = new FileMessageSet(temp, false, 512 *1024 *1024, false) - val position = set.channel.position - val size = set.sizeInBytes() - assertEquals(0, position) - assertEquals(0, size) - assertEquals(0, temp.length) - } - - /** - * Test the new FileMessageSet with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data. - */ - @Test - def testPreallocateClearShutdown() { - val temp = tempFile() - val set = new FileMessageSet(temp, false, 512 *1024 *1024, true) - set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) - val oldposition = set.channel.position - val oldsize = set.sizeInBytes() - assertEquals(messageSet.sizeInBytes, oldposition) - assertEquals(messageSet.sizeInBytes, oldsize) - set.close() - - val tempReopen = new File(temp.getAbsolutePath()) - val setReopen = new FileMessageSet(tempReopen, true, 512 *1024 *1024, true) - val position = setReopen.channel.position - val size = setReopen.sizeInBytes() - - assertEquals(oldposition, position) - assertEquals(oldposition, size) - assertEquals(oldposition, tempReopen.length) - } - - @Test - def testFormatConversionWithPartialMessage() { - val message = messageSet.toList(1) - val start = messageSet.searchForOffsetWithSize(1, 0)._1.position - val size = message.message.size + 12 - val slice = messageSet.read(start, size - 1) - val messageV0 = slice.toMessageFormat(Message.MagicValue_V0) - assertEquals("No message should be there", 0, messageV0.size) - assertEquals(s"There should be ${size - 1} bytes", size - 1, messageV0.sizeInBytes) - } - - @Test - def testMessageFormatConversion() { - - // Prepare messages. - val offsets = Seq(0L, 2L) - val messagesV0 = Seq(new Message("hello".getBytes, "k1".getBytes, Message.NoTimestamp, Message.MagicValue_V0), - new Message("goodbye".getBytes, "k2".getBytes, Message.NoTimestamp, Message.MagicValue_V0)) - val messageSetV0 = new ByteBufferMessageSet( - compressionCodec = NoCompressionCodec, - offsetSeq = offsets, - messages = messagesV0:_*) - val compressedMessageSetV0 = new ByteBufferMessageSet( - compressionCodec = DefaultCompressionCodec, - offsetSeq = offsets, - messages = messagesV0:_*) - - val messagesV1 = Seq(new Message("hello".getBytes, "k1".getBytes, 1L, Message.MagicValue_V1), - new Message("goodbye".getBytes, "k2".getBytes, 2L, Message.MagicValue_V1)) - val messageSetV1 = new ByteBufferMessageSet( - compressionCodec = NoCompressionCodec, - offsetSeq = offsets, - messages = messagesV1:_*) - val compressedMessageSetV1 = new ByteBufferMessageSet( - compressionCodec = DefaultCompressionCodec, - offsetSeq = offsets, - messages = messagesV1:_*) - - // Down conversion - // down conversion for non-compressed messages - var fileMessageSet = new FileMessageSet(tempFile()) - fileMessageSet.append(messageSetV1) - fileMessageSet.flush() - var convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V0) - verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0) - - // down conversion for compressed messages - fileMessageSet = new FileMessageSet(tempFile()) - fileMessageSet.append(compressedMessageSetV1) - fileMessageSet.flush() - convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V0) - verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0) - - // Up conversion. In reality we only do down conversion, but up conversion should work as well. - // up conversion for non-compressed messages - fileMessageSet = new FileMessageSet(tempFile()) - fileMessageSet.append(messageSetV0) - fileMessageSet.flush() - convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1) - verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1) - - // up conversion for compressed messages - fileMessageSet = new FileMessageSet(tempFile()) - fileMessageSet.append(compressedMessageSetV0) - fileMessageSet.flush() - convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1) - verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1) - - def verifyConvertedMessageSet(convertedMessageSet: MessageSet, magicByte: Byte) { - var i = 0 - for (messageAndOffset <- convertedMessageSet) { - assertEquals("magic byte should be 1", magicByte, messageAndOffset.message.magic) - assertEquals("offset should not change", offsets(i), messageAndOffset.offset) - assertEquals("key should not change", messagesV0(i).key, messageAndOffset.message.key) - assertEquals("payload should not change", messagesV0(i).payload, messageAndOffset.message.payload) - i += 1 - } - } - } -}