Repository: kafka
Updated Branches:
  refs/heads/trunk c7ab3efcb -> 71fe23b44


MINOR: Fix inconsistency in StopReplica/LeaderAndIsr error counts

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #4147 from hachikuji/fix-error-inconsistencies


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71fe23b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71fe23b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71fe23b4

Branch: refs/heads/trunk
Commit: 71fe23b445e242ca0f03f06aa8dad96e610db00b
Parents: c7ab3ef
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Oct 31 09:39:01 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Oct 31 09:43:18 2017 -0700

----------------------------------------------------------------------
 .../common/requests/LeaderAndIsrRequest.java    |  8 ++-
 .../common/requests/LeaderAndIsrResponse.java   |  6 +-
 .../common/requests/StopReplicaRequest.java     |  8 ++-
 .../common/requests/StopReplicaResponse.java    |  6 +-
 .../requests/LeaderAndIsrResponseTest.java      | 67 ++++++++++++++++++++
 .../requests/StopReplicaResponseTest.java       | 61 ++++++++++++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala | 10 ++-
 .../scala/kafka/server/ReplicaManager.scala     | 39 +++++-------
 8 files changed, 168 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 27aaf0a..4fa5337 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -235,17 +235,19 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+    public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable 
e) {
+        Errors error = Errors.forException(e);
+
         Map<TopicPartition, Errors> responses = new 
HashMap<>(partitionStates.size());
         for (TopicPartition partition : partitionStates.keySet()) {
-            responses.put(partition, Errors.forException(e));
+            responses.put(partition, error);
         }
 
         short versionId = version();
         switch (versionId) {
             case 0:
             case 1:
-                return new LeaderAndIsrResponse(Errors.NONE, responses);
+                return new LeaderAndIsrResponse(error, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ApiKeys.LEADER_AND_ISR.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 921c8ad..c21f9a7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -89,7 +90,10 @@ public class LeaderAndIsrResponse extends AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
+        if (error != Errors.NONE)
+            // Minor optimization since the top-level error applies to all 
partitions
+            return Collections.singletonMap(error, responses.size());
+        return errorCounts(responses);
     }
 
     public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) 
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 722a604..d79c938 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -122,16 +122,18 @@ public class StopReplicaRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+    public StopReplicaResponse getErrorResponse(int throttleTimeMs, Throwable 
e) {
+        Errors error = Errors.forException(e);
+
         Map<TopicPartition, Errors> responses = new 
HashMap<>(partitions.size());
         for (TopicPartition partition : partitions) {
-            responses.put(partition, Errors.forException(e));
+            responses.put(partition, error);
         }
 
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new StopReplicaResponse(Errors.NONE, responses);
+                return new StopReplicaResponse(error, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ApiKeys.STOP_REPLICA.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 8ad6222..777416d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,7 +87,10 @@ public class StopReplicaResponse extends AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
+        if (error != Errors.NONE)
+            // Minor optimization since the top-level error applies to all 
partitions
+            return Collections.singletonMap(error, responses.size());
+        return errorCounts(responses);
     }
 
     public static StopReplicaResponse parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
new file mode 100644
index 0000000..3eda778
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class LeaderAndIsrResponseTest {
+
+    @Test
+    public void testErrorCountsFromGetErrorResponse() {
+        HashMap<TopicPartition, LeaderAndIsrRequest.PartitionState> 
partitionStates = new HashMap<>();
+        partitionStates.put(new TopicPartition("foo", 0), new 
LeaderAndIsrRequest.PartitionState(15, 1, 10,
+                Collections.singletonList(10), 20, 
Collections.singletonList(10), false));
+        partitionStates.put(new TopicPartition("foo", 1), new 
LeaderAndIsrRequest.PartitionState(15, 1, 10,
+                Collections.singletonList(10), 20, 
Collections.singletonList(10), false));
+        LeaderAndIsrRequest request = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(),
+                15, 20, partitionStates, Collections.<Node>emptySet()).build();
+        LeaderAndIsrResponse response = request.getErrorResponse(0, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+        
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 2), 
response.errorCounts());
+    }
+
+    @Test
+    public void testErrorCountsWithTopLevelError() {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        errors.put(new TopicPartition("foo", 0), Errors.NONE);
+        errors.put(new TopicPartition("foo", 1), 
Errors.NOT_LEADER_FOR_PARTITION);
+        LeaderAndIsrResponse response = new 
LeaderAndIsrResponse(Errors.UNKNOWN_SERVER_ERROR, errors);
+        assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 2), 
response.errorCounts());
+    }
+
+    @Test
+    public void testErrorCountsNoTopLevelError() {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        errors.put(new TopicPartition("foo", 0), Errors.NONE);
+        errors.put(new TopicPartition("foo", 1), 
Errors.CLUSTER_AUTHORIZATION_FAILED);
+        LeaderAndIsrResponse response = new LeaderAndIsrResponse(Errors.NONE, 
errors);
+        Map<Errors, Integer> errorCounts = response.errorCounts();
+        assertEquals(2, errorCounts.size());
+        assertEquals(1, errorCounts.get(Errors.NONE).intValue());
+        assertEquals(1, 
errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
new file mode 100644
index 0000000..95cb3aa
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class StopReplicaResponseTest {
+
+    @Test
+    public void testErrorCountsFromGetErrorResponse() {
+        StopReplicaRequest request = new StopReplicaRequest.Builder(15, 20, 
false,
+                Utils.mkSet(new TopicPartition("foo", 0), new 
TopicPartition("foo", 1))).build();
+        StopReplicaResponse response = request.getErrorResponse(0, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+        
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 2), 
response.errorCounts());
+    }
+
+    @Test
+    public void testErrorCountsWithTopLevelError() {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        errors.put(new TopicPartition("foo", 0), Errors.NONE);
+        errors.put(new TopicPartition("foo", 1), 
Errors.NOT_LEADER_FOR_PARTITION);
+        StopReplicaResponse response = new 
StopReplicaResponse(Errors.UNKNOWN_SERVER_ERROR, errors);
+        assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 2), 
response.errorCounts());
+    }
+
+    @Test
+    public void testErrorCountsNoTopLevelError() {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        errors.put(new TopicPartition("foo", 0), Errors.NONE);
+        errors.put(new TopicPartition("foo", 1), 
Errors.CLUSTER_AUTHORIZATION_FAILED);
+        StopReplicaResponse response = new StopReplicaResponse(Errors.NONE, 
errors);
+        Map<Errors, Integer> errorCounts = response.errorCounts();
+        assertEquals(2, errorCounts.size());
+        assertEquals(1, errorCounts.get(Errors.NONE).intValue());
+        assertEquals(1, 
errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 53feeac..4637521 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -172,13 +172,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-      val result = replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest, onLeadershipChange)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, 
result.responseMap.asJava)
-      sendResponseExemptThrottle(request, leaderAndIsrResponse)
+      val response = replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest, onLeadershipChange)
+      sendResponseExemptThrottle(request, response)
     } else {
-      val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, 
Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-      sendResponseMaybeThrottle(request, _ =>
-        new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
result.asJava))
+      sendResponseMaybeThrottle(request, throttleTimeMs => 
leaderAndIsrRequest.getErrorResponse(throttleTimeMs,
+        Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index eee442a..101eaae 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -122,13 +122,6 @@ object LogReadResult {
                                            lastStableOffset = None)
 }
 
-case class BecomeLeaderOrFollowerResult(responseMap: 
collection.Map[TopicPartition, Errors], error: Errors) {
-
-  override def toString = {
-    "update results: [%s], global error: [%d]".format(responseMap, error.code)
-  }
-}
-
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
   val IsrChangePropagationBlackOut = 5000L
@@ -1038,29 +1031,29 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def becomeLeaderOrFollower(correlationId: Int,
-                             leaderAndISRRequest: LeaderAndIsrRequest,
-                             onLeadershipChange: (Iterable[Partition], 
Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
-    leaderAndISRRequest.partitionStates.asScala.foreach { case 
(topicPartition, stateInfo) =>
+                             leaderAndIsrRequest: LeaderAndIsrRequest,
+                             onLeadershipChange: (Iterable[Partition], 
Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
+    leaderAndIsrRequest.partitionStates.asScala.foreach { case 
(topicPartition, stateInfo) =>
       stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " +
-        s"correlation id $correlationId from controller 
${leaderAndISRRequest.controllerId} " +
-        s"epoch ${leaderAndISRRequest.controllerEpoch} for partition 
$topicPartition")
+        s"correlation id $correlationId from controller 
${leaderAndIsrRequest.controllerId} " +
+        s"epoch ${leaderAndIsrRequest.controllerEpoch} for partition 
$topicPartition")
     }
     replicaStateChangeLock synchronized {
-      val responseMap = new mutable.HashMap[TopicPartition, Errors]
-      if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller 
${leaderAndISRRequest.controllerId} with " +
-          s"correlation id $correlationId since its controller epoch 
${leaderAndISRRequest.controllerEpoch} is old. " +
+      if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
+        stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller 
${leaderAndIsrRequest.controllerId} with " +
+          s"correlation id $correlationId since its controller epoch 
${leaderAndIsrRequest.controllerEpoch} is old. " +
           s"Latest known controller epoch is $controllerEpoch")
-        BecomeLeaderOrFollowerResult(responseMap, 
Errors.STALE_CONTROLLER_EPOCH)
+        leaderAndIsrRequest.getErrorResponse(0, 
Errors.STALE_CONTROLLER_EPOCH.exception)
       } else {
-        val controllerId = leaderAndISRRequest.controllerId
-        controllerEpoch = leaderAndISRRequest.controllerEpoch
+        val responseMap = new mutable.HashMap[TopicPartition, Errors]
+        val controllerId = leaderAndIsrRequest.controllerId
+        controllerEpoch = leaderAndIsrRequest.controllerEpoch
 
         // First check partition's leader epoch
         val partitionState = new mutable.HashMap[Partition, 
LeaderAndIsrRequest.PartitionState]()
-        val newPartitions = 
leaderAndISRRequest.partitionStates.asScala.keys.filter(topicPartition => 
getPartition(topicPartition).isEmpty)
+        val newPartitions = 
leaderAndIsrRequest.partitionStates.asScala.keys.filter(topicPartition => 
getPartition(topicPartition).isEmpty)
 
-        leaderAndISRRequest.partitionStates.asScala.foreach { case 
(topicPartition, stateInfo) =>
+        leaderAndIsrRequest.partitionStates.asScala.foreach { case 
(topicPartition, stateInfo) =>
           val partition = getOrCreatePartition(topicPartition)
           val partitionLeaderEpoch = partition.getLeaderEpoch
           if (partition eq ReplicaManager.OfflinePartition) {
@@ -1105,7 +1098,7 @@ class ReplicaManager(val config: KafkaConfig,
         else
           Set.empty[Partition]
 
-        leaderAndISRRequest.partitionStates.asScala.keys.foreach( 
topicPartition =>
+        
leaderAndIsrRequest.partitionStates.asScala.keys.foreach(topicPartition =>
           /*
            * If there is offline log directory, a Partition object may have 
been created by getOrCreatePartition()
            * before getOrCreateReplica() failed to create local replica due to 
KafkaStorageException.
@@ -1139,7 +1132,7 @@ class ReplicaManager(val config: KafkaConfig,
         replicaFetcherManager.shutdownIdleFetcherThreads()
         replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
         onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-        BecomeLeaderOrFollowerResult(responseMap, Errors.NONE)
+        new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)
       }
     }
   }

Reply via email to