http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9e4c149..6dfe97f 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,9 +114,8 @@ class ReplicaManager(val config: KafkaConfig,
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch 
- 1
   private val localBrokerId = config.brokerId
-  private val allPartitions = new Pool[(String, Int), Partition](valueFactory 
= Some { case (t, p) =>
-    new Partition(t, p, time, this)
-  })
+  private val allPartitions = new Pool[TopicPartition, Partition](valueFactory 
= Some(tp =>
+    new Partition(tp.topic, tp.partition, time, this)))
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, 
time, threadNamePrefix, quotaManager)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@@ -124,14 +123,14 @@ class ReplicaManager(val config: KafkaConfig,
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
-  private val isrChangeSet: mutable.Set[TopicAndPartition] = new 
mutable.HashSet[TopicAndPartition]()
+  private val isrChangeSet: mutable.Set[TopicPartition] = new 
mutable.HashSet[TopicPartition]()
   private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
   private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
 
   val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
-    purgatoryName = "Produce", config.brokerId, 
config.producerPurgatoryPurgeIntervalRequests)
+    purgatoryName = "Produce", localBrokerId, 
config.producerPurgatoryPurgeIntervalRequests)
   val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
-    purgatoryName = "Fetch", config.brokerId, 
config.fetchPurgatoryPurgeIntervalRequests)
+    purgatoryName = "Fetch", localBrokerId, 
config.fetchPurgatoryPurgeIntervalRequests)
 
   val leaderCount = newGauge(
     "LeaderCount",
@@ -165,9 +164,9 @@ class ReplicaManager(val config: KafkaConfig,
       scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, 
period = config.replicaHighWatermarkCheckpointIntervalMs, unit = 
TimeUnit.MILLISECONDS)
   }
 
