http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index ec8e1eb..1d95fa0 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -19,19 +19,16 @@ package kafka.network import java.net.InetAddress import java.nio.ByteBuffer -import java.util.Collections import java.util.concurrent._ import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup +import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest} import kafka.server.QuotaId import kafka.utils.{Logging, NotNothing} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol} -import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time @@ -40,25 +37,20 @@ import org.apache.log4j.Logger import scala.reflect.ClassTag object RequestChannel extends Logging { - val AllDone = new Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), - startTimeNanos = 0, listenerName = new ListenerName(""), securityProtocol = SecurityProtocol.PLAINTEXT, - MemoryPool.NONE, shutdownReceive) private val requestLogger = Logger.getLogger("kafka.request.logger") - private def shutdownReceive: ByteBuffer = { - val emptyProduceRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 0, 0, - Collections.emptyMap[TopicPartition, MemoryRecords]).build() - val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, emptyProduceRequest.version, "", 0) - emptyProduceRequest.serialize(emptyRequestHeader) - } + sealed trait BaseRequest + case object ShutdownRequest extends BaseRequest case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) { val sanitizedUser = QuotaId.sanitize(principal.getName) } - class Request(val processor: Int, val connectionId: String, val session: Session, startTimeNanos: Long, - val listenerName: ListenerName, val securityProtocol: SecurityProtocol, memoryPool: MemoryPool, - @volatile private var buffer: ByteBuffer) { + class Request(val processor: Int, + val context: RequestContext, + val startTimeNanos: Long, + memoryPool: MemoryPool, + @volatile private var buffer: ByteBuffer) extends BaseRequest { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads @volatile var requestDequeueTimeNanos = -1L @@ -68,31 +60,17 @@ object RequestChannel extends Logging { @volatile var apiRemoteCompleteTimeNanos = -1L @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None - val header: RequestHeader = try { - RequestHeader.parse(buffer) - } catch { - case ex: Throwable => - throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: ${buffer.getShort(0)}", ex) - } + val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) - val bodyAndSize: RequestAndSize = - try { - // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later - if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) { - new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0) - } - else - AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer) - } catch { - case ex: Throwable => - throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex) - } + def header: RequestHeader = context.header + def sizeOfBodyInBytes: Int = bodyAndSize.size //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. - if (!Protocol.requiresDelayedDeallocation(header.apiKey)) { - dispose() + if (!Protocol.requiresDelayedDeallocation(header.apiKey.id)) { + releaseBuffer() } def requestDesc(details: Boolean): String = s"$header -- ${body[AbstractRequest].toString(details)}" @@ -105,7 +83,7 @@ object RequestChannel extends Logging { } } - trace("Processor %d received request : %s".format(processor, requestDesc(true))) + trace(s"Processor $processor received request: ${requestDesc(true)}") def requestThreadTimeNanos = { if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds @@ -121,7 +99,7 @@ object RequestChannel extends Logging { if (apiLocalCompleteTimeNanos < 0) apiLocalCompleteTimeNanos = responseCompleteTimeNanos // If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that do not go through a purgatory), then it is - // the same as responseCompleteTimeNans. + // the same as responseCompleteTimeNanos. if (apiRemoteCompleteTimeNanos < 0) apiRemoteCompleteTimeNanos = responseCompleteTimeNanos @@ -135,7 +113,7 @@ object RequestChannel extends Logging { val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val totalTime = nanosToMs(endTimeNanos - startTimeNanos) val fetchMetricNames = - if (header.apiKey == ApiKeys.FETCH.id) { + if (header.apiKey == ApiKeys.FETCH) { val isFromFollower = body[FetchRequest].isFromFollower Seq( if (isFromFollower) RequestMetrics.followFetchMetricName @@ -143,7 +121,7 @@ object RequestChannel extends Logging { ) } else Seq.empty - val metricNames = fetchMetricNames :+ ApiKeys.forId(header.apiKey).name + val metricNames = fetchMetricNames :+ header.apiKey.name metricNames.foreach { metricName => val m = RequestMetrics.metricsMap(metricName) m.requestRate.mark() @@ -174,12 +152,22 @@ object RequestChannel extends Logging { val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos) val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos) val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) - requestLogger.debug("Completed request:%s from connection %s;totalTime:%f,requestQueueTime:%f,localTime:%f,remoteTime:%f,throttleTime:%f,responseQueueTime:%f,sendTime:%f,securityProtocol:%s,principal:%s,listener:%s" - .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value)) + + requestLogger.debug(s"Completed request:${requestDesc(detailsEnabled)} from connection ${context.connectionId};" + + s"totalTime:$totalTimeMs," + + s"requestQueueTime:$requestQueueTimeMs," + + s"localTime:$apiLocalTimeMs," + + s"remoteTime:$apiRemoteTimeMs," + + s"throttleTime:$apiThrottleTimeMs," + + s"responseQueueTime:$responseQueueTimeMs," + + s"sendTime:$responseSendTimeMs," + + s"securityProtocol:${context.securityProtocol}," + + s"principal:${context.principal}," + + s"listener:${context.listenerName.value}") } } - def dispose(): Unit = { + def releaseBuffer(): Unit = { if (buffer != null) { memoryPool.release(buffer) buffer = null @@ -187,30 +175,14 @@ object RequestChannel extends Logging { } override def toString = s"Request(processor=$processor, " + - s"connectionId=$connectionId, " + + s"connectionId=${context.connectionId}, " + s"session=$session, " + - s"listenerName=$listenerName, " + - s"securityProtocol=$securityProtocol, " + + s"listenerName=${context.listenerName}, " + + s"securityProtocol=${context.securityProtocol}, " + s"buffer=$buffer)" } - object Response { - - def apply(request: Request, responseSend: Send): Response = { - require(request != null, "request should be non null") - require(responseSend != null, "responseSend should be non null") - new Response(request, Some(responseSend), SendAction) - } - - def apply(request: Request, response: AbstractResponse): Response = { - require(request != null, "request should be non null") - require(response != null, "response should be non null") - apply(request, response.toSend(request.connectionId, request.header)) - } - - } - class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction) { request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds @@ -228,7 +200,7 @@ object RequestChannel extends Logging { class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil - private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) + private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() @@ -260,17 +232,23 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe /** Send a response back to the socket server to be sent over the network */ def sendResponse(response: RequestChannel.Response) { + if (isTraceEnabled) { + val requestHeader = response.request.header + trace(s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of " + + s"${response.responseSend.size} bytes.") + } + responseQueues(response.processor).put(response) for(onResponse <- responseListeners) onResponse(response.processor) } /** Get the next request or block until specified time has elapsed */ - def receiveRequest(timeout: Long): RequestChannel.Request = + def receiveRequest(timeout: Long): RequestChannel.BaseRequest = requestQueue.poll(timeout, TimeUnit.MILLISECONDS) /** Get the next request or block until there is one */ - def receiveRequest(): RequestChannel.Request = + def receiveRequest(): RequestChannel.BaseRequest = requestQueue.take() /** Get a response for the given processor if there is one */ @@ -288,6 +266,9 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe def shutdown() { requestQueue.clear() } + + def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) + } object RequestMetrics {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e6f6662..f1bc926 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -37,8 +37,9 @@ import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Rate import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.protocol.types.SchemaException +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.utils.{KafkaThread, Time} import scala.collection._ @@ -485,7 +486,7 @@ private[kafka] class Processor(val id: Int, // that are sitting in the server's socket buffer updateRequestMetrics(curr.request) trace("Socket server received empty response to send, registering for read: " + curr) - val channelId = curr.request.connectionId + val channelId = curr.request.context.connectionId if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) selector.unmute(channelId) case RequestChannel.SendAction => @@ -495,7 +496,7 @@ private[kafka] class Processor(val id: Int, case RequestChannel.CloseConnectionAction => updateRequestMetrics(curr.request) trace("Closing socket connection actively according to the response code.") - close(selector, curr.request.connectionId) + close(selector, curr.request.context.connectionId) } } finally { curr = requestChannel.receiveResponse(id) @@ -505,7 +506,7 @@ private[kafka] class Processor(val id: Int, /* `protected` for test usage */ protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { - val connectionId = response.request.connectionId + val connectionId = response.request.context.connectionId trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") // `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long if (channel(connectionId).isEmpty) { @@ -515,7 +516,7 @@ private[kafka] class Processor(val id: Int, // Invoke send for closingChannel as well so that the send is failed and the channel closed properly and // removed from the Selector after discarding any pending staged receives. // `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long - if (!openOrClosingChannel(connectionId).isEmpty) { + if (openOrClosingChannel(connectionId).isDefined) { selector.send(responseSend) inflightResponses += (connectionId -> response) } @@ -538,11 +539,12 @@ private[kafka] class Processor(val id: Int, val openChannel = selector.channel(receive.source) // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'. val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) - val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress) - - val req = new RequestChannel.Request(processor = id, connectionId = receive.source, session = session, - startTimeNanos = time.nanoseconds, listenerName = listenerName, securityProtocol = securityProtocol, - memoryPool, receive.payload) + val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName) + val header = RequestHeader.parse(receive.payload) + val context = new RequestContext(header, receive.source, openOrClosingChannel.socketAddress, + principal, listenerName, securityProtocol) + val req = new RequestChannel.Request(processor = id, context = context, + startTimeNanos = time.nanoseconds, memoryPool, receive.payload) requestChannel.sendRequest(req) selector.mute(receive.source) } catch { @@ -565,7 +567,7 @@ private[kafka] class Processor(val id: Int, } private def updateRequestMetrics(request: RequestChannel.Request) { - val networkThreadTimeNanos = openOrClosingChannel(request.connectionId).fold(0L) (_.getAndResetNetworkThreadTimeNanos()) + val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos()) request.updateRequestMetrics(networkThreadTimeNanos) } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala index fbc1881..f454483 100644 --- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala @@ -18,14 +18,15 @@ package kafka.server import java.util.concurrent.TimeUnit +import kafka.network.RequestChannel import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ import org.apache.kafka.common.utils.Time class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, - private val metrics: Metrics, - private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) { + private val metrics: Metrics, + private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) { val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds) def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName) @@ -33,25 +34,29 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, exemptSensor.record(value) } - def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, requestThreadTimeNanos: Long, - sendResponseCallback: Int => Unit, recordNetworkThreadTimeCallback: (Long => Unit) => Unit): Unit = { + def maybeRecordAndThrottle(request: RequestChannel.Request, sendResponseCallback: Int => Unit): Unit = { + if (request.apiRemoteCompleteTimeNanos == -1) { + // When this callback is triggered, the remote API call has completed + request.apiRemoteCompleteTimeNanos = time.nanoseconds + } + if (quotasEnabled) { - val quotaSensors = getOrCreateQuotaSensors(sanitizedUser, clientId) - recordNetworkThreadTimeCallback(timeNanos => recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))) + val quotaSensors = getOrCreateQuotaSensors(request.session.sanitizedUser, request.header.clientId) + request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))) recordAndThrottleOnQuotaViolation( quotaSensors, - nanosToPercentage(requestThreadTimeNanos), + nanosToPercentage(request.requestThreadTimeNanos), sendResponseCallback) } else { sendResponseCallback(0) } } - def maybeRecordExempt(requestThreadTimeNanos: Long, recordNetworkThreadTimeCallback: (Long => Unit) => Unit): Unit = { + def maybeRecordExempt(request: RequestChannel.Request): Unit = { if (quotasEnabled) { - recordNetworkThreadTimeCallback(timeNanos => recordExempt(nanosToPercentage(timeNanos))) - recordExempt(nanosToPercentage(requestThreadTimeNanos)) + request.recordNetworkThreadTimeCallback = Some(timeNanos => recordExempt(nanosToPercentage(timeNanos))) + recordExempt(nanosToPercentage(request.requestThreadTimeNanos)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4537898..1a85222 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -34,6 +34,7 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.RequestChannel +import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction} import kafka.security.SecurityUtils import kafka.security.auth._ import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils} @@ -42,7 +43,7 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch} import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} @@ -91,9 +92,9 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handle(request: RequestChannel.Request) { try { - trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". - format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) - ApiKeys.forId(request.header.apiKey) match { + trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") + request.header.apiKey match { case ApiKeys.PRODUCE => handleProduceRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request) @@ -166,7 +167,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava) - sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse)) + sendResponseExemptThrottle(request, leaderAndIsrResponse) } else { val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap sendResponseMaybeThrottle(request, _ => @@ -193,8 +194,7 @@ class KafkaApis(val requestChannel: RequestChannel, groupCoordinator.handleGroupEmigration(topicPartition.partition) } } - val response = new StopReplicaResponse(error, result.asJava) - sendResponseExemptThrottle(RequestChannel.Response(request, response)) + sendResponseExemptThrottle(request, new StopReplicaResponse(error, result.asJava)) } else { val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap sendResponseMaybeThrottle(request, _ => @@ -218,7 +218,7 @@ class KafkaApis(val requestChannel: RequestChannel, adminManager.tryCompleteDelayedTopicOperations(topic) } } - sendResponseExemptThrottle(RequestChannel.Response(request, new UpdateMetadataResponse(Errors.NONE))) + sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE)) } else { sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)) } @@ -232,14 +232,14 @@ class KafkaApis(val requestChannel: RequestChannel, authorizeClusterAction(request) def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicAndPartition]]): Unit = { - controlledShutdownResult match { + val response = controlledShutdownResult match { case Success(partitionsRemaining) => - val controlledShutdownResponse = new ControlledShutdownResponse(Errors.NONE, - partitionsRemaining.map(_.asTopicPartition).asJava) - sendResponseExemptThrottle(RequestChannel.Response(request, controlledShutdownResponse)) + new ControlledShutdownResponse(Errors.NONE, partitionsRemaining.map(_.asTopicPartition).asJava) + case Failure(throwable) => - sendResponseExemptThrottle(RequestChannel.Response(request, controlledShutdownRequest.getErrorResponse(throwable))) + controlledShutdownRequest.getErrorResponse(throwable) } + sendResponseExemptThrottle(request, response) } controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback) } @@ -362,22 +362,17 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleProduceRequest(request: RequestChannel.Request) { val produceRequest = request.body[ProduceRequest] - val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size - - def sendErrorResponse(error: Errors): Unit = { - sendResponseMaybeThrottle(request, requestThrottleMs => - produceRequest.getErrorResponse(requestThrottleMs, error.exception)) - } + val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes if (produceRequest.isTransactional) { if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId))) { - sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) return } // Note that authorization to a transactionalId implies ProducerId authorization } else if (produceRequest.isIdempotent && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) { - sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } @@ -415,7 +410,6 @@ class KafkaApis(val requestChannel: RequestChannel, // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata - val action = if (errorInResponse) { val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => topicPartition -> status.error.exceptionName @@ -425,9 +419,10 @@ class KafkaApis(val requestChannel: RequestChannel, s"from client id ${request.header.clientId} with ack=0\n" + s"Topic and partition to exceptions: $exceptionsSummary" ) - RequestChannel.CloseConnectionAction - } else RequestChannel.NoOpAction - sendResponseExemptThrottle(new RequestChannel.Response(request, None, action)) + closeConnection(request) + } else { + sendNoOpResponseExemptThrottle(request) + } } else { sendResponseMaybeThrottle(request, requestThrottleMs => new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs)) @@ -459,7 +454,7 @@ class KafkaApis(val requestChannel: RequestChannel, responseCallback = sendResponseCallback) // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; - // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log + // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log produceRequest.clearPartitionRecords() } } @@ -544,28 +539,23 @@ class KafkaApis(val requestChannel: RequestChannel, // fetch response callback invoked after any throttling def fetchResponseCallback(bandwidthThrottleTimeMs: Int) { - def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = { + def createResponse(requestThrottleTimeMs: Int): FetchResponse = { val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData] fetchedPartitionData.asScala.foreach { case (tp, partitionData) => convertedData.put(tp, convertedPartitionData(tp, partitionData)) } - val response = new FetchResponse(convertedData, 0) - val responseSend = response.toSend(bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header) - - trace(s"Sending fetch response to client $clientId of ${responseSend.size} bytes.") + val response = new FetchResponse(convertedData, bandwidthThrottleTimeMs + requestThrottleTimeMs) response.responseData.asScala.foreach { case (topicPartition, data) => // record the bytes out metrics only when the response is being sent brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) } - - RequestChannel.Response(request, responseSend) + response } if (fetchRequest.isFromFollower) - sendResponseExemptThrottle(createResponse(0)) + sendResponseExemptThrottle(request, createResponse(0)) else - sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs => - requestChannel.sendResponse(createResponse(requestThrottleMs))) + sendResponseMaybeThrottle(request, requestThrottleMs => createResponse(requestThrottleMs)) } // When this callback is triggered, the remote API call has completed. @@ -912,12 +902,12 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.body[MetadataRequest] - val requestVersion = request.header.apiVersion() + val requestVersion = request.header.apiVersion val topics = // Handle old metadata request logic. Version 0 has no way to specify "no topics". if (requestVersion == 0) { - if (metadataRequest.topics() == null || metadataRequest.topics().isEmpty) + if (metadataRequest.topics() == null || metadataRequest.topics.isEmpty) metadataCache.getAllTopics() else metadataRequest.topics.asScala.toSet @@ -963,7 +953,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else - getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.listenerName, + getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, errorUnavailableEndpoints) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata @@ -976,7 +966,7 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(request, requestThrottleMs => new MetadataResponse( requestThrottleMs, - brokers.map(_.getNode(request.listenerName)).asJava, + brokers.map(_.getNode(request.context.listenerName)).asJava, clusterId, metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), completeTopicMetadata.asJava @@ -1062,26 +1052,23 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFindCoordinatorRequest(request: RequestChannel.Request) { val findCoordinatorRequest = request.body[FindCoordinatorRequest] - def sendErrorResponse(error: Errors): Unit = - sendResponseMaybeThrottle(request, requestThrottleMs => new FindCoordinatorResponse(error, Node.noNode)) - if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP && !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) - sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION && !authorize(request.session, Describe, new Resource(TransactionalId, findCoordinatorRequest.coordinatorKey))) - sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { // get metadata (and create the topic if necessary) val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match { case FindCoordinatorRequest.CoordinatorType.GROUP => val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.listenerName) + val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) (partition, metadata) case FindCoordinatorRequest.CoordinatorType.TRANSACTION => val partition = txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.listenerName) + val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) (partition, metadata) case _ => @@ -1201,9 +1188,9 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED) } else { groupCoordinator.handleSyncGroup( - syncGroupRequest.groupId(), - syncGroupRequest.generationId(), - syncGroupRequest.memberId(), + syncGroupRequest.groupId, + syncGroupRequest.generationId, + syncGroupRequest.memberId, syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray), sendResponseCallback ) @@ -1230,9 +1217,9 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // let the coordinator to handle heartbeat groupCoordinator.handleHeartbeat( - heartbeatRequest.groupId(), - heartbeatRequest.memberId(), - heartbeatRequest.groupGenerationId(), + heartbeatRequest.groupId, + heartbeatRequest.memberId, + heartbeatRequest.groupGenerationId, sendResponseCallback) } } @@ -1257,8 +1244,8 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // let the coordinator to handle leave-group groupCoordinator.handleLeaveGroup( - leaveGroupRequest.groupId(), - leaveGroupRequest.memberId(), + leaveGroupRequest.groupId, + leaveGroupRequest.memberId, sendResponseCallback) } } @@ -1274,14 +1261,14 @@ class KafkaApis(val requestChannel: RequestChannel, // If this is considered to leak information about the broker version a workaround is to use SSL // with client authentication which is performed at an earlier stage of the connection where the // ApiVersionRequest is not available. - def sendResponseCallback(requestThrottleMs: Int) { - val responseSend = - if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion)) - ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, config.interBrokerProtocolVersion.messageFormatVersion).toSend(request.connectionId, request.header) - else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header) - requestChannel.sendResponse(RequestChannel.Response(request, responseSend)) - } - sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback) + def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = { + val apiVersionRequest = request.body[ApiVersionsRequest] + if (apiVersionRequest.hasUnsupportedRequestVersion) + apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception) + else + ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, config.interBrokerProtocolVersion.messageFormatVersion) + } + sendResponseMaybeThrottle(request, createResponseCallback) } def handleCreateTopicsRequest(request: RequestChannel.Request) { @@ -1427,17 +1414,13 @@ class KafkaApis(val requestChannel: RequestChannel, val initProducerIdRequest = request.body[InitProducerIdRequest] val transactionalId = initProducerIdRequest.transactionalId - def sendErrorResponse(error: Errors): Unit = { - sendResponseMaybeThrottle(request, requestThrottleMs => new InitProducerIdResponse(requestThrottleMs, error)) - } - if (transactionalId != null) { if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) { - sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) return } } else if (!authorize(request.session, IdempotentWrite, Resource.ClusterResource)) { - sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } @@ -1486,7 +1469,7 @@ class KafkaApis(val requestChannel: RequestChannel, val numAppends = new AtomicInteger(markers.size) if (numAppends.get == 0) { - sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) + sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors)) return } @@ -1525,7 +1508,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if (numAppends.decrementAndGet() == 0) - sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) + sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors)) } // TODO: The current append API makes doing separate writes per producerId a little easier, but it would @@ -1579,7 +1562,7 @@ class KafkaApis(val requestChannel: RequestChannel, // No log appends were written as all partitions had incorrect log format // so we need to send the error response if (skippedMarkers == markers.size()) - sendResponseExemptThrottle(request, () => sendResponse(request, new WriteTxnMarkersResponse(errors))) + sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors)) } def ensureInterBrokerVersion(version: ApiVersion): Unit = { @@ -1679,17 +1662,12 @@ class KafkaApis(val requestChannel: RequestChannel, val header = request.header val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest] - def sendErrorResponse(error: Errors): Unit = { - sendResponseMaybeThrottle(request, requestThrottleMs => - txnOffsetCommitRequest.getErrorResponse(requestThrottleMs, error.exception)) - } - // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization // since it is implied by transactionalId authorization if (!authorize(request.session, Write, new Resource(TransactionalId, txnOffsetCommitRequest.transactionalId))) - sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId))) - sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED) + sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else { val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition { case (topicPartition, _) => @@ -1873,30 +1851,8 @@ class KafkaApis(val requestChannel: RequestChannel, val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition() authorizeClusterAction(request) - val responseBody = new OffsetsForLeaderEpochResponse( - replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava - ) - sendResponseExemptThrottle(RequestChannel.Response(request, responseBody)) - } - - private def handleError(request: RequestChannel.Request, e: Throwable) { - val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.header.apiKey).clusterAction - - def createResponse(requestThrottleMs: Int): RequestChannel.Response = { - val response = request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e) - /* If request doesn't have a default error response, we just close the connection. - For example, when produce request has acks set to 0 */ - if (response == null) - new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction) - else RequestChannel.Response(request, response) - } - error("Error when handling request %s".format(request.body[AbstractRequest]), e) - if (mayThrottle) - sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs => - requestChannel.sendResponse(createResponse(requestThrottleMs)) - }) - else - sendResponseExemptThrottle(createResponse(0)) + val lastOffsetForLeaderEpoch = replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava + sendResponseExemptThrottle(request, new OffsetsForLeaderEpochResponse(lastOffsetForLeaderEpoch)) } def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = { @@ -1969,34 +1925,61 @@ class KafkaApis(val requestChannel: RequestChannel, throw new ClusterAuthorizationException(s"Request $request is not authorized.") } - private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) { - sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs => - sendResponse(request, createResponse(requestThrottleMs)) - }) + private def handleError(request: RequestChannel.Request, e: Throwable) { + val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction + error("Error when handling request %s".format(request.body[AbstractRequest]), e) + if (mayThrottle) + sendErrorResponseMaybeThrottle(request, e) + else + sendErrorResponseExemptThrottle(request, e) } - private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) { + private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse): Unit = { + quotas.request.maybeRecordAndThrottle(request, + throttleTimeMs => sendResponse(request, Some(createResponse(throttleTimeMs)))) + } - if (request.apiRemoteCompleteTimeNanos == -1) { - // When this callback is triggered, the remote API call has completed - request.apiRemoteCompleteTimeNanos = time.nanoseconds - } - quotas.request.maybeRecordAndThrottle(request.session.sanitizedUser, clientId, - request.requestThreadTimeNanos, sendResponseCallback, - callback => request.recordNetworkThreadTimeCallback = Some(callback)) + private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable) { + quotas.request.maybeRecordAndThrottle(request, sendErrorOrCloseConnection(request, error)) + } + + private def sendResponseExemptThrottle(request: RequestChannel.Request, response: AbstractResponse): Unit = { + quotas.request.maybeRecordExempt(request) + sendResponse(request, Some(response)) } - private def sendResponseExemptThrottle(response: RequestChannel.Response) { - sendResponseExemptThrottle(response.request, () => requestChannel.sendResponse(response)) + private def sendErrorResponseExemptThrottle(request: RequestChannel.Request, error: Throwable): Unit = { + quotas.request.maybeRecordExempt(request) + sendErrorOrCloseConnection(request, error)(throttleMs = 0) + } + + private def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable)(throttleMs: Int): Unit = { + val response = request.body[AbstractRequest].getErrorResponse(throttleMs, error) + if (response == null) + closeConnection(request) + else + sendResponse(request, Some(response)) } - private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) { - quotas.request.maybeRecordExempt(request.requestThreadTimeNanos, - callback => request.recordNetworkThreadTimeCallback = Some(callback)) - sendResponseCallback() + private def sendNoOpResponseExemptThrottle(request: RequestChannel.Request): Unit = { + quotas.request.maybeRecordExempt(request) + sendResponse(request, None) } - private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) { - requestChannel.sendResponse(RequestChannel.Response(request, response)) + private def closeConnection(request: RequestChannel.Request): Unit = { + // This case is used when the request handler has encountered an error, but the client + // does not expect a response (e.g. when produce request has acks set to 0) + requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction)) } + + private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = { + responseOpt match { + case Some(response) => + val responseSend = request.context.buildResponse(response) + requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction)) + case None => + requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction)) + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index b3f98d1..356ccae 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -41,42 +41,43 @@ class KafkaRequestHandler(id: Int, def run() { while(true) { - var req: RequestChannel.Request = null - try { - while (req == null) { - // We use a single meter for aggregate idle percentage for the thread pool. - // Since meter is calculated as total_recorded_value / time_window and - // time_window is independent of the number of threads, each recorded idle - // time should be discounted by # threads. - val startSelectTime = time.nanoseconds - req = requestChannel.receiveRequest(300) - val endTime = time.nanoseconds - if (req != null) - req.requestDequeueTimeNanos = endTime - val idleTime = endTime - startSelectTime - aggregateIdleMeter.mark(idleTime / totalHandlerThreads) - } - - if (req eq RequestChannel.AllDone) { + // We use a single meter for aggregate idle percentage for the thread pool. + // Since meter is calculated as total_recorded_value / time_window and + // time_window is independent of the number of threads, each recorded idle + // time should be discounted by # threads. + val startSelectTime = time.nanoseconds + + val req = requestChannel.receiveRequest(300) + val endTime = time.nanoseconds + val idleTime = endTime - startSelectTime + aggregateIdleMeter.mark(idleTime / totalHandlerThreads) + + req match { + case RequestChannel.ShutdownRequest => debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId)) latch.countDown() return - } - trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) - apis.handle(req) - } catch { - case e: FatalExitError => - latch.countDown() - Exit.exit(e.statusCode) - case e: Throwable => error("Exception when handling request", e) - } finally { - if (req != null) - req.dispose() + + case request: RequestChannel.Request => + try { + request.requestDequeueTimeNanos = endTime + trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, request)) + apis.handle(request) + } catch { + case e: FatalExitError => + latch.countDown() + Exit.exit(e.statusCode) + case e: Throwable => error("Exception when handling request", e) + } finally { + request.releaseBuffer() + } + + case null => // continue } } } - def initiateShutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone) + def initiateShutdown(): Unit = requestChannel.sendShutdownRequest() def awaitShutdown(): Unit = latch.await() http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/utils/Logging.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala index 2df5878..f2cd4e9 100755 --- a/core/src/main/scala/kafka/utils/Logging.scala +++ b/core/src/main/scala/kafka/utils/Logging.scala @@ -49,6 +49,8 @@ trait Logging { def isDebugEnabled: Boolean = logger.isDebugEnabled + def isTraceEnabled: Boolean = logger.isTraceEnabled + def debug(msg: => String): Unit = { if (logger.isDebugEnabled()) logger.debug(msgWithLogIdent(msg)) http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/other/kafka/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 69531d4..c8f9397 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -263,8 +263,7 @@ object TestOffsetManager { } fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkUtils) - - val statsThread = new StatsThread(reportingIntervalMs, commitThreads, fetchThread) + statsThread = new StatsThread(reportingIntervalMs, commitThreads, fetchThread) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala index 27ff4d4..ef85c6d 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala @@ -16,12 +16,11 @@ */ package kafka.admin -import kafka.utils.{Logging, TestUtils} -import kafka.zk.ZooKeeperTestHarness +import kafka.utils.Logging import org.junit.Assert._ import org.junit.Test -import scala.collection.{Map, Seq} +import scala.collection.Map class AdminRackAwareTest extends RackAwareTest with Logging { http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index effcd92..6594b6e 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -14,17 +14,12 @@ package kafka.admin import java.io.{BufferedWriter, File, FileWriter} import java.text.{ParseException, SimpleDateFormat} -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} -import java.util.{Calendar, Collections, Date, Properties} +import java.util.{Calendar, Date, Properties} import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} -import kafka.admin.{AdminUtils, ConsumerGroupCommand} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.errors.WakeupException -import org.apache.kafka.common.serialization.StringDeserializer import org.junit.{After, Before, Test} import scala.collection.mutable.ArrayBuffer @@ -330,7 +325,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { AdminUtils.deleteTopic(zkUtils, topic1) } - private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int = 1, topic: String, totalMessages: Int) { + private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) { TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000) val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic) http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index e7fbb83..0bc1c9f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -28,7 +28,7 @@ import org.junit.Assert._ import org.junit.Test import com.yammer.metrics.Metrics import kafka.common.RequestAndCompletionHandler -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -296,7 +296,8 @@ class TransactionMarkerChannelManagerTest { val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) for (requestAndHandler <- requestAndHandlers) { - requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) } EasyMock.verify(txnStateManager) @@ -344,7 +345,8 @@ class TransactionMarkerChannelManagerTest { val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) for (requestAndHandler <- requestAndHandlers) { - requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) } EasyMock.verify(txnStateManager) @@ -398,7 +400,8 @@ class TransactionMarkerChannelManagerTest { val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) for (requestAndHandler <- requestAndHandlers) { - requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) } // call this again so that append log will be retried http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala index 6323d15..41ec159 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala @@ -20,7 +20,7 @@ import java.{lang, util} import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse} import org.apache.kafka.common.utils.Utils import org.easymock.{EasyMock, IAnswer} @@ -71,7 +71,8 @@ class TransactionMarkerRequestCompletionHandlerTest { producerId, producerEpoch, txnResult, coordinatorEpoch, Set[TopicPartition](topicPartition))) EasyMock.replay(markerChannelManager) - handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, true, null, null)) + handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, true, null, null)) EasyMock.verify(markerChannelManager) } @@ -84,7 +85,8 @@ class TransactionMarkerRequestCompletionHandlerTest { val response = new WriteTxnMarkersResponse(new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()) try { - handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) fail("should have thrown illegal argument exception") } catch { case _: IllegalStateException => // ok @@ -200,8 +202,9 @@ class TransactionMarkerRequestCompletionHandlerTest { producerId, producerEpoch, txnResult, coordinatorEpoch, Set[TopicPartition](topicPartition))) EasyMock.replay(markerChannelManager) - val response = new WriteTxnMarkersResponse(createPidErrorMap(error)) - handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error)) + handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) assertEquals(txnMetadata.topicPartitions, mutable.Set[TopicPartition](topicPartition)) EasyMock.verify(markerChannelManager) @@ -210,9 +213,10 @@ class TransactionMarkerRequestCompletionHandlerTest { private def verifyThrowIllegalStateExceptionOnError(error: Errors) = { mockCache() - val response = new WriteTxnMarkersResponse(createPidErrorMap(error)) + val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error)) try { - handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) fail("should have thrown illegal state exception") } catch { case _: IllegalStateException => // ok @@ -231,8 +235,9 @@ class TransactionMarkerRequestCompletionHandlerTest { .once() EasyMock.replay(markerChannelManager) - val response = new WriteTxnMarkersResponse(createPidErrorMap(error)) - handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error)) + handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) assertTrue(txnMetadata.topicPartitions.isEmpty) assertTrue(completed) @@ -250,14 +255,15 @@ class TransactionMarkerRequestCompletionHandlerTest { .once() EasyMock.replay(markerChannelManager) - val response = new WriteTxnMarkersResponse(createPidErrorMap(error)) - handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error)) + handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, response)) assertTrue(removed) } - private def createPidErrorMap(errors: Errors) = { + private def createProducerIdErrorMap(errors: Errors) = { val pidMap = new java.util.HashMap[lang.Long, util.Map[TopicPartition, Errors]]() val errorsMap = new util.HashMap[TopicPartition, Errors]() errorsMap.put(topicPartition, errors) http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index ceeaffc..c005c72 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -22,7 +22,7 @@ import java.util.Arrays import kafka.common.KafkaException import kafka.server._ -import kafka.utils.{CoreUtils, TestUtils} +import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.auth.KafkaPrincipal http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index bc0b81a..9c4d08f 100755 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -24,12 +24,11 @@ import kafka.api.{FetchRequest, FetchRequestBuilder, PartitionFetchInfo} import kafka.server.{KafkaConfig, KafkaRequestHandler} import kafka.producer.{KeyedMessage, Producer} import org.apache.log4j.{Level, Logger} -import kafka.zk.ZooKeeperTestHarness import org.junit.Test import scala.collection._ import kafka.common.{ErrorMapping, OffsetOutOfRangeException, TopicAndPartition, UnknownTopicOrPartitionException} -import kafka.utils.{CoreUtils, StaticPartitioner, TestUtils} +import kafka.utils.{StaticPartitioner, TestUtils} import kafka.serializer.StringEncoder import java.util.Properties http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 66103cc..6e9e8ff 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -171,7 +171,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { 2000, 0).topicsMetadata assertEquals(Errors.NONE, topicsMetadata.head.error) assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error) - var partitionMetadata = topicsMetadata.head.partitionsMetadata + val partitionMetadata = topicsMetadata.head.partitionsMetadata assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) assertEquals(1, partitionMetadata.head.replicas.size) http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 2a0525b..ede4ff7 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -27,7 +27,6 @@ import kafka.javaapi.producer.Producer import kafka.utils.IntEncoder import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} -import kafka.zk.ZooKeeperTestHarness import kafka.common.MessageStreamsExistException import org.junit.Test http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 9a324aa..bb41380 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -20,7 +20,7 @@ package kafka.log import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel -import java.nio.file.{OpenOption, StandardOpenOption} +import java.nio.file.StandardOpenOption import kafka.server.LogOffsetMetadata import kafka.utils.TestUtils http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index e59ce58..6fbb0c9 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -20,12 +20,12 @@ package kafka.network import java.io._ import java.net._ import java.nio.ByteBuffer -import java.nio.channels.SocketChannel import java.util.{HashMap, Random} import javax.net.ssl._ import com.yammer.metrics.core.Gauge import com.yammer.metrics.{Metrics => YammerMetrics} +import kafka.network.RequestChannel.SendAction import kafka.security.CredentialProvider import kafka.server.KafkaConfig import kafka.utils.TestUtils @@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader} import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.{Time, MockTime} +import org.apache.kafka.common.utils.{MockTime, Time} import org.junit.Assert._ import org.junit._ import org.scalatest.junit.JUnitSuite @@ -89,19 +89,25 @@ class SocketServerTest extends JUnitSuite { response } + private def receiveRequest(channel: RequestChannel, timeout: Long = 2000L): RequestChannel.Request = { + channel.receiveRequest(timeout) match { + case request: RequestChannel.Request => request + case RequestChannel.ShutdownRequest => fail("Unexpected shutdown received") + case null => fail("receiveRequest timed out") + } + } + /* A simple request handler that just echos back the response */ def processRequest(channel: RequestChannel) { - val request = channel.receiveRequest(2000) - assertNotNull("receiveRequest timed out", request) - processRequest(channel, request) + processRequest(channel, receiveRequest(channel)) } def processRequest(channel: RequestChannel, request: RequestChannel.Request) { val byteBuffer = request.body[AbstractRequest].serialize(request.header) byteBuffer.rewind() - val send = new NetworkSend(request.connectionId, byteBuffer) - channel.sendResponse(RequestChannel.Response(request, send)) + val send = new NetworkSend(request.context.connectionId, byteBuffer) + channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction)) } def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { @@ -119,7 +125,6 @@ class SocketServerTest extends JUnitSuite { } private def producerRequestBytes: Array[Byte] = { - val apiKey: Short = 0 val correlationId = -1 val clientId = "" val ackTimeoutMs = 10000 @@ -127,7 +132,7 @@ class SocketServerTest extends JUnitSuite { val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build() - val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId) + val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId) val byteBuffer = emptyRequest.serialize(emptyHeader) byteBuffer.rewind() @@ -180,7 +185,7 @@ class SocketServerTest extends JUnitSuite { sendRequest(plainSocket, serializedBytes) plainSocket.close() for (_ <- 0 until 10) { - val request = server.requestChannel.receiveRequest(2000) + val request = receiveRequest(server.requestChannel) assertNotNull("receiveRequest timed out", request) server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction)) } @@ -193,10 +198,10 @@ class SocketServerTest extends JUnitSuite { val requests = sockets.map{socket => sendRequest(socket, serializedBytes) - server.requestChannel.receiveRequest(2000) + receiveRequest(server.requestChannel) } requests.zipWithIndex.foreach { case (request, i) => - val index = request.connectionId.split("-").last + val index = request.context.connectionId.split("-").last assertEquals(i.toString, index) } @@ -212,9 +217,9 @@ class SocketServerTest extends JUnitSuite { val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) def openChannel(request: RequestChannel.Request): Option[KafkaChannel] = - overrideServer.processor(request.processor).channel(request.connectionId) + overrideServer.processor(request.processor).channel(request.context.connectionId) def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] = - overrideServer.processor(request.processor).openOrClosingChannel(request.connectionId) + overrideServer.processor(request.processor).openOrClosingChannel(request.context.connectionId) try { overrideServer.startup() @@ -223,7 +228,7 @@ class SocketServerTest extends JUnitSuite { // Connection with no staged receives val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) sendRequest(socket1, serializedBytes) - val request1 = overrideServer.requestChannel.receiveRequest(2000) + val request1 = receiveRequest(overrideServer.requestChannel) assertTrue("Channel not open", openChannel(request1).nonEmpty) assertEquals(openChannel(request1), openOrClosingChannel(request1)) @@ -317,10 +322,10 @@ class SocketServerTest extends JUnitSuite { def sendTwoRequestsReceiveOne(): RequestChannel.Request = { sendRequest(socket, requestBytes, flush = false) sendRequest(socket, requestBytes, flush = true) - server.requestChannel.receiveRequest(2000) + receiveRequest(server.requestChannel) } val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req => - val connectionId = req.connectionId + val connectionId = req.context.connectionId val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0 if (!hasStagedReceives) { processRequest(server.requestChannel, req) @@ -347,7 +352,7 @@ class SocketServerTest extends JUnitSuite { // the following sleep is necessary to reliably detect the connection close when we send data below Thread.sleep(200L) // make sure the sockets are open - server.acceptors.values.map(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) + server.acceptors.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) // then shutdown the server server.shutdown() @@ -376,7 +381,7 @@ class SocketServerTest extends JUnitSuite { // now try one more (should fail) val conn = connect() conn.setSoTimeout(3000) - assertEquals(-1, conn.getInputStream().read()) + assertEquals(-1, conn.getInputStream.read()) conn.close() // it should succeed after closing one connection @@ -437,14 +442,13 @@ class SocketServerTest extends JUnitSuite { overrideServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))).asInstanceOf[SSLSocket] sslSocket.setNeedClientAuth(false) - val apiKey = ApiKeys.PRODUCE.id val correlationId = -1 val clientId = "" val ackTimeoutMs = 10000 val ack = 0: Short val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build() - val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId) + val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId) val byteBuffer = emptyRequest.serialize(emptyHeader) byteBuffer.rewind() @@ -466,7 +470,7 @@ class SocketServerTest extends JUnitSuite { val socket = connect() val bytes = new Array[Byte](40) sendRequest(socket, bytes, Some(0)) - assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest(2000).session.principal) + assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.requestChannel).session.principal) } /* Test that we update request metrics if the client closes the connection while the broker response is in flight. */ @@ -494,9 +498,9 @@ class SocketServerTest extends JUnitSuite { sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel - val request = channel.receiveRequest(2000) + val request = receiveRequest(channel) - val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name) + val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name) def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count val expectedTotalTimeCount = totalTimeHistCount() + 1 @@ -505,8 +509,8 @@ class SocketServerTest extends JUnitSuite { // write. If the buffer is smaller than this, the write is considered complete and the disconnection is not // detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an // IOException. - val send = new NetworkSend(request.connectionId, ByteBuffer.allocate(550000)) - channel.sendResponse(RequestChannel.Response(request, send)) + val send = new NetworkSend(request.context.connectionId, ByteBuffer.allocate(550000)) + channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction)) TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount, s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}") @@ -533,12 +537,12 @@ class SocketServerTest extends JUnitSuite { val serializedBytes = producerRequestBytes sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel - val request = channel.receiveRequest(2000) + val request = receiveRequest(channel) - TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.connectionId).isEmpty, - s"Idle connection `${request.connectionId}` was not closed by selector") + TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty, + s"Idle connection `${request.context.connectionId}` was not closed by selector") - val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name) + val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name) def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count val expectedTotalTimeCount = totalTimeHistCount() + 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index b212a74..fffe3a8 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -155,7 +155,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { def nextRequestHeader(apiKey: ApiKeys, apiVersion: Short): RequestHeader = { correlationId += 1 - new RequestHeader(apiKey.id, apiVersion, "client-id", correlationId) + new RequestHeader(apiKey, apiVersion, "client-id", correlationId) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index c5d40f6..329772b 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{Record, RecordBatch} -import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.serialization.StringSerializer import org.junit.Assert._ import org.junit.Test @@ -49,7 +49,7 @@ class FetchRequestTest extends BaseRequestTest { } private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition], - offsetMap: Map[TopicPartition, Long] = Map.empty): FetchRequest = + offsetMap: Map[TopicPartition, Long]): FetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) .setMaxBytes(maxResponseBytes).build() http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 6cc3ede..deea586 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -28,7 +28,6 @@ import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.{Log, TimestampOffset} import kafka.network.RequestChannel -import kafka.network.RequestChannel.Session import kafka.security.auth.Authorizer import kafka.server.QuotaFactory.QuotaManagers import kafka.server._ @@ -393,10 +392,13 @@ class KafkaApisTest { private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T]): (T, RequestChannel.Request) = { val request = builder.build() - val header = new RequestHeader(builder.apiKey.id, request.version, "", 0) - val buffer = request.serialize(header) - val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost) - (request, new RequestChannel.Request(1, "1", session, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT, + val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0)) + + // read the header from the buffer first so that the body can be read next from the Request constructor + val header = RequestHeader.parse(buffer) + val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, + new ListenerName(""), SecurityProtocol.PLAINTEXT) + (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer)) } @@ -408,16 +410,13 @@ class KafkaApisTest { channel.buffer.getInt() // read the size ResponseHeader.parse(channel.buffer) val struct = api.responseSchema(request.version).read(channel.buffer) - AbstractResponse.getResponse(api, struct) + AbstractResponse.parseResponse(api, struct) } private def expectThrottleCallbackAndInvoke(capturedThrottleCallback: Capture[Int => Unit]): Unit = { EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle( - EasyMock.anyString(), - EasyMock.anyString(), - EasyMock.anyLong(), - EasyMock.capture(capturedThrottleCallback), - EasyMock.anyObject[(Long => Unit) => Unit]())) + EasyMock.anyObject[RequestChannel.Request](), + EasyMock.capture(capturedThrottleCallback))) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = { val callback = capturedThrottleCallback.getValue http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 916129e..8786b19 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -31,7 +31,7 @@ import kafka.utils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.Time import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ import org.junit.{After, Before, Test} http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index e1d4d75..9c51a10 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -23,7 +23,7 @@ import TestUtils._ import kafka.zk.ZooKeeperTestHarness import java.io.File -import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile} +import kafka.server.checkpoints.OffsetCheckpointFile import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} @@ -230,7 +230,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L)) } - private def sendMessages(n: Int = 1) { + private def sendMessages(n: Int) { (0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, message))).foreach(_.get) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 244ef78..bb7e9fe 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -24,7 +24,7 @@ import kafka.utils._ import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.Time import org.junit.{After, Before, Test} import org.junit.Assert._ import java.util.Properties http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index dd683e1..b8c45a5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -17,9 +17,6 @@ package kafka.server -import java.io.File - -import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.{After, Before, Test} import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index e7d2a64..f162492 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -30,7 +30,7 @@ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.junit.Assert._ -import org.junit.{After, Before, Test} +import org.junit.{After, Test} import scala.collection.JavaConverters._ /**