http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 250c8b8..65c2d05 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -22,10 +22,9 @@ import java.util.Properties import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0} import kafka.common.TopicAndPartition -import kafka.message._ import kafka.server.OffsetCheckpoint import kafka.utils._ -import org.apache.kafka.common.record.CompressionType +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record} import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit._ @@ -43,7 +42,7 @@ import scala.util.Random @RunWith(value = classOf[Parameterized]) class LogCleanerIntegrationTest(compressionCodec: String) { - val codec = CompressionCodec.getCompressionCodec(compressionCodec) + val codec = CompressionType.forName(compressionCodec) val time = new MockTime() val segmentSize = 256 val deleteDelay = 1000 @@ -56,7 +55,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { @Test def cleanerTest() { val largeMessageKey = 20 - val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V1) + val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V1) val maxMessageSize = largeMessageSet.sizeInBytes cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize) @@ -133,13 +132,13 @@ class LogCleanerIntegrationTest(compressionCodec: String) { } // returns (value, ByteBufferMessageSet) - private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, ByteBufferMessageSet) = { + private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = { def messageValue(length: Int): String = { val random = new Random(0) new String(random.alphanumeric.take(length).toArray) } val value = messageValue(128) - val messageSet = TestUtils.singleMessageSet(payload = value.getBytes, codec = codec, key = key.toString.getBytes, + val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes, magicValue = messageFormatVersion) (value, messageSet) } @@ -147,9 +146,9 @@ class LogCleanerIntegrationTest(compressionCodec: String) { @Test def testCleanerWithMessageFormatV0(): Unit = { val largeMessageKey = 20 - val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V0) + val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V0) val maxMessageSize = codec match { - case NoCompressionCodec => largeMessageSet.sizeInBytes + case CompressionType.NONE => largeMessageSet.sizeInBytes case _ => // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to // increase because the broker offsets are larger than the ones assigned by the client @@ -165,7 +164,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version) log.config = new LogConfig(props) - val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0) + val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0) val startSize = log.size cleaner.startup() @@ -177,14 +176,14 @@ class LogCleanerIntegrationTest(compressionCodec: String) { checkLogAfterAppendingDups(log, startSize, appends) val appends2: Seq[(Int, String, Long)] = { - val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0) + val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0) val appendInfo = log.append(largeMessageSet, assignOffsets = true) val largeMessageOffset = appendInfo.firstOffset // also add some messages with version 1 to check that we handle mixed format versions correctly props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version) log.config = new LogConfig(props) - val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V1) + val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1) appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 } val firstDirty2 = log.activeSegment.baseOffset @@ -205,15 +204,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) { // with compression enabled, these messages will be written as a single message containing // all of the individual messages - var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0) - appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V0) + var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0) + appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0) props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version) log.config = new LogConfig(props) - var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1) - appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1) - appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1) + var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1) val appends = appendsV0 ++ appendsV1 @@ -250,32 +249,27 @@ class LogCleanerIntegrationTest(compressionCodec: String) { } private def readFromLog(log: Log): Iterable[(Int, String, Long)] = { - - def messageIterator(entry: MessageAndOffset): Iterator[MessageAndOffset] = - // create single message iterator or deep iterator depending on compression codec - if (entry.message.compressionCodec == NoCompressionCodec) Iterator(entry) - else ByteBufferMessageSet.deepIterator(entry) - - for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- messageIterator(entry)) yield { - val key = TestUtils.readString(messageAndOffset.message.key).toInt - val value = TestUtils.readString(messageAndOffset.message.payload) - (key, value, messageAndOffset.offset) + import JavaConverters._ + for (segment <- log.logSegments; deepLogEntry <- segment.log.deepIterator.asScala) yield { + val key = TestUtils.readString(deepLogEntry.record.key).toInt + val value = TestUtils.readString(deepLogEntry.record.value) + (key, value, deepLogEntry.offset) } } - private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, - startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = { + private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, + startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = { for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { - val payload = counter.toString - val appendInfo = log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec = codec, + val value = counter.toString + val appendInfo = log.append(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec, key = key.toString.getBytes, magicValue = magicValue), assignOffsets = true) counter += 1 - (key, payload, appendInfo.firstOffset) + (key, value, appendInfo.firstOffset) } } - private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, - startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = { + private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, + startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = { val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { val payload = counter.toString counter += 1 @@ -283,11 +277,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) { } val messages = kvs.map { case (key, payload) => - new Message(payload.toString.getBytes, key.toString.getBytes, Message.NoTimestamp, magicValue) + Record.create(magicValue, key.toString.getBytes, payload.toString.getBytes) } - val messageSet = new ByteBufferMessageSet(compressionCodec = codec, messages: _*) - val appendInfo = log.append(messageSet, assignOffsets = true) + val records = MemoryRecords.withRecords(codec, messages: _*) + val appendInfo = log.append(records, assignOffsets = true) val offsets = appendInfo.firstOffset to appendInfo.lastOffset kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala index 5e029fc..abab3bf 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala @@ -21,7 +21,6 @@ import java.io.File import java.util.Properties import kafka.common.TopicAndPartition -import kafka.message._ import kafka.utils._ import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.utils.Utils @@ -33,7 +32,6 @@ import org.junit.runners.Parameterized.Parameters import scala.collection._ - /** * This is an integration test that tests the fully integrated log cleaner */ @@ -52,7 +50,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging val logDir = TestUtils.tempDir() var counter = 0 val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2)) - val compressionCodec = CompressionCodec.getCompressionCodec(compressionCodecName) + val compressionCodec = CompressionType.forName(compressionCodecName) @Test def cleanerTest(): Unit = { @@ -96,7 +94,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize") - val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get + val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition("log", 0)) assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset) assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize", sizeUpToActiveSegmentAtT0 > compactedSize) @@ -106,23 +104,19 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging } private def readFromLog(log: Log): Iterable[(Int, Int)] = { - for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- { - // create single message iterator or deep iterator depending on compression codec - if (entry.message.compressionCodec == NoCompressionCodec) - Stream.cons(entry, Stream.empty).iterator - else - ByteBufferMessageSet.deepIterator(entry) - }) yield { - val key = TestUtils.readString(messageAndOffset.message.key).toInt - val value = TestUtils.readString(messageAndOffset.message.payload).toInt + import JavaConverters._ + + for (segment <- log.logSegments; logEntry <- segment.log.deepIterator.asScala) yield { + val key = TestUtils.readString(logEntry.record.key).toInt + val value = TestUtils.readString(logEntry.record.value).toInt key -> value } } - private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = { + private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long): Seq[(Int, Int)] = { for (_ <- 0 until numDups; key <- 0 until numKeys) yield { val count = counter - log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true) + log.append(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true) counter += 1 (key, count) } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 0cd52d6..5dfa268 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -21,8 +21,8 @@ import java.io.File import java.util.Properties import kafka.common._ -import kafka.message._ import kafka.utils._ +import org.apache.kafka.common.record.{MemoryRecords, Record} import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit.{After, Test} @@ -54,8 +54,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { */ @Test def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs(): Unit = { - val messageSet = TestUtils.singleMessageSet("test".getBytes) - val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Delete) + val records = TestUtils.singletonRecords("test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) val cleanerManager: LogCleanerManager = createCleanerManager(log) val readyToDelete = cleanerManager.deletableLogs().size @@ -67,8 +67,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { */ @Test def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs(): Unit = { - val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes) - val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete) + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete) val cleanerManager: LogCleanerManager = createCleanerManager(log) val readyToDelete = cleanerManager.deletableLogs().size @@ -81,8 +81,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { */ @Test def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): Unit = { - val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes) - val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact) + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) val cleanerManager: LogCleanerManager = createCleanerManager(log) val readyToDelete = cleanerManager.deletableLogs().size @@ -100,7 +100,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) while(log.numberOfSegments < 8) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds)) + log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds)) val topicAndPartition = TopicAndPartition("log", 0) val lastClean = Map(topicAndPartition-> 0L) @@ -123,7 +123,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val t0 = time.milliseconds while(log.numberOfSegments < 4) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0)) + log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0)) val activeSegAtT0 = log.activeSegment @@ -131,7 +131,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val t1 = time.milliseconds while (log.numberOfSegments < 8) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1)) + log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1)) val topicAndPartition = TopicAndPartition("log", 0) val lastClean = Map(topicAndPartition-> 0L) @@ -155,7 +155,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val t0 = time.milliseconds while (log.numberOfSegments < 8) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0)) + log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0)) time.sleep(compactionLag + 1) @@ -192,10 +192,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { private def makeLog(dir: File = logDir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) - private def message(key: Int, value: Int, timestamp: Long) = - new ByteBufferMessageSet(new Message(key = key.toString.getBytes, - bytes = value.toString.getBytes, - timestamp = timestamp, - magicValue = Message.MagicValue_V1)) + private def logEntries(key: Int, value: Int, timestamp: Long) = + MemoryRecords.withRecords(Record.create(timestamp, key.toString.getBytes, value.toString.getBytes)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index d80fba1..a99d4b9 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -17,21 +17,21 @@ package kafka.log -import java.io.{DataOutputStream, File} +import java.io.File import java.nio._ import java.nio.file.Paths import java.util.Properties import kafka.common._ -import kafka.message._ import kafka.utils._ -import org.apache.kafka.common.record.{MemoryRecords, TimestampType} +import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit.{After, Test} import org.scalatest.junit.JUnitSuite import scala.collection._ +import JavaConverters._ /** * Unit tests for the log cleaning logic @@ -66,7 +66,7 @@ class LogCleanerTest extends JUnitSuite { // append messages to the log until we have four segments while(log.numberOfSegments < 4) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) val keysFound = keysInLog(log) assertEquals(0L until log.logEndOffset, keysFound) @@ -100,7 +100,7 @@ class LogCleanerTest extends JUnitSuite { val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) while(log.numberOfSegments < 2) - log.append(message(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte))) + log.append(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte))) val keysFound = keysInLog(log) assertEquals(0L until log.logEndOffset, keysFound) @@ -123,23 +123,23 @@ class LogCleanerTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - + // append messages with the keys 0 through N while(log.numberOfSegments < 2) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) + // delete all even keys between 0 and N val leo = log.logEndOffset for(key <- 0 until leo.toInt by 2) - log.append(deleteMessage(key)) - + log.append(tombstoneRecord(key)) + // append some new unique keys to pad out to a new active segment while(log.numberOfSegments < 4) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) val keys = keysInLog(log).toSet - assertTrue("None of the keys we deleted should still exist.", + assertTrue("None of the keys we deleted should still exist.", (0 until leo.toInt by 2).forall(!keys.contains(_))) } @@ -151,11 +151,11 @@ class LogCleanerTest extends JUnitSuite { val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - log.append(message(0,0)) // offset 0 - log.append(message(1,1)) // offset 1 - log.append(message(0,0)) // offset 2 - log.append(message(1,1)) // offset 3 - log.append(message(0,0)) // offset 4 + log.append(record(0,0)) // offset 0 + log.append(record(1,1)) // offset 1 + log.append(record(0,0)) // offset 2 + log.append(record(1,1)) // offset 3 + log.append(record(0,0)) // offset 4 // roll the segment, so we can clean the messages already appended log.roll() @@ -180,11 +180,11 @@ class LogCleanerTest extends JUnitSuite { val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - log.append(message(0,0)) // offset 0 - log.append(message(1,1)) // offset 1 - log.append(message(0,0)) // offset 2 - log.append(message(1,1)) // offset 3 - log.append(message(0,0)) // offset 4 + log.append(record(0,0)) // offset 0 + log.append(record(1,1)) // offset 1 + log.append(record(0,0)) // offset 2 + log.append(record(1,1)) // offset 3 + log.append(record(0,0)) // offset 4 // roll the segment, so we can clean the messages already appended log.roll() @@ -218,18 +218,18 @@ class LogCleanerTest extends JUnitSuite { // append messages with the keys 0 through N-1, values equal offset while(log.numberOfSegments <= numCleanableSegments) - log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) // at this point one message past the cleanable segments has been added // the entire segment containing the first uncleanable offset should not be cleaned. val firstUncleanableOffset = log.logEndOffset + 1 // +1 so it is past the baseOffset while(log.numberOfSegments < numTotalSegments - 1) - log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) // the last (active) segment has just one message - def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => TestUtils.readString(m.message.payload)).toSet.size).toSeq + def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowIterator.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.", @@ -253,7 +253,7 @@ class LogCleanerTest extends JUnitSuite { val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // create 6 segments with only one message in each segment - val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes) + val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes) for (_ <- 0 until 6) log.append(messageSet, assignOffsets = true) @@ -271,7 +271,7 @@ class LogCleanerTest extends JUnitSuite { val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // create 6 segments with only one message in each segment - val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes) + val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes) for (_ <- 0 until 6) log.append(messageSet, assignOffsets = true) @@ -305,14 +305,14 @@ class LogCleanerTest extends JUnitSuite { // append unkeyed messages while(log.numberOfSegments < 2) - log.append(unkeyedMessage(log.logEndOffset.toInt)) + log.append(unkeyedRecord(log.logEndOffset.toInt)) val numInvalidMessages = unkeyedMessageCountInLog(log) val sizeWithUnkeyedMessages = log.size // append keyed messages while(log.numberOfSegments < 3) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages val (_, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) @@ -321,17 +321,17 @@ class LogCleanerTest extends JUnitSuite { assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size) assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, stats.invalidMessagesRead) } - + /* extract all the keys from a log */ def keysInLog(log: Log): Iterable[Int] = - log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt)) + log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt)) /* extract all the offsets from a log */ def offsetsInLog(log: Log): Iterable[Long] = - log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset)) + log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset)) def unkeyedMessageCountInLog(log: Log) = - log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum + log.logSegments.map(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = { throw new LogCleaningAbortedException() @@ -350,7 +350,7 @@ class LogCleanerTest extends JUnitSuite { // append messages to the log until we have four segments while(log.numberOfSegments < 4) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) val keys = keysInLog(log) val map = new FakeOffsetMap(Int.MaxValue) @@ -371,20 +371,20 @@ class LogCleanerTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - + // append some messages to the log var i = 0 while(log.numberOfSegments < 10) { - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes)) i += 1 } - + // grouping by very large values should result in a single group with all the segments in it var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) assertEquals(1, groups.size) assertEquals(log.numberOfSegments, groups.head.size) checkSegmentOrder(groups) - + // grouping by very small values should result in all groups having one entry groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue) assertEquals(log.numberOfSegments, groups.size) @@ -396,20 +396,20 @@ class LogCleanerTest extends JUnitSuite { checkSegmentOrder(groups) val groupSize = 3 - + // check grouping by log size val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1 groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue) checkSegmentOrder(groups) assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) - + // check grouping by index size val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1 groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize) checkSegmentOrder(groups) assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) } - + /** * Validate the logic for grouping log segments together for cleaning when only a small number of * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not @@ -425,47 +425,45 @@ class LogCleanerTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - + // fill up first segment while (log.numberOfSegments == 1) - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) - + log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes)) + // forward offset and append message to next segment at offset Int.MaxValue - val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(Int.MaxValue - 1), - new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1)) - log.append(messageSet, assignOffsets = false) - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + val records = MemoryRecords.withLogEntries(LogEntry.create(Int.MaxValue - 1, Record.create("hello".getBytes, "hello".getBytes))) + log.append(records, assignOffsets = false) + log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes)) assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset) - + // grouping should result in a single group with maximum relative offset of Int.MaxValue var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) assertEquals(1, groups.size) - + // append another message, making last offset of second segment > Int.MaxValue - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) - + log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes)) + // grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) assertEquals(2, groups.size) checkSegmentOrder(groups) - + // append more messages, creating new segments, further grouping should still occur while (log.numberOfSegments < 4) - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes)) groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) assertEquals(log.numberOfSegments - 1, groups.size) for (group <- groups) assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue) checkSegmentOrder(groups) - } - + private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = { val offsets = groups.flatMap(_.map(_.baseOffset)) assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets) } - + /** * Test building an offset map off the log */ @@ -496,8 +494,7 @@ class LogCleanerTest extends JUnitSuite { checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt) checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt) } - - + /** * Tests recovery if broker crashes at the following stages during the cleaning sequence * <ol> @@ -516,8 +513,8 @@ class LogCleanerTest extends JUnitSuite { logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer) val config = LogConfig.fromProps(logConfig.originals, logProps) - - def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = { + + def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = { // Recover log file and check that after recovery, keys are as expected // and all temporary files have been deleted val recoveredLog = makeLog(config = config) @@ -530,25 +527,25 @@ class LogCleanerTest extends JUnitSuite { assertEquals(expectedKeys, keysInLog(recoveredLog)) recoveredLog } - + // create a log and append some messages var log = makeLog(config = config) var messageCount = 0 while(log.numberOfSegments < 10) { - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) messageCount += 1 } val allKeys = keysInLog(log) - + // pretend we have odd-numbered keys val offsetMap = new FakeOffsetMap(Int.MaxValue) for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) - + // clean the log cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) var cleanedKeys = keysInLog(log) - + // 1) Simulate recovery just after .cleaned file is created, before rename to .swap // On recovery, clean operation is aborted. All messages should be present in the log log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix) @@ -556,44 +553,44 @@ class LogCleanerTest extends JUnitSuite { Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) } log = recoverAndCheck(config, allKeys) - + // clean again cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) cleanedKeys = keysInLog(log) - + // 2) Simulate recovery just after swap file is created, before old segment files are - // renamed to .deleted. Clean operation is resumed during recovery. + // renamed to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) - } + } log = recoverAndCheck(config, cleanedKeys) // add some more messages and clean the log again while(log.numberOfSegments < 10) { - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) messageCount += 1 } for (k <- 1 until messageCount by 2) - offsetMap.put(key(k), Long.MaxValue) + offsetMap.put(key(k), Long.MaxValue) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) cleanedKeys = keysInLog(log) - + // 3) Simulate recovery after swap file is created and old segments files are renamed // to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) log = recoverAndCheck(config, cleanedKeys) - + // add some more messages and clean the log again while(log.numberOfSegments < 10) { - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) messageCount += 1 } for (k <- 1 until messageCount by 2) - offsetMap.put(key(k), Long.MaxValue) + offsetMap.put(key(k), Long.MaxValue) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) cleanedKeys = keysInLog(log) - + // 4) Simulate recovery after swap is complete, but async deletion // is not yet complete. Clean operation is resumed during recovery. recoverAndCheck(config, cleanedKeys) @@ -631,11 +628,11 @@ class LogCleanerTest extends JUnitSuite { val log = makeLog() val cleaner = makeCleaner(2) - log.append(message(0,0)) - log.append(message(1,1)) - log.append(message(2,2)) - log.append(message(3,3)) - log.append(message(4,4)) + log.append(record(0,0)) + log.append(record(1,1)) + log.append(record(2,2)) + log.append(record(3,3)) + log.append(record(4,4)) log.roll() val stats = new CleanerStats() @@ -653,7 +650,7 @@ class LogCleanerTest extends JUnitSuite { */ @Test def testCleanCorruptMessageSet() { - val codec = SnappyCompressionCodec + val codec = CompressionType.GZIP val logProps = new Properties() logProps.put(LogConfig.CompressionTypeProp, codec.name) @@ -682,10 +679,10 @@ class LogCleanerTest extends JUnitSuite { cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) - for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) { - assertEquals(shallowMessage.message.magic, deepMessage.message.magic) - val value = TestUtils.readString(deepMessage.message.payload).toLong - assertEquals(deepMessage.offset, value) + for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowIterator.asScala; deepLogEntry <- shallowLogEntry.asScala) { + assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic) + val value = TestUtils.readString(deepLogEntry.record.value).toLong + assertEquals(deepLogEntry.offset, value) } } @@ -704,7 +701,7 @@ class LogCleanerTest extends JUnitSuite { val corruptedMessage = invalidCleanedMessage(offset, set) val records = MemoryRecords.readableRecords(corruptedMessage.buffer) - for (logEntry <- records.iterator.asScala) { + for (logEntry <- records.deepIterator.asScala) { val offset = logEntry.offset val value = TestUtils.readString(logEntry.record.value).toLong assertEquals(offset, value) @@ -718,94 +715,64 @@ class LogCleanerTest extends JUnitSuite { private def invalidCleanedMessage(initialOffset: Long, keysAndValues: Iterable[(Int, Int)], - codec: CompressionCodec = SnappyCompressionCodec): ByteBufferMessageSet = { + codec: CompressionType = CompressionType.GZIP): MemoryRecords = { // this function replicates the old versions of the cleaner which under some circumstances // would write invalid compressed message sets with the outer magic set to 1 and the inner // magic set to 0 - - val messages = keysAndValues.map(kv => - new Message(key = kv._1.toString.getBytes, - bytes = kv._2.toString.getBytes, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V0)) - - val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) - var lastOffset = initialOffset - - messageWriter.write( - codec = codec, - timestamp = Message.NoTimestamp, - timestampType = TimestampType.CREATE_TIME, - magicValue = Message.MagicValue_V1) { outputStream => - - val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream)) - try { - for (message <- messages) { - val innerOffset = lastOffset - initialOffset - output.writeLong(innerOffset) - output.writeInt(message.size) - output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) - lastOffset += 1 - } - } finally { - output.close() - } + val records = keysAndValues.map(kv => + Record.create(Record.MAGIC_VALUE_V0, + Record.NO_TIMESTAMP, + kv._1.toString.getBytes, + kv._2.toString.getBytes)) + + val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16)) + val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME) + + var offset = initialOffset + records.foreach { record => + builder.appendUnchecked(offset, record) + offset += 1 } - val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) - ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1) - buffer.rewind() - new ByteBufferMessageSet(buffer) + builder.build() } private def messageWithOffset(key: Int, value: Int, offset: Long) = - new ByteBufferMessageSet(NoCompressionCodec, Seq(offset), - new Message(key = key.toString.getBytes, - bytes = value.toString.getBytes, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V1)) - - + MemoryRecords.withLogEntries(LogEntry.create(offset, Record.create(key.toString.getBytes, value.toString.getBytes))) + def makeLog(dir: File = dir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ } def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) = - new Cleaner(id = 0, - offsetMap = new FakeOffsetMap(capacity), + new Cleaner(id = 0, + offsetMap = new FakeOffsetMap(capacity), ioBufferSize = maxMessageSize, maxIoBufferSize = maxMessageSize, dupBufferLoadFactor = 0.75, - throttler = throttler, + throttler = throttler, time = time, checkDone = checkDone ) - + def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { for((key, value) <- seq) - yield log.append(message(key, value)).firstOffset + yield log.append(record(key, value)).firstOffset } - + def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes) - - def message(key: Int, value: Int): ByteBufferMessageSet = - message(key, value.toString.getBytes) - - def message(key: Int, value: Array[Byte]) = - new ByteBufferMessageSet(new Message(key = key.toString.getBytes, - bytes = value, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V1)) - - def unkeyedMessage(value: Int) = - new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes)) - - def deleteMessage(key: Int) = - new ByteBufferMessageSet(new Message(key = key.toString.getBytes, - bytes = null, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V1)) - + + def record(key: Int, value: Int): MemoryRecords = + record(key, value.toString.getBytes) + + def record(key: Int, value: Array[Byte]) = + MemoryRecords.withRecords(Record.create(key.toString.getBytes, value)) + + def unkeyedRecord(value: Int) = + MemoryRecords.withRecords(Record.create(value.toString.getBytes)) + + def tombstoneRecord(key: Int) = record(key, null) + } class FakeOffsetMap(val slots: Int) extends OffsetMap { http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 5421da9..40e6228 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -67,7 +67,7 @@ class LogManagerTest { val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) - log.append(TestUtils.singleMessageSet("test".getBytes())) + log.append(TestUtils.singletonRecords("test".getBytes())) } /** @@ -89,7 +89,7 @@ class LogManagerTest { val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) var offset = 0L for(_ <- 0 until 200) { - val set = TestUtils.singleMessageSet("test".getBytes()) + val set = TestUtils.singletonRecords("test".getBytes()) val info = log.append(set) offset = info.lastOffset } @@ -101,7 +101,7 @@ class LogManagerTest { assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes) try { log.read(0, 1024) @@ -110,7 +110,7 @@ class LogManagerTest { case _: OffsetOutOfRangeException => // This is good. } // log should still be appendable - log.append(TestUtils.singleMessageSet("test".getBytes())) + log.append(TestUtils.singletonRecords("test".getBytes())) } /** @@ -118,7 +118,7 @@ class LogManagerTest { */ @Test def testCleanupSegmentsToMaintainSize() { - val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes + val setSize = TestUtils.singletonRecords("test".getBytes()).sizeInBytes logManager.shutdown() val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 10 * setSize: java.lang.Integer) @@ -135,7 +135,7 @@ class LogManagerTest { // add a bunch of messages that should be larger than the retentionSize val numMessages = 200 for (_ <- 0 until numMessages) { - val set = TestUtils.singleMessageSet("test".getBytes()) + val set = TestUtils.singletonRecords("test".getBytes()) val info = log.append(set) offset = info.firstOffset } @@ -147,7 +147,7 @@ class LogManagerTest { assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes) try { log.read(0, 1024) fail("Should get exception from fetching earlier.") @@ -155,7 +155,7 @@ class LogManagerTest { case _: OffsetOutOfRangeException => // This is good. } // log should still be appendable - log.append(TestUtils.singleMessageSet("test".getBytes())) + log.append(TestUtils.singletonRecords("test".getBytes())) } /** @@ -169,7 +169,7 @@ class LogManagerTest { val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps)) var offset = 0L for (_ <- 0 until 200) { - val set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes()) + val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes()) val info = log.append(set) offset = info.lastOffset } @@ -198,7 +198,7 @@ class LogManagerTest { val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime for (_ <- 0 until 200) { - val set = TestUtils.singleMessageSet("test".getBytes()) + val set = TestUtils.singletonRecords("test".getBytes()) log.append(set) } time.sleep(logManager.InitialTaskDelayMs) @@ -280,7 +280,7 @@ class LogManagerTest { val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig)) logs.foreach(log => { for (_ <- 0 until 50) - log.append(TestUtils.singleMessageSet("test".getBytes())) + log.append(TestUtils.singletonRecords("test".getBytes())) log.flush() }) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index f02c5cb..d99981a 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -16,15 +16,13 @@ */ package kafka.log -import org.junit.Assert._ -import java.util.concurrent.atomic._ - -import kafka.common.LongRef -import org.junit.{After, Test} import kafka.utils.TestUtils -import kafka.message._ +import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record} import org.apache.kafka.common.utils.Time +import org.junit.Assert._ +import org.junit.{After, Test} +import scala.collection.JavaConverters._ import scala.collection._ class LogSegmentTest { @@ -34,7 +32,7 @@ class LogSegmentTest { /* create a segment with the given base offset */ def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = { val msFile = TestUtils.tempFile() - val ms = new FileMessageSet(msFile) + val ms = FileRecords.open(msFile) val idxFile = TestUtils.tempFile() val timeIdxFile = TestUtils.tempFile() idxFile.delete() @@ -47,12 +45,10 @@ class LogSegmentTest { } /* create a ByteBufferMessageSet for the given messages starting from the given offset */ - def messages(offset: Long, messages: String*): ByteBufferMessageSet = { - new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - offsetCounter = new LongRef(offset), - messages = messages.map(s => new Message(s.getBytes, offset * 10, Message.MagicValue_V1)):_*) + def records(offset: Long, records: String*): MemoryRecords = { + MemoryRecords.withRecords(offset, records.map(s => Record.create(Record.MAGIC_VALUE_V1, offset * 10, s.getBytes)):_*) } - + @After def teardown() { for(seg <- segments) { @@ -60,7 +56,7 @@ class LogSegmentTest { seg.log.delete() } } - + /** * A read on an empty log segment should return null */ @@ -70,7 +66,7 @@ class LogSegmentTest { val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None) assertNull("Read beyond the last offset in the segment should be null", read) } - + /** * Reading from before the first offset in the segment should return messages * beginning with the first message in the segment @@ -78,12 +74,12 @@ class LogSegmentTest { @Test def testReadBeforeFirstOffset() { val seg = createSegment(40) - val ms = messages(50, "hello", "there", "little", "bee") - seg.append(50, Message.NoTimestamp, -1L, ms) - val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet - assertEquals(ms.toList, read.toList) + val ms = records(50, "hello", "there", "little", "bee") + seg.append(50, Record.NO_TIMESTAMP, -1L, ms) + val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records + assertEquals(ms.deepIterator.asScala.toList, read.deepIterator.asScala.toList) } - + /** * If we set the startOffset and maxOffset for the read to be the same value * we should get only the first message in the log @@ -92,28 +88,28 @@ class LogSegmentTest { def testMaxOffset() { val baseOffset = 50 val seg = createSegment(baseOffset) - val ms = messages(baseOffset, "hello", "there", "beautiful") - seg.append(baseOffset, Message.NoTimestamp, -1L, ms) - def validate(offset: Long) = - assertEquals(ms.filter(_.offset == offset).toList, - seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList) + val ms = records(baseOffset, "hello", "there", "beautiful") + seg.append(baseOffset, Record.NO_TIMESTAMP, -1L, ms) + def validate(offset: Long) = + assertEquals(ms.deepIterator.asScala.filter(_.offset == offset).toList, + seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepIterator.asScala.toList) validate(50) validate(51) validate(52) } - + /** * If we read from an offset beyond the last offset in the segment we should get null */ @Test def testReadAfterLast() { val seg = createSegment(40) - val ms = messages(50, "hello", "there") - seg.append(50, Message.NoTimestamp, -1L, ms) + val ms = records(50, "hello", "there") + seg.append(50, Record.NO_TIMESTAMP, -1L, ms) val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None) assertNull("Read beyond the last offset in the segment should give null", read) } - + /** * If we read from an offset which doesn't exist we should get a message set beginning * with the least offset greater than the given startOffset. @@ -121,14 +117,14 @@ class LogSegmentTest { @Test def testReadFromGap() { val seg = createSegment(40) - val ms = messages(50, "hello", "there") - seg.append(50, Message.NoTimestamp, -1L, ms) - val ms2 = messages(60, "alpha", "beta") - seg.append(60, Message.NoTimestamp, -1L, ms2) + val ms = records(50, "hello", "there") + seg.append(50, Record.NO_TIMESTAMP, -1L, ms) + val ms2 = records(60, "alpha", "beta") + seg.append(60, Record.NO_TIMESTAMP, -1L, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) - assertEquals(ms2.toList, read.messageSet.toList) + assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList) } - + /** * In a loop append two messages then truncate off the second of those messages and check that we can read * the first but not the second message. @@ -138,18 +134,18 @@ class LogSegmentTest { val seg = createSegment(40) var offset = 40 for (_ <- 0 until 30) { - val ms1 = messages(offset, "hello") - seg.append(offset, Message.NoTimestamp, -1L, ms1) - val ms2 = messages(offset + 1, "hello") - seg.append(offset + 1, Message.NoTimestamp, -1L, ms2) + val ms1 = records(offset, "hello") + seg.append(offset, Record.NO_TIMESTAMP, -1L, ms1) + val ms2 = records(offset + 1, "hello") + seg.append(offset + 1, Record.NO_TIMESTAMP, -1L, ms2) // check that we can read back both messages val read = seg.read(offset, None, 10000) - assertEquals(List(ms1.head, ms2.head), read.messageSet.toList) + assertEquals(List(ms1.deepIterator.next(), ms2.deepIterator.next()), read.records.deepIterator.asScala.toList) // now truncate off the last message seg.truncateTo(offset + 1) val read2 = seg.read(offset, None, 10000) - assertEquals(1, read2.messageSet.size) - assertEquals(ms1.head, read2.messageSet.head) + assertEquals(1, read2.records.deepIterator.asScala.size) + assertEquals(ms1.deepIterator.next(), read2.records.deepIterator.next()) offset += 1 } } @@ -157,10 +153,10 @@ class LogSegmentTest { @Test def testReloadLargestTimestampAfterTruncation() { val numMessages = 30 - val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1) + val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1) var offset = 40 for (_ <- 0 until numMessages) { - seg.append(offset, offset, offset, messages(offset, "hello")) + seg.append(offset, offset, offset, records(offset, "hello")) offset += 1 } val expectedNumEntries = numMessages / 2 - 1 @@ -179,10 +175,10 @@ class LogSegmentTest { def testTruncateFull() { // test the case where we fully truncate the log val seg = createSegment(40) - seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there")) + seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there")) seg.truncateTo(0) assertNull("Segment should be empty.", seg.read(0, None, 1024)) - seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there")) + seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there")) } /** @@ -190,11 +186,11 @@ class LogSegmentTest { */ @Test def testFindOffsetByTimestamp() { - val messageSize = messages(0, s"msg00").sizeInBytes + val messageSize = records(0, s"msg00").sizeInBytes val seg = createSegment(40, messageSize * 2 - 1) // Produce some messages for (i <- 40 until 50) - seg.append(i, i * 10, i, messages(i, s"msg$i")) + seg.append(i, i * 10, i, records(i, s"msg$i")) assertEquals(490, seg.largestTimestamp) // Search for an indexed timestamp @@ -218,10 +214,10 @@ class LogSegmentTest { def testNextOffsetCalculation() { val seg = createSegment(40) assertEquals(40, seg.nextOffset) - seg.append(50, Message.NoTimestamp, -1L, messages(50, "hello", "there", "you")) + seg.append(50, Record.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you")) assertEquals(53, seg.nextOffset()) } - + /** * Test that we can change the file suffixes for the log and index files */ @@ -236,7 +232,7 @@ class LogSegmentTest { assertTrue(seg.log.file.exists) assertTrue(seg.index.file.exists) } - + /** * Create a segment with some data and an index. Then corrupt the index, * and recover the segment, the entries should all be readable. @@ -245,12 +241,12 @@ class LogSegmentTest { def testRecoveryFixesCorruptIndex() { val seg = createSegment(0) for(i <- 0 until 100) - seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString)) + seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString)) val indexFile = seg.index.file TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) seg.recover(64*1024) for(i <- 0 until 100) - assertEquals(i, seg.read(i, Some(i + 1), 1024).messageSet.head.offset) + assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepIterator.next().offset) } /** @@ -261,7 +257,7 @@ class LogSegmentTest { def testRecoveryFixesCorruptTimeIndex() { val seg = createSegment(0) for(i <- 0 until 100) - seg.append(i, i * 10, i, messages(i, i.toString)) + seg.append(i, i * 10, i, records(i, i.toString)) val timeIndexFile = seg.timeIndex.file TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt) seg.recover(64*1024) @@ -271,7 +267,7 @@ class LogSegmentTest { assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get.offset) } } - + /** * Randomly corrupt a log a number of times and attempt recovery. */ @@ -281,13 +277,15 @@ class LogSegmentTest { for (_ <- 0 until 10) { val seg = createSegment(0) for(i <- 0 until messagesAppended) - seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString)) + seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString)) val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end - val position = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)._1.position + TestUtils.random.nextInt(15) - TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position) + + val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0) + val position = recordPosition.position + TestUtils.random.nextInt(15) + TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt) seg.recover(64*1024) - assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) + assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowIterator.asScala.map(_.offset).toList) seg.delete() } } @@ -304,12 +302,12 @@ class LogSegmentTest { @Test def testCreateWithInitFileSizeAppendMessage() { val seg = createSegment(40, false, 512*1024*1024, true) - val ms = messages(50, "hello", "there") - seg.append(50, Message.NoTimestamp, -1L, ms) - val ms2 = messages(60, "alpha", "beta") - seg.append(60, Message.NoTimestamp, -1L, ms2) + val ms = records(50, "hello", "there") + seg.append(50, Record.NO_TIMESTAMP, -1L, ms) + val ms2 = records(60, "alpha", "beta") + seg.append(60, Record.NO_TIMESTAMP, -1L, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) - assertEquals(ms2.toList, read.messageSet.toList) + assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList) } /* create a segment with pre allocate and clearly shut down*/ @@ -318,12 +316,12 @@ class LogSegmentTest { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true) - val ms = messages(50, "hello", "there") - seg.append(50, Message.NoTimestamp, -1L, ms) - val ms2 = messages(60, "alpha", "beta") - seg.append(60, Message.NoTimestamp, -1L, ms2) + val ms = records(50, "hello", "there") + seg.append(50, Record.NO_TIMESTAMP, -1L, ms) + val ms2 = records(60, "alpha", "beta") + seg.append(60, Record.NO_TIMESTAMP, -1L, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) - assertEquals(ms2.toList, read.messageSet.toList) + assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length @@ -336,7 +334,7 @@ class LogSegmentTest { segments += segReopen val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None) - assertEquals(ms2.toList, readAgain.messageSet.toList) + assertEquals(ms2.deepIterator.asScala.toList, readAgain.records.deepIterator.asScala.toList) val size = segReopen.log.sizeInBytes() val position = segReopen.log.channel.position val fileSize = segReopen.log.file.length