-  def recordIsrChange(topicAndPartition: TopicAndPartition) {
+  def recordIsrChange(topicPartition: TopicPartition) {
     isrChangeSet synchronized {
-      isrChangeSet += topicAndPartition
+      isrChangeSet += topicPartition
       lastIsrChangeMs.set(System.currentTimeMillis())
     }
   }
@@ -224,29 +223,22 @@ class ReplicaManager(val config: KafkaConfig,
   def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): 
Short  = {
     stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica 
(delete=$deletePartition) for partition $topicPartition")
     val errorCode = Errors.NONE.code
-    val topic = topicPartition.topic
-    val partitionId = topicPartition.partition
-    getPartition(topic, partitionId) match {
+    getPartition(topicPartition) match {
       case Some(_) =>
         if (deletePartition) {
-          val removedPartition = allPartitions.remove((topic, partitionId))
+          val removedPartition = allPartitions.remove(topicPartition)
           if (removedPartition != null) {
             removedPartition.delete() // this will delete the local log
-            val topicHasPartitions = allPartitions.keys.exists { case (t, _) 
=> topic == t }
+            val topicHasPartitions = allPartitions.keys.exists(tp => 
topicPartition.topic == tp.topic)
             if (!topicHasPartitions)
-              BrokerTopicStats.removeMetrics(topic)
+              BrokerTopicStats.removeMetrics(topicPartition.topic)
           }
         }
       case None =>
         // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
         // This could happen when topic is being deleted while broker is down 
and recovers.
-        if (deletePartition) {
-          val topicAndPartition = TopicAndPartition(topic, partitionId)
-
-          if(logManager.getLog(topicAndPartition).isDefined) {
-              logManager.asyncDelete(topicAndPartition)
-          }
-        }
+        if (deletePartition && logManager.getLog(topicPartition).isDefined)
+          logManager.asyncDelete(topicPartition)
         stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica 
(delete=$deletePartition) for partition $topicPartition as replica doesn't 
exist on broker")
     }
     stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop 
replica (delete=$deletePartition) for partition $topicPartition")
@@ -274,44 +266,34 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
-    allPartitions.getAndMaybePut((topic, partitionId))
-  }
+  def getOrCreatePartition(topicPartition: TopicPartition): Partition =
+    allPartitions.getAndMaybePut(topicPartition)
 
-  def getPartition(topic: String, partitionId: Int): Option[Partition] = {
-    Option(allPartitions.get((topic, partitionId)))
-  }
+  def getPartition(topicPartition: TopicPartition): Option[Partition] =
+    Option(allPartitions.get(topicPartition))
 
-  def getReplicaOrException(topic: String, partition: Int): Replica = {
-    val replicaOpt = getReplica(topic, partition)
-    if(replicaOpt.isDefined)
-      replicaOpt.get
-    else
-      throw new ReplicaNotAvailableException("Replica %d is not available for 
partition [%s,%d]".format(config.brokerId, topic, partition))
+  def getReplicaOrException(topicPartition: TopicPartition): Replica = {
+    getReplica(topicPartition).getOrElse {
+      throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not 
available for partition $topicPartition")
+    }
   }
 
-  def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica =  {
-    val partitionOpt = getPartition(topic, partitionId)
+  def getLeaderReplicaIfLocal(topicPartition: TopicPartition): Replica =  {
+    val partitionOpt = getPartition(topicPartition)
     partitionOpt match {
       case None =>
-        throw new UnknownTopicOrPartitionException("Partition [%s,%d] doesn't 
exist on %d".format(topic, partitionId, config.brokerId))
+        throw new UnknownTopicOrPartitionException(s"Partition $topicPartition 
doesn't exist on $localBrokerId")
       case Some(partition) =>
         partition.leaderReplicaIfLocal match {
           case Some(leaderReplica) => leaderReplica
           case None =>
-            throw new NotLeaderForPartitionException("Leader not local for 
partition [%s,%d] on broker %d"
-                                                     .format(topic, 
partitionId, config.brokerId))
+            throw new NotLeaderForPartitionException(s"Leader not local for 
partition $topicPartition on broker $localBrokerId")
         }
     }
   }
 
-  def getReplica(topic: String, partitionId: Int, replicaId: Int = 
config.brokerId): Option[Replica] =  {
-    val partitionOpt = getPartition(topic, partitionId)
-    partitionOpt match {
-      case None => None
-      case Some(partition) => partition.getReplica(replicaId)
-    }
-  }
+  def getReplica(topicPartition: TopicPartition, replicaId: Int = 
localBrokerId): Option[Replica] =
+    getPartition(topicPartition).flatMap(_.getReplica(replicaId))
 
   /**
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
@@ -356,8 +338,8 @@ class ReplicaManager(val config: KafkaConfig,
     } else {
       // If required.acks is outside accepted range, something is wrong with 
the client
       // Just return an error and don't handle the request at all
-      val responseStatus = entriesPerPartition.map { case (topicAndPartition, 
_) =>
-        topicAndPartition -> new 
PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+      val responseStatus = entriesPerPartition.map { case (topicPartition, _) 
=>
+        topicPartition -> new 
PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
           LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
       }
       responseCallback(responseStatus)
@@ -396,10 +378,10 @@ class ReplicaManager(val config: KafkaConfig,
       if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
         (topicPartition, LogAppendResult(
           LogAppendInfo.UnknownLogAppendInfo,
-          Some(new InvalidTopicException("Cannot append to internal topic 
%s".format(topicPartition.topic)))))
+          Some(new InvalidTopicException(s"Cannot append to internal topic 
${topicPartition.topic}"))))
       } else {
         try {
-          val partitionOpt = getPartition(topicPartition.topic, 
topicPartition.partition)
+          val partitionOpt = getPartition(topicPartition)
           val info = partitionOpt match {
             case Some(partition) =>
               partition.appendRecordsToLeader(records, requiredAcks)
@@ -459,7 +441,7 @@ class ReplicaManager(val config: KafkaConfig,
                     hardMaxBytesLimit: Boolean,
                     fetchInfos: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota = UnboundedQuota,
-                    responseCallback: Seq[(TopicAndPartition, 
FetchPartitionData)] => Unit) {
+                    responseCallback: Seq[(TopicPartition, 
FetchPartitionData)] => Unit) {
     val isFromFollower = replicaId >= 0
     val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
@@ -496,11 +478,11 @@ class ReplicaManager(val config: KafkaConfig,
       responseCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
-      val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, 
result) =>
+      val fetchPartitionStatus = logReadResults.map { case (topicPartition, 
result) =>
         val fetchInfo = fetchInfos.collectFirst {
-          case (tp, v) if TopicAndPartition(tp.topic, tp.partition) == 
topicAndPartition => v
-        }.getOrElse(sys.error(s"Partition $topicAndPartition not found in 
fetchInfos"))
-        (topicAndPartition, 
FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
+          case (tp, v) if tp == topicPartition => v
+        }.getOrElse(sys.error(s"Partition $topicPartition not found in 
fetchInfos"))
+        (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, 
fetchInfo))
       }
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, 
hardMaxBytesLimit, fetchOnlyFromLeader,
         fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
@@ -525,15 +507,13 @@ class ReplicaManager(val config: KafkaConfig,
                        fetchMaxBytes: Int,
                        hardMaxBytesLimit: Boolean,
                        readPartitionInfo: Seq[(TopicPartition, PartitionData)],
-                       quota: ReplicaQuota): Seq[(TopicAndPartition, 
LogReadResult)] = {
+                       quota: ReplicaQuota): Seq[(TopicPartition, 
LogReadResult)] = {
 
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, 
minOneMessage: Boolean): LogReadResult = {
-      val topic = tp.topic
-      val partition = tp.partition
       val offset = fetchInfo.offset
       val partitionFetchSize = fetchInfo.maxBytes
 
-      BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark()
+      
BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
 
       try {
@@ -543,9 +523,9 @@ class ReplicaManager(val config: KafkaConfig,
 
         // decide whether to only fetch from leader
         val localReplica = if (fetchOnlyFromLeader)
-          getLeaderReplicaIfLocal(topic, partition)
+          getLeaderReplicaIfLocal(tp)
         else
-          getReplicaOrException(topic, partition)
+          getReplicaOrException(tp)
 
         // decide whether to only fetch committed data (i.e. messages below 
high watermark)
         val maxOffsetOpt = if (readOnlyCommitted)
@@ -568,7 +548,7 @@ class ReplicaManager(val config: KafkaConfig,
             val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, 
minOneMessage)
 
             // If the partition is being throttled, simply return an empty set.
-            if (shouldLeaderThrottle(quota, TopicAndPartition(tp.topic, 
tp.partition), replicaId))
+            if (shouldLeaderThrottle(quota, tp, replicaId))
               FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
             // For FetchRequest version 3, we replace incomplete message sets 
with an empty one as consumers can make
             // progress in such cases and don't need to report a 
`RecordTooLargeException`
@@ -594,7 +574,7 @@ class ReplicaManager(val config: KafkaConfig,
           LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY), -1L,
             partitionFetchSize, false, Some(e))
         case e: Throwable =>
-          
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+          
BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark()
           
BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
           error(s"Error processing fetch operation on partition $tp, offset 
$offset", e)
           LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY), -1L,
@@ -603,7 +583,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     var limitBytes = fetchMaxBytes
-    val result = new mutable.ArrayBuffer[(TopicAndPartition, LogReadResult)]
+    val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
     var minOneMessage = !hardMaxBytesLimit
     readPartitionInfo.foreach { case (tp, fetchInfo) =>
       val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
@@ -612,7 +592,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (messageSetSize > 0)
         minOneMessage = false
       limitBytes = math.max(0, limitBytes - messageSetSize)
-      result += (TopicAndPartition(tp.topic, tp.partition) -> readResult)
+      result += (tp -> readResult)
     }
     result
   }
@@ -621,15 +601,15 @@ class ReplicaManager(val config: KafkaConfig,
    *  To avoid ISR thrashing, we only throttle a replica on the leader if it's 
in the throttled replica list,
    *  the quota is exceeded and the replica is not in sync.
    */
-  def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: 
TopicAndPartition, replicaId: Int): Boolean = {
-    val isReplicaInSync = getPartition(topicPartition.topic, 
topicPartition.partition).flatMap { partition =>
+  def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: 
TopicPartition, replicaId: Int): Boolean = {
+    val isReplicaInSync = getPartition(topicPartition).flatMap { partition =>
       partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
     }.getOrElse(false)
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && 
!isReplicaInSync
   }
 
-  def getMagicAndTimestampType(topicAndPartition: TopicAndPartition): 
Option[(Byte, TimestampType)] =
-    getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { 
replica =>
+  def getMagicAndTimestampType(topicPartition: TopicPartition): Option[(Byte, 
TimestampType)] =
+    getReplica(topicPartition).flatMap { replica =>
       replica.log.map(log => 
(log.config.messageFormatVersion.messageFormatVersion, 
log.config.messageTimestampType))
     }
 
@@ -671,12 +651,12 @@ class ReplicaManager(val config: KafkaConfig,
         // First check partition's leader epoch
         val partitionState = new mutable.HashMap[Partition, PartitionState]()
         leaderAndISRRequest.partitionStates.asScala.foreach { case 
(topicPartition, stateInfo) =>
-          val partition = getOrCreatePartition(topicPartition.topic, 
topicPartition.partition)
+          val partition = getOrCreatePartition(topicPartition)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
           // If the leader epoch is valid record the epoch of the controller 
that made the leadership decision.
           // This is useful while updating the isr to maintain the decision 
maker controller's epoch in the zookeeper path
           if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
-            if(stateInfo.replicas.contains(config.brokerId))
+            if(stateInfo.replicas.contains(localBrokerId))
               partitionState.put(partition, stateInfo)
             else {
               stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request 
from controller %d with correlation id %d " +
@@ -696,7 +676,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) 
=>
-          stateInfo.leader == config.brokerId
+          stateInfo.leader == localBrokerId
         }
         val partitionsToBeFollower = partitionState -- 
partitionsTobeLeader.keys
 
@@ -741,19 +721,20 @@ class ReplicaManager(val config: KafkaConfig,
                           partitionState: Map[Partition, PartitionState],
                           correlationId: Int,
                           responseMap: mutable.Map[TopicPartition, Short]): 
Set[Partition] = {
-    partitionState.foreach(state =>
+    partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request 
correlationId %d from controller %d epoch %d " +
         "starting the become-leader transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, 
TopicAndPartition(state._1.topic, state._1.partitionId))))
+        .format(localBrokerId, correlationId, controllerId, epoch, 
partition.topicPartition))
+    }
 
     for (partition <- partitionState.keys)
-      responseMap.put(new TopicPartition(partition.topic, 
partition.partitionId), Errors.NONE.code)
+      responseMap.put(partition.topicPartition, Errors.NONE.code)
 
     val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
 
     try {
       // First stop fetchers for all the partitions
-      
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(p => 
new TopicPartition(p.topic, p.partitionId)))
+      
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
       // Update the partition information to be the leader
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         if (partition.makeLeader(controllerId, partitionStateInfo, 
correlationId))
@@ -761,29 +742,28 @@ class ReplicaManager(val config: KafkaConfig,
         else
           stateChangeLogger.info(("Broker %d skipped the become-leader state 
change after marking its partition as leader with correlation id %d from " +
             "controller %d epoch %d for partition %s since it is already the 
leader for the partition.")
-            .format(localBrokerId, correlationId, controllerId, epoch, 
TopicAndPartition(partition.topic, partition.partitionId)));
+            .format(localBrokerId, correlationId, controllerId, epoch, 
partition.topicPartition))
       }
       partitionsToMakeLeaders.foreach { partition =>
         stateChangeLogger.trace(("Broker %d stopped fetchers as part of 
become-leader request from controller " +
           "%d epoch %d with correlation id %d for partition %s")
-          .format(localBrokerId, controllerId, epoch, correlationId, 
TopicAndPartition(partition.topic, partition.partitionId)))
+          .format(localBrokerId, controllerId, epoch, correlationId, 
partition.topicPartition))
       }
     } catch {
       case e: Throwable =>
-        partitionState.foreach { state =>
+        partitionState.keys.foreach { partition =>
           val errorMsg = ("Error on broker %d while processing LeaderAndIsr 
request correlationId %d received from controller %d" +
-            " epoch %d for partition %s").format(localBrokerId, correlationId, 
controllerId, epoch,
-                                                
TopicAndPartition(state._1.topic, state._1.partitionId))
+            " epoch %d for partition %s").format(localBrokerId, correlationId, 
controllerId, epoch, partition.topicPartition)
           stateChangeLogger.error(errorMsg, e)
         }
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
-    partitionState.foreach { state =>
+    partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request 
correlationId %d from controller %d epoch %d " +
         "for the become-leader transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, 
TopicAndPartition(state._1.topic, state._1.partitionId)))
+        .format(localBrokerId, correlationId, controllerId, epoch, 
partition.topicPartition))
     }
 
     partitionsToMakeLeaders
@@ -813,14 +793,14 @@ class ReplicaManager(val config: KafkaConfig,
                             correlationId: Int,
                             responseMap: mutable.Map[TopicPartition, Short],
                             metadataCache: MetadataCache) : Set[Partition] = {
-    partitionState.foreach { state =>
+    partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request 
correlationId %d from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, 
TopicAndPartition(state._1.topic, state._1.partitionId)))
+        .format(localBrokerId, correlationId, controllerId, epoch, 
partition.topicPartition))
     }
 
     for (partition <- partitionState.keys)
-      responseMap.put(new TopicPartition(partition.topic, 
partition.partitionId), Errors.NONE.code)
+      responseMap.put(partition.topicPartition, Errors.NONE.code)
 
     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
 
@@ -852,14 +832,16 @@ class ReplicaManager(val config: KafkaConfig,
         }
       }
 
