Repository: kafka
Updated Branches:
  refs/heads/trunk e5a0d1398 -> 0e5700fb6


MINOR: Improve PartitionState logging and remove duplication of code

Currently, logs involving PartitionState are not very helpful.

```
        Broker 449 cached leader info 
org.apache.kafka.common.requests.UpdateMetadataRequest$PartitionState3285d64a 
for partition <topic>-<partition> in response to UpdateMetadata request sent by 
controller 356 epoch 138 with correlation id 0

        TRACE state.change.logger: Broker 449 received LeaderAndIsr request 
org.apache.kafka.common.requests.LeaderAndIsrRequest$PartitionState66d6a8eb 
correlation id 3 from controller 356 epoch 138 for partition 
[<topic>,<partition>]
```

Author: Ashish Singh <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #1609 from SinghAsDev/partitionState


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e5700fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e5700fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e5700fb

Branch: refs/heads/trunk
Commit: 0e5700fb68671f3fb75bfdeceda40e84330aca69
Parents: e5a0d13
Author: Ashish Singh <[email protected]>
Authored: Thu Jul 21 01:00:33 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Thu Jul 21 01:00:33 2016 +0100

----------------------------------------------------------------------
 .../common/requests/LeaderAndIsrRequest.java    | 19 --------
 .../kafka/common/requests/PartitionState.java   | 46 ++++++++++++++++++++
 .../common/requests/UpdateMetadataRequest.java  | 18 --------
 .../common/requests/RequestResponseTest.java    | 16 +++----
 .../main/scala/kafka/cluster/Partition.scala    | 12 +++--
 .../controller/ControllerChannelManager.scala   |  5 ++-
 .../main/scala/kafka/server/MetadataCache.scala |  3 +-
 .../scala/kafka/server/ReplicaManager.scala     | 18 ++++----
 .../kafka/api/AuthorizerIntegrationTest.scala   |  4 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  3 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  7 ++-
 12 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index fee3c21..52b9674 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -35,25 +35,6 @@ import java.util.Set;
 
 public class LeaderAndIsrRequest extends AbstractRequest {
 
-    public static class PartitionState {
-        public final int controllerEpoch;
-        public final int leader;
-        public final int leaderEpoch;
-        public final List<Integer> isr;
-        public final int zkVersion;
-        public final Set<Integer> replicas;
-
-        public PartitionState(int controllerEpoch, int leader, int 
leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
-            this.controllerEpoch = controllerEpoch;
-            this.leader = leader;
-            this.leaderEpoch = leaderEpoch;
-            this.isr = isr;
-            this.zkVersion = zkVersion;
-            this.replicas = replicas;
-        }
-
-    }
-
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
 
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java 
b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
new file mode 100644
index 0000000..e766632
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class PartitionState {
+    public final int controllerEpoch;
+    public final int leader;
+    public final int leaderEpoch;
+    public final List<Integer> isr;
+    public final int zkVersion;
+    public final Set<Integer> replicas;
+
+    public PartitionState(int controllerEpoch, int leader, int leaderEpoch, 
List<Integer> isr, int zkVersion, Set<Integer> replicas) {
+        this.controllerEpoch = controllerEpoch;
+        this.leader = leader;
+        this.leaderEpoch = leaderEpoch;
+        this.isr = isr;
+        this.zkVersion = zkVersion;
+        this.replicas = replicas;
+    }
+
+    @Override
+    public String toString() {
+        return "PartitionState(controllerEpoch=" + controllerEpoch +
+                ", leader=" + leader +
+                ", leaderEpoch=" + leaderEpoch +
+                ", isr=" + Arrays.toString(isr.toArray()) +
+                ", zkVersion=" + zkVersion +
+                ", replicas=" + Arrays.toString(replicas.toArray()) + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 27f89fa..1c21789 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -33,24 +33,6 @@ import java.util.Set;
 
 public class UpdateMetadataRequest extends AbstractRequest {
 
-    public static final class PartitionState {
-        public final int controllerEpoch;
-        public final int leader;
-        public final int leaderEpoch;
-        public final List<Integer> isr;
-        public final int zkVersion;
-        public final Set<Integer> replicas;
-
-        public PartitionState(int controllerEpoch, int leader, int 
leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
-            this.controllerEpoch = controllerEpoch;
-            this.leader = leader;
-            this.leaderEpoch = leaderEpoch;
-            this.isr = isr;
-            this.zkVersion = zkVersion;
-            this.replicas = replicas;
-        }
-    }
-
     public static final class Broker {
         public final int id;
         public final Map<SecurityProtocol, EndPoint> endPoints;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2f53a3c..afeece7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -376,15 +376,15 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createLeaderAndIsrRequest() {
-        Map<TopicPartition, LeaderAndIsrRequest.PartitionState> 
partitionStates = new HashMap<>();
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
         List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new LeaderAndIsrRequest.PartitionState(0, 2, 1, new 
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new LeaderAndIsrRequest.PartitionState(1, 1, 1, new 
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new LeaderAndIsrRequest.PartitionState(1, 0, 1, new 
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
 
         Set<Node> leaders = new HashSet<>(Arrays.asList(
                 new Node(0, "test0", 1223),
@@ -402,15 +402,15 @@ public class RequestResponseTest {
 
     @SuppressWarnings("deprecation")
     private AbstractRequest createUpdateMetadataRequest(int version, String 
rack) {
-        Map<TopicPartition, UpdateMetadataRequest.PartitionState> 
partitionStates = new HashMap<>();
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
         List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new UpdateMetadataRequest.PartitionState(0, 2, 1, new 
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new UpdateMetadataRequest.PartitionState(1, 1, 1, new 
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new UpdateMetadataRequest.PartitionState(1, 0, 1, new 
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
 
         if (version == 0) {
             Set<Node> liveBrokers = new HashSet<>(Arrays.asList(

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index a561a97..28d3c8d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,7 @@ package kafka.cluster
 
 import kafka.common._
 import kafka.utils._
-import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
 import kafka.log.LogConfig
@@ -26,17 +26,15 @@ import kafka.server._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
-
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
+
 import org.apache.kafka.common.errors.{NotEnoughReplicasException, 
NotLeaderForPartitionException}
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-
 
 import scala.collection.JavaConverters._
-
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.requests.PartitionState
 
 /**
  * Data structure that represents a topic partition. The leader maintains the 
AR, ISR, CUR, RAR
@@ -166,7 +164,7 @@ class Partition(val topic: String,
    * from the time when this broker was the leader last time) and setting the 
new leader and ISR.
    * If the leader replica id does not change, return false to indicate the 
replica manager.
    */
-  def makeLeader(controllerId: Int, partitionStateInfo: 
LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+  def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, 
correlationId: Int): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
       // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
@@ -207,7 +205,7 @@ class Partition(val topic: String,
    *  Make the local replica the follower by setting the new leader and ISR to 
empty
    *  If the leader replica id does not change, return false to indicate the 
replica manager
    */
-  def makeFollower(controllerId: Int, partitionStateInfo: 
LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+  def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, 
correlationId: Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
       val newLeaderBrokerId: Int = partitionStateInfo.leader

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 32478ca..3007004 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.{ClientRequest, 
ClientResponse, ManualMetadataUp
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, 
NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -362,7 +363,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
         }
         val partitionStates = partitionStateInfos.map { case (topicPartition, 
partitionStateInfo) =>
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = 
partitionStateInfo.leaderIsrAndControllerEpoch
-          val partitionState = new 
LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader,
+          val partitionState = new requests.PartitionState(controllerEpoch, 
leaderIsr.leader,
             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, 
leaderIsr.zkVersion,
             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
           )
@@ -379,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
           broker, p._1)))
         val partitionStates = partitionStateInfos.map { case (topicPartition, 
partitionStateInfo) =>
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = 
partitionStateInfo.leaderIsrAndControllerEpoch
-          val partitionState = new 
UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader,
+          val partitionState = new requests.PartitionState(controllerEpoch, 
leaderIsr.leader,
             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, 
leaderIsr.zkVersion,
             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
           )

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index b387f2e..f493e7d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -30,8 +30,7 @@ import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
-import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, 
UpdateMetadataRequest}
 
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b1cf68b..77df029 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.io.{File, IOException}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
@@ -28,17 +29,18 @@ import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, 
MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.I0Itec.zkclient.IZkChildListener
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, 
RecordBatchTooLargeException, ReplicaNotAvailableException, 
RecordTooLargeException,
-InvalidTopicException, ControllerMovedException, 
NotLeaderForPartitionException, CorruptRecordException, 
UnknownTopicOrPartitionException,
-InvalidTimestampException}
+import org.apache.kafka.common.errors.{ControllerMovedException, 
CorruptRecordException, InvalidTimestampException,
+                                        InvalidTopicException, 
NotLeaderForPartitionException, OffsetOutOfRangeException,
+                                        RecordBatchTooLargeException, 
RecordTooLargeException, ReplicaNotAvailableException,
+                                        UnknownTopicOrPartitionException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, 
StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, 
StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time => JTime}
+
 import scala.collection._
 import scala.collection.JavaConverters._
 
@@ -611,7 +613,7 @@ class ReplicaManager(val config: KafkaConfig,
         controllerEpoch = leaderAndISRRequest.controllerEpoch
 
         // First check partition's leader epoch
-        val partitionState = new mutable.HashMap[Partition, 
LeaderAndIsrRequest.PartitionState]()
+        val partitionState = new mutable.HashMap[Partition, PartitionState]()
         leaderAndISRRequest.partitionStates.asScala.foreach { case 
(topicPartition, stateInfo) =>
           val partition = getOrCreatePartition(topicPartition.topic, 
topicPartition.partition)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
@@ -680,7 +682,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeLeaders(controllerId: Int,
                           epoch: Int,
-                          partitionState: Map[Partition, 
LeaderAndIsrRequest.PartitionState],
+                          partitionState: Map[Partition, PartitionState],
                           correlationId: Int,
                           responseMap: mutable.Map[TopicPartition, Short]): 
Set[Partition] = {
     partitionState.foreach(state =>
@@ -751,7 +753,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeFollowers(controllerId: Int,
                             epoch: Int,
-                            partitionState: Map[Partition, 
LeaderAndIsrRequest.PartitionState],
+                            partitionState: Map[Partition, PartitionState],
                             correlationId: Int,
                             responseMap: mutable.Map[TopicPartition, Short],
                             metadataCache: MetadataCache) : Set[Partition] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b1f0283..8f23c49 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def createUpdateMetadataRequest = {
-    val partitionState = Map(tp -> new 
requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, 
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+    val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, 
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
       Map(SecurityProtocol.PLAINTEXT -> new 
requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava
     new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, 
brokers)
@@ -220,7 +220,7 @@ class AuthorizerIntegrationTest extends 
KafkaServerTestHarness {
 
   private def createLeaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
-      Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, 
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+      Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, 
List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
       Set(new Node(brokerId, "localhost", 0)).asJava)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 3c30b6b..2e24aae 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -18,11 +18,10 @@
 package kafka.server
 
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
 
 import scala.collection.JavaConverters._
 import kafka.api.LeaderAndIsr
-import org.apache.kafka.common.requests.{AbstractRequestResponse, 
LeaderAndIsrRequest, LeaderAndIsrResponse}
+import org.apache.kafka.common.requests.{AbstractRequestResponse, 
LeaderAndIsrRequest, LeaderAndIsrResponse, PartitionState}
 import org.junit.Assert._
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.cluster.Broker

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 770513c..b34c93d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -22,8 +22,8 @@ import util.Arrays.asList
 import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.UpdateMetadataRequest
-import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, 
EndPoint, PartitionState}
+import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, 
EndPoint}
 import org.junit.Test
 import org.junit.Assert._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e5700fb/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5739856..bfb66b9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,19 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
-import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
 import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Test, Before, After}
+import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map

Reply via email to