http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index af64ffe..9e4c149 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -26,7 +26,7 @@ import kafka.cluster.{Partition, Replica}
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogManager}
-import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, 
MessageSet}
+import kafka.message.InvalidMessageException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils._
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordEx
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, 
StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Time
@@ -78,9 +79,11 @@ case class LogReadResult(info: FetchDataInfo,
   }
 }
 
+case class FetchPartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, 
records: Records)
+
 object LogReadResult {
   val UnknownLogReadResult = 
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata,
-                                                         MessageSet.Empty),
+                                                         MemoryRecords.EMPTY),
                                            -1L,
                                            -1,
                                            false)
@@ -276,11 +279,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getPartition(topic: String, partitionId: Int): Option[Partition] = {
-    val partition = allPartitions.get((topic, partitionId))
-    if (partition == null)
-      None
-    else
-      Some(partition)
+    Option(allPartitions.get((topic, partitionId)))
   }
 
   def getReplicaOrException(topic: String, partition: Int): Replica = {
@@ -318,15 +317,15 @@ class ReplicaManager(val config: KafkaConfig,
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
    * the callback function will be triggered either when timeout or the 
required acks are satisfied
    */
-  def appendMessages(timeout: Long,
-                     requiredAcks: Short,
-                     internalTopicsAllowed: Boolean,
-                     messagesPerPartition: Map[TopicPartition, MessageSet],
-                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit) {
+  def appendRecords(timeout: Long,
+                    requiredAcks: Short,
+                    internalTopicsAllowed: Boolean,
+                    entriesPerPartition: Map[TopicPartition, MemoryRecords],
+                    responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit) {
 
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
-      val localProduceResults = appendToLocalLog(internalTopicsAllowed, 
messagesPerPartition, requiredAcks)
+      val localProduceResults = appendToLocalLog(internalTopicsAllowed, 
entriesPerPartition, requiredAcks)
       debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
       val produceStatus = localProduceResults.map { case (topicPartition, 
result) =>
@@ -336,13 +335,13 @@ class ReplicaManager(val config: KafkaConfig,
                   new PartitionResponse(result.errorCode, 
result.info.firstOffset, result.info.logAppendTime)) // response status
       }
 
-      if (delayedRequestRequired(requiredAcks, messagesPerPartition, 
localProduceResults)) {
+      if (delayedRequestRequired(requiredAcks, entriesPerPartition, 
localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
         val delayedProduce = new DelayedProduce(timeout, produceMetadata, 
this, responseCallback)
 
         // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
-        val producerRequestKeys = messagesPerPartition.keys.map(new 
TopicPartitionOperationKey(_)).toSeq
+        val producerRequestKeys = entriesPerPartition.keys.map(new 
TopicPartitionOperationKey(_)).toSeq
 
         // try to complete the request immediately, otherwise put it into the 
purgatory
         // this is because while the delayed produce operation is being 
created, new
@@ -357,9 +356,9 @@ class ReplicaManager(val config: KafkaConfig,
     } else {
       // If required.acks is outside accepted range, something is wrong with 
the client
       // Just return an error and don't handle the request at all
-      val responseStatus = messagesPerPartition.map { case (topicAndPartition, 
_) =>
+      val responseStatus = entriesPerPartition.map { case (topicAndPartition, 
_) =>
         topicAndPartition -> new 
PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
-          LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
+          LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
       }
       responseCallback(responseStatus)
     }
@@ -370,11 +369,12 @@ class ReplicaManager(val config: KafkaConfig,
   // 1. required acks = -1
   // 2. there is data to append
   // 3. at least one partition append was successful (fewer errors than 
partitions)
-  private def delayedRequestRequired(requiredAcks: Short, 
messagesPerPartition: Map[TopicPartition, MessageSet],
-                                       localProduceResults: 
Map[TopicPartition, LogAppendResult]): Boolean = {
+  private def delayedRequestRequired(requiredAcks: Short,
+                                     entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
+                                     localProduceResults: Map[TopicPartition, 
LogAppendResult]): Boolean = {
     requiredAcks == -1 &&
-    messagesPerPartition.nonEmpty &&
-    localProduceResults.values.count(_.error.isDefined) < 
messagesPerPartition.size
+    entriesPerPartition.nonEmpty &&
+    localProduceResults.values.count(_.error.isDefined) < 
entriesPerPartition.size
   }
 
   private def isValidRequiredAcks(requiredAcks: Short): Boolean = {
@@ -385,10 +385,10 @@ class ReplicaManager(val config: KafkaConfig,
    * Append the messages to the local replica logs
    */
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
-                               messagesPerPartition: Map[TopicPartition, 
MessageSet],
+                               entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
                                requiredAcks: Short): Map[TopicPartition, 
LogAppendResult] = {
-    trace("Append [%s] to local log ".format(messagesPerPartition))
-    messagesPerPartition.map { case (topicPartition, messages) =>
+    trace("Append [%s] to local log ".format(entriesPerPartition))
+    entriesPerPartition.map { case (topicPartition, records) =>
       
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
 
@@ -402,7 +402,7 @@ class ReplicaManager(val config: KafkaConfig,
           val partitionOpt = getPartition(topicPartition.topic, 
topicPartition.partition)
           val info = partitionOpt match {
             case Some(partition) =>
-              
partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], 
requiredAcks)
+              partition.appendRecordsToLeader(records, requiredAcks)
             case None => throw new UnknownTopicOrPartitionException("Partition 
%s doesn't exist on %d"
               .format(topicPartition, localBrokerId))
           }
@@ -414,13 +414,13 @@ class ReplicaManager(val config: KafkaConfig,
               info.lastOffset - info.firstOffset + 1
 
           // update stats for successfully appended bytes and messages as 
bytesInRate and messageInRate
-          
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(messages.sizeInBytes)
-          
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
+          
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
+          
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(records.sizeInBytes)
           
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
           
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 
           trace("%d bytes written to log %s-%d beginning at offset %d and 
ending at offset %d"
-            .format(messages.sizeInBytes, topicPartition.topic, 
topicPartition.partition, info.firstOffset, info.lastOffset))
+            .format(records.sizeInBytes, topicPartition.topic, 
topicPartition.partition, info.firstOffset, info.lastOffset))
           (topicPartition, LogAppendResult(info))
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known 
exceptions
@@ -434,6 +434,7 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
+                   _: InvalidRecordException |
                    _: InvalidMessageException |
                    _: InvalidTimestampException) =>
             (topicPartition, 
LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
@@ -458,7 +459,7 @@ class ReplicaManager(val config: KafkaConfig,
                     hardMaxBytesLimit: Boolean,
                     fetchInfos: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota = UnboundedQuota,
-                    responseCallback: Seq[(TopicAndPartition, 
FetchResponsePartitionData)] => Unit) {
+                    responseCallback: Seq[(TopicAndPartition, 
FetchPartitionData)] => Unit) {
     val isFromFollower = replicaId >= 0
     val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
@@ -480,7 +481,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     // check if this fetch request can be satisfied right away
     val logReadResultValues = logReadResults.map { case (_, v) => v }
-    val bytesReadable = 
logReadResultValues.map(_.info.messageSet.sizeInBytes).sum
+    val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
     val errorReadingData = logReadResultValues.foldLeft(false) 
((errorIncurred, readResult) =>
       errorIncurred || (readResult.errorCode != Errors.NONE.code))
 
@@ -490,7 +491,7 @@ class ReplicaManager(val config: KafkaConfig,
     //                        4) some error happens while reading data
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes 
|| errorReadingData) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
-        tp -> FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet)
+        tp -> FetchPartitionData(result.errorCode, result.hw, 
result.info.records)
       }
       responseCallback(fetchPartitionData)
     } else {
@@ -568,16 +569,16 @@ class ReplicaManager(val config: KafkaConfig,
 
             // If the partition is being throttled, simply return an empty set.
             if (shouldLeaderThrottle(quota, TopicAndPartition(tp.topic, 
tp.partition), replicaId))
-              FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
+              FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
             // For FetchRequest version 3, we replace incomplete message sets 
with an empty one as consumers can make
             // progress in such cases and don't need to report a 
`RecordTooLargeException`
-            else if (!hardMaxBytesLimit && fetch.firstMessageSetIncomplete)
-              FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
+            else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete)
+              FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
             else fetch
 
           case None =>
             error(s"Leader for partition $tp does not have a local log")
-            FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty)
+            FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY)
         }
 
         val readToEndOfLog = initialLogEndOffset.messageOffset - 
logReadInfo.fetchOffsetMetadata.messageOffset <= 0
@@ -590,12 +591,14 @@ class ReplicaManager(val config: KafkaConfig,
                  _: NotLeaderForPartitionException |
                  _: ReplicaNotAvailableException |
                  _: OffsetOutOfRangeException) =>
-          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, partitionFetchSize, false, Some(e))
+          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY), -1L,
+            partitionFetchSize, false, Some(e))
         case e: Throwable =>
           
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
           
BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
-          error(s"Error processing fetch operation on partition ${tp}, offset 
$offset", e)
-          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, partitionFetchSize, false, Some(e))
+          error(s"Error processing fetch operation on partition $tp, offset 
$offset", e)
+          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY), -1L,
+            partitionFetchSize, false, Some(e))
       }
     }
 
@@ -604,7 +607,7 @@ class ReplicaManager(val config: KafkaConfig,
     var minOneMessage = !hardMaxBytesLimit
     readPartitionInfo.foreach { case (tp, fetchInfo) =>
       val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
-      val messageSetSize = readResult.info.messageSet.sizeInBytes
+      val messageSetSize = readResult.info.records.sizeInBytes
       // Once we read from a non-empty partition, we stop ignoring request and 
partition level size limits
       if (messageSetSize > 0)
         minOneMessage = false
@@ -625,9 +628,9 @@ class ReplicaManager(val config: KafkaConfig,
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && 
!isReplicaInSync
   }
 
-  def getMessageFormatVersion(topicAndPartition: TopicAndPartition): 
Option[Byte] =
+  def getMagicAndTimestampType(topicAndPartition: TopicAndPartition): 
Option[(Byte, TimestampType)] =
     getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { 
replica =>
-      replica.log.map(_.config.messageFormatVersion.messageFormatVersion)
+      replica.log.map(log => 
(log.config.messageFormatVersion.messageFormatVersion, 
log.config.messageTimestampType))
     }
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest, metadataCache: MetadataCache) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 221ef6c..ceff78c 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -23,15 +23,16 @@ import java.nio.ByteBuffer
 import joptsimple.OptionParser
 import kafka.coordinator.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
 import kafka.log._
-import kafka.message._
 import kafka.serializer.Decoder
 import kafka.utils._
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.record.{CompressionType, FileRecords, LogEntry, 
Record}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 
 object DumpLogSegments {
 
@@ -69,7 +70,7 @@ object DumpLogSegments {
 
     CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
 
-    val printDataLog = (options.has(printOpt) || options.has(offsetsOpt) || 
options.has(valueDecoderOpt) || options.has(keyDecoderOpt))
+    val printDataLog = options.has(printOpt) || options.has(offsetsOpt) || 
options.has(valueDecoderOpt) || options.has(keyDecoderOpt)
     val verifyOnly = options.has(verifyOpt)
     val indexSanityOnly = options.has(indexSanityOpt)
 
@@ -132,7 +133,7 @@ object DumpLogSegments {
                         maxMessageSize: Int) {
     val startOffset = file.getName().split("\\.")(0).toLong
     val logFile = new File(file.getAbsoluteFile.getParent, 
file.getName.split("\\.")(0) + Log.LogFileSuffix)
-    val messageSet = new FileMessageSet(logFile, false)
+    val fileRecords = FileRecords.open(logFile, false)
     val index = new OffsetIndex(file, baseOffset = startOffset)
 
     //Check that index passes sanityCheck, this is the check that determines 
if indexes will be rebuilt on startup or not.
@@ -144,11 +145,11 @@ object DumpLogSegments {
 
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
-      val partialFileMessageSet: FileMessageSet = 
messageSet.read(entry.position, maxMessageSize)
-      val messageAndOffset = getIterator(partialFileMessageSet.head, 
isDeepIteration = true).next()
-      if(messageAndOffset.offset != entry.offset + index.baseOffset) {
+      val slice = fileRecords.read(entry.position, maxMessageSize)
+      val logEntry = getIterator(slice.shallowIterator.next, isDeepIteration = 
true).next()
+      if (logEntry.offset != entry.offset + index.baseOffset) {
         var misMatchesSeq = 
misMatchesForIndexFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
-        misMatchesSeq ::=(entry.offset + index.baseOffset, 
messageAndOffset.offset)
+        misMatchesSeq ::=(entry.offset + index.baseOffset, logEntry.offset)
         misMatchesForIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq)
       }
       // since it is a sparse file, in the event of a crash there may be many 
zero entries, stop if we see one
@@ -164,9 +165,9 @@ object DumpLogSegments {
                             verifyOnly: Boolean,
                             timeIndexDumpErrors: TimeIndexDumpErrors,
                             maxMessageSize: Int) {
-    val startOffset = file.getName().split("\\.")(0).toLong
+    val startOffset = file.getName.split("\\.")(0).toLong
     val logFile = new File(file.getAbsoluteFile.getParent, 
file.getName.split("\\.")(0) + Log.LogFileSuffix)
-    val messageSet = new FileMessageSet(logFile, false)
+    val fileRecords = FileRecords.open(logFile, false)
     val indexFile = new File(file.getAbsoluteFile.getParent, 
file.getName.split("\\.")(0) + Log.IndexFileSuffix)
     val index = new OffsetIndex(indexFile, baseOffset = startOffset)
     val timeIndex = new TimeIndex(file, baseOffset = startOffset)
@@ -178,26 +179,26 @@ object DumpLogSegments {
       return
     }
 
-    var prevTimestamp = Message.NoTimestamp
+    var prevTimestamp = Record.NO_TIMESTAMP
     for(i <- 0 until timeIndex.entries) {
       val entry = timeIndex.entry(i)
       val position = index.lookup(entry.offset + timeIndex.baseOffset).position
-      val partialFileMessageSet: FileMessageSet = messageSet.read(position, 
Int.MaxValue)
-      val shallowIter = partialFileMessageSet.iterator
-      var maxTimestamp = Message.NoTimestamp
+      val partialFileRecords = fileRecords.read(position, Int.MaxValue)
+      val shallowEntries = partialFileRecords.shallowIterator.asScala
+      var maxTimestamp = Record.NO_TIMESTAMP
       // We first find the message by offset then check if the timestamp is 
correct.
-      val wrapperMessageOpt = shallowIter.find(_.offset >= entry.offset + 
timeIndex.baseOffset)
-      wrapperMessageOpt match {
+      val maybeLogEntry = shallowEntries.find(_.offset >= entry.offset + 
timeIndex.baseOffset)
+      maybeLogEntry match {
         case None =>
           timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + 
timeIndex.baseOffset,
             -1.toLong)
-        case Some(wrapperMessage) if wrapperMessage.offset != entry.offset + 
timeIndex.baseOffset =>
+        case Some(logEntry) if logEntry.offset != entry.offset + 
timeIndex.baseOffset =>
           timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + 
timeIndex.baseOffset,
-            wrapperMessage.offset)
-        case Some(wrapperMessage) =>
-          val deepIter = getIterator(wrapperMessage, isDeepIteration = true)
-          for (messageAndOffset <- deepIter)
-            maxTimestamp = math.max(maxTimestamp, 
messageAndOffset.message.timestamp)
+            logEntry.offset)
+        case Some(shallowLogEntry) =>
+          val deepIter = getIterator(shallowLogEntry, isDeepIteration = true)
+          for (deepLogEntry <- deepIter)
+            maxTimestamp = math.max(maxTimestamp, 
deepLogEntry.record.timestamp)
 
           if (maxTimestamp != entry.timestamp)
             timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, 
maxTimestamp)
@@ -216,20 +217,20 @@ object DumpLogSegments {
   }
 
   private trait MessageParser[K, V] {
-    def parse(message: Message): (Option[K], Option[V])
+    def parse(record: Record): (Option[K], Option[V])
   }
 
   private class DecoderMessageParser[K, V](keyDecoder: Decoder[K], 
valueDecoder: Decoder[V]) extends MessageParser[K, V] {
-    override def parse(message: Message): (Option[K], Option[V]) = {
-      if (message.isNull) {
+    override def parse(record: Record): (Option[K], Option[V]) = {
+      if (record.hasNullValue) {
         (None, None)
       } else {
-        val key = if (message.hasKey)
-          Some(keyDecoder.fromBytes(Utils.readBytes(message.key)))
+        val key = if (record.hasKey)
+          Some(keyDecoder.fromBytes(Utils.readBytes(record.key)))
         else
           None
 
-        val payload = 
Some(valueDecoder.fromBytes(Utils.readBytes(message.payload)))
+        val payload = 
Some(valueDecoder.fromBytes(Utils.readBytes(record.value)))
 
         (key, payload)
       }
@@ -249,7 +250,7 @@ object DumpLogSegments {
       val topicPartition = offsetKey.key.topicPartition
       val offset = GroupMetadataManager.readOffsetMessageValue(payload)
 
-      val keyString = 
s"offset::${group}:${topicPartition.topic}:${topicPartition.partition}"
+      val keyString = 
s"offset::$group:${topicPartition.topic}:${topicPartition.partition}"
       val valueString = if (offset.metadata.isEmpty)
         String.valueOf(offset.offset)
       else
@@ -271,27 +272,27 @@ object DumpLogSegments {
           if (userData.isEmpty)
             s"${member.memberId}=${partitionAssignment.partitions()}"
           else
-            
s"${member.memberId}=${partitionAssignment.partitions()}:${userData}"
+            s"${member.memberId}=${partitionAssignment.partitions()}:$userData"
         } else {
           s"${member.memberId}=${hex(member.assignment)}"
         }
       }.mkString("{", ",", "}")
 
-      val keyString = s"metadata::${groupId}"
-      val valueString = 
s"${protocolType}:${group.protocol}:${group.generationId}:${assignment}"
+      val keyString = s"metadata::$groupId"
+      val valueString = 
s"$protocolType:${group.protocol}:${group.generationId}:$assignment"
 
       (Some(keyString), Some(valueString))
     }
 
-    override def parse(message: Message): (Option[String], Option[String]) = {
-      if (message.isNull)
+    override def parse(record: Record): (Option[String], Option[String]) = {
+      if (record.hasNullValue)
         (None, None)
-      else if (!message.hasKey) {
+      else if (!record.hasKey) {
         throw new KafkaException("Failed to decode message using offset topic 
decoder (message had a missing key)")
       } else {
-        GroupMetadataManager.readMessageKey(message.key) match {
-          case offsetKey: OffsetKey => parseOffsets(offsetKey, message.payload)
-          case groupMetadataKey: GroupMetadataKey => 
parseGroupMetadata(groupMetadataKey, message.payload)
+        GroupMetadataManager.readMessageKey(record.key) match {
+          case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
+          case groupMetadataKey: GroupMetadataKey => 
parseGroupMetadata(groupMetadataKey, record.value)
           case _ => throw new KafkaException("Failed to decode message using 
offset topic decoder (message had an invalid key)")
         }
       }
@@ -307,70 +308,51 @@ object DumpLogSegments {
                       parser: MessageParser[_, _]) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
-    val messageSet = new FileMessageSet(file, false)
+    val messageSet = FileRecords.open(file, false)
     var validBytes = 0L
     var lastOffset = -1l
-    val shallowIterator = messageSet.iterator(maxMessageSize)
-    for(shallowMessageAndOffset <- shallowIterator) { // this only does 
shallow iteration
-      val itr = getIterator(shallowMessageAndOffset, isDeepIteration)
-      for (messageAndOffset <- itr) {
-        val msg = messageAndOffset.message
+    val shallowIterator = messageSet.shallowIterator(maxMessageSize).asScala
+    for (shallowLogEntry <- shallowIterator) { // this only does shallow 
iteration
+      val itr = getIterator(shallowLogEntry, isDeepIteration)
+      for (deepLogEntry <- itr) {
+        val record = deepLogEntry.record()
 
         if(lastOffset == -1)
-          lastOffset = messageAndOffset.offset
+          lastOffset = deepLogEntry.offset
         // If we are iterating uncompressed messages, offsets must be 
consecutive
-        else if (msg.compressionCodec == NoCompressionCodec && 
messageAndOffset.offset != lastOffset +1) {
+        else if (record.compressionType == CompressionType.NONE && 
deepLogEntry.offset != lastOffset +1) {
           var nonConsecutivePairsSeq = 
nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, 
Long)]())
-          nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
+          nonConsecutivePairsSeq ::=(lastOffset, deepLogEntry.offset)
           nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, 
nonConsecutivePairsSeq)
         }
-        lastOffset = messageAndOffset.offset
-
-        print("offset: " + messageAndOffset.offset + " position: " + 
validBytes +
-              " " + msg.timestampType + ": " + msg.timestamp + " isvalid: " + 
msg.isValid +
-              " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
-              " compresscodec: " + msg.compressionCodec + " crc: " + 
msg.checksum)
-        if(msg.hasKey)
-          print(" keysize: " + msg.keySize)
-        if(printContents) {
-          val (key, payload) = parser.parse(msg)
-          key.map(key => print(s" key: ${key}"))
-          payload.map(payload => print(s" payload: ${payload}"))
+        lastOffset = deepLogEntry.offset
+
+        print("offset: " + deepLogEntry.offset + " position: " + validBytes +
+              " " + record.timestampType + ": " + record.timestamp + " 
isvalid: " + record.isValid +
+              " payloadsize: " + record.valueSize + " magic: " + record.magic +
+              " compresscodec: " + record.compressionType + " crc: " + 
record.checksum)
+        if (record.hasKey)
+          print(" keysize: " + record.keySize)
+        if (printContents) {
+          val (key, payload) = parser.parse(record)
+          key.foreach(key => print(s" key: $key"))
+          payload.foreach(payload => print(s" payload: $payload"))
         }
         println()
       }
-      validBytes += MessageSet.entrySize(shallowMessageAndOffset.message)
+
+      validBytes += shallowLogEntry.sizeInBytes
     }
     val trailingBytes = messageSet.sizeInBytes - validBytes
     if(trailingBytes > 0)
       println("Found %d invalid bytes at the end of %s".format(trailingBytes, 
file.getName))
   }
 
-  private def getIterator(messageAndOffset: MessageAndOffset, isDeepIteration: 
Boolean) = {
-    if (isDeepIteration) {
-      val message = messageAndOffset.message
-      message.compressionCodec match {
-        case NoCompressionCodec =>
-          getSingleMessageIterator(messageAndOffset)
-        case _ =>
-          ByteBufferMessageSet.deepIterator(messageAndOffset)
-      }
-    } else
-      getSingleMessageIterator(messageAndOffset)
-  }
-
-  private def getSingleMessageIterator(messageAndOffset: MessageAndOffset) = {
-    new IteratorTemplate[MessageAndOffset] {
-      var messageIterated = false
-
-      override def makeNext(): MessageAndOffset = {
-        if (!messageIterated) {
-          messageIterated = true
-          messageAndOffset
-        } else
-          allDone()
-      }
-    }
+  private def getIterator(logEntry: LogEntry, isDeepIteration: Boolean): 
Iterator[LogEntry] = {
+    if (isDeepIteration)
+      logEntry.iterator.asScala
+    else
+      Iterator(logEntry)
   }
 
   class TimeIndexDumpErrors {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 479b43c..c483021 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -17,25 +17,26 @@
 
 package kafka.tools
 
-import joptsimple.OptionParser
-import kafka.cluster.BrokerEndPoint
-import kafka.message.{ByteBufferMessageSet, MessageAndOffset, MessageSet}
+import java.text.SimpleDateFormat
+import java.util.Date
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicReference
-
-import kafka.client.ClientUtils
 import java.util.regex.{Pattern, PatternSyntaxException}
 
+import joptsimple.OptionParser
 import kafka.api._
-import java.text.SimpleDateFormat
-import java.util.Date
-
+import kafka.client.ClientUtils
+import kafka.cluster.BrokerEndPoint
 import kafka.common.TopicAndPartition
-import kafka.utils._
 import kafka.consumer.{ConsumerConfig, SimpleConsumer, Whitelist}
+import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.utils._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
+
+
 /**
  *  For verifying the consistency among replicas.
  *
@@ -149,15 +150,15 @@ object ReplicaVerificationTool extends Logging {
     debug("Selected topic partitions: " + topicPartitionReplicaList)
     val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = 
topicPartitionReplicaList.groupBy(_.replicaId)
       .map { case (brokerId, partitions) =>
-               brokerId -> partitions.map { case partition => new 
TopicAndPartition(partition.topic, partition.partitionId) } }
+               brokerId -> partitions.map { partition => 
TopicAndPartition(partition.topic, partition.partitionId) } }
     debug("Topic partitions per broker: " + topicAndPartitionsPerBroker)
     val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] =
-          topicPartitionReplicaList.groupBy(replica => new 
TopicAndPartition(replica.topic, replica.partitionId))
+          topicPartitionReplicaList.groupBy(replica => 
TopicAndPartition(replica.topic, replica.partitionId))
           .map { case (topicAndPartition, replicaSet) => topicAndPartition -> 
replicaSet.size }
     debug("Expected replicas per topic partition: " + 
expectedReplicasPerTopicAndPartition)
     val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = 
filteredTopicMetadata.flatMap { topicMetadataResponse =>
       topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
-        (new TopicAndPartition(topicMetadataResponse.topic, 
partitionMetadata.partitionId), partitionMetadata.leader.get.id)
+        (TopicAndPartition(topicMetadataResponse.topic, 
partitionMetadata.partitionId), partitionMetadata.leader.get.id)
       }
     }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => 
topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
        topicAndPartition
@@ -200,8 +201,6 @@ object ReplicaVerificationTool extends Logging {
 
 private case class TopicPartitionReplica(topic: String,  partitionId: Int,  
replicaId: Int)
 
-private case class ReplicaAndMessageIterator(replicaId: Int, iterator: 
Iterator[MessageAndOffset])
-
 private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, 
checksum: Long)
 
 private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: 
Map[TopicAndPartition, Int],
@@ -276,41 +275,42 @@ private class 
ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       assert(fetchResponsePerReplica.size == 
expectedReplicasPerTopicAndPartition(topicAndPartition),
             "fetched " + fetchResponsePerReplica.size + " replicas for " + 
topicAndPartition + ", but expected "
             + expectedReplicasPerTopicAndPartition(topicAndPartition) + " 
replicas")
-      val messageIteratorMap = fetchResponsePerReplica.map {
+      val logEntryIteratorMap = fetchResponsePerReplica.map {
         case(replicaId, fetchResponse) =>
-          replicaId -> 
fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator}
+          replicaId -> 
fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowIterator.asScala
+      }
       val maxHw = fetchResponsePerReplica.values.map(_.hw).max
 
       // Iterate one message at a time from every replica, until high 
watermark is reached.
       var isMessageInAllReplicas = true
       while (isMessageInAllReplicas) {
         var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
-        for ( (replicaId, messageIterator) <- messageIteratorMap) {
+        for ( (replicaId, logEntries) <- logEntryIteratorMap) {
           try {
-            if (messageIterator.hasNext) {
-              val messageAndOffset = messageIterator.next()
+            if (logEntries.hasNext) {
+              val logEntry = logEntries.next()
 
               // only verify up to the high watermark
-              if (messageAndOffset.offset >= 
fetchResponsePerReplica.get(replicaId).hw)
+              if (logEntry.offset >= fetchResponsePerReplica.get(replicaId).hw)
                 isMessageInAllReplicas = false
               else {
                 messageInfoFromFirstReplicaOpt match {
                   case None =>
                     messageInfoFromFirstReplicaOpt = Some(
-                      MessageInfo(replicaId, 
messageAndOffset.offset,messageAndOffset.nextOffset, 
messageAndOffset.message.checksum))
+                      MessageInfo(replicaId, 
logEntry.offset,logEntry.nextOffset, logEntry.record.checksum))
                   case Some(messageInfoFromFirstReplica) =>
-                    if (messageInfoFromFirstReplica.offset != 
messageAndOffset.offset) {
+                    if (messageInfoFromFirstReplica.offset != logEntry.offset) 
{
                       println(ReplicaVerificationTool.getCurrentTimeString + 
": partition " + topicAndPartition
                         + ": replica " + messageInfoFromFirstReplica.replicaId 
+ "'s offset "
                         + messageInfoFromFirstReplica.offset + " doesn't match 
replica "
-                        + replicaId + "'s offset " + messageAndOffset.offset)
+                        + replicaId + "'s offset " + logEntry.offset)
                       System.exit(1)
                     }
-                    if (messageInfoFromFirstReplica.checksum != 
messageAndOffset.message.checksum)
+                    if (messageInfoFromFirstReplica.checksum != 
logEntry.record.checksum)
                       println(ReplicaVerificationTool.getCurrentTimeString + 
": partition "
-                        + topicAndPartition + " has unmatched checksum at 
offset " + messageAndOffset.offset + "; replica "
+                        + topicAndPartition + " has unmatched checksum at 
offset " + logEntry.offset + "; replica "
                         + messageInfoFromFirstReplica.replicaId + "'s checksum 
" + messageInfoFromFirstReplica.checksum
-                        + "; replica " + replicaId + "'s checksum " + 
messageAndOffset.message.checksum)
+                        + "; replica " + replicaId + "'s checksum " + 
logEntry.record.checksum)
                 }
               }
             } else

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index f36e146..51e987a 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -50,11 +50,13 @@ class GroupCoordinatorIntegrationTest extends 
KafkaServerTestHarness {
     def getGroupMetadataLogOpt: Option[Log] =
       logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0))
 
-    TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)),
+    TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowIterator.asScala.nonEmpty)),
                             "Commit message not appended in time")
 
     val logSegments = getGroupMetadataLogOpt.get.logSegments
-    val incorrectCompressionCodecs = 
logSegments.flatMap(_.log.map(_.message.compressionCodec)).filter(_ != 
offsetsTopicCompressionCodec)
+    val incorrectCompressionCodecs = logSegments
+      .flatMap(_.log.shallowIterator.asScala.map(_.record.compressionType.id))
+      .filter(_ != offsetsTopicCompressionCodec.codec)
     assertEquals("Incorrect compression codecs should be empty", Seq.empty, 
incorrectCompressionCodecs)
 
     consumer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala 
b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 51f02d1..ecf7408 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -21,12 +21,15 @@ import joptsimple.OptionParser
 import java.util.Properties
 import java.util.Random
 import java.io._
+
 import kafka.consumer._
 import kafka.serializer._
 import kafka.utils._
-import kafka.log.FileMessageSet
 import kafka.log.Log
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, 
ProducerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.record.FileRecords
+
+import scala.collection.JavaConverters._
 
 /**
  * This is a torture test that runs against an existing broker. Here is how it 
works:
@@ -135,15 +138,15 @@ object TestLogCleaning {
   
   def dumpLog(dir: File) {
     require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
-    for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
-      val ms = new FileMessageSet(new File(dir, file))
-      for(entry <- ms) {
-        val key = TestUtils.readString(entry.message.key)
+    for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
+      val fileRecords = FileRecords.open(new File(dir, file))
+      for (entry <- fileRecords.shallowIterator.asScala) {
+        val key = TestUtils.readString(entry.record.key)
         val content = 
-          if(entry.message.isNull)
+          if(entry.record.hasNullValue)
             null
           else
-            TestUtils.readString(entry.message.payload)
+            TestUtils.readString(entry.record.value)
         println("offset = %s, key = %s, content = %s".format(entry.offset, 
key, content))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala 
b/core/src/test/scala/other/kafka/StressTestLog.scala
index f5cee0c..3381fb7 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -20,10 +20,10 @@ package kafka
 import java.util.Properties
 import java.util.concurrent.atomic._
 
-import kafka.message._
 import kafka.log._
 import kafka.utils._
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
+import org.apache.kafka.common.record.FileRecords
 import org.apache.kafka.common.utils.Utils
 
 /**
@@ -36,13 +36,13 @@ object StressTestLog {
   def main(args: Array[String]) {
     val dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir())
     val time = new MockTime
-    val logProprties = new Properties()
-    logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: 
java.lang.Integer)
-    logProprties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: 
java.lang.Integer)
-    logProprties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: 
java.lang.Integer)
+    val logProperties = new Properties()
+    logProperties.put(LogConfig.SegmentBytesProp, 64*1024*1024: 
java.lang.Integer)
+    logProperties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: 
java.lang.Integer)
+    logProperties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: 
java.lang.Integer)
 
     val log = new Log(dir = dir,
-                      config = LogConfig(logProprties),
+                      config = LogConfig(logProperties),
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)
@@ -84,7 +84,7 @@ object StressTestLog {
   class WriterThread(val log: Log) extends WorkerThread {
     @volatile var offset = 0
     override def work() {
-      val logAppendInfo = 
log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
+      val logAppendInfo = 
log.append(TestUtils.singletonRecords(offset.toString.getBytes))
       require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset 
== offset)
       offset += 1
       if(offset % 1000 == 0)
@@ -96,11 +96,11 @@ object StressTestLog {
     @volatile var offset = 0
     override def work() {
       try {
-        log.read(offset, 1024, Some(offset+1)).messageSet match {
-          case read: FileMessageSet if read.sizeInBytes > 0 => {
-            val first = read.head
+        log.read(offset, 1024, Some(offset+1)).records match {
+          case read: FileRecords if read.sizeInBytes > 0 => {
+            val first = read.shallowIterator.next()
             require(first.offset == offset, "We should either read nothing or 
the message we asked for.")
-            require(MessageSet.entrySize(first.message) == read.sizeInBytes, 
"Expected %d but got %d.".format(MessageSet.entrySize(first.message), 
read.sizeInBytes))
+            require(first.sizeInBytes == read.sizeInBytes, "Expected %d but 
got %d.".format(first.sizeInBytes, read.sizeInBytes))
             offset += 1
           }
           case _ =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 6fef2b3..f0883ad 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -22,13 +22,14 @@ import java.nio._
 import java.nio.channels._
 import java.util.{Properties, Random}
 
+import joptsimple._
 import kafka.log._
-import kafka.utils._
 import kafka.message._
+import kafka.utils._
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.math._
-import joptsimple._
-import org.apache.kafka.common.utils.{Time, Utils}
 
 /**
  * This test does linear writes using either a kafka log or a file and 
measures throughput and latency.
@@ -64,7 +65,7 @@ object TestLinearWriteSpeed {
                            .withRequiredArg
                            .describedAs("ms")
                            .ofType(classOf[java.lang.Long])
-                           .defaultsTo(1000)
+                           .defaultsTo(1000L)
    val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum 
throughput.")
                            .withRequiredArg
                            .describedAs("mb")
@@ -81,7 +82,7 @@ object TestLinearWriteSpeed {
                             .ofType(classOf[java.lang.String])
                             .defaultsTo(NoCompressionCodec.name)
    val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.")
-   val channelOpt = parser.accepts("channel", "Do writes to file channesl.")
+   val channelOpt = parser.accepts("channel", "Do writes to file channels.")
    val logOpt = parser.accepts("log", "Do writes to kafka logs.")
                           
     val options = parser.parse(args : _*)
@@ -101,9 +102,9 @@ object TestLinearWriteSpeed {
     val rand = new Random
     rand.nextBytes(buffer.array)
     val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
-    val messageSet = new ByteBufferMessageSet(compressionCodec = 
compressionCodec,
-      messages = (0 until numMessages).map(_ => new Message(new 
Array[Byte](messageSize))): _*)
-    
+    val messageSet = 
MemoryRecords.withRecords(CompressionType.forId(compressionCodec.codec),
+      (0 until numMessages).map(_ => Record.create(new 
Array[Byte](messageSize))): _*)
+
     val writables = new Array[Writable](numFiles)
     val scheduler = new KafkaScheduler(1)
     scheduler.startup()
@@ -199,7 +200,7 @@ object TestLinearWriteSpeed {
     }
   }
   
-  class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, 
val messages: ByteBufferMessageSet) extends Writable {
+  class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, 
val messages: MemoryRecords) extends Writable {
     Utils.delete(dir)
     val log = new Log(dir, config, 0L, scheduler, Time.SYSTEM)
     def write(): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index d1fcbc0..b98822d 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -301,7 +301,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     var counter = 0
     for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
-      log.append(TestUtils.singleMessageSet(payload = 
counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
+      log.append(TestUtils.singletonRecords(value = counter.toString.getBytes, 
key = key.toString.getBytes), assignOffsets = true)
       counter += 1
       (key, count)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index a981e68..1c5a526 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -18,20 +18,20 @@
 package kafka.coordinator
 
 import kafka.utils.timer.MockTimer
-import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
 import org.junit.Assert._
 import kafka.common.{OffsetAndMetadata, Topic}
-import kafka.message.{Message, MessageSet}
-import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig}
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.easymock.{Capture, IAnswer, EasyMock}
+import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
 import java.util.concurrent.TimeUnit
+
 import scala.collection._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise}
@@ -305,7 +305,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andReturn(None)
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
+      .andReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.CREATE_TIME)).anyTimes()
     EasyMock.replay(replicaManager)
 
     timer.advanceClock(DefaultSessionTimeout + 100)
@@ -988,17 +989,18 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
 
-    EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.anyBoolean(),
-      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) 
->
           new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
+      .andReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.CREATE_TIME)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleSyncGroup(groupId, generation, leaderId, 
assignment, responseCallback)
@@ -1069,17 +1071,18 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
 
-    EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.anyBoolean(),
-      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) 
->
           new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
+      .andReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.CREATE_TIME)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, 
offsets, responseCallback)
@@ -1090,7 +1093,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
     EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andReturn(None)
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
+      .andReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.CREATE_TIME)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 6c03476..62b7f42 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -21,12 +21,11 @@ import kafka.api.ApiVersion
 import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, Topic}
 import kafka.log.LogAppendInfo
-import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.server.{KafkaConfig, ReplicaManager}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
@@ -34,6 +33,7 @@ import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
 import scala.collection._
+import JavaConverters._
 
 class GroupMetadataManagerTest {
 
@@ -50,7 +50,6 @@ class GroupMetadataManagerTest {
   val rebalanceTimeout = 60000
   val sessionTimeout = 10000
 
-
   @Before
   def setUp() {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 
0, zkConnect = ""))
@@ -176,7 +175,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testStoreNonEmptyGroupWhenCoordinatorHasMoved() {
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None)
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None)
     val memberId = "memberId"
     val clientId = "clientId"
     val clientHost = "localhost"
@@ -245,7 +244,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testCommitOffsetWhenCoordinatorHasMoved() {
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(None)
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None)
     val memberId = ""
     val generationId = -1
     val topicPartition = new TopicPartition("foo", 0)
@@ -363,7 +362,7 @@ class GroupMetadataManagerTest {
     time.sleep(2)
 
     EasyMock.reset(partition)
-    
EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]),
 EasyMock.anyInt()))
+    
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
 EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -391,24 +390,74 @@ class GroupMetadataManagerTest {
 
     // expect the group metadata tombstone
     EasyMock.reset(partition)
-    val messageSetCapture: Capture[ByteBufferMessageSet] = 
EasyMock.newCapture()
+    val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
+
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
+      .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
+    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andStubReturn(Some(partition))
+    
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
 EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(replicaManager, partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    assertTrue(recordsCapture.hasCaptured)
+
+    val records = recordsCapture.getValue.records.asScala.toList
+    assertEquals(1, records.size)
+
+    val metadataTombstone = records.head
+    assertTrue(metadataTombstone.hasKey)
+    assertTrue(metadataTombstone.hasNullValue)
+    assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
+    assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType)
+    assertTrue(metadataTombstone.timestamp > 0)
+
+    val groupKey = 
GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
+    assertEquals(groupId, groupKey.key)
+
+    // the full group should be gone since all offsets were removed
+    assertEquals(None, groupMetadataManager.getGroup(groupId))
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
Seq(topicPartition1, topicPartition2))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition2).map(_.offset))
+  }
+
+  @Test
+  def testGroupMetadataRemovalWithLogAppendTime() {
+    val topicPartition1 = new TopicPartition("foo", 0)
+    val topicPartition2 = new TopicPartition("foo", 1)
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+    group.generationId = 5
+
+    // expect the group metadata tombstone
+    EasyMock.reset(partition)
+    val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1))
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
+      .andStubReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.LOG_APPEND_TIME))
     EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andStubReturn(Some(partition))
-    
EasyMock.expect(partition.appendMessagesToLeader(EasyMock.capture(messageSetCapture),
 EasyMock.anyInt()))
+    
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
 EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
     groupMetadataManager.cleanupGroupMetadata()
 
-    assertTrue(messageSetCapture.hasCaptured)
+    assertTrue(recordsCapture.hasCaptured)
 
-    val messageSet = messageSetCapture.getValue
-    assertEquals(1, messageSet.size)
+    val records = recordsCapture.getValue.records.asScala.toList
+    assertEquals(1, records.size)
 
-    val metadataTombstone = messageSet.head.message
+    val metadataTombstone = records.head
     assertTrue(metadataTombstone.hasKey)
-    assertTrue(metadataTombstone.isNull)
+    assertTrue(metadataTombstone.hasNullValue)
+    assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
+    assertEquals(TimestampType.LOG_APPEND_TIME, 
metadataTombstone.timestampType)
+    assertTrue(metadataTombstone.timestamp > 0)
 
     val groupKey = 
GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
     assertEquals(groupId, groupKey.key)
@@ -463,22 +512,22 @@ class GroupMetadataManagerTest {
 
     // expect the offset tombstone
     EasyMock.reset(partition)
-    val messageSetCapture: Capture[ByteBufferMessageSet] = 
EasyMock.newCapture()
+    val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
-    
EasyMock.expect(partition.appendMessagesToLeader(EasyMock.capture(messageSetCapture),
 EasyMock.anyInt()))
+    
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
 EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
 
-    assertTrue(messageSetCapture.hasCaptured)
+    assertTrue(recordsCapture.hasCaptured)
 
     // verify the tombstones are correct and only for the expired offsets
-    val messageSet = messageSetCapture.getValue
-    assertEquals(2, messageSet.size)
-    messageSet.map(_.message).foreach { message =>
+    val records = recordsCapture.getValue.records.asScala.toList
+    assertEquals(2, records.size)
+    records.foreach { message =>
       assertTrue(message.hasKey)
-      assertTrue(message.isNull)
+      assertTrue(message.hasNullValue)
       val offsetKey = 
GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey]
       assertEquals(groupId, offsetKey.key.group)
       assertEquals("foo", offsetKey.key.topicPartition.topic)
@@ -539,7 +588,7 @@ class GroupMetadataManagerTest {
 
     // expect the offset tombstone
     EasyMock.reset(partition)
-    
EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]),
 EasyMock.anyInt()))
+    
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
 EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -557,17 +606,18 @@ class GroupMetadataManagerTest {
 
   private def expectAppendMessage(error: Errors) {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
-    EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.anyBoolean(),
-      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) 
->
           new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1))
+    
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
+      .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala 
b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 791bdb0..296dc15 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -25,9 +25,10 @@ import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
 import org.apache.kafka.common.utils.Utils
 import java.util.{Collection, Properties}
+
 import scala.collection.JavaConverters._
 
 @RunWith(value = classOf[Parameterized])
@@ -50,22 +51,22 @@ class BrokerCompressionTest(messageCompression: String, 
brokerCompression: Strin
   def testBrokerSideCompression() {
     val messageCompressionCode = 
CompressionCodec.getCompressionCodec(messageCompression)
     val logProps = new Properties()
-    logProps.put(LogConfig.CompressionTypeProp,brokerCompression)
+    logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, 
time.scheduler, time = time)
 
     /* append two messages */
-    log.append(new ByteBufferMessageSet(messageCompressionCode, new 
Message("hello".getBytes), new Message("there".getBytes)))
+    
log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
+      Record.create("hello".getBytes), Record.create("there".getBytes)))
 
-    def readMessage(offset: Int) = log.read(offset, 
4096).messageSet.head.message
+    def readMessage(offset: Int) = log.read(offset, 
4096).records.shallowIterator.next().record
 
     if (!brokerCompression.equals("producer")) {
       val brokerCompressionCode = 
BrokerCompressionCodec.getCompressionCodec(brokerCompression)
-      assertEquals("Compression at offset 0 should produce " + 
brokerCompressionCode.name, brokerCompressionCode, 
readMessage(0).compressionCodec)
+      assertEquals("Compression at offset 0 should produce " + 
brokerCompressionCode.name, brokerCompressionCode.codec, 
readMessage(0).compressionType.id)
     }
     else
-      assertEquals("Compression at offset 0 should produce " + 
messageCompressionCode.name, messageCompressionCode, 
readMessage(0).compressionCodec)
-
+      assertEquals("Compression at offset 0 should produce " + 
messageCompressionCode.name, messageCompressionCode.codec, 
readMessage(0).compressionType.id)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala 
b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
deleted file mode 100644
index a7f0446..0000000
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ /dev/null
@@ -1,354 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io._
-import java.nio._
-import java.nio.channels._
-
-import kafka.common.LongRef
-import org.junit.Assert._
-import kafka.utils.TestUtils._
-import kafka.message._
-import kafka.common.KafkaException
-import org.easymock.EasyMock
-import org.junit.Test
-
-class FileMessageSetTest extends BaseMessageSetTestCases {
-  
-  val messageSet = createMessageSet(messages)
-  
-  def createMessageSet(messages: Seq[Message]): FileMessageSet = {
-    val set = new FileMessageSet(tempFile())
-    set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
-    set.flush()
-    set
-  }
-
-  /**
-   * Test that the cached size variable matches the actual file size as we 
append messages
-   */
-  @Test
-  def testFileSize() {
-    assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
-    for (_ <- 0 until 20) {
-      messageSet.append(singleMessageSet("abcd".getBytes))
-      assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
-    } 
-  }
-  
-  /**
-   * Test that adding invalid bytes to the end of the log doesn't break 
iteration
-   */
-  @Test
-  def testIterationOverPartialAndTruncation() {
-    testPartialWrite(0, messageSet)
-    testPartialWrite(2, messageSet)
-    testPartialWrite(4, messageSet)
-    testPartialWrite(5, messageSet)
-    testPartialWrite(6, messageSet)
-  }
-  
-  def testPartialWrite(size: Int, messageSet: FileMessageSet) {
-    val buffer = ByteBuffer.allocate(size)
-    for (_ <- 0 until size)
-      buffer.put(0: Byte)
-    buffer.rewind()
-    messageSet.channel.write(buffer)
-    // appending those bytes should not change the contents
-    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
-  }
-  
-  /**
-   * Iterating over the file does file reads but shouldn't change the position 
of the underlying FileChannel.
-   */
-  @Test
-  def testIterationDoesntChangePosition() {
-    val position = messageSet.channel.position
-    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
-    assertEquals(position, messageSet.channel.position)
-  }
-  
-  /**
-   * Test a simple append and read.
-   */
-  @Test
-  def testRead() {
-    var read = messageSet.read(0, messageSet.sizeInBytes)
-    checkEquals(messageSet.iterator, read.iterator)
-    val items = read.iterator.toList
-    val sec = items.tail.head
-    read = messageSet.read(position = MessageSet.entrySize(sec.message), size 
= messageSet.sizeInBytes)
-    assertEquals("Try a read starting from the second message", items.tail, 
read.toList)
-    read = messageSet.read(MessageSet.entrySize(sec.message), 
MessageSet.entrySize(sec.message))
-    assertEquals("Try a read of a single message starting from the second 
message", List(items.tail.head), read.toList)
-  }
-  
-  /**
-   * Test the MessageSet.searchFor API.
-   */
-  @Test
-  def testSearch() {
-    // append a new message with a high offset
-    val lastMessage = new Message("test".getBytes)
-    messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new 
LongRef(50), lastMessage))
-    val messages = messageSet.toSeq
-    var position = 0
-    val message1Size = MessageSet.entrySize(messages.head.message)
-    assertEquals("Should be able to find the first message by its offset",
-                 (OffsetPosition(0L, position), message1Size),
-                 messageSet.searchForOffsetWithSize(0, 0))
-    position += message1Size
-    val message2Size = MessageSet.entrySize(messages(1).message)
-    assertEquals("Should be able to find second message when starting from 0", 
-                 (OffsetPosition(1L, position), message2Size),
-                 messageSet.searchForOffsetWithSize(1, 0))
-    assertEquals("Should be able to find second message starting from its 
offset", 
-                 (OffsetPosition(1L, position), message2Size),
-                 messageSet.searchForOffsetWithSize(1, position))
-    position += message2Size + MessageSet.entrySize(messages(2).message)
-    val message4Size = MessageSet.entrySize(messages(3).message)
-    assertEquals("Should be able to find fourth message from a non-existant 
offset", 
-                 (OffsetPosition(50L, position), message4Size),
-                 messageSet.searchForOffsetWithSize(3, position))
-    assertEquals("Should be able to find fourth message by correct offset",
-                 (OffsetPosition(50L, position), message4Size),
-                 messageSet.searchForOffsetWithSize(50,  position))
-  }
-  
-  /**
-   * Test that the message set iterator obeys start and end slicing
-   */
-  @Test
-  def testIteratorWithLimits() {
-    val message = messageSet.toList(1)
-    val start = messageSet.searchForOffsetWithSize(1, 0)._1.position
-    val size = message.message.size + 12
-    val slice = messageSet.read(start, size)
-    assertEquals(List(message), slice.toList)
-    val slice2 = messageSet.read(start, size - 1)
-    assertEquals(List(), slice2.toList)
-  }
-
-  /**
-   * Test the truncateTo method lops off messages and appropriately updates 
the size
-   */
-  @Test
-  def testTruncate() {
-    val message = messageSet.toList.head
-    val end = messageSet.searchForOffsetWithSize(1, 0)._1.position
-    messageSet.truncateTo(end)
-    assertEquals(List(message), messageSet.toList)
-    assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
-  }
-
-  /**
-    * Test that truncateTo only calls truncate on the FileChannel if the size 
of the
-    * FileChannel is bigger than the target size. This is important because 
some JVMs
-    * change the mtime of the file, even if truncate should do nothing.
-    */
-  @Test
-  def testTruncateNotCalledIfSizeIsSameAsTargetSize() {
-    val channelMock = EasyMock.createMock(classOf[FileChannel])
-
-    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
-    EasyMock.expect(channelMock.position(42L)).andReturn(null)
-    EasyMock.replay(channelMock)
-
-    val msgSet = new FileMessageSet(tempFile(), channelMock)
-    msgSet.truncateTo(42)
-
-    EasyMock.verify(channelMock)
-  }
-
-  /**
-    * Expect a KafkaException if targetSize is bigger than the size of
-    * the FileMessageSet.
-    */
-  @Test
-  def testTruncateNotCalledIfSizeIsBiggerThanTargetSize() {
-    val channelMock = EasyMock.createMock(classOf[FileChannel])
-
-    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
-    EasyMock.expect(channelMock.position(42L)).andReturn(null)
-    EasyMock.replay(channelMock)
-
-    val msgSet = new FileMessageSet(tempFile(), channelMock)
-
-    try {
-      msgSet.truncateTo(43)
-      fail("Should throw KafkaException")
-    } catch {
-      case _: KafkaException => // expected
-    }
-
-    EasyMock.verify(channelMock)
-  }
-
-  /**
-    * see #testTruncateNotCalledIfSizeIsSameAsTargetSize
-    */
-  @Test
-  def testTruncateIfSizeIsDifferentToTargetSize() {
-    val channelMock = EasyMock.createMock(classOf[FileChannel])
-
-    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
-    EasyMock.expect(channelMock.position(42L)).andReturn(null).once()
-    EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once()
-    EasyMock.expect(channelMock.position(23L)).andReturn(null).once()
-    EasyMock.replay(channelMock)
-
-    val msgSet = new FileMessageSet(tempFile(), channelMock)
-    msgSet.truncateTo(23)
-
-    EasyMock.verify(channelMock)
-  }
-
-
-  /**
-   * Test the new FileMessageSet with pre allocate as true
-   */
-  @Test
-  def testPreallocateTrue() {
-    val temp = tempFile()
-    val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
-    val position = set.channel.position
-    val size = set.sizeInBytes()
-    assertEquals(0, position)
-    assertEquals(0, size)
-    assertEquals(512 *1024 *1024, temp.length)
-  }
-
-  /**
-   * Test the new FileMessageSet with pre allocate as false
-   */
-  @Test
-  def testPreallocateFalse() {
-    val temp = tempFile()
-    val set = new FileMessageSet(temp, false, 512 *1024 *1024, false)
-    val position = set.channel.position
-    val size = set.sizeInBytes()
-    assertEquals(0, position)
-    assertEquals(0, size)
-    assertEquals(0, temp.length)
-  }
-
-  /**
-   * Test the new FileMessageSet with pre allocate as true and file has been 
clearly shut down, the file will be truncate to end of valid data.
-   */
-  @Test
-  def testPreallocateClearShutdown() {
-    val temp = tempFile()
-    val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
-    set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
-    val oldposition = set.channel.position
-    val oldsize = set.sizeInBytes()
-    assertEquals(messageSet.sizeInBytes, oldposition)
-    assertEquals(messageSet.sizeInBytes, oldsize)
-    set.close()
-
-    val tempReopen = new File(temp.getAbsolutePath())
-    val setReopen = new FileMessageSet(tempReopen, true, 512 *1024 *1024, true)
-    val position = setReopen.channel.position
-    val size = setReopen.sizeInBytes()
-
-    assertEquals(oldposition, position)
-    assertEquals(oldposition, size)
-    assertEquals(oldposition, tempReopen.length)
-  }
-
-  @Test
-  def testFormatConversionWithPartialMessage() {
-    val message = messageSet.toList(1)
-    val start = messageSet.searchForOffsetWithSize(1, 0)._1.position
-    val size = message.message.size + 12
-    val slice = messageSet.read(start, size - 1)
-    val messageV0 = slice.toMessageFormat(Message.MagicValue_V0)
-    assertEquals("No message should be there", 0, messageV0.size)
-    assertEquals(s"There should be ${size - 1} bytes", size - 1, 
messageV0.sizeInBytes)
-  }
-
-  @Test
-  def testMessageFormatConversion() {
-
-    // Prepare messages.
-    val offsets = Seq(0L, 2L)
-    val messagesV0 = Seq(new Message("hello".getBytes, "k1".getBytes, 
Message.NoTimestamp, Message.MagicValue_V0),
-      new Message("goodbye".getBytes, "k2".getBytes, Message.NoTimestamp, 
Message.MagicValue_V0))
-    val messageSetV0 = new ByteBufferMessageSet(
-      compressionCodec = NoCompressionCodec,
-      offsetSeq = offsets,
-      messages = messagesV0:_*)
-    val compressedMessageSetV0 = new ByteBufferMessageSet(
-      compressionCodec = DefaultCompressionCodec,
-      offsetSeq = offsets,
-      messages = messagesV0:_*)
-
-    val messagesV1 = Seq(new Message("hello".getBytes, "k1".getBytes, 1L, 
Message.MagicValue_V1),
-                         new Message("goodbye".getBytes, "k2".getBytes, 2L, 
Message.MagicValue_V1))
-    val messageSetV1 = new ByteBufferMessageSet(
-      compressionCodec = NoCompressionCodec,
-      offsetSeq = offsets,
-      messages = messagesV1:_*)
-    val compressedMessageSetV1 = new ByteBufferMessageSet(
-      compressionCodec = DefaultCompressionCodec,
-      offsetSeq = offsets,
-      messages = messagesV1:_*)
-
-    // Down conversion
-    // down conversion for non-compressed messages
-    var fileMessageSet = new FileMessageSet(tempFile())
-    fileMessageSet.append(messageSetV1)
-    fileMessageSet.flush()
-    var convertedMessageSet = 
fileMessageSet.toMessageFormat(Message.MagicValue_V0)
-    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0)
-
-    // down conversion for compressed messages
-    fileMessageSet = new FileMessageSet(tempFile())
-    fileMessageSet.append(compressedMessageSetV1)
-    fileMessageSet.flush()
-    convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V0)
-    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0)
-
-    // Up conversion. In reality we only do down conversion, but up conversion 
should work as well.
-    // up conversion for non-compressed messages
-    fileMessageSet = new FileMessageSet(tempFile())
-    fileMessageSet.append(messageSetV0)
-    fileMessageSet.flush()
-    convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1)
-    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1)
-
-    // up conversion for compressed messages
-    fileMessageSet = new FileMessageSet(tempFile())
-    fileMessageSet.append(compressedMessageSetV0)
-    fileMessageSet.flush()
-    convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1)
-    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1)
-
-    def verifyConvertedMessageSet(convertedMessageSet: MessageSet, magicByte: 
Byte) {
-      var i = 0
-      for (messageAndOffset <- convertedMessageSet) {
-        assertEquals("magic byte should be 1", magicByte, 
messageAndOffset.message.magic)
-        assertEquals("offset should not change", offsets(i), 
messageAndOffset.offset)
-        assertEquals("key should not change", messagesV0(i).key, 
messageAndOffset.message.key)
-        assertEquals("payload should not change", messagesV0(i).payload, 
messageAndOffset.message.payload)
-        i += 1
-      }
-    }
-  }
-}

Reply via email to