-      
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(p 
=> new TopicPartition(p.topic, p.partitionId)))
+      
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(("Broker %d stopped fetchers as part of 
become-follower request from controller " +
           "%d epoch %d with correlation id %d for partition %s")
-          .format(localBrokerId, controllerId, epoch, correlationId, 
TopicAndPartition(partition.topic, partition.partitionId)))
+          .format(localBrokerId, controllerId, epoch, correlationId, 
partition.topicPartition))
       }
 
-      logManager.truncateTo(partitionsToMakeFollower.map(partition => (new 
TopicAndPartition(partition), 
partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
+      logManager.truncateTo(partitionsToMakeFollower.map { partition =>
+        (partition.topicPartition, 
partition.getOrCreateReplica().highWatermark.messageOffset)
+      }.toMap)
       partitionsToMakeFollower.foreach { partition =>
         val topicPartitionOperationKey = new 
TopicPartitionOperationKey(partition.topic, partition.partitionId)
         tryCompleteDelayedProduce(topicPartitionOperationKey)
@@ -882,7 +864,7 @@ class ReplicaManager(val config: KafkaConfig,
       else {
         // we do not need to check if the leader exists again since this has 
been done at the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToMakeFollower.map(partition =>
-          new TopicPartition(partition.topic, partition.partitionId) -> 
BrokerAndInitialOffset(
+          partition.topicPartition -> BrokerAndInitialOffset(
             metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
             partition.getReplica().get.logEndOffset.messageOffset)).toMap
         
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
@@ -902,10 +884,10 @@ class ReplicaManager(val config: KafkaConfig,
         throw e
     }
 
-    partitionState.foreach { state =>
+    partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request 
correlationId %d from controller %d epoch %d " +
         "for the become-follower transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, 
TopicAndPartition(state._1.topic, state._1.partitionId)))
+        .format(localBrokerId, correlationId, controllerId, epoch, 
partition.topicPartition))
     }
 
     partitionsToMakeFollower
@@ -916,18 +898,18 @@ class ReplicaManager(val config: KafkaConfig,
     allPartitions.values.foreach(partition => 
partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
   }
 
-  private def updateFollowerLogReadResults(replicaId: Int, readResults: 
Seq[(TopicAndPartition, LogReadResult)]) {
+  private def updateFollowerLogReadResults(replicaId: Int, readResults: 
Seq[(TopicPartition, LogReadResult)]) {
     debug("Recording follower broker %d log read results: %s 
".format(replicaId, readResults))
-    readResults.foreach { case (topicAndPartition, readResult) =>
-      getPartition(topicAndPartition.topic, topicAndPartition.partition) match 
{
+    readResults.foreach { case (topicPartition, readResult) =>
+      getPartition(topicPartition) match {
         case Some(partition) =>
           partition.updateReplicaLogReadResult(replicaId, readResult)
 
           // for producer requests with ack > 1, we need to check
           // if they can be unblocked after some follower's log end offsets 
have moved
-          tryCompleteDelayedProduce(new 
TopicPartitionOperationKey(topicAndPartition))
+          tryCompleteDelayedProduce(new 
TopicPartitionOperationKey(topicPartition))
         case None =>
-          warn("While recording the replica LEO, the partition %s hasn't been 
created.".format(topicAndPartition))
+          warn("While recording the replica LEO, the partition %s hasn't been 
created.".format(topicPartition))
       }
     }
   }
@@ -938,10 +920,10 @@ class ReplicaManager(val config: KafkaConfig,
 
   // Flushes the highwatermark value for all partitions to the highwatermark 
file
   def checkpointHighWatermarks() {
-    val replicas = allPartitions.values.flatMap(_.getReplica(config.brokerId))
+    val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId))
     val replicasByDir = 
replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
     for ((dir, reps) <- replicasByDir) {
-      val hwms = reps.map(r => new TopicAndPartition(r) -> 
r.highWatermark.messageOffset).toMap
+      val hwms = reps.map(r => r.partition.topicPartition -> 
r.highWatermark.messageOffset).toMap
       try {
         highWatermarkCheckpoints(dir).write(hwms)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index d4c82d8..4a87dfb 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -17,13 +17,15 @@
 package kafka.server
 
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import kafka.common.TopicAndPartition
+
 import kafka.server.Constants._
 import kafka.server.ReplicationQuotaManagerConfig._
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.metrics._
 import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.stats.SimpleRate
 import org.apache.kafka.common.utils.Time
 
@@ -49,7 +51,7 @@ object ReplicationQuotaManagerConfig {
 }
 
 trait ReplicaQuota {
-  def isThrottled(topicAndPartition: TopicAndPartition): Boolean
+  def isThrottled(topicPartition: TopicPartition): Boolean
   def isQuotaExceeded(): Boolean
 }
 
@@ -113,7 +115,7 @@ class ReplicationQuotaManager(val config: 
ReplicationQuotaManagerConfig,
     * @param topicPartition the partition to check
     * @return
     */
-  override def isThrottled(topicPartition: TopicAndPartition): Boolean = {
+  override def isThrottled(topicPartition: TopicPartition): Boolean = {
     val partitions = throttledPartitions.get(topicPartition.topic)
     if (partitions != null)
       (partitions eq AllReplicas) || 
partitions.contains(topicPartition.partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index f6d5153..75d6e8a 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -295,7 +295,7 @@ private class 
ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
                 messageInfoFromFirstReplicaOpt match {
                   case None =>
                     messageInfoFromFirstReplicaOpt = Some(
-                      MessageInfo(replicaId, 
logEntry.offset,logEntry.nextOffset, logEntry.record.checksum))
+                      MessageInfo(replicaId, logEntry.offset, 
logEntry.nextOffset, logEntry.record.checksum))
                   case Some(messageInfoFromFirstReplica) =>
                     if (messageInfoFromFirstReplica.offset != logEntry.offset) 
{
                       println(ReplicaVerificationTool.getCurrentTimeString + 
": partition " + topicAndPartition

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala 
b/core/src/main/scala/kafka/utils/Pool.scala
index 1d96238..0cf6474 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -23,7 +23,7 @@ import collection.mutable
 import collection.JavaConverters._
 import kafka.common.KafkaException
 
-class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, 
V)] {
+class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
 
   private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V]
   private val createLock = new Object

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala 
b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 369bb23..29e5d10 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -21,6 +21,7 @@ import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
 import kafka.controller.{IsrChangeNotificationListener, 
LeaderIsrAndControllerEpoch}
 import kafka.utils.ZkUtils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
 
 import scala.collection._
@@ -39,7 +40,7 @@ object ReplicationUtils extends Logging {
     updatePersistentPath
   }
 
-  def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: 
Set[TopicAndPartition]): Unit = {
+  def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: 
Set[TopicPartition]): Unit = {
     val isrChangeNotificationPath: String = 
zkUtils.createSequentialPersistentPath(
       ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
       generateIsrChangeJson(isrChangeSet))
@@ -89,7 +90,7 @@ object ReplicationUtils extends Logging {
       Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 
zkPathVersion), controllerEpoch))}
   }
 
-  private def generateIsrChangeJson(isrChanges: Set[TopicAndPartition]): 
String = {
+  private def generateIsrChangeJson(isrChanges: Set[TopicPartition]): String = 
{
     val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" 
-> tp.partition)).toArray
     Json.encode(Map("version" -> IsrChangeNotificationListener.version, 
"partitions" -> partitions))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 3b81e25..10d49f5 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -122,7 +122,7 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
 
     // wait until all the followers have synced the last HW with leader
     TestUtils.waitUntilTrue(() => servers.forall(server =>
-      server.replicaManager.getReplica(tp.topic(), 
tp.partition()).get.highWatermark.messageOffset == numRecords
+      server.replicaManager.getReplica(tp).get.highWatermark.messageOffset == 
numRecords
     ), "Failed to update high watermark for followers after timeout")
 
     val scheduler = new BounceBrokerScheduler(numIters)

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index ac310a9..bb93cb4 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -12,7 +12,7 @@
  */
 package integration.kafka.api
 
-import kafka.common.{Topic, TopicAndPartition}
+import kafka.common.Topic
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.Log
 import kafka.message.GZIPCompressionCodec
@@ -48,7 +48,7 @@ class GroupCoordinatorIntegrationTest extends 
KafkaServerTestHarness {
     val logManager = servers.head.getLogManager
 
     def getGroupMetadataLogOpt: Option[Log] =
-      logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0))
+      logManager.getLog(new TopicPartition(Topic.GroupMetadataTopicName, 0))
 
     TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowEntries.asScala.nonEmpty)),
                             "Commit message not appended in time")

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 693c758..f9eb61c 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -32,10 +32,14 @@ import kafka.common.TopicAndPartition
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
 import java.util
+
 import kafka.utils.TestUtils._
 import kafka.admin.AdminUtils._
+
 import scala.collection.{Map, immutable}
 import kafka.utils.CoreUtils._
+import org.apache.kafka.common.TopicPartition
+
 import scala.collection.JavaConverters._
 
 class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@@ -406,14 +410,15 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
           assertEquals(expected.split(",").toSeq, actual.asScala)
       }
       TestUtils.retry(10000) {
-        for(part <- 0 until partitions) {
-          val log = server.logManager.getLog(TopicAndPartition(topic, part))
+        for (part <- 0 until partitions) {
+          val tp = new TopicPartition(topic, part)
+          val log = server.logManager.getLog(tp)
           assertTrue(log.isDefined)
           assertEquals(retentionMs, log.get.config.retentionMs)
           assertEquals(messageSize, log.get.config.maxMessageSize)
           checkList(log.get.config.LeaderReplicationThrottledReplicas, 
throttledLeaders)
           checkList(log.get.config.FollowerReplicationThrottledReplicas, 
throttledFollowers)
-          assertEquals(quotaManagerIsThrottled, 
server.quotaManagers.leader.isThrottled(TopicAndPartition(topic, part)))
+          assertEquals(quotaManagerIsThrottled, 
server.quotaManagers.leader.isThrottled(tp))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index b98822d..ba270ad 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -26,14 +26,15 @@ import org.junit.Test
 import java.util.Properties
 
 import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
 class DeleteTopicTest extends ZooKeeperTestHarness {
 
   @Test
   def testDeleteTopicWithAllAliveReplicas() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
+    val topicPartition = new TopicPartition("test", 0)
+    val topic = topicPartition.topic
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
@@ -43,8 +44,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   @Test
   def testResumeDeleteTopicWithRecoveredFollower() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
+    val topicPartition = new TopicPartition("test", 0)
+    val topic = topicPartition.topic
     val servers = createTestTopicAndCluster(topic)
     // shut down one follower replica
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
@@ -56,7 +57,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // check if all replicas but the one that is shut down has deleted the log
     TestUtils.waitUntilTrue(() =>
       servers.filter(s => s.config.brokerId != follower.config.brokerId)
-        .forall(_.getLogManager().getLog(topicAndPartition).isEmpty), 
"Replicas 0,1 have not deleted log.")
+        .forall(_.getLogManager().getLog(topicPartition).isEmpty), "Replicas 
0,1 have not deleted log.")
     // ensure topic deletion is halted
     TestUtils.waitUntilTrue(() => 
zkUtils.pathExists(getDeleteTopicPath(topic)),
       "Admin path /admin/delete_topic/test path deleted even when a follower 
replica is down")
@@ -68,8 +69,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   @Test
   def testResumeDeleteTopicOnControllerFailover() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
+    val topicPartition = new TopicPartition("test", 0)
+    val topic = topicPartition.topic
     val servers = createTestTopicAndCluster(topic)
     val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == 
controllerId).head
@@ -97,7 +98,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   def testPartitionReassignmentDuringDeleteTopic() {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
-    val topicAndPartition = TopicAndPartition(topic, 0)
+    val topicPartition = new TopicPartition(topic, 0)
     val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
@@ -106,7 +107,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
expectedReplicaAssignment)
     // wait until replica log is created on every broker
-    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", 
leaderIdOpt.isDefined)
@@ -119,18 +120,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // reassign partition 0
     val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     val newReplicas = Seq(1, 2, 3)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, 
Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, 
Map(new TopicAndPartition(topicPartition) -> newReplicas))
     assertTrue("Partition reassignment should fail for [test,0]", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
       val partitionsBeingReassigned = 
zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, 
topicAndPartition,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentFailed
+      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, 
new TopicAndPartition(topicPartition),
+        Map(new TopicAndPartition(topicPartition) -> newReplicas), 
partitionsBeingReassigned) == ReassignmentFailed
     }, "Partition reassignment shouldn't complete.")
     val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == 
controllerId).head
     assertFalse("Partition reassignment should fail",
-      
controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
+      
controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(new
 TopicAndPartition(topicPartition)))
     val assignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     assertEquals("Partition should not be reassigned to 0, 1, 2", 
oldAssignedReplicas, assignedReplicas)
     follower.startup()
@@ -145,7 +146,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", 
leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-    val newPartition = TopicAndPartition(topic, 1)
+    val newPartition = new TopicPartition(topic, 1)
     follower.shutdown()
     // add partitions to topic
     AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2", false)
@@ -168,7 +169,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
     // add partitions to topic
-    val newPartition = TopicAndPartition(topic, 1)
+    val newPartition = new TopicPartition(topic, 1)
     AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2")
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
@@ -181,7 +182,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   def testRecreateTopicAfterDeletion() {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
-    val topicAndPartition = TopicAndPartition(topic, 0)
+    val topicPartition = new TopicPartition(topic, 0)
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
@@ -192,15 +193,15 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, 
topic, 0, 1000)
     assertTrue("New leader should be elected after re-creating topic test", 
leaderIdOpt.isDefined)
     // check if all replica logs are created
-    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
     servers.foreach(_.shutdown())
   }
 
   @Test
   def testDeleteNonExistingTopic() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
+    val topicPartition = new TopicPartition("test", 0)
+    val topic = topicPartition.topic
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
     try {
@@ -212,7 +213,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // verify delete topic path for test2 is removed from zookeeper
     TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
     // verify that topic test is untouched
-    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
     // test the topic path exists
     assertTrue("Topic test mistakenly deleted", 
zkUtils.pathExists(getTopicPath(topic)))
@@ -225,8 +226,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   @Test
   def testDeleteTopicWithCleaner() {
     val topicName = "test"
-    val topicAndPartition = TopicAndPartition(topicName, 0)
-    val topic = topicAndPartition.topic
+    val topicPartition = new TopicPartition(topicName, 0)
+    val topic = topicPartition.topic
 
     val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
     brokerConfigs.head.setProperty("delete.topic.enable", "true")
@@ -240,13 +241,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
     // for simplicity, we are validating cleaner offsets on a single broker
     val server = servers.head
-    val log = server.logManager.getLog(topicAndPartition).get
+    val log = server.logManager.getLog(topicPartition).get
 
     // write to the topic to activate cleaner
     writeDups(numKeys = 100, numDups = 3,log)
 
     // wait for cleaner to clean
-   server.logManager.cleaner.awaitCleaned(topicName, 0, 0)
+   server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
     // delete topic
     AdminUtils.deleteTopic(zkUtils, "test")
@@ -257,8 +258,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   @Test
   def testDeleteTopicAlreadyMarkedAsDeleted() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
+    val topicPartition = new TopicPartition("test", 0)
+    val topic = topicPartition.topic
     val servers = createTestTopicAndCluster(topic)
 
     try {
@@ -286,13 +287,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties]): Seq[KafkaServer] = {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
-    val topicAndPartition = TopicAndPartition(topic, 0)
+    val topicPartition = new TopicPartition(topic, 0)
     // create brokers
     val servers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
expectedReplicaAssignment)
     // wait until replica log is created on every broker
-    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
     servers
   }
@@ -309,15 +310,15 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   @Test
   def testDisableDeleteTopic() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
+    val topicPartition = new TopicPartition("test", 0)
+    val topic = topicPartition.topic
     val servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
     // mark the topic for deletion
     AdminUtils.deleteTopic(zkUtils, "test")
     TestUtils.waitUntilTrue(() => 
!zkUtils.pathExists(getDeleteTopicPath(topic)),
       "Admin path /admin/delete_topic/%s path not deleted even if deleteTopic 
is disabled".format(topic))
     // verify that topic test is untouched
-    
assertTrue(servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined))
+    
assertTrue(servers.forall(_.getLogManager().getLog(topicPartition).isDefined))
     // test the topic path exists
     assertTrue("Topic path disappeared", 
zkUtils.pathExists(getTopicPath(topic)))
     // topic test should have a leader

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 1c5a526..20e512f 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -304,7 +304,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, syncGroupErrorCode)
 
     EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
     
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
       .andReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.CREATE_TIME)).anyTimes()
     EasyMock.replay(replicaManager)
@@ -1092,7 +1092,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def leaveGroup(groupId: String, consumerId: String): 
LeaveGroupCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
-    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
     
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
       .andReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.CREATE_TIME)).anyTimes()
     EasyMock.replay(replicaManager)

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 62b7f42..629020e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -342,7 +342,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new 
TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId))).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -394,7 +394,7 @@ class GroupMetadataManagerTest {
 
     
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
       .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
-    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new 
TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId))).andStubReturn(Some(partition))
     
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
 EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
