Repository: kafka Updated Branches: refs/heads/trunk d9cabfde6 -> 00114dae7
KAFKA-2072; Replace StopReplica Request/Response with their org.apache.kafka.common.requests equivalents Author: David Jacot <[email protected]> Reviewers: Ismael Juma <[email protected]>, Grant Henke <[email protected]>, Jun Rao <[email protected]> Closes #196 from dajac/KAFKA-2072-part-2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/00114dae Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/00114dae Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/00114dae Branch: refs/heads/trunk Commit: 00114dae7a0ecb361aa5b843d5c7a95381294bd2 Parents: d9cabfd Author: David Jacot <[email protected]> Authored: Wed Jan 6 16:08:16 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Wed Jan 6 16:08:16 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/api/StopReplicaRequest.scala | 126 ------------------- .../scala/kafka/api/StopReplicaResponse.scala | 75 ----------- .../scala/kafka/network/RequestChannel.scala | 1 - .../src/main/scala/kafka/server/KafkaApis.scala | 29 +++-- .../scala/kafka/server/ReplicaManager.scala | 21 ++-- .../api/RequestResponseSerializationTest.scala | 16 +-- 6 files changed, 31 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/api/StopReplicaRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala deleted file mode 100644 index 03c7f3d..0000000 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio._ -import kafka.api.ApiUtils._ -import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException} -import kafka.common.{TopicAndPartition, ErrorMapping} -import kafka.network.RequestChannel.Response -import kafka.utils.Logging -import org.apache.kafka.common.protocol.ApiKeys -import collection.Set - - -object StopReplicaRequest extends Logging { - val CurrentVersion = 0.shortValue - val DefaultClientId = "" - val DefaultAckTimeout = 100 - - def readFrom(buffer: ByteBuffer): StopReplicaRequest = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val controllerId = buffer.getInt - val controllerEpoch = buffer.getInt - val deletePartitions = buffer.get match { - case 1 => true - case 0 => false - case x => - throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x)) - } - val topicPartitionPairCount = buffer.getInt - val topicPartitionPairSet = new collection.mutable.HashSet[TopicAndPartition]() - (1 to topicPartitionPairCount) foreach { _ => - topicPartitionPairSet.add(TopicAndPartition(readShortString(buffer), buffer.getInt)) - } - StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch, - deletePartitions, topicPartitionPairSet.toSet) - } -} - -case class StopReplicaRequest(versionId: Short, - correlationId: Int, - clientId: String, - controllerId: Int, - controllerEpoch: Int, - deletePartitions: Boolean, - partitions: Set[TopicAndPartition]) - extends RequestOrResponse(Some(ApiKeys.STOP_REPLICA.id)) { - - def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = { - this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, - controllerId, controllerEpoch, deletePartitions, partitions) - } - - def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) - buffer.putInt(correlationId) - writeShortString(buffer, clientId) - buffer.putInt(controllerId) - buffer.putInt(controllerEpoch) - buffer.put(if (deletePartitions) 1.toByte else 0.toByte) - buffer.putInt(partitions.size) - for (topicAndPartition <- partitions) { - writeShortString(buffer, topicAndPartition.topic) - buffer.putInt(topicAndPartition.partition) - } - } - - def sizeInBytes(): Int = { - var size = - 2 + /* versionId */ - 4 + /* correlation id */ - ApiUtils.shortStringLength(clientId) + - 4 + /* controller id*/ - 4 + /* controller epoch */ - 1 + /* deletePartitions */ - 4 /* partition count */ - for (topicAndPartition <- partitions){ - size += (ApiUtils.shortStringLength(topicAndPartition.topic)) + - 4 /* partition id */ - } - size - } - - override def toString(): String = { - describe(true) - } - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val responseMap = partitions.map { - case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - }.toMap - val errorResponse = StopReplicaResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) - } - - override def describe(details: Boolean): String = { - val stopReplicaRequest = new StringBuilder - stopReplicaRequest.append("Name: " + this.getClass.getSimpleName) - stopReplicaRequest.append("; Version: " + versionId) - stopReplicaRequest.append("; CorrelationId: " + correlationId) - stopReplicaRequest.append("; ClientId: " + clientId) - stopReplicaRequest.append("; DeletePartitions: " + deletePartitions) - stopReplicaRequest.append("; ControllerId: " + controllerId) - stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch) - if(details) - stopReplicaRequest.append("; Partitions: " + partitions.mkString(",")) - stopReplicaRequest.toString() - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/api/StopReplicaResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala deleted file mode 100644 index 2fc3c95..0000000 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import collection.mutable.HashMap -import collection.immutable.Map -import kafka.common.{TopicAndPartition, ErrorMapping} -import kafka.api.ApiUtils._ - - -object StopReplicaResponse { - def readFrom(buffer: ByteBuffer): StopReplicaResponse = { - val correlationId = buffer.getInt - val errorCode = buffer.getShort - val numEntries = buffer.getInt - - val responseMap = new HashMap[TopicAndPartition, Short]() - for (i<- 0 until numEntries){ - val topic = readShortString(buffer) - val partition = buffer.getInt - val partitionErrorCode = buffer.getShort() - responseMap.put(TopicAndPartition(topic, partition), partitionErrorCode) - } - new StopReplicaResponse(correlationId, responseMap.toMap, errorCode) - } -} - - -case class StopReplicaResponse(correlationId: Int, - responseMap: Map[TopicAndPartition, Short], - errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse() { - def sizeInBytes(): Int ={ - var size = - 4 /* correlation id */ + - 2 /* error code */ + - 4 /* number of responses */ - for ((key, value) <- responseMap) { - size += - 2 + key.topic.length /* topic */ + - 4 /* partition */ + - 2 /* error code for this partition */ - } - size - } - - def writeTo(buffer: ByteBuffer) { - buffer.putInt(correlationId) - buffer.putShort(errorCode) - buffer.putInt(responseMap.size) - for ((topicAndPartition, errorCode) <- responseMap){ - writeShortString(buffer, topicAndPartition.topic) - buffer.putInt(topicAndPartition.partition) - buffer.putShort(errorCode) - } - } - - override def describe(details: Boolean):String = { toString } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1ab51da..998f51a 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -69,7 +69,6 @@ object RequestChannel extends Logging { Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom, ApiKeys.FETCH.id -> FetchRequest.readFrom, ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom, - ApiKeys.STOP_REPLICA.id -> StopReplicaRequest.readFrom, ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom, ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom, ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom, http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cbf5031..5fda0eb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -36,7 +36,8 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, -LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse} +LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, +StopReplicaRequest, StopReplicaResponse} import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Node} @@ -90,9 +91,10 @@ class KafkaApis(val requestChannel: RequestChannel, } } catch { case e: Throwable => - if ( request.requestObj != null) + if (request.requestObj != null) { request.requestObj.handleError(e, requestChannel, request) - else { + error("Error when handling request %s".format(request.requestObj), e) + } else { val response = request.body.getErrorResponse(request.header.apiVersion, e) val respHeader = new ResponseHeader(request.header.correlationId) @@ -102,8 +104,10 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.closeConnection(request.processor, request) else requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) + + error("Error when handling request %s".format(request.body), e) } - error("error when handling request %s".format(request.requestObj), e) + } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds } @@ -152,13 +156,19 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted - val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] + val stopReplicaRequest = request.body.asInstanceOf[StopReplicaRequest] - authorizeClusterAction(request) + val responseHeader = new ResponseHeader(request.header.correlationId) + val response = + if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { + val (result, error) = replicaManager.stopReplicas(stopReplicaRequest) + new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava) + } else { + val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap + new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava) + } - val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) - val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, stopReplicaResponse))) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response))) replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() } @@ -926,5 +936,4 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, ClusterAction, Resource.ClusterResource)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3c2fa36..75e6bae 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.LeaderAndIsrRequest +import org.apache.kafka.common.requests.StopReplicaRequest import org.apache.kafka.common.utils.{Time => JTime} import scala.collection._ @@ -241,21 +242,21 @@ class ReplicaManager(val config: KafkaConfig, errorCode } - def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = { + def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Short], Short) = { replicaStateChangeLock synchronized { - val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short] - if(stopReplicaRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d." - .format(localBrokerId, stopReplicaRequest.controllerEpoch) + - " Latest known controller epoch is %d " + controllerEpoch) + val responseMap = new collection.mutable.HashMap[TopicPartition, Short] + if(stopReplicaRequest.controllerEpoch() < controllerEpoch) { + stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d" + .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch)) (responseMap, ErrorMapping.StaleControllerEpochCode) } else { + val partitions = stopReplicaRequest.partitions.asScala controllerEpoch = stopReplicaRequest.controllerEpoch // First stop fetchers for all partitions, then stop the corresponding replicas - replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition))) - for(topicAndPartition <- stopReplicaRequest.partitions){ - val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions) - responseMap.put(topicAndPartition, errorCode) + replicaFetcherManager.removeFetcherForPartitions(partitions.map(r => TopicAndPartition(r.topic, r.partition))) + for(topicPartition <- partitions){ + val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions) + responseMap.put(topicPartition, errorCode) } (responseMap, ErrorMapping.NoError) } http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index c645102..51d6c91 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -116,17 +116,6 @@ object SerializationTestUtils { TopicAndPartition(topic1,3) -> partitionStateInfo3 ) - def createTestStopReplicaRequest() : StopReplicaRequest = { - new StopReplicaRequest(controllerId = 0, controllerEpoch = 1, correlationId = 0, deletePartitions = true, - partitions = collection.immutable.Set(TopicAndPartition(topic1, 0),TopicAndPartition(topic2, 0))) - } - - def createTestStopReplicaResponse() : StopReplicaResponse = { - val responseMap = Map((TopicAndPartition(topic1, 0), ErrorMapping.NoError), - (TopicAndPartition(topic2, 0), ErrorMapping.NoError)) - new StopReplicaResponse(0, responseMap.toMap) - } - def createTestProducerRequest: ProducerRequest = { new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest) } @@ -239,8 +228,6 @@ object SerializationTestUtils { } class RequestResponseSerializationTest extends JUnitSuite { - private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest - private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse private val producerRequest = SerializationTestUtils.createTestProducerRequest private val producerResponse = SerializationTestUtils.createTestProducerResponse private val fetchRequest = SerializationTestUtils.createTestFetchRequest @@ -266,8 +253,7 @@ class RequestResponseSerializationTest extends JUnitSuite { def testSerializationAndDeserialization() { val requestsAndResponses = - collection.immutable.Seq(stopReplicaRequest, - stopReplicaResponse, producerRequest, producerResponse, + collection.immutable.Seq(producerRequest, producerResponse, fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest, topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,
