Repository: kafka Updated Branches: refs/heads/trunk c7ab3efcb -> 71fe23b44
MINOR: Fix inconsistency in StopReplica/LeaderAndIsr error counts Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #4147 from hachikuji/fix-error-inconsistencies Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71fe23b4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71fe23b4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71fe23b4 Branch: refs/heads/trunk Commit: 71fe23b445e242ca0f03f06aa8dad96e610db00b Parents: c7ab3ef Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Oct 31 09:39:01 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Oct 31 09:43:18 2017 -0700 ---------------------------------------------------------------------- .../common/requests/LeaderAndIsrRequest.java | 8 ++- .../common/requests/LeaderAndIsrResponse.java | 6 +- .../common/requests/StopReplicaRequest.java | 8 ++- .../common/requests/StopReplicaResponse.java | 6 +- .../requests/LeaderAndIsrResponseTest.java | 67 ++++++++++++++++++++ .../requests/StopReplicaResponseTest.java | 61 ++++++++++++++++++ .../src/main/scala/kafka/server/KafkaApis.scala | 10 ++- .../scala/kafka/server/ReplicaManager.scala | 39 +++++------- 8 files changed, 168 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 27aaf0a..4fa5337 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -235,17 +235,19 @@ public class LeaderAndIsrRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) { + Errors error = Errors.forException(e); + Map<TopicPartition, Errors> responses = new HashMap<>(partitionStates.size()); for (TopicPartition partition : partitionStates.keySet()) { - responses.put(partition, Errors.forException(e)); + responses.put(partition, error); } short versionId = version(); switch (versionId) { case 0: case 1: - return new LeaderAndIsrResponse(Errors.NONE, responses); + return new LeaderAndIsrResponse(error, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.LEADER_AND_ISR.latestVersion())); http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java index 921c8ad..c21f9a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -89,7 +90,10 @@ public class LeaderAndIsrResponse extends AbstractResponse { @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(error); + if (error != Errors.NONE) + // Minor optimization since the top-level error applies to all partitions + return Collections.singletonMap(error, responses.size()); + return errorCounts(responses); } public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index 722a604..d79c938 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -122,16 +122,18 @@ public class StopReplicaRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + public StopReplicaResponse getErrorResponse(int throttleTimeMs, Throwable e) { + Errors error = Errors.forException(e); + Map<TopicPartition, Errors> responses = new HashMap<>(partitions.size()); for (TopicPartition partition : partitions) { - responses.put(partition, Errors.forException(e)); + responses.put(partition, error); } short versionId = version(); switch (versionId) { case 0: - return new StopReplicaResponse(Errors.NONE, responses); + return new StopReplicaResponse(error, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.STOP_REPLICA.latestVersion())); http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java index 8ad6222..777416d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,7 +87,10 @@ public class StopReplicaResponse extends AbstractResponse { @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(error); + if (error != Errors.NONE) + // Minor optimization since the top-level error applies to all partitions + return Collections.singletonMap(error, responses.size()); + return errorCounts(responses); } public static StopReplicaResponse parse(ByteBuffer buffer, short version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java new file mode 100644 index 0000000..3eda778 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class LeaderAndIsrResponseTest { + + @Test + public void testErrorCountsFromGetErrorResponse() { + HashMap<TopicPartition, LeaderAndIsrRequest.PartitionState> partitionStates = new HashMap<>(); + partitionStates.put(new TopicPartition("foo", 0), new LeaderAndIsrRequest.PartitionState(15, 1, 10, + Collections.singletonList(10), 20, Collections.singletonList(10), false)); + partitionStates.put(new TopicPartition("foo", 1), new LeaderAndIsrRequest.PartitionState(15, 1, 10, + Collections.singletonList(10), 20, Collections.singletonList(10), false)); + LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), + 15, 20, partitionStates, Collections.<Node>emptySet()).build(); + LeaderAndIsrResponse response = request.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 2), response.errorCounts()); + } + + @Test + public void testErrorCountsWithTopLevelError() { + Map<TopicPartition, Errors> errors = new HashMap<>(); + errors.put(new TopicPartition("foo", 0), Errors.NONE); + errors.put(new TopicPartition("foo", 1), Errors.NOT_LEADER_FOR_PARTITION); + LeaderAndIsrResponse response = new LeaderAndIsrResponse(Errors.UNKNOWN_SERVER_ERROR, errors); + assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 2), response.errorCounts()); + } + + @Test + public void testErrorCountsNoTopLevelError() { + Map<TopicPartition, Errors> errors = new HashMap<>(); + errors.put(new TopicPartition("foo", 0), Errors.NONE); + errors.put(new TopicPartition("foo", 1), Errors.CLUSTER_AUTHORIZATION_FAILED); + LeaderAndIsrResponse response = new LeaderAndIsrResponse(Errors.NONE, errors); + Map<Errors, Integer> errorCounts = response.errorCounts(); + assertEquals(2, errorCounts.size()); + assertEquals(1, errorCounts.get(Errors.NONE).intValue()); + assertEquals(1, errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java new file mode 100644 index 0000000..95cb3aa --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Utils; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class StopReplicaResponseTest { + + @Test + public void testErrorCountsFromGetErrorResponse() { + StopReplicaRequest request = new StopReplicaRequest.Builder(15, 20, false, + Utils.mkSet(new TopicPartition("foo", 0), new TopicPartition("foo", 1))).build(); + StopReplicaResponse response = request.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 2), response.errorCounts()); + } + + @Test + public void testErrorCountsWithTopLevelError() { + Map<TopicPartition, Errors> errors = new HashMap<>(); + errors.put(new TopicPartition("foo", 0), Errors.NONE); + errors.put(new TopicPartition("foo", 1), Errors.NOT_LEADER_FOR_PARTITION); + StopReplicaResponse response = new StopReplicaResponse(Errors.UNKNOWN_SERVER_ERROR, errors); + assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 2), response.errorCounts()); + } + + @Test + public void testErrorCountsNoTopLevelError() { + Map<TopicPartition, Errors> errors = new HashMap<>(); + errors.put(new TopicPartition("foo", 0), Errors.NONE); + errors.put(new TopicPartition("foo", 1), Errors.CLUSTER_AUTHORIZATION_FAILED); + StopReplicaResponse response = new StopReplicaResponse(Errors.NONE, errors); + Map<Errors, Integer> errorCounts = response.errorCounts(); + assertEquals(2, errorCounts.size()); + assertEquals(1, errorCounts.get(Errors.NONE).intValue()); + assertEquals(1, errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 53feeac..4637521 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -172,13 +172,11 @@ class KafkaApis(val requestChannel: RequestChannel, } if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { - val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) - val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava) - sendResponseExemptThrottle(request, leaderAndIsrResponse) + val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) + sendResponseExemptThrottle(request, response) } else { - val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap - sendResponseMaybeThrottle(request, _ => - new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) + sendResponseMaybeThrottle(request, throttleTimeMs => leaderAndIsrRequest.getErrorResponse(throttleTimeMs, + Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/71fe23b4/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index eee442a..101eaae 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -122,13 +122,6 @@ object LogReadResult { lastStableOffset = None) } -case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Errors], error: Errors) { - - override def toString = { - "update results: [%s], global error: [%d]".format(responseMap, error.code) - } -} - object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" val IsrChangePropagationBlackOut = 5000L @@ -1038,29 +1031,29 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(correlationId: Int, - leaderAndISRRequest: LeaderAndIsrRequest, - onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = { - leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => + leaderAndIsrRequest: LeaderAndIsrRequest, + onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = { + leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " + - s"correlation id $correlationId from controller ${leaderAndISRRequest.controllerId} " + - s"epoch ${leaderAndISRRequest.controllerEpoch} for partition $topicPartition") + s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " + + s"epoch ${leaderAndIsrRequest.controllerEpoch} for partition $topicPartition") } replicaStateChangeLock synchronized { - val responseMap = new mutable.HashMap[TopicPartition, Errors] - if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndISRRequest.controllerId} with " + - s"correlation id $correlationId since its controller epoch ${leaderAndISRRequest.controllerEpoch} is old. " + + if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) { + stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " + + s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " + s"Latest known controller epoch is $controllerEpoch") - BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH) + leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception) } else { - val controllerId = leaderAndISRRequest.controllerId - controllerEpoch = leaderAndISRRequest.controllerEpoch + val responseMap = new mutable.HashMap[TopicPartition, Errors] + val controllerId = leaderAndIsrRequest.controllerId + controllerEpoch = leaderAndIsrRequest.controllerEpoch // First check partition's leader epoch val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]() - val newPartitions = leaderAndISRRequest.partitionStates.asScala.keys.filter(topicPartition => getPartition(topicPartition).isEmpty) + val newPartitions = leaderAndIsrRequest.partitionStates.asScala.keys.filter(topicPartition => getPartition(topicPartition).isEmpty) - leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => + leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch if (partition eq ReplicaManager.OfflinePartition) { @@ -1105,7 +1098,7 @@ class ReplicaManager(val config: KafkaConfig, else Set.empty[Partition] - leaderAndISRRequest.partitionStates.asScala.keys.foreach( topicPartition => + leaderAndIsrRequest.partitionStates.asScala.keys.foreach(topicPartition => /* * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. @@ -1139,7 +1132,7 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) - BecomeLeaderOrFollowerResult(responseMap, Errors.NONE) + new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava) } } }