@@ -440,7 +440,7 @@ class GroupMetadataManagerTest {
 
     
EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
       .andStubReturn(Some(Record.MAGIC_VALUE_V1, 
TimestampType.LOG_APPEND_TIME))
-    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new 
TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId))).andStubReturn(Some(partition))
     
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
 EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
@@ -491,7 +491,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new 
TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId))).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -567,7 +567,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new 
TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId))).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 201fa87..761cac8 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -18,19 +18,23 @@
 package kafka.integration
 
 import java.nio.ByteBuffer
+
 import org.junit.Assert._
-import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import kafka.api.{FetchRequest, FetchRequestBuilder, PartitionFetchInfo}
+import kafka.server.{KafkaConfig, KafkaRequestHandler}
 import kafka.producer.{KeyedMessage, Producer}
 import org.apache.log4j.{Level, Logger}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.Test
+
 import scala.collection._
-import kafka.common.{TopicAndPartition, ErrorMapping, 
UnknownTopicOrPartitionException, OffsetOutOfRangeException}
-import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
+import kafka.common.{ErrorMapping, OffsetOutOfRangeException, 
TopicAndPartition, UnknownTopicOrPartitionException}
+import kafka.utils.{CoreUtils, StaticPartitioner, TestUtils}
 import kafka.serializer.StringEncoder
 import java.util.Properties
 
