Repository: kafka Updated Branches: refs/heads/trunk 7a67a7226 -> 0dc243b92
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/LogOffsetMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala new file mode 100644 index 0000000..a868334 --- /dev/null +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.KafkaException + +object LogOffsetMetadata { + val UnknownOffsetMetadata = new LogOffsetMetadata(-1, 0, 0) + val UnknownSegBaseOffset = -1L + val UnknownFilePosition = -1 + + class OffsetOrdering extends Ordering[LogOffsetMetadata] { + override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = { + return x.offsetDiff(y).toInt + } + } + +} + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the base message offset of the located segment + * 3. the physical position on the located segment + */ +case class LogOffsetMetadata(messageOffset: Long, + segmentBaseOffset: Long = LogOffsetMetadata.UnknownSegBaseOffset, + relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) { + + // check if this offset is already on an older segment compared with the given offset + def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = { + if (messageOffsetOnly()) + throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + + this.segmentBaseOffset < that.segmentBaseOffset + } + + // check if this offset is on the same segment with the given offset + def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = { + if (messageOffsetOnly()) + throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + + this.segmentBaseOffset == that.segmentBaseOffset + } + + // check if this offset is before the given offset + def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset + + // compute the number of messages between this offset to the given offset + def offsetDiff(that: LogOffsetMetadata): Long = { + this.messageOffset - that.messageOffset + } + + // compute the number of bytes between this offset to the given offset + // if they are on the same segment and this offset precedes the given offset + def positionDiff(that: LogOffsetMetadata): Int = { + if(!offsetOnSameSegment(that)) + throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that)) + if(messageOffsetOnly()) + throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that)) + + this.relativePositionInSegment - that.relativePositionInSegment + } + + // decide if the offset metadata only contains message offset info + def messageOffsetOnly(): Boolean = { + segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition + } + + override def toString = messageOffset.toString + " [" + segmentBaseOffset + " : " + relativePositionInSegment + "]" + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 0e22897..43eb2a3 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -17,26 +17,29 @@ package kafka.server +import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} +import org.apache.kafka.common.protocol.types.Type.STRING +import org.apache.kafka.common.protocol.types.Type.INT32 +import org.apache.kafka.common.protocol.types.Type.INT64 + import kafka.utils._ import kafka.common._ -import java.nio.ByteBuffer -import java.util.Properties import kafka.log.{FileMessageSet, LogConfig} -import org.I0Itec.zkclient.ZkClient -import scala.collection._ import kafka.message._ -import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup -import com.yammer.metrics.core.Gauge -import scala.Some import kafka.common.TopicAndPartition import kafka.tools.MessageFormatter + +import scala.Some +import scala.collection._ import java.io.PrintStream -import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} -import org.apache.kafka.common.protocol.types.Type.STRING -import org.apache.kafka.common.protocol.types.Type.INT32 -import org.apache.kafka.common.protocol.types.Type.INT64 import java.util.concurrent.atomic.AtomicBoolean +import java.nio.ByteBuffer +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.yammer.metrics.core.Gauge +import org.I0Itec.zkclient.ZkClient /** @@ -271,7 +274,7 @@ class OffsetManager(val config: OffsetManagerConfig, // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { buffer.clear() - val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet] + val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] messages.readInto(buffer, 0) val messageSet = new ByteBufferMessageSet(buffer) messageSet.foreach { msgAndOffset => @@ -312,7 +315,7 @@ class OffsetManager(val config: OffsetManagerConfig, val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId) val hw = partitionOpt.map { partition => - partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L) + partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) }.getOrElse(-1L) hw http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala new file mode 100644 index 0000000..d4a7d4a --- /dev/null +++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.Pool +import kafka.network.{BoundedByteBufferSend, RequestChannel} + +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed producer requests + */ +class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) + extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { + this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { + val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val producerRequestMetricsForKey = { + val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") + new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) + } + + private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics + + private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { + val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) + List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) + + /** + * When a delayed produce request expires answer it with possible time out error codes + */ + def expire(delayedProduce: DelayedProduce) { + debug("Expiring produce request %s.".format(delayedProduce.produce)) + for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) + recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) + respond(delayedProduce) + } + + // TODO: purgatory should not be responsible for sending back the responses + def respond(delayedProduce: DelayedProduce) { + val response = delayedProduce.respond(offsetManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 75ae1e1..6879e73 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -47,16 +47,19 @@ class ReplicaFetcherThread(name:String, val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - if (fetchOffset != replica.logEndOffset) - throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) + if (fetchOffset != replica.logEndOffset.messageOffset) + throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset)) trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw)) + .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw)) replica.log.get.append(messageSet, assignOffsets = false) trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" - .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, topicAndPartition)) - val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) - replica.highWatermark = followerHighWatermark - trace("Follower %d set replica highwatermark for partition [%s,%d] to %d" + .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition)) + val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw) + // for the follower replica, we do not need to keep + // its segment base offset the physical position, + // these values will be computed upon making the leader + replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) + trace("Follower %d set replica high watermark for partition [%s,%d] to %s" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } catch { case e: KafkaStorageException => @@ -82,7 +85,7 @@ class ReplicaFetcherThread(name:String, * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) - if (leaderEndOffset < replica.logEndOffset) { + if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. @@ -91,13 +94,13 @@ class ReplicaFetcherThread(name:String, // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" - .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset)) + .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) Runtime.getRuntime.halt(1) } replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset)) + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -109,7 +112,7 @@ class ReplicaFetcherThread(name:String, val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset)) + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 897783c..68758e3 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,29 +16,39 @@ */ package kafka.server -import collection._ -import mutable.HashMap -import kafka.cluster.{Broker, Partition, Replica} +import kafka.api._ +import kafka.common._ import kafka.utils._ +import kafka.cluster.{Broker, Partition, Replica} import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup -import kafka.common._ -import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController -import org.I0Itec.zkclient.ZkClient -import com.yammer.metrics.core.Gauge +import kafka.common.TopicAndPartition +import kafka.message.MessageSet + import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit +import scala.Predef._ +import scala.collection._ +import scala.collection.mutable.HashMap +import scala.collection.Map +import scala.collection.Set +import scala.Some + +import org.I0Itec.zkclient.ZkClient +import com.yammer.metrics.core.Gauge object ReplicaManager { - val UnknownLogEndOffset = -1L val HighWatermarkFilename = "replication-offset-checkpoint" } -class ReplicaManager(val config: KafkaConfig, - time: Time, - val zkClient: ZkClient, +case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata) + + +class ReplicaManager(val config: KafkaConfig, + time: Time, + val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { @@ -54,6 +64,9 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger + var producerRequestPurgatory: ProducerRequestPurgatory = null + var fetchRequestPurgatory: FetchRequestPurgatory = null + newGauge( "LeaderCount", new Gauge[Int] { @@ -87,17 +100,37 @@ class ReplicaManager(val config: KafkaConfig, } /** - * This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest(). - * In the former case, the partition should have been created, in the latter case, return -1 will put the request into purgatory + * Initialize the replica manager with the request purgatory + * + * TODO: will be removed in 0.9 where we refactor server structure */ - def getReplicationFactorForPartition(topic: String, partitionId: Int) = { - val partitionOpt = getPartition(topic, partitionId) - partitionOpt match { - case Some(partition) => - partition.replicationFactor - case None => - -1 - } + + def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) { + this.producerRequestPurgatory = producerRequestPurgatory + this.fetchRequestPurgatory = fetchRequestPurgatory + } + + /** + * Unblock some delayed produce requests with the request key + */ + def unblockDelayedProduceRequests(key: DelayedRequestKey) { + val satisfied = producerRequestPurgatory.update(key) + debug("Request key %s unblocked %d producer requests." + .format(key.keyLabel, satisfied.size)) + + // send any newly unblocked responses + satisfied.foreach(producerRequestPurgatory.respond(_)) + } + + /** + * Unblock some delayed fetch requests with the request key + */ + def unblockDelayedFetchRequests(key: DelayedRequestKey) { + val satisfied = fetchRequestPurgatory.update(key) + debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size)) + + // send any newly unblocked responses + satisfied.foreach(fetchRequestPurgatory.respond(_)) } def startup() { @@ -155,10 +188,10 @@ class ReplicaManager(val config: KafkaConfig, } } - def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = { + def getOrCreatePartition(topic: String, partitionId: Int): Partition = { var partition = allPartitions.get((topic, partitionId)) if (partition == null) { - allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this)) + allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this)) partition = allPartitions.get((topic, partitionId)) } partition @@ -203,6 +236,77 @@ class ReplicaManager(val config: KafkaConfig, } } + /** + * Read from all the offset details given and return a map of + * (topic, partition) -> PartitionData + */ + def readMessageSets(fetchRequest: FetchRequest) = { + val isFetchFromFollower = fetchRequest.isFromFollower + fetchRequest.requestInfo.map + { + case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionDataAndOffsetInfo = + try { + val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) + BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) + if (isFetchFromFollower) { + debug("Partition [%s,%d] received fetch request from follower %d" + .format(topic, partition, fetchRequest.replicaId)) + } + new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset) + } catch { + // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException + // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request + // for a partition it is the leader for + case utpe: UnknownTopicOrPartitionException => + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) + new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + case nle: NotLeaderForPartitionException => + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) + new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + case t: Throwable => + BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() + error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" + .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) + new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + } + (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) + } + } + + /** + * Read from a single topic/partition at the given offset upto maxSize bytes + */ + private def readMessageSet(topic: String, + partition: Int, + offset: Long, + maxSize: Int, + fromReplicaId: Int): (FetchDataInfo, Long) = { + // check if the current broker is the leader for the partitions + val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) + getReplicaOrException(topic, partition) + else + getLeaderReplicaIfLocal(topic, partition) + trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + val maxOffsetOpt = + if (Request.isValidBrokerId(fromReplicaId)) + None + else + Some(localReplica.highWatermark.messageOffset) + val fetchInfo = localReplica.log match { + case Some(log) => + log.read(offset, maxSize, maxOffsetOpt) + case None => + error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + } + (fetchInfo, localReplica.highWatermark.messageOffset) + } + def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { replicaStateChangeLock synchronized { if(updateMetadataRequest.controllerEpoch < controllerEpoch) { @@ -243,7 +347,7 @@ class ReplicaManager(val config: KafkaConfig, // First check partition's leader epoch val partitionState = new HashMap[Partition, PartitionStateInfo]() leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => - val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) + val partition = getOrCreatePartition(topic, partitionId) val partitionLeaderEpoch = partition.getLeaderEpoch() // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path @@ -403,7 +507,7 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) } - logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap) + logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + @@ -421,7 +525,9 @@ class ReplicaManager(val config: KafkaConfig, else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap + new TopicAndPartition(partition) -> BrokerAndInitialOffset( + leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, + partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach { partition => @@ -451,12 +557,23 @@ class ReplicaManager(val config: KafkaConfig, allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } - def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { - val partitionOpt = getPartition(topic, partitionId) - if(partitionOpt.isDefined) { - partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset) - } else { - warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) + def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = { + getPartition(topic, partitionId) match { + case Some(partition) => + partition.getReplica(replicaId) match { + case Some(replica) => + replica.logEndOffset = offset + // check if we need to update HW and expand Isr + partition.updateLeaderHWAndMaybeExpandIsr(replicaId) + debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId)) + case None => + throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, + offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + + } + case None => + warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) } } @@ -470,7 +587,7 @@ class ReplicaManager(val config: KafkaConfig, val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap + val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap try { highWatermarkCheckpoints(dir).write(hwms) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/RequestPurgatory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 3d0ff1e..ce06d2c 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -17,13 +17,15 @@ package kafka.server -import scala.collection._ -import java.util.concurrent._ -import java.util.concurrent.atomic._ import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup + import java.util +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import scala.collection._ + import com.yammer.metrics.core.Gauge @@ -45,8 +47,10 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * * For us the key is generally a (topic, partition) pair. * By calling - * watch(delayedRequest) - * we will add triggers for each of the given keys. It is up to the user to then call + * val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest) + * we will check if a request is satisfied already, and if not add the request for watch on all its keys. + * + * It is up to the user to then call * val satisfied = update(key, request) * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this * new request. @@ -61,18 +65,23 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000) +abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - private val requestCounter = new AtomicInteger(0) + /* the number of requests being watched, duplicates added on different watchers are also counted */ + private val watched = new AtomicInteger(0) + + /* background thread expiring requests that have been waiting too long */ + private val expiredRequestReaper = new ExpiredRequestReaper + private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) newGauge( "PurgatorySize", new Gauge[Int] { - def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watched.get() + expiredRequestReaper.numRequests } ) @@ -83,41 +92,50 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } ) - /* background thread expiring requests that have been waiting too long */ - private val expiredRequestReaper = new ExpiredRequestReaper - private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) expirationThread.start() /** - * Add a new delayed request watching the contained keys + * Try to add the request for watch on all keys. Return true iff the request is + * satisfied and the satisfaction is done by the caller. + * + * Requests can be watched on only a few of the keys if it is found satisfied when + * trying to add it to each one of the keys. In this case the request is still treated as satisfied + * and hence no longer watched. Those already added elements will be later purged by the expire reaper. */ - def watch(delayedRequest: T) { - requestCounter.getAndIncrement() - + def checkAndMaybeWatch(delayedRequest: T): Boolean = { for(key <- delayedRequest.keys) { - var lst = watchersFor(key) - lst.add(delayedRequest) + val lst = watchersFor(key) + if(!lst.checkAndMaybeAdd(delayedRequest)) { + if(delayedRequest.satisfied.compareAndSet(false, true)) + return true + else + return false + } } + + // if it is indeed watched, add to the expire queue also expiredRequestReaper.enqueue(delayedRequest) + + false } /** * Update any watchers and return a list of newly satisfied requests. */ - def update(key: Any, request: R): Seq[T] = { + def update(key: Any): Seq[T] = { val w = watchersForKey.get(key) if(w == null) Seq.empty else - w.collectSatisfiedRequests(request) + w.collectSatisfiedRequests() } private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** - * Check if this request satisfied this delayed request + * Check if this delayed request is already satisfied */ - protected def checkSatisfied(request: R, delayed: T): Boolean + protected def checkSatisfied(request: T): Boolean /** * Handle an expired delayed request @@ -125,7 +143,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge protected def expire(delayed: T) /** - * Shutdown the expirey thread + * Shutdown the expire reaper thread */ def shutdown() { expiredRequestReaper.shutdown() @@ -136,17 +154,26 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge * bookkeeping logic. */ private class Watchers { + private val requests = new util.ArrayList[T] - private val requests = new util.LinkedList[T] - - def numRequests = requests.size - - def add(t: T) { + // potentially add the element to watch if it is not satisfied yet + def checkAndMaybeAdd(t: T): Boolean = { synchronized { + // if it is already satisfied, do not add to the watch list + if (t.satisfied.get) + return false + // synchronize on the delayed request to avoid any race condition + // with expire and update threads on client-side. + if(t synchronized checkSatisfied(t)) { + return false + } requests.add(t) + watched.getAndIncrement() + return true } } + // traverse the list and purge satisfied elements def purgeSatisfied(): Int = { synchronized { val iter = requests.iterator() @@ -155,6 +182,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge val curr = iter.next if(curr.satisfied.get()) { iter.remove() + watched.getAndDecrement() purged += 1 } } @@ -162,7 +190,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } } - def collectSatisfiedRequests(request: R): Seq[T] = { + // traverse the list and try to satisfy watched elements + def collectSatisfiedRequests(): Seq[T] = { val response = new mutable.ArrayBuffer[T] synchronized { val iter = requests.iterator() @@ -174,9 +203,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } else { // synchronize on curr to avoid any race condition with expire // on client-side. - val satisfied = curr synchronized checkSatisfied(request, curr) + val satisfied = curr synchronized checkSatisfied(curr) if(satisfied) { iter.remove() + watched.getAndDecrement() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr @@ -215,13 +245,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge expire(curr) } } - if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge + if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge debug("Beginning purgatory purge") - requestCounter.set(0) val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers)) + debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers)) } } catch { case e: Exception => @@ -266,10 +295,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } /** - * Delete all expired events from the delay queue + * Delete all satisfied events from the delay queue and the watcher lists */ private def purgeSatisfied(): Int = { var purged = 0 + + // purge the delayed queue val iter = delayed.iterator() while(iter.hasNext) { val curr = iter.next() @@ -278,6 +309,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge purged += 1 } } + purged } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index 5f8f6bc..67196f3 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -17,15 +17,17 @@ package kafka.tools +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} + +import kafka.consumer._ + import java.util.Properties import java.util.Arrays -import kafka.consumer._ -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} object TestEndToEndLatency { def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages") + if (args.length != 6) { + System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") System.exit(1) } @@ -33,31 +35,38 @@ object TestEndToEndLatency { val zkConnect = args(1) val topic = args(2) val numMessages = args(3).toInt + val consumerFetchMaxWait = args(4).toInt + val producerAcks = args(5).toInt val consumerProps = new Properties() consumerProps.put("group.id", topic) consumerProps.put("auto.commit.enable", "false") consumerProps.put("auto.offset.reset", "largest") consumerProps.put("zookeeper.connect", zkConnect) - consumerProps.put("fetch.wait.max.ms", "1") + consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString) consumerProps.put("socket.timeout.ms", 1201000.toString) val config = new ConsumerConfig(consumerProps) val connector = Consumer.create(config) - var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head + val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head val iter = stream.iterator val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) val producer = new KafkaProducer(producerProps) + // make sure the consumer fetcher has started before sending data since otherwise + // the consumption from the tail will skip the first message and hence be blocked + Thread.sleep(5000) + val message = "hello there beautiful".getBytes var totalTime = 0.0 val latencies = new Array[Long](numMessages) for (i <- 0 until numMessages) { - var begin = System.nanoTime + val begin = System.nanoTime producer.send(new ProducerRecord(topic, message)) val received = iter.next val elapsed = System.nanoTime - begin http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 8fcd068..e19b8b2 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -91,7 +91,7 @@ object StressTestLog { @volatile var offset = 0 override def work() { try { - log.read(offset, 1024, Some(offset+1)) match { + log.read(offset, 1024, Some(offset+1)).messageSet match { case read: FileMessageSet if read.sizeInBytes > 0 => { val first = read.head require(first.offset == offset, "We should either read nothing or the message we asked for.") http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 9f04bd3..a5386a0 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -74,7 +74,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val replica = servers.head.replicaManager.getReplica(topic, 0).get assertTrue("HighWatermark should equal logEndOffset with just 1 replica", - replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark) + replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark)) val request = new FetchRequestBuilder() .clientId("test-client") @@ -248,13 +248,13 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with "Published messages should be in the log") val replicaId = servers.head.config.brokerId - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") // test if the consumer received the messages in the correct order when producer has enabled request pipelining http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 7d4c70c..59bd8a9 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -94,7 +94,7 @@ class LogManagerTest extends JUnit3Suite { assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes) try { log.read(0, 1024) @@ -137,7 +137,7 @@ class LogManagerTest extends JUnit3Suite { assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes) try { log.read(0, 1024) fail("Should get exception from fetching earlier.") http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 6b76037..7b97e6a 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -78,7 +78,7 @@ class LogSegmentTest extends JUnit3Suite { val seg = createSegment(40) val ms = messages(50, "hello", "there", "little", "bee") seg.append(50, ms) - val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None) + val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet assertEquals(ms.toList, read.toList) } @@ -94,7 +94,7 @@ class LogSegmentTest extends JUnit3Suite { seg.append(baseOffset, ms) def validate(offset: Long) = assertEquals(ms.filter(_.offset == offset).toList, - seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList) + seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList) validate(50) validate(51) validate(52) @@ -109,7 +109,7 @@ class LogSegmentTest extends JUnit3Suite { val ms = messages(50, "hello", "there") seg.append(50, ms) val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None) - assertNull("Read beyond the last offset in the segment should give null", null) + assertNull("Read beyond the last offset in the segment should give null", read) } /** @@ -124,7 +124,7 @@ class LogSegmentTest extends JUnit3Suite { val ms2 = messages(60, "alpha", "beta") seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) - assertEquals(ms2.toList, read.toList) + assertEquals(ms2.toList, read.messageSet.toList) } /** @@ -142,12 +142,12 @@ class LogSegmentTest extends JUnit3Suite { seg.append(offset+1, ms2) // check that we can read back both messages val read = seg.read(offset, None, 10000) - assertEquals(List(ms1.head, ms2.head), read.toList) + assertEquals(List(ms1.head, ms2.head), read.messageSet.toList) // now truncate off the last message seg.truncateTo(offset + 1) val read2 = seg.read(offset, None, 10000) - assertEquals(1, read2.size) - assertEquals(ms1.head, read2.head) + assertEquals(1, read2.messageSet.size) + assertEquals(ms1.head, read2.messageSet.head) offset += 1 } } @@ -204,7 +204,7 @@ class LogSegmentTest extends JUnit3Suite { TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) seg.recover(64*1024) for(i <- 0 until 100) - assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset) + assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1da1393..577d102 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -131,11 +131,11 @@ class LogTest extends JUnitSuite { for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) for(i <- 0 until messages.length) { - val read = log.read(i, 100, Some(i+1)).head + val read = log.read(i, 100, Some(i+1)).messageSet.head assertEquals("Offset read should match order appended.", i, read.offset) assertEquals("Message should match appended.", messages(i), read.message) } - assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size) + assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size) } /** @@ -153,7 +153,7 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) for(i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) - val read = log.read(i, 100, None).head + val read = log.read(i, 100, None).messageSet.head assertEquals("Offset read should match message id.", messageIds(idx), read.offset) assertEquals("Message should match appended.", messages(idx), read.message) } @@ -176,7 +176,7 @@ class LogTest extends JUnitSuite { // now manually truncate off all but one message from the first segment to create a gap in the messages log.logSegments.head.truncateTo(1) - assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset) + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset) } /** @@ -188,7 +188,7 @@ class LogTest extends JUnitSuite { def testReadOutOfRange() { createEmptyLogs(logDir, 1024) val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time) - assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) + assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes) try { log.read(0, 1024) fail("Expected exception on invalid read.") @@ -219,12 +219,12 @@ class LogTest extends JUnitSuite { /* do successive reads to ensure all our messages are there */ var offset = 0L for(i <- 0 until numMessages) { - val messages = log.read(offset, 1024*1024) + val messages = log.read(offset, 1024*1024).messageSet assertEquals("Offsets not equal", offset, messages.head.offset) assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message) offset = messages.head.offset + 1 } - val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) + val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet assertEquals("Should be no more messages", 0, lastRead.size) // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure @@ -245,7 +245,7 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message) /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) @@ -363,7 +363,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) - assertEquals(i, log.read(i, 100, None).head.offset) + assertEquals(i, log.read(i, 100, None).messageSet.head.offset) log.close() } @@ -575,15 +575,15 @@ class LogTest extends JUnitSuite { @Test def testAppendMessageWithNullPayload() { - var log = new Log(logDir, + val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, time.scheduler, time) log.append(new ByteBufferMessageSet(new Message(bytes = null))) - val ms = log.read(0, 4096, None) - assertEquals(0, ms.head.offset) - assertTrue("Message payload should be null.", ms.head.message.isNull) + val messageSet = log.read(0, 4096, None).messageSet + assertEquals(0, messageSet.head.offset) + assertTrue("Message payload should be null.", messageSet.head.message.isNull) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index 6db245c..dd8847f 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { def createMessageSet(messages: Seq[Message]): MessageSet @Test - def testWrittenEqualsRead { + def testWrittenEqualsRead() { val messageSet = createMessageSet(messages) checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) } http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index e532c28..03a424d 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -58,7 +58,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.checkpointHighWatermarks() var fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(0L, fooPartition0Hw) - val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1) + val partition0 = replicaManager.getOrCreatePartition(topic, 0) // create leader and follower replicas val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig()) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) @@ -67,18 +67,12 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { partition0.addReplicaIfNotExists(followerReplicaPartition0) replicaManager.checkpointHighWatermarks() fooPartition0Hw = hwmFor(replicaManager, topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) - try { - followerReplicaPartition0.highWatermark - fail("Should fail with KafkaException") - }catch { - case e: KafkaException => // this is ok - } - // set the highwatermark for local replica - partition0.getReplica().get.highWatermark = 5L + assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) + // set the high watermark for local replica + partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L) replicaManager.checkpointHighWatermarks() fooPartition0Hw = hwmFor(replicaManager, topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) + assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) EasyMock.verify(zkClient) } @@ -97,7 +91,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.checkpointHighWatermarks() var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(0L, topic1Partition0Hw) - val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1) + val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0) // create leader log val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig()) // create a local replica for topic1 @@ -105,15 +99,15 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) replicaManager.checkpointHighWatermarks() topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) - assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw) - // set the highwatermark for local replica - topic1Partition0.getReplica().get.highWatermark = 5L + assertEquals(leaderReplicaTopic1Partition0.highWatermark.messageOffset, topic1Partition0Hw) + // set the high watermark for local replica + topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L) replicaManager.checkpointHighWatermarks() topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) - assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark) + assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset) assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark - val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) + val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0) // create leader log val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig()) // create a local replica for topic2 @@ -121,13 +115,13 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) replicaManager.checkpointHighWatermarks() var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) - assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw) + assertEquals(leaderReplicaTopic2Partition0.highWatermark.messageOffset, topic2Partition0Hw) // set the highwatermark for local replica - topic2Partition0.getReplica().get.highWatermark = 15L - assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark) + topic2Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(15L) + assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark.messageOffset) // change the highwatermark for topic1 - topic1Partition0.getReplica().get.highWatermark = 10L - assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark) + topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(10L) + assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark.messageOffset) replicaManager.checkpointHighWatermarks() // verify checkpointed hw for topic 2 topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 2cd3a3f..cd302aa 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -46,7 +46,7 @@ class IsrExpirationTest extends JUnit3Suite { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // let the follower catch up to 10 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10) + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(10L)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -69,7 +69,7 @@ class IsrExpirationTest extends JUnit3Suite { assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get // set remote replicas leo to something low, like 4 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L) + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(4L)) // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than // replicaMaxLagBytes, the follower is out of sync. @@ -83,7 +83,7 @@ class IsrExpirationTest extends JUnit3Suite { localLog: Log): Partition = { val leaderId=config.brokerId val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false)) - val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1) + val partition = replicaManager.getOrCreatePartition(topic, partitionId) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica @@ -97,7 +97,7 @@ class IsrExpirationTest extends JUnit3Suite { private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = { val log1 = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls) + EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls) EasyMock.replay(log1) log1 http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 0ec120a..d5d351c 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -85,7 +85,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // give some time for the follower 1 to record leader HW TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages, "Failed to update high watermark for follower after timeout") servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) @@ -134,7 +134,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // give some time for follower 1 to record leader HW of 60 TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) @@ -147,7 +147,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val hw = 20L // give some time for follower 1 to record leader HW of 600 TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) @@ -165,7 +165,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // kill the server hosting the preferred replica server1.shutdown() @@ -191,7 +191,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => - server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9abf219..a9c4ddc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -39,7 +39,7 @@ class ReplicaManagerTest extends JUnit3Suite { val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition(topic, 1, 1) + val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() } @@ -53,7 +53,7 @@ class ReplicaManagerTest extends JUnit3Suite { val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition(topic, 1, 1) + val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() } http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 4f61f84..168712d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -46,17 +46,17 @@ class RequestPurgatoryTest extends JUnit3Suite { def testRequestSatisfaction() { val r1 = new DelayedRequest(Array("test1"), null, 100000L) val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r1) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r2) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1)) - assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size) + assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) + assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2)) - assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) + assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) } @Test @@ -65,8 +65,8 @@ class RequestPurgatoryTest extends JUnit3Suite { val r1 = new DelayedRequest(Array("test1"), null, expiration) val r2 = new DelayedRequest(Array("test1"), null, 200000L) val start = System.currentTimeMillis - purgatory.watch(r1) - purgatory.watch(r2) + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) purgatory.awaitExpiration(r1) val elapsed = System.currentTimeMillis - start assertTrue("r1 expired", purgatory.expired.contains(r1)) @@ -74,7 +74,7 @@ class RequestPurgatoryTest extends JUnit3Suite { assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] { + class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { val satisfied = mutable.Set[DelayedRequest]() val expired = mutable.Set[DelayedRequest]() def awaitExpiration(delayed: DelayedRequest) = { @@ -82,7 +82,7 @@ class RequestPurgatoryTest extends JUnit3Suite { delayed.wait() } } - def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed) + def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) def expire(delayed: DelayedRequest) { expired += delayed delayed synchronized { http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index b1c4ce9..09ed8f5 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -16,17 +16,19 @@ */ package kafka.server +import kafka.api._ import kafka.cluster.{Partition, Replica} +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message} import kafka.network.RequestChannel import kafka.utils.{ZkUtils, Time, TestUtils, MockTime} + +import scala.Some + import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite -import kafka.api._ -import scala.Some -import kafka.common.TopicAndPartition class SimpleFetchTest extends JUnit3Suite { @@ -45,13 +47,13 @@ class SimpleFetchTest extends JUnit3Suite { * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync * but is still in ISR (hasn't yet expired from ISR). * - * When a normal consumer fetches data, it only should only see data upto the HW of the leader, + * When a normal consumer fetches data, it should only see data up to the HW of the leader, * in this case up an offset of "5". */ def testNonReplicaSeesHwWhenFetching() { /* setup */ val time = new MockTime - val leo = 20 + val leo = 20L val hw = 5 val fetchSize = 100 val messages = new Message("test-message".getBytes()) @@ -64,7 +66,11 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() EasyMock.expect(log) - EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(0L, 0L, leo.toInt), + new ByteBufferMessageSet(messages) + )).anyTimes() EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) @@ -76,14 +82,26 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(0, fetchSize, Some(hw)) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() EasyMock.replay(replicaManager) val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L + partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(0, fetchSize, Some(hw)) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() + EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) @@ -138,7 +156,11 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(followerLEO, 0L, followerLEO), + new ByteBufferMessageSet(messages) + )).anyTimes() EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) @@ -150,16 +172,28 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() EasyMock.replay(replicaManager) val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long] + partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO)) + EasyMock.expect(replicaManager.updateReplicaLEOAndPartitionHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO))) EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() + EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes() EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) @@ -195,7 +229,7 @@ class SimpleFetchTest extends JUnit3Suite { private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = { - val partition = new Partition(topic, partitionId, 2, time, replicaManager) + val partition = new Partition(topic, partitionId, time, replicaManager) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica @@ -204,7 +238,7 @@ class SimpleFetchTest extends JUnit3Suite { partition.inSyncReplicas = allReplicas.toSet // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) - leaderReplica.highWatermark = leaderHW + leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW) partition }
