http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9e4c149..6dfe97f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -114,9 +114,8 @@ class ReplicaManager(val config: KafkaConfig, /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId - private val allPartitions = new Pool[(String, Int), Partition](valueFactory = Some { case (t, p) => - new Partition(t, p, time, this) - }) + private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp => + new Partition(tp.topic, tp.partition, time, this))) private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) @@ -124,14 +123,14 @@ class ReplicaManager(val config: KafkaConfig, private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger - private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]() + private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]() private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce]( - purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests) + purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests) val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch]( - purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests) val leaderCount = newGauge( "LeaderCount", @@ -165,9 +164,9 @@ class ReplicaManager(val config: KafkaConfig, scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS) } - def recordIsrChange(topicAndPartition: TopicAndPartition) { + def recordIsrChange(topicPartition: TopicPartition) { isrChangeSet synchronized { - isrChangeSet += topicAndPartition + isrChangeSet += topicPartition lastIsrChangeMs.set(System.currentTimeMillis()) } } @@ -224,29 +223,22 @@ class ReplicaManager(val config: KafkaConfig, def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Short = { stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition") val errorCode = Errors.NONE.code - val topic = topicPartition.topic - val partitionId = topicPartition.partition - getPartition(topic, partitionId) match { + getPartition(topicPartition) match { case Some(_) => if (deletePartition) { - val removedPartition = allPartitions.remove((topic, partitionId)) + val removedPartition = allPartitions.remove(topicPartition) if (removedPartition != null) { removedPartition.delete() // this will delete the local log - val topicHasPartitions = allPartitions.keys.exists { case (t, _) => topic == t } + val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic) if (!topicHasPartitions) - BrokerTopicStats.removeMetrics(topic) + BrokerTopicStats.removeMetrics(topicPartition.topic) } } case None => // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. - if (deletePartition) { - val topicAndPartition = TopicAndPartition(topic, partitionId) - - if(logManager.getLog(topicAndPartition).isDefined) { - logManager.asyncDelete(topicAndPartition) - } - } + if (deletePartition && logManager.getLog(topicPartition).isDefined) + logManager.asyncDelete(topicPartition) stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker") } stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop replica (delete=$deletePartition) for partition $topicPartition") @@ -274,44 +266,34 @@ class ReplicaManager(val config: KafkaConfig, } } - def getOrCreatePartition(topic: String, partitionId: Int): Partition = { - allPartitions.getAndMaybePut((topic, partitionId)) - } + def getOrCreatePartition(topicPartition: TopicPartition): Partition = + allPartitions.getAndMaybePut(topicPartition) - def getPartition(topic: String, partitionId: Int): Option[Partition] = { - Option(allPartitions.get((topic, partitionId))) - } + def getPartition(topicPartition: TopicPartition): Option[Partition] = + Option(allPartitions.get(topicPartition)) - def getReplicaOrException(topic: String, partition: Int): Replica = { - val replicaOpt = getReplica(topic, partition) - if(replicaOpt.isDefined) - replicaOpt.get - else - throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) + def getReplicaOrException(topicPartition: TopicPartition): Replica = { + getReplica(topicPartition).getOrElse { + throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition") + } } - def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { - val partitionOpt = getPartition(topic, partitionId) + def getLeaderReplicaIfLocal(topicPartition: TopicPartition): Replica = { + val partitionOpt = getPartition(topicPartition) partitionOpt match { case None => - throw new UnknownTopicOrPartitionException("Partition [%s,%d] doesn't exist on %d".format(topic, partitionId, config.brokerId)) + throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist on $localBrokerId") case Some(partition) => partition.leaderReplicaIfLocal match { case Some(leaderReplica) => leaderReplica case None => - throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d" - .format(topic, partitionId, config.brokerId)) + throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId") } } } - def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = { - val partitionOpt = getPartition(topic, partitionId) - partitionOpt match { - case None => None - case Some(partition) => partition.getReplica(replicaId) - } - } + def getReplica(topicPartition: TopicPartition, replicaId: Int = localBrokerId): Option[Replica] = + getPartition(topicPartition).flatMap(_.getReplica(replicaId)) /** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; @@ -356,8 +338,8 @@ class ReplicaManager(val config: KafkaConfig, } else { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all - val responseStatus = entriesPerPartition.map { case (topicAndPartition, _) => - topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code, + val responseStatus = entriesPerPartition.map { case (topicPartition, _) => + topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP) } responseCallback(responseStatus) @@ -396,10 +378,10 @@ class ReplicaManager(val config: KafkaConfig, if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { (topicPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, - Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic))))) + Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")))) } else { try { - val partitionOpt = getPartition(topicPartition.topic, topicPartition.partition) + val partitionOpt = getPartition(topicPartition) val info = partitionOpt match { case Some(partition) => partition.appendRecordsToLeader(records, requiredAcks) @@ -459,7 +441,7 @@ class ReplicaManager(val config: KafkaConfig, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota = UnboundedQuota, - responseCallback: Seq[(TopicAndPartition, FetchPartitionData)] => Unit) { + responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) @@ -496,11 +478,11 @@ class ReplicaManager(val config: KafkaConfig, responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results - val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => + val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) => val fetchInfo = fetchInfos.collectFirst { - case (tp, v) if TopicAndPartition(tp.topic, tp.partition) == topicAndPartition => v - }.getOrElse(sys.error(s"Partition $topicAndPartition not found in fetchInfos")) - (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo)) + case (tp, v) if tp == topicPartition => v + }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos")) + (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo)) } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus) @@ -525,15 +507,13 @@ class ReplicaManager(val config: KafkaConfig, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], - quota: ReplicaQuota): Seq[(TopicAndPartition, LogReadResult)] = { + quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { - val topic = tp.topic - val partition = tp.partition val offset = fetchInfo.offset val partitionFetchSize = fetchInfo.maxBytes - BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark() + BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark() try { @@ -543,9 +523,9 @@ class ReplicaManager(val config: KafkaConfig, // decide whether to only fetch from leader val localReplica = if (fetchOnlyFromLeader) - getLeaderReplicaIfLocal(topic, partition) + getLeaderReplicaIfLocal(tp) else - getReplicaOrException(topic, partition) + getReplicaOrException(tp) // decide whether to only fetch committed data (i.e. messages below high watermark) val maxOffsetOpt = if (readOnlyCommitted) @@ -568,7 +548,7 @@ class ReplicaManager(val config: KafkaConfig, val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage) // If the partition is being throttled, simply return an empty set. - if (shouldLeaderThrottle(quota, TopicAndPartition(tp.topic, tp.partition), replicaId)) + if (shouldLeaderThrottle(quota, tp, replicaId)) FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY) // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don't need to report a `RecordTooLargeException` @@ -594,7 +574,7 @@ class ReplicaManager(val config: KafkaConfig, LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, partitionFetchSize, false, Some(e)) case e: Throwable => - BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() error(s"Error processing fetch operation on partition $tp, offset $offset", e) LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, @@ -603,7 +583,7 @@ class ReplicaManager(val config: KafkaConfig, } var limitBytes = fetchMaxBytes - val result = new mutable.ArrayBuffer[(TopicAndPartition, LogReadResult)] + val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)] var minOneMessage = !hardMaxBytesLimit readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) @@ -612,7 +592,7 @@ class ReplicaManager(val config: KafkaConfig, if (messageSetSize > 0) minOneMessage = false limitBytes = math.max(0, limitBytes - messageSetSize) - result += (TopicAndPartition(tp.topic, tp.partition) -> readResult) + result += (tp -> readResult) } result } @@ -621,15 +601,15 @@ class ReplicaManager(val config: KafkaConfig, * To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list, * the quota is exceeded and the replica is not in sync. */ - def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition, replicaId: Int): Boolean = { - val isReplicaInSync = getPartition(topicPartition.topic, topicPartition.partition).flatMap { partition => + def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = { + val isReplicaInSync = getPartition(topicPartition).flatMap { partition => partition.getReplica(replicaId).map(partition.inSyncReplicas.contains) }.getOrElse(false) quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync } - def getMagicAndTimestampType(topicAndPartition: TopicAndPartition): Option[(Byte, TimestampType)] = - getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { replica => + def getMagicAndTimestampType(topicPartition: TopicPartition): Option[(Byte, TimestampType)] = + getReplica(topicPartition).flatMap { replica => replica.log.map(log => (log.config.messageFormatVersion.messageFormatVersion, log.config.messageTimestampType)) } @@ -671,12 +651,12 @@ class ReplicaManager(val config: KafkaConfig, // First check partition's leader epoch val partitionState = new mutable.HashMap[Partition, PartitionState]() leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => - val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition) + val partition = getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch() // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if (partitionLeaderEpoch < stateInfo.leaderEpoch) { - if(stateInfo.replicas.contains(config.brokerId)) + if(stateInfo.replicas.contains(localBrokerId)) partitionState.put(partition, stateInfo) else { stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + @@ -696,7 +676,7 @@ class ReplicaManager(val config: KafkaConfig, } val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => - stateInfo.leader == config.brokerId + stateInfo.leader == localBrokerId } val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys @@ -741,19 +721,20 @@ class ReplicaManager(val config: KafkaConfig, partitionState: Map[Partition, PartitionState], correlationId: Int, responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = { - partitionState.foreach(state => + partitionState.keys.foreach { partition => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) + .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) + } for (partition <- partitionState.keys) - responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code) + responseMap.put(partition.topicPartition, Errors.NONE.code) val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set() try { // First stop fetchers for all the partitions - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(p => new TopicPartition(p.topic, p.partitionId))) + replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition)) // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) @@ -761,29 +742,28 @@ class ReplicaManager(val config: KafkaConfig, else stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " + "controller %d epoch %d for partition %s since it is already the leader for the partition.") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(partition.topic, partition.partitionId))); + .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } partitionsToMakeLeaders.foreach { partition => stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) + .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) } } catch { case e: Throwable => - partitionState.foreach { state => + partitionState.keys.foreach { partition => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + - " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, - TopicAndPartition(state._1.topic, state._1.partitionId)) + " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition) stateChangeLogger.error(errorMsg, e) } // Re-throw the exception for it to be caught in KafkaApis throw e } - partitionState.foreach { state => + partitionState.keys.foreach { partition => stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + "for the become-leader transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } partitionsToMakeLeaders @@ -813,14 +793,14 @@ class ReplicaManager(val config: KafkaConfig, correlationId: Int, responseMap: mutable.Map[TopicPartition, Short], metadataCache: MetadataCache) : Set[Partition] = { - partitionState.foreach { state => + partitionState.keys.foreach { partition => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } for (partition <- partitionState.keys) - responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code) + responseMap.put(partition.topicPartition, Errors.NONE.code) val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() @@ -852,14 +832,16 @@ class ReplicaManager(val config: KafkaConfig, } } - replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(p => new TopicPartition(p.topic, p.partitionId))) + replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) + .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) } - logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) + logManager.truncateTo(partitionsToMakeFollower.map { partition => + (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset) + }.toMap) partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId) tryCompleteDelayedProduce(topicPartitionOperationKey) @@ -882,7 +864,7 @@ class ReplicaManager(val config: KafkaConfig, else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicPartition(partition.topic, partition.partitionId) -> BrokerAndInitialOffset( + partition.topicPartition -> BrokerAndInitialOffset( metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol), partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) @@ -902,10 +884,10 @@ class ReplicaManager(val config: KafkaConfig, throw e } - partitionState.foreach { state => + partitionState.keys.foreach { partition => stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + "for the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } partitionsToMakeFollower @@ -916,18 +898,18 @@ class ReplicaManager(val config: KafkaConfig, allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) } - private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicAndPartition, LogReadResult)]) { + private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) { debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults)) - readResults.foreach { case (topicAndPartition, readResult) => - getPartition(topicAndPartition.topic, topicAndPartition.partition) match { + readResults.foreach { case (topicPartition, readResult) => + getPartition(topicPartition) match { case Some(partition) => partition.updateReplicaLogReadResult(replicaId, readResult) // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved - tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicAndPartition)) + tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicPartition)) case None => - warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition)) + warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicPartition)) } } } @@ -938,10 +920,10 @@ class ReplicaManager(val config: KafkaConfig, // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks() { - val replicas = allPartitions.values.flatMap(_.getReplica(config.brokerId)) + val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId)) val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for ((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => new TopicAndPartition(r) -> r.highWatermark.messageOffset).toMap + val hwms = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap try { highWatermarkCheckpoints(dir).write(hwms) } catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala index d4c82d8..4a87dfb 100644 --- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala @@ -17,13 +17,15 @@ package kafka.server import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import kafka.common.TopicAndPartition + import kafka.server.Constants._ import kafka.server.ReplicationQuotaManagerConfig._ import kafka.utils.CoreUtils._ import kafka.utils.Logging import org.apache.kafka.common.metrics._ import java.util.concurrent.locks.ReentrantReadWriteLock + +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.stats.SimpleRate import org.apache.kafka.common.utils.Time @@ -49,7 +51,7 @@ object ReplicationQuotaManagerConfig { } trait ReplicaQuota { - def isThrottled(topicAndPartition: TopicAndPartition): Boolean + def isThrottled(topicPartition: TopicPartition): Boolean def isQuotaExceeded(): Boolean } @@ -113,7 +115,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, * @param topicPartition the partition to check * @return */ - override def isThrottled(topicPartition: TopicAndPartition): Boolean = { + override def isThrottled(topicPartition: TopicPartition): Boolean = { val partitions = throttledPartitions.get(topicPartition.topic) if (partitions != null) (partitions eq AllReplicas) || partitions.contains(topicPartition.partition) http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index f6d5153..75d6e8a 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -295,7 +295,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa messageInfoFromFirstReplicaOpt match { case None => messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, logEntry.offset,logEntry.nextOffset, logEntry.record.checksum)) + MessageInfo(replicaId, logEntry.offset, logEntry.nextOffset, logEntry.record.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != logEntry.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/utils/Pool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 1d96238..0cf6474 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -23,7 +23,7 @@ import collection.mutable import collection.JavaConverters._ import kafka.common.KafkaException -class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { +class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V] private val createLock = new Object http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/utils/ReplicationUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 369bb23..29e5d10 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -21,6 +21,7 @@ import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch} import kafka.utils.ZkUtils._ +import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat import scala.collection._ @@ -39,7 +40,7 @@ object ReplicationUtils extends Logging { updatePersistentPath } - def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicAndPartition]): Unit = { + def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicPartition]): Unit = { val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath( ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix, generateIsrChangeJson(isrChangeSet)) @@ -89,7 +90,7 @@ object ReplicationUtils extends Logging { Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))} } - private def generateIsrChangeJson(isrChanges: Set[TopicAndPartition]): String = { + private def generateIsrChangeJson(isrChanges: Set[TopicPartition]): String = { val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)).toArray Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitions)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 3b81e25..10d49f5 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -122,7 +122,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { // wait until all the followers have synced the last HW with leader TestUtils.waitUntilTrue(() => servers.forall(server => - server.replicaManager.getReplica(tp.topic(), tp.partition()).get.highWatermark.messageOffset == numRecords + server.replicaManager.getReplica(tp).get.highWatermark.messageOffset == numRecords ), "Failed to update high watermark for followers after timeout") val scheduler = new BounceBrokerScheduler(numIters) http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala index ac310a9..bb93cb4 100644 --- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala @@ -12,7 +12,7 @@ */ package integration.kafka.api -import kafka.common.{Topic, TopicAndPartition} +import kafka.common.Topic import kafka.integration.KafkaServerTestHarness import kafka.log.Log import kafka.message.GZIPCompressionCodec @@ -48,7 +48,7 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { val logManager = servers.head.getLogManager def getGroupMetadataLogOpt: Option[Log] = - logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0)) + logManager.getLog(new TopicPartition(Topic.GroupMetadataTopicName, 0)) TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowEntries.asScala.nonEmpty)), "Commit message not appended in time") http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 693c758..f9eb61c 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -32,10 +32,14 @@ import kafka.common.TopicAndPartition import kafka.server.{ConfigType, KafkaConfig, KafkaServer} import java.io.File import java.util + import kafka.utils.TestUtils._ import kafka.admin.AdminUtils._ + import scala.collection.{Map, immutable} import kafka.utils.CoreUtils._ +import org.apache.kafka.common.TopicPartition + import scala.collection.JavaConverters._ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @@ -406,14 +410,15 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { assertEquals(expected.split(",").toSeq, actual.asScala) } TestUtils.retry(10000) { - for(part <- 0 until partitions) { - val log = server.logManager.getLog(TopicAndPartition(topic, part)) + for (part <- 0 until partitions) { + val tp = new TopicPartition(topic, part) + val log = server.logManager.getLog(tp) assertTrue(log.isDefined) assertEquals(retentionMs, log.get.config.retentionMs) assertEquals(messageSize, log.get.config.maxMessageSize) checkList(log.get.config.LeaderReplicationThrottledReplicas, throttledLeaders) checkList(log.get.config.FollowerReplicationThrottledReplicas, throttledFollowers) - assertEquals(quotaManagerIsThrottled, server.quotaManagers.leader.isThrottled(TopicAndPartition(topic, part))) + assertEquals(quotaManagerIsThrottled, server.quotaManagers.leader.isThrottled(tp)) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index b98822d..ba270ad 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -26,14 +26,15 @@ import org.junit.Test import java.util.Properties import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.UnknownTopicOrPartitionException class DeleteTopicTest extends ZooKeeperTestHarness { @Test def testDeleteTopicWithAllAliveReplicas() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic + val topicPartition = new TopicPartition("test", 0) + val topic = topicPartition.topic val servers = createTestTopicAndCluster(topic) // start topic deletion AdminUtils.deleteTopic(zkUtils, topic) @@ -43,8 +44,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness { @Test def testResumeDeleteTopicWithRecoveredFollower() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic + val topicPartition = new TopicPartition("test", 0) + val topic = topicPartition.topic val servers = createTestTopicAndCluster(topic) // shut down one follower replica val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) @@ -56,7 +57,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // check if all replicas but the one that is shut down has deleted the log TestUtils.waitUntilTrue(() => servers.filter(s => s.config.brokerId != follower.config.brokerId) - .forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.") + .forall(_.getLogManager().getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.") // ensure topic deletion is halted TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)), "Admin path /admin/delete_topic/test path deleted even when a follower replica is down") @@ -68,8 +69,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness { @Test def testResumeDeleteTopicOnControllerFailover() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic + val topicPartition = new TopicPartition("test", 0) + val topic = topicPartition.topic val servers = createTestTopicAndCluster(topic) val controllerId = zkUtils.getController() val controller = servers.filter(s => s.config.brokerId == controllerId).head @@ -97,7 +98,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { def testPartitionReassignmentDuringDeleteTopic() { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) + val topicPartition = new TopicPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers @@ -106,7 +107,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), "Replicas for topic test not created.") val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) @@ -119,18 +120,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // reassign partition 0 val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0) val newReplicas = Seq(1, 2, 3) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(new TopicAndPartition(topicPartition) -> newReplicas)) assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, new TopicAndPartition(topicPartition), + Map(new TopicAndPartition(topicPartition) -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed }, "Partition reassignment shouldn't complete.") val controllerId = zkUtils.getController() val controller = servers.filter(s => s.config.brokerId == controllerId).head assertFalse("Partition reassignment should fail", - controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) + controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(new TopicAndPartition(topicPartition))) val assignedReplicas = zkUtils.getReplicasForPartition(topic, 0) assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) follower.startup() @@ -145,7 +146,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last - val newPartition = TopicAndPartition(topic, 1) + val newPartition = new TopicPartition(topic, 1) follower.shutdown() // add partitions to topic AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2", false) @@ -168,7 +169,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // start topic deletion AdminUtils.deleteTopic(zkUtils, topic) // add partitions to topic - val newPartition = TopicAndPartition(topic, 1) + val newPartition = new TopicPartition(topic, 1) AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2") TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) // verify that new partition doesn't exist on any broker either @@ -181,7 +182,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { def testRecreateTopicAfterDeletion() { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) + val topicPartition = new TopicPartition(topic, 0) val servers = createTestTopicAndCluster(topic) // start topic deletion AdminUtils.deleteTopic(zkUtils, topic) @@ -192,15 +193,15 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) // check if all replica logs are created - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), "Replicas for topic test not created.") servers.foreach(_.shutdown()) } @Test def testDeleteNonExistingTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic + val topicPartition = new TopicPartition("test", 0) + val topic = topicPartition.topic val servers = createTestTopicAndCluster(topic) // start topic deletion try { @@ -212,7 +213,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // verify delete topic path for test2 is removed from zookeeper TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers) // verify that topic test is untouched - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), "Replicas for topic test not created") // test the topic path exists assertTrue("Topic test mistakenly deleted", zkUtils.pathExists(getTopicPath(topic))) @@ -225,8 +226,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness { @Test def testDeleteTopicWithCleaner() { val topicName = "test" - val topicAndPartition = TopicAndPartition(topicName, 0) - val topic = topicAndPartition.topic + val topicPartition = new TopicPartition(topicName, 0) + val topic = topicPartition.topic val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) brokerConfigs.head.setProperty("delete.topic.enable", "true") @@ -240,13 +241,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // for simplicity, we are validating cleaner offsets on a single broker val server = servers.head - val log = server.logManager.getLog(topicAndPartition).get + val log = server.logManager.getLog(topicPartition).get // write to the topic to activate cleaner writeDups(numKeys = 100, numDups = 3,log) // wait for cleaner to clean - server.logManager.cleaner.awaitCleaned(topicName, 0, 0) + server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0) // delete topic AdminUtils.deleteTopic(zkUtils, "test") @@ -257,8 +258,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness { @Test def testDeleteTopicAlreadyMarkedAsDeleted() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic + val topicPartition = new TopicPartition("test", 0) + val topic = topicPartition.topic val servers = createTestTopicAndCluster(topic) try { @@ -286,13 +287,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness { private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) - val topicAndPartition = TopicAndPartition(topic, 0) + val topicPartition = new TopicPartition(topic, 0) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), "Replicas for topic test not created") servers } @@ -309,15 +310,15 @@ class DeleteTopicTest extends ZooKeeperTestHarness { @Test def testDisableDeleteTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic + val topicPartition = new TopicPartition("test", 0) + val topic = topicPartition.topic val servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false) // mark the topic for deletion AdminUtils.deleteTopic(zkUtils, "test") TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)), "Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic)) // verify that topic test is untouched - assertTrue(servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined)) + assertTrue(servers.forall(_.getLogManager().getLog(topicPartition).isDefined)) // test the topic path exists assertTrue("Topic path disappeared", zkUtils.pathExists(getTopicPath(topic))) // topic test should have a leader http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index 1c5a526..20e512f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -304,7 +304,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE.code, syncGroupErrorCode) EasyMock.reset(replicaManager) - EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None) EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() EasyMock.replay(replicaManager) @@ -1092,7 +1092,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = { val (responseFuture, responseCallback) = setupHeartbeatCallback - EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None) EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() EasyMock.replay(replicaManager) http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala index 62b7f42..629020e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -342,7 +342,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -394,7 +394,7 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)) - EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(replicaManager, partition) @@ -440,7 +440,7 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.LOG_APPEND_TIME)) - EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(replicaManager, partition) @@ -491,7 +491,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -567,7 +567,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 201fa87..761cac8 100755 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -18,19 +18,23 @@ package kafka.integration import java.nio.ByteBuffer + import org.junit.Assert._ -import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} -import kafka.server.{KafkaRequestHandler, KafkaConfig} +import kafka.api.{FetchRequest, FetchRequestBuilder, PartitionFetchInfo} +import kafka.server.{KafkaConfig, KafkaRequestHandler} import kafka.producer.{KeyedMessage, Producer} import org.apache.log4j.{Level, Logger} import kafka.zk.ZooKeeperTestHarness import org.junit.Test + import scala.collection._ -import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} -import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils} +import kafka.common.{ErrorMapping, OffsetOutOfRangeException, TopicAndPartition, UnknownTopicOrPartitionException} +import kafka.utils.{CoreUtils, StaticPartitioner, TestUtils} import kafka.serializer.StringEncoder import java.util.Properties +import org.apache.kafka.common.TopicPartition + /** * End to end tests of the primitive apis against a local server */ @@ -71,7 +75,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness { producer.send(new KeyedMessage[String, String](topic, "test-message")) - val replica = servers.head.replicaManager.getReplica(topic, 0).get + val replica = servers.head.replicaManager.getReplica(new TopicPartition(topic, 0)).get assertTrue("HighWatermark should equal logEndOffset with just 1 replica", replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark)) @@ -243,23 +247,23 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness { } // wait until the messages are published - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test1", 0)).get.logEndOffset == 2 }, "Published messages should be in the log") - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test2", 0)).get.logEndOffset == 2 }, "Published messages should be in the log") - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test3", 0)).get.logEndOffset == 2 }, "Published messages should be in the log") - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test4", 0)).get.logEndOffset == 2 }, "Published messages should be in the log") val replicaId = servers.head.config.brokerId - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark.messageOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test1", 0), replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark.messageOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test2", 0), replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark.messageOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test3", 0), replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark.messageOffset == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test4", 0), replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") // test if the consumer received the messages in the correct order when producer has enabled request pipelining http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 43c41f3..74a1828 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -21,9 +21,9 @@ import java.io.File import java.util.Properties import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0} -import kafka.common.TopicAndPartition import kafka.server.OffsetCheckpoint import kafka.utils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record} import org.apache.kafka.common.utils.Utils import org.junit.Assert._ @@ -50,7 +50,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { val logDir = TestUtils.tempDir() var counter = 0 var cleaner: LogCleaner = _ - val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2)) + val topics = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)) @Test def cleanerTest() { @@ -232,8 +232,9 @@ class LogCleanerIntegrationTest(compressionCodec: String) { private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) { // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than // LogConfig.MinCleanableDirtyRatioProp - cleaner.awaitCleaned(topic, partitionId, firstDirty) - val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition(topic, partitionId)) + val topicPartition = new TopicPartition(topic, partitionId) + cleaner.awaitCleaned(topicPartition, firstDirty) + val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(topicPartition) assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned", lastCleaned >= firstDirty) } @@ -315,7 +316,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { propertyOverrides: Properties = new Properties()): LogCleaner = { // create partitions and add them to the pool - val logs = new Pool[TopicAndPartition, Log]() + val logs = new Pool[TopicPartition, Log]() for(i <- 0 until parts) { val dir = new File(logDir, "log-" + i) dir.mkdirs() @@ -325,7 +326,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { recoveryPoint = 0L, scheduler = time.scheduler, time = time) - logs.put(TopicAndPartition("log", i), log) + logs.put(new TopicPartition("log", i), log) } new LogCleaner(CleanerConfig(numThreads = numThreads, ioBufferSize = maxMessageSize / 2, maxMessageSize = maxMessageSize, backOffMs = logCleanerBackOffMillis), http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala index 1231e98..c24cb68 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala @@ -20,8 +20,8 @@ package kafka.log import java.io.File import java.util.Properties -import kafka.common.TopicAndPartition import kafka.utils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.utils.Utils import org.junit.Assert._ @@ -49,7 +49,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging val logName = "log" val logDir = TestUtils.tempDir() var counter = 0 - val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2)) + val topics = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)) val compressionCodec = CompressionType.forName(compressionCodecName) @Test @@ -86,7 +86,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset // the first block should get cleaned - cleaner.awaitCleaned("log", 0, activeSegAtT0.baseOffset) + cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT0.baseOffset) // check the data is the same val read1 = readFromLog(log) @@ -94,7 +94,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize") - val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition("log", 0)) + val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0)) assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset) assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize", sizeUpToActiveSegmentAtT0 > compactedSize) @@ -137,7 +137,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging policyOverrides: Map[String, String] = Map()): LogCleaner = { // create partitions and add them to the pool - val logs = new Pool[TopicAndPartition, Log]() + val logs = new Pool[TopicPartition, Log]() for(i <- 0 until parts) { val dir = new File(logDir, "log-" + i) dir.mkdirs() @@ -154,7 +154,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging recoveryPoint = 0L, scheduler = time.scheduler, time = time) - logs.put(TopicAndPartition("log", i), log) + logs.put(new TopicPartition("log", i), log) } new LogCleaner(CleanerConfig(numThreads = numThreads, backOffMs = backOffMs), http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 5dfa268..b1d2b33 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -20,8 +20,8 @@ package kafka.log import java.io.File import java.util.Properties -import kafka.common._ import kafka.utils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{MemoryRecords, Record} import org.apache.kafka.common.utils.Utils import org.junit.Assert._ @@ -102,9 +102,9 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { while(log.numberOfSegments < 8) log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds)) - val topicAndPartition = TopicAndPartition("log", 0) - val lastClean = Map(topicAndPartition-> 0L) - val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds) + val topicPartition = new TopicPartition("log", 0) + val lastClean = Map(topicPartition -> 0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets._2) } @@ -133,9 +133,9 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { while (log.numberOfSegments < 8) log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1)) - val topicAndPartition = TopicAndPartition("log", 0) - val lastClean = Map(topicAndPartition-> 0L) - val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds) + val topicPartition = new TopicPartition("log", 0) + val lastClean = Map(topicPartition -> 0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2) } @@ -159,16 +159,16 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { time.sleep(compactionLag + 1) - val topicAndPartition = TopicAndPartition("log", 0) - val lastClean = Map(topicAndPartition-> 0L) - val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds) + val topicPartition = new TopicPartition("log", 0) + val lastClean = Map(topicPartition -> 0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2) } private def createCleanerManager(log: Log): LogCleanerManager = { - val logs = new Pool[TopicAndPartition, Log]() - logs.put(TopicAndPartition("log", 0), log) + val logs = new Pool[TopicPartition, Log]() + logs.put(new TopicPartition("log", 0), log) val cleanerManager = new LogCleanerManager(Array(logDir), logs) cleanerManager } http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index f43e92b..40691b9 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -24,6 +24,7 @@ import java.util.Properties import kafka.common._ import kafka.utils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import org.junit.Assert._ @@ -137,7 +138,7 @@ class LogCleanerTest extends JUnitSuite { while(log.numberOfSegments < 4) log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset)) val keys = keysInLog(log).toSet assertTrue("None of the keys we deleted should still exist.", (0 until leo.toInt by 2).forall(!keys.contains(_))) @@ -161,7 +162,7 @@ class LogCleanerTest extends JUnitSuite { val initialLogSize = log.size - val (endOffset, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset)) + val (endOffset, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset)) assertEquals(5, endOffset) assertEquals(5, stats.messagesRead) assertEquals(initialLogSize, stats.bytesRead) @@ -189,16 +190,16 @@ class LogCleanerTest extends JUnitSuite { log.roll() // clean the log with only one message removed - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset)) assertEquals(immutable.List(1,0,1,0), keysInLog(log)) assertEquals(immutable.List(1,2,3,4), offsetsInLog(log)) // continue to make progress, even though we can only clean one message at a time - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 3, log.activeSegment.baseOffset)) assertEquals(immutable.List(0,1,0), keysInLog(log)) assertEquals(immutable.List(2,3,4), offsetsInLog(log)) - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 4, log.activeSegment.baseOffset)) assertEquals(immutable.List(1,0), keysInLog(log)) assertEquals(immutable.List(3,4), offsetsInLog(log)) } @@ -235,7 +236,7 @@ class LogCleanerTest extends JUnitSuite { assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.", distinctValuesBySegment.reverse.tail.forall(_ > N)) - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, firstUncleanableOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset)) val distinctValuesBySegmentAfterClean = distinctValuesBySegment @@ -257,7 +258,7 @@ class LogCleanerTest extends JUnitSuite { for (_ <- 0 until 6) log.append(messageSet, assignOffsets = true) - val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset) + val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset) assertEquals("Total bytes of LogToClean should equal size of all segments excluding the active segment", logToClean.totalBytes, log.size - log.activeSegment.size) @@ -277,7 +278,7 @@ class LogCleanerTest extends JUnitSuite { // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable val segs = log.logSegments.toSeq - val logToClean = LogToClean(TopicAndPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset) + val logToClean = LogToClean(new TopicPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset) val expectedCleanSize = segs.take(2).map(_.size).sum val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum @@ -315,7 +316,7 @@ class LogCleanerTest extends JUnitSuite { log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt)) val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages - val (_, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset)) assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size) @@ -333,7 +334,7 @@ class LogCleanerTest extends JUnitSuite { def unkeyedMessageCountInLog(log: Log) = log.logSegments.map(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum - def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = { + def abortCheckDone(topicPartition: TopicPartition): Unit = { throw new LogCleaningAbortedException() } @@ -677,7 +678,7 @@ class LogCleanerTest extends JUnitSuite { log.roll() - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset)) for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowEntries.asScala; deepLogEntry <- shallowLogEntry.asScala) { assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic) @@ -720,20 +721,20 @@ class LogCleanerTest extends JUnitSuite { key = "0".getBytes, timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000)) log.roll() - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset)) // Append a tombstone with a small timestamp and roll out a new log segment. log.append(TestUtils.singletonRecords(value = null, key = "0".getBytes, timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000)) log.roll() - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset)) assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset()) // Append a message and roll out another log segment. log.append(TestUtils.singletonRecords(value = "1".getBytes, key = "1".getBytes, timestamp = time.milliseconds())) log.roll() - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset)) + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset)) assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset()) } @@ -772,9 +773,9 @@ class LogCleanerTest extends JUnitSuite { def makeLog(dir: File = dir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) - def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ } + def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ } - def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) = + def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) = new Cleaner(id = 0, offsetMap = new FakeOffsetMap(capacity), ioBufferSize = maxMessageSize, @@ -782,7 +783,7 @@ class LogCleanerTest extends JUnitSuite { dupBufferLoadFactor = 0.75, throttler = throttler, time = time, - checkDone = checkDone ) + checkDone = checkDone) def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { for((key, value) <- seq) http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 40e6228..ab577ce 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -23,6 +23,7 @@ import java.util.Properties import kafka.common._ import kafka.server.OffsetCheckpoint import kafka.utils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.apache.kafka.common.utils.Utils import org.junit.Assert._ @@ -64,7 +65,7 @@ class LogManagerTest { */ @Test def testCreateLog() { - val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) + val log = logManager.createLog(new TopicPartition(name, 0), logConfig) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.append(TestUtils.singletonRecords("test".getBytes())) @@ -75,7 +76,7 @@ class LogManagerTest { */ @Test def testGetNonExistentLog() { - val log = logManager.getLog(TopicAndPartition(name, 0)) + val log = logManager.getLog(new TopicPartition(name, 0)) assertEquals("No log should be found.", None, log) val logFile = new File(logDir, name + "-0") assertTrue(!logFile.exists) @@ -86,7 +87,7 @@ class LogManagerTest { */ @Test def testCleanupExpiredSegments() { - val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) + val log = logManager.createLog(new TopicPartition(name, 0), logConfig) var offset = 0L for(_ <- 0 until 200) { val set = TestUtils.singletonRecords("test".getBytes()) @@ -129,7 +130,7 @@ class LogManagerTest { logManager.startup // create a log - val log = logManager.createLog(TopicAndPartition(name, 0), config) + val log = logManager.createLog(new TopicPartition(name, 0), config) var offset = 0L // add a bunch of messages that should be larger than the retentionSize @@ -166,7 +167,7 @@ class LogManagerTest { def testDoesntCleanLogsWithCompactDeletePolicy() { val logProps = new Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete) - val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps)) + val log = logManager.createLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps)) var offset = 0L for (_ <- 0 until 200) { val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes()) @@ -195,7 +196,7 @@ class LogManagerTest { logManager = createLogManager() logManager.startup - val log = logManager.createLog(TopicAndPartition(name, 0), config) + val log = logManager.createLog(new TopicPartition(name, 0), config) val lastFlush = log.lastFlushTime for (_ <- 0 until 200) { val set = TestUtils.singletonRecords("test".getBytes()) @@ -219,7 +220,7 @@ class LogManagerTest { // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { - logManager.createLog(TopicAndPartition("test", partition), logConfig) + logManager.createLog(new TopicPartition("test", partition), logConfig) assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size) val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size) assertTrue("Load should balance evenly", counts.max <= counts.min + 1) @@ -244,7 +245,7 @@ class LogManagerTest { */ @Test def testCheckpointRecoveryPoints() { - verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), TopicAndPartition("test-b", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager) } /** @@ -257,7 +258,7 @@ class LogManagerTest { logManager = TestUtils.createLogManager( logDirs = Array(new File(logDir.getAbsolutePath + File.separator))) logManager.startup - verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager) } /** @@ -271,13 +272,13 @@ class LogManagerTest { logDir.deleteOnExit() logManager = createLogManager() logManager.startup - verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager) } - private def verifyCheckpointRecovery(topicAndPartitions: Seq[TopicAndPartition], + private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], logManager: LogManager) { - val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig)) + val logs = topicPartitions.map(this.logManager.createLog(_, logConfig)) logs.foreach(log => { for (_ <- 0 until 50) log.append(TestUtils.singletonRecords("test".getBytes())) @@ -288,7 +289,7 @@ class LogManagerTest { logManager.checkpointRecoveryPointOffsets() val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() - topicAndPartitions.zip(logs).foreach { + topicPartitions.zip(logs).foreach { case(tp, log) => { assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) } http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index ff596bd..08cdac5 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1093,9 +1093,9 @@ class LogTest extends JUnitSuite { val topic = "test_topic" val partition = "143" val dir = new File(logDir + topicPartitionName(topic, partition)) - val topicAndPartition = Log.parseTopicPartitionName(dir) - assertEquals(topic, topicAndPartition.asTuple._1) - assertEquals(partition.toInt, topicAndPartition.asTuple._2) + val topicPartition = Log.parseTopicPartitionName(dir) + assertEquals(topic, topicPartition.topic) + assertEquals(partition.toInt, topicPartition.partition) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index e0dfe16..625ff6c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -88,7 +88,7 @@ class AbstractFetcherThreadTest { class DummyFetchRequest(val offsets: collection.Map[TopicPartition, Long]) extends FetchRequest { override def isEmpty: Boolean = offsets.isEmpty - override def offset(topicAndPartition: TopicPartition): Long = offsets(topicAndPartition) + override def offset(topicPartition: TopicPartition): Long = offsets(topicPartition) } class TestPartitionData(records: MemoryRecords = MemoryRecords.EMPTY) extends PartitionData { @@ -110,11 +110,11 @@ class AbstractFetcherThreadTest { type REQ = DummyFetchRequest type PD = PartitionData - override def processPartitionData(topicAndPartition: TopicPartition, + override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData): Unit = {} - override def handleOffsetOutOfRange(topicAndPartition: TopicPartition): Long = 0L + override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = 0L override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit = {} @@ -160,14 +160,14 @@ class AbstractFetcherThreadTest { new TestPartitionData(MemoryRecords.withRecords(1L, Record.create("hello".getBytes()))) ) - override def processPartitionData(topicAndPartition: TopicPartition, + override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData): Unit = { // Throw exception if the fetchOffset does not match the fetcherThread partition state if (fetchOffset != logEndOffset) throw new RuntimeException( "Offset mismatch for partition %s: fetched offset = %d, log end offset = %d." - .format(topicAndPartition, fetchOffset, logEndOffset)) + .format(topicPartition, fetchOffset, logEndOffset)) // Now check message's crc val records = partitionData.toRecords