+import org.apache.kafka.common.TopicPartition
+
 /**
  * End to end tests of the primitive apis against a local server
  */
@@ -71,7 +75,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
 
     producer.send(new KeyedMessage[String, String](topic, "test-message"))
 
-    val replica = servers.head.replicaManager.getReplica(topic, 0).get
+    val replica = servers.head.replicaManager.getReplica(new 
TopicPartition(topic, 0)).get
     assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
                replica.logEndOffset.messageOffset > 0 && 
replica.logEndOffset.equals(replica.highWatermark))
 
@@ -243,23 +247,23 @@ class PrimitiveApiTest extends 
ProducerConsumerTestHarness {
     }
 
     // wait until the messages are published
-    TestUtils.waitUntilTrue(() => { 
servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset 
== 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new 
TopicPartition("test1", 0)).get.logEndOffset == 2 },
                             "Published messages should be in the log")
-    TestUtils.waitUntilTrue(() => { 
servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset 
== 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new 
TopicPartition("test2", 0)).get.logEndOffset == 2 },
                             "Published messages should be in the log")
-    TestUtils.waitUntilTrue(() => { 
servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset 
== 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new 
TopicPartition("test3", 0)).get.logEndOffset == 2 },
                             "Published messages should be in the log")
-    TestUtils.waitUntilTrue(() => { 
servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset 
== 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new 
TopicPartition("test4", 0)).get.logEndOffset == 2 },
                             "Published messages should be in the log")
 
     val replicaId = servers.head.config.brokerId
