Repository: kafka
Updated Branches:
refs/heads/0.10.0 039e89a6e -> e293ed145
MINOR: Improve PartitionState logging and remove duplication of code
Currently, logs involving PartitionState are not very helpful.
```
Broker 449 cached leader info
org.apache.kafka.common.requests.UpdateMetadataRequest$PartitionState3285d64a
for partition <topic>-<partition> in response to UpdateMetadata request sent by
controller 356 epoch 138 with correlation id 0
TRACE state.change.logger: Broker 449 received LeaderAndIsr request
org.apache.kafka.common.requests.LeaderAndIsrRequest$PartitionState66d6a8eb
correlation id 3 from controller 356 epoch 138 for partition
[<topic>,<partition>]
```
Author: Ashish Singh <[email protected]>
Reviewers: Ismael Juma <[email protected]>
Closes #1609 from SinghAsDev/partitionState
(cherry picked from commit 0e5700fb68671f3fb75bfdeceda40e84330aca69)
Signed-off-by: Ismael Juma <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e293ed14
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e293ed14
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e293ed14
Branch: refs/heads/0.10.0
Commit: e293ed1458a1bc2118fe37ed6a6002ffe05e3ecf
Parents: 039e89a
Author: Ashish Singh <[email protected]>
Authored: Thu Jul 21 01:00:33 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Thu Jul 21 01:00:51 2016 +0100
----------------------------------------------------------------------
.../common/requests/LeaderAndIsrRequest.java | 19 --------
.../kafka/common/requests/PartitionState.java | 46 ++++++++++++++++++++
.../common/requests/UpdateMetadataRequest.java | 18 --------
.../common/requests/RequestResponseTest.java | 16 +++----
.../main/scala/kafka/cluster/Partition.scala | 12 +++--
.../controller/ControllerChannelManager.scala | 5 ++-
.../main/scala/kafka/server/MetadataCache.scala | 3 +-
.../scala/kafka/server/ReplicaManager.scala | 18 ++++----
.../kafka/api/AuthorizerIntegrationTest.scala | 4 +-
.../unit/kafka/server/LeaderElectionTest.scala | 3 +-
.../unit/kafka/server/MetadataCacheTest.scala | 4 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 7 ++-
12 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index fee3c21..52b9674 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -35,25 +35,6 @@ import java.util.Set;
public class LeaderAndIsrRequest extends AbstractRequest {
- public static class PartitionState {
- public final int controllerEpoch;
- public final int leader;
- public final int leaderEpoch;
- public final List<Integer> isr;
- public final int zkVersion;
- public final Set<Integer> replicas;
-
- public PartitionState(int controllerEpoch, int leader, int
leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
- this.controllerEpoch = controllerEpoch;
- this.leader = leader;
- this.leaderEpoch = leaderEpoch;
- this.isr = isr;
- this.zkVersion = zkVersion;
- this.replicas = replicas;
- }
-
- }
-
private static final Schema CURRENT_SCHEMA =
ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
----------------------------------------------------------------------
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
new file mode 100644
index 0000000..e766632
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class PartitionState {
+ public final int controllerEpoch;
+ public final int leader;
+ public final int leaderEpoch;
+ public final List<Integer> isr;
+ public final int zkVersion;
+ public final Set<Integer> replicas;
+
+ public PartitionState(int controllerEpoch, int leader, int leaderEpoch,
List<Integer> isr, int zkVersion, Set<Integer> replicas) {
+ this.controllerEpoch = controllerEpoch;
+ this.leader = leader;
+ this.leaderEpoch = leaderEpoch;
+ this.isr = isr;
+ this.zkVersion = zkVersion;
+ this.replicas = replicas;
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionState(controllerEpoch=" + controllerEpoch +
+ ", leader=" + leader +
+ ", leaderEpoch=" + leaderEpoch +
+ ", isr=" + Arrays.toString(isr.toArray()) +
+ ", zkVersion=" + zkVersion +
+ ", replicas=" + Arrays.toString(replicas.toArray()) + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 27f89fa..1c21789 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -33,24 +33,6 @@ import java.util.Set;
public class UpdateMetadataRequest extends AbstractRequest {
- public static final class PartitionState {
- public final int controllerEpoch;
- public final int leader;
- public final int leaderEpoch;
- public final List<Integer> isr;
- public final int zkVersion;
- public final Set<Integer> replicas;
-
- public PartitionState(int controllerEpoch, int leader, int
leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
- this.controllerEpoch = controllerEpoch;
- this.leader = leader;
- this.leaderEpoch = leaderEpoch;
- this.isr = isr;
- this.zkVersion = zkVersion;
- this.replicas = replicas;
- }
- }
-
public static final class Broker {
public final int id;
public final Map<SecurityProtocol, EndPoint> endPoints;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 345de3f..ecf9e53 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -372,15 +372,15 @@ public class RequestResponseTest {
}
private AbstractRequest createLeaderAndIsrRequest() {
- Map<TopicPartition, LeaderAndIsrRequest.PartitionState>
partitionStates = new HashMap<>();
+ Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
- new LeaderAndIsrRequest.PartitionState(0, 2, 1, new
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+ new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new
HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic5", 1),
- new LeaderAndIsrRequest.PartitionState(1, 1, 1, new
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+ new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new
HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic20", 1),
- new LeaderAndIsrRequest.PartitionState(1, 0, 1, new
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+ new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new
HashSet<>(replicas)));
Set<Node> leaders = new HashSet<>(Arrays.asList(
new Node(0, "test0", 1223),
@@ -398,15 +398,15 @@ public class RequestResponseTest {
@SuppressWarnings("deprecation")
private AbstractRequest createUpdateMetadataRequest(int version, String
rack) {
- Map<TopicPartition, UpdateMetadataRequest.PartitionState>
partitionStates = new HashMap<>();
+ Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
- new UpdateMetadataRequest.PartitionState(0, 2, 1, new
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+ new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new
HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic5", 1),
- new UpdateMetadataRequest.PartitionState(1, 1, 1, new
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+ new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new
HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic20", 1),
- new UpdateMetadataRequest.PartitionState(1, 0, 1, new
ArrayList<>(isr), 2, new HashSet<>(replicas)));
+ new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new
HashSet<>(replicas)));
if (version == 0) {
Set<Node> liveBrokers = new HashSet<>(Arrays.asList(
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 4e79bdc..edf6619 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,7 @@ package kafka.cluster
import kafka.common._
import kafka.utils._
-import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
@@ -26,17 +26,15 @@ import kafka.server._
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
-
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
+
import org.apache.kafka.common.errors.{NotEnoughReplicasException,
NotLeaderForPartitionException}
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-
import scala.collection.JavaConverters._
-
import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.requests.PartitionState
/**
* Data structure that represents a topic partition. The leader maintains the
AR, ISR, CUR, RAR
@@ -166,7 +164,7 @@ class Partition(val topic: String,
* from the time when this broker was the leader last time) and setting the
new leader and ISR.
* If the leader replica id does not change, return false to indicate the
replica manager.
*/
- def makeLeader(controllerId: Int, partitionStateInfo:
LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+ def makeLeader(controllerId: Int, partitionStateInfo: PartitionState,
correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
// record the epoch of the controller that made the leadership decision.
This is useful while updating the isr
@@ -207,7 +205,7 @@ class Partition(val topic: String,
* Make the local replica the follower by setting the new leader and ISR to
empty
* If the leader replica id does not change, return false to indicate the
replica manager
*/
- def makeFollower(controllerId: Int, partitionStateInfo:
LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+ def makeFollower(controllerId: Int, partitionStateInfo: PartitionState,
correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
val newLeaderBrokerId: Int = partitionStateInfo.leader
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b4059a4..c19d35a 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.{ClientRequest,
ClientResponse, ManualMetadataUp
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode,
NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests
import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition}
@@ -362,7 +363,7 @@ class ControllerBrokerRequestBatch(controller:
KafkaController) extends Logging
}
val partitionStates = partitionStateInfos.map { case (topicPartition,
partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) =
partitionStateInfo.leaderIsrAndControllerEpoch
- val partitionState = new
LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader,
+ val partitionState = new requests.PartitionState(controllerEpoch,
leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava,
leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
@@ -379,7 +380,7 @@ class ControllerBrokerRequestBatch(controller:
KafkaController) extends Logging
broker, p._1)))
val partitionStates = partitionStateInfos.map { case (topicPartition,
partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) =
partitionStateInfo.leaderIsrAndControllerEpoch
- val partitionState = new
UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader,
+ val partitionState = new requests.PartitionState(controllerEpoch,
leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava,
leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index b387f2e..f493e7d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -30,8 +30,7 @@ import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.Node
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
-import org.apache.kafka.common.requests.{MetadataResponse,
UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{MetadataResponse, PartitionState,
UpdateMetadataRequest}
/**
* A cache for the state (e.g., current leader) of each partition. This cache
is updated through
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68f2385..8260643 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.io.{File, IOException}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{Partition, Replica}
@@ -28,16 +29,17 @@ import kafka.log.{LogAppendInfo, LogManager}
import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message,
MessageSet}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
-import org.I0Itec.zkclient.IZkChildListener
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException,
RecordBatchTooLargeException, ReplicaNotAvailableException,
RecordTooLargeException,
-InvalidTopicException, ControllerMovedException,
NotLeaderForPartitionException, CorruptRecordException,
UnknownTopicOrPartitionException,
-InvalidTimestampException}
+import org.apache.kafka.common.errors.{ControllerMovedException,
CorruptRecordException, InvalidTimestampException,
+ InvalidTopicException,
NotLeaderForPartitionException, OffsetOutOfRangeException,
+ RecordBatchTooLargeException,
RecordTooLargeException, ReplicaNotAvailableException,
+ UnknownTopicOrPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest,
StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState,
StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time => JTime}
+
import scala.collection._
import scala.collection.JavaConverters._
@@ -610,7 +612,7 @@ class ReplicaManager(val config: KafkaConfig,
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
- val partitionState = new mutable.HashMap[Partition,
LeaderAndIsrRequest.PartitionState]()
+ val partitionState = new mutable.HashMap[Partition, PartitionState]()
leaderAndISRRequest.partitionStates.asScala.foreach { case
(topicPartition, stateInfo) =>
val partition = getOrCreatePartition(topicPartition.topic,
topicPartition.partition)
val partitionLeaderEpoch = partition.getLeaderEpoch()
@@ -679,7 +681,7 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def makeLeaders(controllerId: Int,
epoch: Int,
- partitionState: Map[Partition,
LeaderAndIsrRequest.PartitionState],
+ partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short]):
Set[Partition] = {
partitionState.foreach(state =>
@@ -750,7 +752,7 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def makeFollowers(controllerId: Int,
epoch: Int,
- partitionState: Map[Partition,
LeaderAndIsrRequest.PartitionState],
+ partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short],
metadataCache: MetadataCache) : Set[Partition] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2d5900f..60eb74c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -186,7 +186,7 @@ class AuthorizerIntegrationTest extends
KafkaServerTestHarness {
}
private def createUpdateMetadataRequest = {
- val partitionState = Map(tp -> new
requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId,
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+ val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId,
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
Map(SecurityProtocol.PLAINTEXT -> new
requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava
new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState,
brokers)
@@ -215,7 +215,7 @@ class AuthorizerIntegrationTest extends
KafkaServerTestHarness {
private def createLeaderAndIsrRequest = {
new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
- Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue,
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+ Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue,
List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Set(new Node(brokerId, "localhost", 0)).asJava)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 7258980..343a3e1 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -18,11 +18,10 @@
package kafka.server
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
import scala.collection.JavaConverters._
import kafka.api.LeaderAndIsr
-import org.apache.kafka.common.requests.{AbstractRequestResponse,
LeaderAndIsrRequest, LeaderAndIsrResponse}
+import org.apache.kafka.common.requests.{AbstractRequestResponse,
LeaderAndIsrRequest, LeaderAndIsrResponse, PartitionState}
import org.junit.Assert._
import kafka.utils.{CoreUtils, TestUtils}
import kafka.cluster.Broker
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 770513c..b34c93d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -22,8 +22,8 @@ import util.Arrays.asList
import kafka.common.BrokerEndPointNotAvailableException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.UpdateMetadataRequest
-import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker,
EndPoint, PartitionState}
+import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker,
EndPoint}
import org.junit.Test
import org.junit.Assert._
http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5739856..bfb66b9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,19 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
import kafka.cluster.Broker
import kafka.common.TopicAndPartition
-import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{MockTime => JMockTime}
import org.apache.kafka.common.{Node, TopicPartition}
import org.easymock.EasyMock
import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Test, Before, After}
+import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.Map