Repository: kafka Updated Branches: refs/heads/trunk 6626b058c -> 67f1e5b91
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 3d1b485..4467394 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -24,12 +24,12 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} -import org.apache.kafka.common.record.{LogEntry, MemoryRecords} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.LogEntry import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.serialization.StringSerializer -import org.junit.Test import org.junit.Assert._ +import org.junit.Test import scala.collection.JavaConverters._ import scala.util.Random @@ -120,13 +120,13 @@ class FetchRequestTest extends BaseRequestTest { val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3) assertEquals(shuffledTopicPartitions3, fetchResponse3.responseData.keySet.asScala.toSeq) val responseSize3 = fetchResponse3.responseData.asScala.values.map { partitionData => - logEntries(partitionData).map(_.size).sum + logEntries(partitionData).map(_.sizeInBytes).sum }.sum assertTrue(responseSize3 <= maxResponseBytes) val partitionData3 = fetchResponse3.responseData.get(partitionWithLargeMessage1) assertEquals(Errors.NONE.code, partitionData3.errorCode) assertTrue(partitionData3.highWatermark > 0) - val size3 = logEntries(partitionData3).map(_.size).sum + val size3 = logEntries(partitionData3).map(_.sizeInBytes).sum assertTrue(s"Expected $size3 to be smaller than $maxResponseBytes", size3 <= maxResponseBytes) assertTrue(s"Expected $size3 to be larger than $maxPartitionBytes", size3 > maxPartitionBytes) assertTrue(maxPartitionBytes < partitionData3.records.sizeInBytes) @@ -138,13 +138,13 @@ class FetchRequestTest extends BaseRequestTest { val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4) assertEquals(shuffledTopicPartitions4, fetchResponse4.responseData.keySet.asScala.toSeq) val nonEmptyPartitions4 = fetchResponse4.responseData.asScala.toSeq.collect { - case (tp, partitionData) if logEntries(partitionData).map(_.size).sum > 0 => tp + case (tp, partitionData) if logEntries(partitionData).map(_.sizeInBytes).sum > 0 => tp } assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4) val partitionData4 = fetchResponse4.responseData.get(partitionWithLargeMessage2) assertEquals(Errors.NONE.code, partitionData4.errorCode) assertTrue(partitionData4.highWatermark > 0) - val size4 = logEntries(partitionData4).map(_.size).sum + val size4 = logEntries(partitionData4).map(_.sizeInBytes).sum assertTrue(s"Expected $size4 to be larger than $maxResponseBytes", size4 > maxResponseBytes) assertTrue(maxResponseBytes < partitionData4.records.sizeInBytes) } @@ -161,12 +161,11 @@ class FetchRequestTest extends BaseRequestTest { assertEquals(Errors.NONE.code, partitionData.errorCode) assertTrue(partitionData.highWatermark > 0) assertEquals(maxPartitionBytes, partitionData.records.sizeInBytes) - assertEquals(0, logEntries(partitionData).map(_.size).sum) + assertEquals(0, logEntries(partitionData).map(_.sizeInBytes).sum) } private def logEntries(partitionData: FetchResponse.PartitionData): Seq[LogEntry] = { - val memoryRecords = partitionData.records - memoryRecords.iterator.asScala.toIndexedSeq + partitionData.records.deepIterator.asScala.toIndexedSeq } private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse, @@ -181,25 +180,25 @@ class FetchRequestTest extends BaseRequestTest { assertEquals(Errors.NONE.code, partitionData.errorCode) assertTrue(partitionData.highWatermark > 0) - val memoryRecords = partitionData.records - responseBufferSize += memoryRecords.sizeInBytes + val records = partitionData.records + responseBufferSize += records.sizeInBytes - val messages = memoryRecords.iterator.asScala.toIndexedSeq - assertTrue(messages.size < numMessagesPerPartition) - val messagesSize = messages.map(_.size).sum - responseSize += messagesSize - if (messagesSize == 0 && !emptyResponseSeen) { - assertEquals(0, memoryRecords.sizeInBytes) + val entries = records.shallowIterator.asScala.toIndexedSeq + assertTrue(entries.size < numMessagesPerPartition) + val entriesSize = entries.map(_.sizeInBytes).sum + responseSize += entriesSize + if (entriesSize == 0 && !emptyResponseSeen) { + assertEquals(0, records.sizeInBytes) emptyResponseSeen = true } - else if (messagesSize != 0 && !emptyResponseSeen) { - assertTrue(messagesSize <= maxPartitionBytes) - assertEquals(maxPartitionBytes, memoryRecords.sizeInBytes) + else if (entriesSize != 0 && !emptyResponseSeen) { + assertTrue(entriesSize <= maxPartitionBytes) + assertEquals(maxPartitionBytes, records.sizeInBytes) } - else if (messagesSize != 0 && emptyResponseSeen) - fail(s"Expected partition with size 0, but found $tp with size $messagesSize") - else if (memoryRecords.sizeInBytes != 0 && emptyResponseSeen) - fail(s"Expected partition buffer with size 0, but found $tp with size ${memoryRecords.sizeInBytes}") + else if (entriesSize != 0 && emptyResponseSeen) + fail(s"Expected partition with size 0, but found $tp with size $entriesSize") + else if (records.sizeInBytes != 0 && emptyResponseSeen) + fail(s"Expected partition buffer with size 0, but found $tp with size ${records.sizeInBytes}") } @@ -208,7 +207,7 @@ class FetchRequestTest extends BaseRequestTest { } private def createTopics(numTopics: Int, numPartitions: Int): Map[TopicPartition, Int] = { - val topics = (0 until numPartitions).map(t => s"topic${t}") + val topics = (0 until numPartitions).map(t => s"topic$t") val topicConfig = new Properties topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString) topics.flatMap { topic => @@ -223,7 +222,7 @@ class FetchRequestTest extends BaseRequestTest { tp <- topicPartitions.toSeq messageIndex <- 0 until numMessagesPerPartition } yield { - val suffix = s"${tp}-${messageIndex}" + val suffix = s"$tp-$messageIndex" new ProducerRecord(tp.topic, tp.partition, s"key $suffix", s"value $suffix") } records.map(producer.send).foreach(_.get) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 2d51be9..aad37d1 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -17,21 +17,20 @@ package kafka.server import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean -import org.apache.kafka.common.metrics.Metrics -import org.junit.{After, Before, Test} - -import collection.mutable.HashMap -import collection.mutable.Map import kafka.cluster.{Partition, Replica} -import org.easymock.EasyMock import kafka.log.Log -import org.junit.Assert._ import kafka.utils._ -import java.util.concurrent.atomic.AtomicBoolean - -import kafka.message.MessageSet +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.Time +import org.easymock.EasyMock +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.collection.mutable.{HashMap, Map} + class IsrExpirationTest { @@ -76,7 +75,7 @@ class IsrExpirationTest { // let the follower catch up to the Leader logEndOffset (15) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), -1L, -1, true))) @@ -127,7 +126,7 @@ class IsrExpirationTest { // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms for(replica <- partition0.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, false)) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), -1L, -1, false)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -137,7 +136,7 @@ class IsrExpirationTest { time.sleep(75) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MessageSet.Empty), -1L, -1, false))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), -1L, -1, false))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -149,7 +148,7 @@ class IsrExpirationTest { // Now actually make a fetch to the end of the log. The replicas should be back in ISR (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, true))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), -1L, -1, true))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 70445d7..b577e7d 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -26,12 +26,12 @@ import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer import kafka.log.{Log, LogSegment} -import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec} import kafka.utils.TestUtils._ import kafka.utils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{MemoryRecords, Record} import org.apache.kafka.common.utils.{Time, Utils} import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ @@ -89,9 +89,9 @@ class LogOffsetTest extends ZooKeeperTestHarness { "Log for partition [topic,0] should be created") val log = logManager.getLog(TopicAndPartition(topic, part)).get - val message = new Message(Integer.toString(42).getBytes()) + val record = Record.create(Integer.toString(42).getBytes()) for (_ <- 0 until 20) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) + log.append(MemoryRecords.withRecords(record)) log.flush() val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15) @@ -150,9 +150,9 @@ class LogOffsetTest extends ZooKeeperTestHarness { val logManager = server.getLogManager val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) - val message = new Message(Integer.toString(42).getBytes()) + val record = Record.create(Integer.toString(42).getBytes()) for (_ <- 0 until 20) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) + log.append(MemoryRecords.withRecords(record)) log.flush() val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs @@ -179,9 +179,9 @@ class LogOffsetTest extends ZooKeeperTestHarness { val logManager = server.getLogManager val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) - val message = new Message(Integer.toString(42).getBytes()) + val record = Record.create(Integer.toString(42).getBytes()) for (_ <- 0 until 20) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) + log.append(MemoryRecords.withRecords(record)) log.flush() val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime, 10) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index bd74dee..51be54c 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -54,11 +54,11 @@ class ProduceRequestTest extends BaseRequestTest { } sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE, - new Record(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0) + Record.create(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0) sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP, - new Record(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes), - new Record(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) + Record.create(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes), + Record.create(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } /* returns a pair of partition id and leader id */ @@ -74,7 +74,7 @@ class ProduceRequestTest extends BaseRequestTest { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") val timestamp = 1000000 val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4, - new Record(timestamp, "key".getBytes, "value".getBytes)) + Record.create(timestamp, "key".getBytes, "value".getBytes)) // Change the lz4 checksum value so that it doesn't match the contents recordBuffer.array.update(40, 0) val topicPartition = new TopicPartition("topic", partition) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 378d382..a643f63 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -16,28 +16,29 @@ */ package kafka.server - import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.Replica import kafka.common.TopicAndPartition import kafka.log.Log -import kafka.message.{ByteBufferMessageSet, Message} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.record.{MemoryRecords, Record} import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.easymock.EasyMock import EasyMock._ import org.junit.Assert._ import org.junit.{After, Test} +import scala.collection.JavaConverters._ + class ReplicaManagerQuotasTest { val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, new Properties())) val time = new MockTime val metrics = new Metrics - val message = new Message("some-data-in-a-message".getBytes()) + val record = Record.create("some-data-in-a-message".getBytes()) val topicAndPartition1 = TopicAndPartition("test-topic", 1) val topicAndPartition2 = TopicAndPartition("test-topic", 2) val fetchInfo = Seq(new TopicPartition(topicAndPartition1.topic, topicAndPartition1.partition) -> new PartitionData(0, 100), @@ -63,10 +64,10 @@ class ReplicaManagerQuotasTest { readPartitionInfo = fetchInfo, quota = quota) assertEquals("Given two partitions, with only one throttled, we should get the first", 1, - fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size) assertEquals("But we shouldn't get the second", 0, - fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size) } @Test @@ -88,9 +89,9 @@ class ReplicaManagerQuotasTest { readPartitionInfo = fetchInfo, quota = quota) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, - fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, - fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size) } @Test @@ -112,9 +113,9 @@ class ReplicaManagerQuotasTest { readPartitionInfo = fetchInfo, quota = quota) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, - fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, - fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size) } @Test @@ -136,13 +137,13 @@ class ReplicaManagerQuotasTest { readPartitionInfo = fetchInfo, quota = quota) assertEquals("Given two partitions, with only one throttled, we should get the first", 1, - fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size) assertEquals("But we should get the second too since it's throttled but in sync", 1, - fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size) + fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size) } - def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], message: Message = this.message, bothReplicasInSync: Boolean = false) { + def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: Record = this.record, bothReplicasInSync: Boolean = false) { val zkUtils = createNiceMock(classOf[ZkUtils]) val scheduler = createNiceMock(classOf[KafkaScheduler]) @@ -153,16 +154,16 @@ class ReplicaManagerQuotasTest { //if we ask for len 1 return a message expect(log.read(anyObject(), geq(1), anyObject(), anyObject())).andReturn( - new FetchDataInfo( + FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), - new ByteBufferMessageSet(message) + MemoryRecords.withRecords(record) )).anyTimes() //if we ask for len = 0, return 0 messages expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject())).andReturn( - new FetchDataInfo( + FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), - new ByteBufferMessageSet() + MemoryRecords.EMPTY )).anyTimes() replay(log) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index c6d66ba..421de32 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -17,18 +17,16 @@ package kafka.server - import java.io.File import java.util.concurrent.atomic.AtomicBoolean -import kafka.api.FetchResponsePartitionData import kafka.cluster.Broker import kafka.common.TopicAndPartition -import kafka.message.{ByteBufferMessageSet, Message, MessageSet} import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{MemoryRecords, Record, Records} import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -105,11 +103,11 @@ class ReplicaManagerTest { def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { assert(responseStatus.values.head.errorCode == Errors.INVALID_REQUIRED_ACKS.code) } - rm.appendMessages( + rm.appendRecords( timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, - messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), + entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(Record.create("first message".getBytes()))), responseCallback = callback) } finally { rm.shutdown(checkpointHW = false) @@ -135,7 +133,7 @@ class ReplicaManagerTest { } var fetchCallbackFired = false - def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchResponsePartitionData)]) = { + def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchPartitionData)]) = { assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.map(_._2).head.error) fetchCallbackFired = true } @@ -158,11 +156,11 @@ class ReplicaManagerTest { rm.getLeaderReplicaIfLocal(topic, 0) // Append a message. - rm.appendMessages( + rm.appendRecords( timeout = 1000, requiredAcks = -1, internalTopicsAllowed = false, - messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), + entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(Record.create("first message".getBytes()))), responseCallback = produceCallback) // Fetch some messages @@ -220,19 +218,19 @@ class ReplicaManagerTest { // Append a couple of messages. for(i <- 1 to 2) - rm.appendMessages( + rm.appendRecords( timeout = 1000, requiredAcks = -1, internalTopicsAllowed = false, - messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("message %d".format(i).getBytes))), + entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(Record.create("message %d".format(i).getBytes))), responseCallback = produceCallback) var fetchCallbackFired = false var fetchError = 0 - var fetchedMessages: MessageSet = null - def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchResponsePartitionData)]) = { + var fetchedRecords: Records = null + def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchPartitionData)]) = { fetchError = responseStatus.map(_._2).head.error - fetchedMessages = responseStatus.map(_._2).head.messages + fetchedRecords = responseStatus.map(_._2).head.records fetchCallbackFired = true } @@ -249,7 +247,7 @@ class ReplicaManagerTest { assertTrue(fetchCallbackFired) assertEquals("Should not give an exception", Errors.NONE.code, fetchError) - assertTrue("Should return some data", fetchedMessages.iterator.hasNext) + assertTrue("Should return some data", fetchedRecords.shallowIterator.hasNext) fetchCallbackFired = false // Fetch a message above the high watermark as a consumer @@ -264,7 +262,7 @@ class ReplicaManagerTest { assertTrue(fetchCallbackFired) assertEquals("Should not give an exception", Errors.NONE.code, fetchError) - assertEquals("Should return empty response", MessageSet.Empty, fetchedMessages) + assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords) } finally { rm.shutdown(checkpointHW = false) } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index b1ebeee..2f73a94 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -21,7 +21,6 @@ import kafka.utils._ import kafka.cluster.Replica import kafka.common.TopicAndPartition import kafka.log.Log -import kafka.message.{ByteBufferMessageSet, Message, MessageSet} import kafka.server.QuotaFactory.UnboundedQuota import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -30,8 +29,10 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.record.{MemoryRecords, Record} import org.easymock.EasyMock import org.junit.Assert._ +import scala.collection.JavaConverters._ class SimpleFetchTest { @@ -53,8 +54,8 @@ class SimpleFetchTest { val partitionHW = 5 val fetchSize = 100 - val messagesToHW = new Message("messageToHW".getBytes()) - val messagesToLEO = new Message("messageToLEO".getBytes()) + val messagesToHW = Record.create("messageToHW".getBytes()) + val messagesToLEO = Record.create("messageToLEO".getBytes()) val topic = "test-topic" val partitionId = 0 @@ -79,14 +80,14 @@ class SimpleFetchTest { EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes() EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn( - new FetchDataInfo( + FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), - new ByteBufferMessageSet(messagesToHW) + MemoryRecords.withRecords(messagesToHW) )).anyTimes() EasyMock.expect(log.read(0, fetchSize, None, true)).andReturn( - new FetchDataInfo( + FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), - new ByteBufferMessageSet(messagesToLEO) + MemoryRecords.withRecords(messagesToLEO) )).anyTimes() EasyMock.replay(log) @@ -110,7 +111,7 @@ class SimpleFetchTest { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) - followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty), -1L, -1, true)) + followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), -1L, -1, true)) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) @@ -153,7 +154,7 @@ class SimpleFetchTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message) + quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator.next().record) assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, replicaManager.readFromLocalLog( replicaId = Request.OrdinaryConsumerId, @@ -162,7 +163,7 @@ class SimpleFetchTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message) + quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator().next().record) assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()) assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 33ab58c..ede145a 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio._ import java.nio.channels._ import java.util.concurrent.{Callable, Executors, TimeUnit} -import java.util.{Properties, Random} +import java.util.Properties import java.security.cert.X509Certificate import javax.net.ssl.X509TrustManager import charset.Charset @@ -48,6 +48,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.network.Mode +import org.apache.kafka.common.record._ import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer} import org.apache.kafka.common.utils.Time import org.apache.kafka.test.{TestUtils => JTestUtils} @@ -269,16 +270,16 @@ object TestUtils extends Logging { } /** - * Wrap the message in a message set - * - * @param payload The bytes of the message + * Wrap a single record log buffer. */ - def singleMessageSet(payload: Array[Byte], - codec: CompressionCodec = NoCompressionCodec, + def singletonRecords(value: Array[Byte], key: Array[Byte] = null, - timestamp: Long = Message.NoTimestamp, - magicValue: Byte = Message.CurrentMagicValue) = - new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, timestamp, magicValue)) + codec: CompressionType = CompressionType.NONE, + timestamp: Long = Record.NO_TIMESTAMP, + magicValue: Byte = Record.CURRENT_MAGIC_VALUE) = { + val record = Record.create(magicValue, timestamp, key, value) + MemoryRecords.withRecords(codec, record) + } /** * Generate an array of random bytes