-    TestUtils.waitUntilTrue(() => { 
servers.head.replicaManager.getReplica("test1", 0, 
replicaId).get.highWatermark.messageOffset == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new 
TopicPartition("test1", 0), replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { 
servers.head.replicaManager.getReplica("test2", 0, 
replicaId).get.highWatermark.messageOffset == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new 
TopicPartition("test2", 0), replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { 
servers.head.replicaManager.getReplica("test3", 0, 
replicaId).get.highWatermark.messageOffset == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new 
TopicPartition("test3", 0), replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { 
servers.head.replicaManager.getReplica("test4", 0, 
replicaId).get.highWatermark.messageOffset == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new 
TopicPartition("test4", 0), replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
 
     // test if the consumer received the messages in the correct order when 
producer has enabled request pipelining

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 43c41f3..74a1828 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -21,9 +21,9 @@ import java.io.File
 import java.util.Properties
 
 import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
-import kafka.common.TopicAndPartition
 import kafka.server.OffsetCheckpoint
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
@@ -50,7 +50,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   val logDir = TestUtils.tempDir()
   var counter = 0
   var cleaner: LogCleaner = _
-  val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), 
TopicAndPartition("log", 2))
+  val topics = Array(new TopicPartition("log", 0), new TopicPartition("log", 
1), new TopicPartition("log", 2))
 
   @Test
   def cleanerTest() {
@@ -232,8 +232,9 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: 
Long) {
     // wait until cleaning up to base_offset, note that cleaning happens only 
when "log dirty ratio" is higher than
     // LogConfig.MinCleanableDirtyRatioProp
-    cleaner.awaitCleaned(topic, partitionId, firstDirty)
-    val lastCleaned = 
cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition(topic, 
partitionId))
+    val topicPartition = new TopicPartition(topic, partitionId)
+    cleaner.awaitCleaned(topicPartition, firstDirty)
+    val lastCleaned = 
cleaner.cleanerManager.allCleanerCheckpoints(topicPartition)
     assertTrue(s"log cleaner should have processed up to offset $firstDirty, 
but lastCleaned=$lastCleaned",
       lastCleaned >= firstDirty)
   }
@@ -315,7 +316,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
                           propertyOverrides: Properties = new Properties()): 
LogCleaner = {
     
     // create partitions and add them to the pool
-    val logs = new Pool[TopicAndPartition, Log]()
+    val logs = new Pool[TopicPartition, Log]()
     for(i <- 0 until parts) {
       val dir = new File(logDir, "log-" + i)
       dir.mkdirs()
@@ -325,7 +326,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)
-      logs.put(TopicAndPartition("log", i), log)      
+      logs.put(new TopicPartition("log", i), log)
     }
   
     new LogCleaner(CleanerConfig(numThreads = numThreads, ioBufferSize = 
maxMessageSize / 2, maxMessageSize = maxMessageSize, backOffMs = 
logCleanerBackOffMillis),

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 1231e98..c24cb68 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -20,8 +20,8 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
-import kafka.common.TopicAndPartition
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
@@ -49,7 +49,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: 
String) extends Logging
   val logName = "log"
   val logDir = TestUtils.tempDir()
   var counter = 0
-  val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), 
TopicAndPartition("log", 2))
+  val topics = Array(new TopicPartition("log", 0), new TopicPartition("log", 
1), new TopicPartition("log", 2))
   val compressionCodec = CompressionType.forName(compressionCodecName)
 
   @Test
@@ -86,7 +86,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: 
String) extends Logging
     val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset
 
     // the first block should get cleaned
-    cleaner.awaitCleaned("log", 0, activeSegAtT0.baseOffset)
+    cleaner.awaitCleaned(new TopicPartition("log", 0), 
activeSegAtT0.baseOffset)
 
     // check the data is the same
     val read1 = readFromLog(log)
@@ -94,7 +94,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: 
String) extends Logging
 
     val compactedSize = log.logSegments(0L, 
activeSegAtT0.baseOffset).map(_.size).sum
     debug(s"after cleaning the compacted size up to active segment at T0: 
$compactedSize")
-    val lastCleaned = 
cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition("log", 0))
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new 
TopicPartition("log", 0))
     assertTrue(s"log cleaner should have processed up to offset 
$firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= 
firstBlock1SegmentBaseOffset)
     assertTrue(s"log should have been compacted: size up to offset of active 
segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize",
       sizeUpToActiveSegmentAtT0 > compactedSize)
@@ -137,7 +137,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: 
String) extends Logging
                   policyOverrides: Map[String, String] = Map()): LogCleaner = {
 
     // create partitions and add them to the pool
-    val logs = new Pool[TopicAndPartition, Log]()
+    val logs = new Pool[TopicPartition, Log]()
     for(i <- 0 until parts) {
       val dir = new File(logDir, "log-" + i)
       dir.mkdirs()
@@ -154,7 +154,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: 
String) extends Logging
         recoveryPoint = 0L,
         scheduler = time.scheduler,
         time = time)
-      logs.put(TopicAndPartition("log", i), log)
+      logs.put(new TopicPartition("log", i), log)
     }
 
     new LogCleaner(CleanerConfig(numThreads = numThreads, backOffMs = 
backOffMs),

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 5dfa268..b1d2b33 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -20,8 +20,8 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
-import kafka.common._
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
@@ -102,9 +102,9 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
     while(log.numberOfSegments < 8)
       log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, 
timestamp = time.milliseconds))
 
-    val topicAndPartition = TopicAndPartition("log", 0)
-    val lastClean = Map(topicAndPartition-> 0L)
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicAndPartition, lastClean, time.milliseconds)
+    val topicPartition = new TopicPartition("log", 0)
+    val lastClean = Map(topicPartition -> 0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the 
log.", 0L, cleanableOffsets._1)
     assertEquals("The first uncleanable offset begins with the active 
segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
   }
@@ -133,9 +133,9 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
     while (log.numberOfSegments < 8)
       log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, 
timestamp = t1))
 
-    val topicAndPartition = TopicAndPartition("log", 0)
-    val lastClean = Map(topicAndPartition-> 0L)
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicAndPartition, lastClean, time.milliseconds)
+    val topicPartition = new TopicPartition("log", 0)
+    val lastClean = Map(topicPartition -> 0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the 
log.", 0L, cleanableOffsets._1)
     assertEquals("The first uncleanable offset begins with the second block of 
log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
   }
@@ -159,16 +159,16 @@ class LogCleanerManagerTest extends JUnitSuite with 
Logging {
 
     time.sleep(compactionLag + 1)
 
-    val topicAndPartition = TopicAndPartition("log", 0)
-    val lastClean = Map(topicAndPartition-> 0L)
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicAndPartition, lastClean, time.milliseconds)
+    val topicPartition = new TopicPartition("log", 0)
+    val lastClean = Map(topicPartition -> 0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the 
log.", 0L, cleanableOffsets._1)
     assertEquals("The first uncleanable offset begins with active segment.", 
log.activeSegment.baseOffset, cleanableOffsets._2)
   }
 
   private def createCleanerManager(log: Log): LogCleanerManager = {
-    val logs = new Pool[TopicAndPartition, Log]()
-    logs.put(TopicAndPartition("log", 0), log)
+    val logs = new Pool[TopicPartition, Log]()
+    logs.put(new TopicPartition("log", 0), log)
     val cleanerManager = new LogCleanerManager(Array(logDir), logs)
     cleanerManager
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index f43e92b..40691b9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -24,6 +24,7 @@ import java.util.Properties
 
 import kafka.common._
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
@@ -137,7 +138,7 @@ class LogCleanerTest extends JUnitSuite {
     while(log.numberOfSegments < 4)
       log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
 
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
     val keys = keysInLog(log).toSet
     assertTrue("None of the keys we deleted should still exist.",
                (0 until leo.toInt by 2).forall(!keys.contains(_)))
@@ -161,7 +162,7 @@ class LogCleanerTest extends JUnitSuite {
 
     val initialLogSize = log.size
 
-    val (endOffset, stats) = 
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
+    val (endOffset, stats) = cleaner.clean(LogToClean(new 
TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
     assertEquals(5, endOffset)
     assertEquals(5, stats.messagesRead)
     assertEquals(initialLogSize, stats.bytesRead)
@@ -189,16 +190,16 @@ class LogCleanerTest extends JUnitSuite {
     log.roll()
 
     // clean the log with only one message removed
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
     assertEquals(immutable.List(1,0,1,0), keysInLog(log))
     assertEquals(immutable.List(1,2,3,4), offsetsInLog(log))
 
     // continue to make progress, even though we can only clean one message at 
a time
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 3, 
log.activeSegment.baseOffset))
     assertEquals(immutable.List(0,1,0), keysInLog(log))
     assertEquals(immutable.List(2,3,4), offsetsInLog(log))
 
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 4, 
log.activeSegment.baseOffset))
     assertEquals(immutable.List(1,0), keysInLog(log))
     assertEquals(immutable.List(3,4), offsetsInLog(log))
   }
@@ -235,7 +236,7 @@ class LogCleanerTest extends JUnitSuite {
     assertTrue("Test is not effective unless each segment contains duplicates. 
Increase segment size or decrease number of keys.",
       distinctValuesBySegment.reverse.tail.forall(_ > N))
 
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
firstUncleanableOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
firstUncleanableOffset))
 
     val distinctValuesBySegmentAfterClean = distinctValuesBySegment
 
@@ -257,7 +258,7 @@ class LogCleanerTest extends JUnitSuite {
     for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
-    val logToClean = LogToClean(TopicAndPartition("test", 0), log, 
log.activeSegment.baseOffset, log.activeSegment.baseOffset)
+    val logToClean = LogToClean(new TopicPartition("test", 0), log, 
log.activeSegment.baseOffset, log.activeSegment.baseOffset)
 
     assertEquals("Total bytes of LogToClean should equal size of all segments 
excluding the active segment",
       logToClean.totalBytes, log.size - log.activeSegment.size)
@@ -277,7 +278,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] 
are uncleanable
     val segs = log.logSegments.toSeq
-    val logToClean = LogToClean(TopicAndPartition("test", 0), log, 
segs(2).baseOffset, segs(4).baseOffset)
+    val logToClean = LogToClean(new TopicPartition("test", 0), log, 
segs(2).baseOffset, segs(4).baseOffset)
 
     val expectedCleanSize = segs.take(2).map(_.size).sum
     val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
@@ -315,7 +316,7 @@ class LogCleanerTest extends JUnitSuite {
       log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
 
     val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
-    val (_, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), 
log, 0, log.activeSegment.baseOffset))
+    val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), 
log, 0, log.activeSegment.baseOffset))
 
     assertEquals("Log should only contain keyed messages after cleaning.", 0, 
unkeyedMessageCountInLog(log))
     assertEquals("Log should only contain keyed messages after cleaning.", 
expectedSizeAfterCleaning, log.size)
@@ -333,7 +334,7 @@ class LogCleanerTest extends JUnitSuite {
   def unkeyedMessageCountInLog(log: Log) =
     log.logSegments.map(s => 
s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).count(m => 
!m.record.hasKey)).sum
 
-  def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
+  def abortCheckDone(topicPartition: TopicPartition): Unit = {
     throw new LogCleaningAbortedException()
   }
 
@@ -677,7 +678,7 @@ class LogCleanerTest extends JUnitSuite {
 
     log.roll()
 
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
 
     for (segment <- log.logSegments; shallowLogEntry <- 
segment.log.shallowEntries.asScala; deepLogEntry <- shallowLogEntry.asScala) {
       assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic)
@@ -720,20 +721,20 @@ class LogCleanerTest extends JUnitSuite {
                                           key = "0".getBytes,
                                           timestamp = time.milliseconds() + 
logConfig.deleteRetentionMs + 10000))
     log.roll()
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
     // Append a tombstone with a small timestamp and roll out a new log 
segment.
     log.append(TestUtils.singletonRecords(value = null,
                                           key = "0".getBytes,
                                           timestamp = time.milliseconds() - 
logConfig.deleteRetentionMs - 10000))
     log.roll()
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, 
log.activeSegment.baseOffset))
     assertEquals("The tombstone should be retained.", 1, 
log.logSegments.head.log.shallowEntries.iterator().next().offset())
     // Append a message and roll out another log segment.
     log.append(TestUtils.singletonRecords(value = "1".getBytes,
                                           key = "1".getBytes,
                                           timestamp = time.milliseconds()))
     log.roll()
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
     assertEquals("The tombstone should be retained.", 1, 
log.logSegments.head.log.shallowEntries.iterator().next().offset())
   }
 
@@ -772,9 +773,9 @@ class LogCleanerTest extends JUnitSuite {
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = 
time.scheduler, time = time)
 
-  def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */  }
+  def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */  }
 
-  def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = 
noOpCheckDone, maxMessageSize: Int = 64*1024) =
+  def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = 
noOpCheckDone, maxMessageSize: Int = 64*1024) =
     new Cleaner(id = 0,
                 offsetMap = new FakeOffsetMap(capacity),
                 ioBufferSize = maxMessageSize,
@@ -782,7 +783,7 @@ class LogCleanerTest extends JUnitSuite {
                 dupBufferLoadFactor = 0.75,
                 throttler = throttler,
                 time = time,
-                checkDone = checkDone )
+                checkDone = checkDone)
 
   def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
     for((key, value) <- seq)

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 40e6228..ab577ce 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -23,6 +23,7 @@ import java.util.Properties
 import kafka.common._
 import kafka.server.OffsetCheckpoint
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
@@ -64,7 +65,7 @@ class LogManagerTest {
    */
   @Test
   def testCreateLog() {
-    val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
+    val log = logManager.createLog(new TopicPartition(name, 0), logConfig)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.append(TestUtils.singletonRecords("test".getBytes()))
@@ -75,7 +76,7 @@ class LogManagerTest {
    */
   @Test
   def testGetNonExistentLog() {
-    val log = logManager.getLog(TopicAndPartition(name, 0))
+    val log = logManager.getLog(new TopicPartition(name, 0))
     assertEquals("No log should be found.", None, log)
     val logFile = new File(logDir, name + "-0")
     assertTrue(!logFile.exists)
@@ -86,7 +87,7 @@ class LogManagerTest {
    */
   @Test
   def testCleanupExpiredSegments() {
-    val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
+    val log = logManager.createLog(new TopicPartition(name, 0), logConfig)
     var offset = 0L
     for(_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
@@ -129,7 +130,7 @@ class LogManagerTest {
     logManager.startup
 
     // create a log
-    val log = logManager.createLog(TopicAndPartition(name, 0), config)
+    val log = logManager.createLog(new TopicPartition(name, 0), config)
     var offset = 0L
 
     // add a bunch of messages that should be larger than the retentionSize
@@ -166,7 +167,7 @@ class LogManagerTest {
   def testDoesntCleanLogsWithCompactDeletePolicy() {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + 
LogConfig.Delete)
-    val log = logManager.createLog(TopicAndPartition(name, 0), 
LogConfig.fromProps(logConfig.originals, logProps))
+    val log = logManager.createLog(new TopicPartition(name, 0), 
LogConfig.fromProps(logConfig.originals, logProps))
     var offset = 0L
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes(), 
key="test".getBytes())
@@ -195,7 +196,7 @@ class LogManagerTest {
 
     logManager = createLogManager()
     logManager.startup
-    val log = logManager.createLog(TopicAndPartition(name, 0), config)
+    val log = logManager.createLog(new TopicPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
@@ -219,7 +220,7 @@ class LogManagerTest {
 
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
-      logManager.createLog(TopicAndPartition("test", partition), logConfig)
+      logManager.createLog(new TopicPartition("test", partition), logConfig)
       assertEquals("We should have created the right number of logs", 
partition + 1, logManager.allLogs.size)
       val counts = 
logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
       assertTrue("Load should balance evenly", counts.max <= counts.min + 1)
@@ -244,7 +245,7 @@ class LogManagerTest {
    */
   @Test
   def testCheckpointRecoveryPoints() {
-    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), 
TopicAndPartition("test-b", 1)), logManager)
+    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new 
TopicPartition("test-b", 1)), logManager)
   }
 
   /**
@@ -257,7 +258,7 @@ class LogManagerTest {
     logManager = TestUtils.createLogManager(
       logDirs = Array(new File(logDir.getAbsolutePath + File.separator)))
     logManager.startup
-    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
+    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager)
   }
 
   /**
@@ -271,13 +272,13 @@ class LogManagerTest {
     logDir.deleteOnExit()
     logManager = createLogManager()
     logManager.startup
-    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
+    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager)
   }
 
 
-  private def verifyCheckpointRecovery(topicAndPartitions: 
Seq[TopicAndPartition],
+  private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition],
                                        logManager: LogManager) {
-    val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
+    val logs = topicPartitions.map(this.logManager.createLog(_, logConfig))
     logs.foreach(log => {
       for (_ <- 0 until 50)
         log.append(TestUtils.singletonRecords("test".getBytes()))
@@ -288,7 +289,7 @@ class LogManagerTest {
     logManager.checkpointRecoveryPointOffsets()
     val checkpoints = new OffsetCheckpoint(new File(logDir, 
logManager.RecoveryPointCheckpointFile)).read()
 
-    topicAndPartitions.zip(logs).foreach {
+    topicPartitions.zip(logs).foreach {
       case(tp, log) => {
         assertEquals("Recovery point should equal checkpoint", 
checkpoints(tp), log.recoveryPoint)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ff596bd..08cdac5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1093,9 +1093,9 @@ class LogTest extends JUnitSuite {
     val topic = "test_topic"
     val partition = "143"
     val dir = new File(logDir + topicPartitionName(topic, partition))
-    val topicAndPartition = Log.parseTopicPartitionName(dir)
-    assertEquals(topic, topicAndPartition.asTuple._1)
-    assertEquals(partition.toInt, topicAndPartition.asTuple._2)
+    val topicPartition = Log.parseTopicPartitionName(dir)
+    assertEquals(topic, topicPartition.topic)
+    assertEquals(partition.toInt, topicPartition.partition)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index e0dfe16..625ff6c 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -88,7 +88,7 @@ class AbstractFetcherThreadTest {
   class DummyFetchRequest(val offsets: collection.Map[TopicPartition, Long]) 
extends FetchRequest {
     override def isEmpty: Boolean = offsets.isEmpty
 
-    override def offset(topicAndPartition: TopicPartition): Long = 
offsets(topicAndPartition)
+    override def offset(topicPartition: TopicPartition): Long = 
offsets(topicPartition)
   }
 
   class TestPartitionData(records: MemoryRecords = MemoryRecords.EMPTY) 
extends PartitionData {
@@ -110,11 +110,11 @@ class AbstractFetcherThreadTest {
     type REQ = DummyFetchRequest
     type PD = PartitionData
 
-    override def processPartitionData(topicAndPartition: TopicPartition,
+    override def processPartitionData(topicPartition: TopicPartition,
                                       fetchOffset: Long,
                                       partitionData: PartitionData): Unit = {}
 
-    override def handleOffsetOutOfRange(topicAndPartition: TopicPartition): 
Long = 0L
+    override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long 
= 0L
 
     override def handlePartitionsWithErrors(partitions: 
Iterable[TopicPartition]): Unit = {}
 
@@ -160,14 +160,14 @@ class AbstractFetcherThreadTest {
       new TestPartitionData(MemoryRecords.withRecords(1L, 
Record.create("hello".getBytes())))
     )
 
-    override def processPartitionData(topicAndPartition: TopicPartition,
+    override def processPartitionData(topicPartition: TopicPartition,
                                       fetchOffset: Long,
                                       partitionData: PartitionData): Unit = {
       // Throw exception if the fetchOffset does not match the fetcherThread 
partition state
       if (fetchOffset != logEndOffset)
         throw new RuntimeException(
           "Offset mismatch for partition %s: fetched offset = %d, log end 
offset = %d."
-            .format(topicAndPartition, fetchOffset, logEndOffset))
+            .format(topicPartition, fetchOffset, logEndOffset))
 
       // Now check message's crc
       val records = partitionData.toRecords

Reply via email to