Repository: kafka Updated Branches: refs/heads/trunk d0adf6abe -> d02ca36ca
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 039c7eb..756cf77 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -17,33 +17,39 @@ package kafka.server +import java.net.{SocketTimeoutException} import java.util import kafka.admin._ +import kafka.api.{KAFKA_083, ApiVersion} import kafka.log.LogConfig import kafka.log.CleanerConfig import kafka.log.LogManager import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} -import java.io.File +import java.io.{IOException, File} import kafka.utils._ +import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, NetworkClient} +import org.apache.kafka.common.Node import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.NetworkReceive -import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector} +import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} +import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest, RequestSend} +import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.utils.AppInfoParser import scala.collection.mutable +import scala.collection.JavaConverters._ import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} -import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator} +import kafka.coordinator.{ConsumerCoordinator} object KafkaServer { // Copy the subset of properties that are relevant to Logs @@ -92,7 +98,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // This exists because the Metrics package from clients has its own Time implementation. // SocketServer/Quotas (which uses client libraries) have to use the client Time objects without having to convert all of Kafka to use them // Eventually, we want to merge the Time objects in core and clients - private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime() + private implicit val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime() var metrics: Metrics = null private val metricConfig: MetricConfig = new MetricConfig() @@ -175,7 +181,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg replicaManager.startup() /* start kafka controller */ - kafkaController = new KafkaController(config, zkClient, brokerState) + kafkaController = new KafkaController(config, zkClient, brokerState, kafkaMetricsTime, metrics) kafkaController.startup() /* start kafka coordinator */ @@ -262,17 +268,126 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg * Performs controlled shutdown */ private def controlledShutdown() { - if (startupComplete.get() && config.controlledShutdownEnable) { - // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period - // of time and try again for a configured number of retries. If all the attempt fails, we simply force - // the shutdown. - var remainingRetries = config.controlledShutdownMaxRetries - info("Starting controlled shutdown") - var channel : BlockingChannel = null - var prevController : Broker = null - var shutdownSucceeded : Boolean = false + + def node(broker: Broker): Node = { + val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) + new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + } + + val socketTimeoutMs = config.controllerSocketTimeoutMs + + def socketTimeoutException: Throwable = + new SocketTimeoutException(s"Did not receive response within $socketTimeoutMs") + + def networkClientControlledShutdown(retries: Int): Boolean = { + val metadataUpdater = new ManualMetadataUpdater() + val networkClient = { + val selector = new Selector( + NetworkReceive.UNLIMITED, + config.connectionsMaxIdleMs, + metrics, + kafkaMetricsTime, + "kafka-server-controlled-shutdown", + Map.empty.asJava, + false, + ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, config.channelConfigs) + ) + new NetworkClient( + selector, + metadataUpdater, + config.brokerId.toString, + 1, + 0, + Selectable.USE_DEFAULT_BUFFER_SIZE, + Selectable.USE_DEFAULT_BUFFER_SIZE) + } + + var shutdownSucceeded: Boolean = false + + try { + + var remainingRetries = retries + var prevController: Broker = null + var ioException = false + + while (!shutdownSucceeded && remainingRetries > 0) { + remainingRetries = remainingRetries - 1 + + import NetworkClientBlockingOps._ + + // 1. Find the controller and establish a connection to it. + + // Get the current controller info. This is to ensure we use the most recent info to issue the + // controlled shutdown request + val controllerId = ZkUtils.getController(zkClient) + ZkUtils.getBrokerInfo(zkClient, controllerId) match { + case Some(broker) => + // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous + // attempt, connect to the most recent controller + if (ioException || broker != prevController) { + + ioException = false + + if (prevController != null) + networkClient.close(node(prevController).idString) + + prevController = broker + metadataUpdater.setNodes(Seq(node(prevController)).asJava) + } + case None => //ignore and try again + } + + // 2. issue a controlled shutdown to the controller + if (prevController != null) { + try { + + if (!networkClient.blockingReady(node(prevController), socketTimeoutMs)) + throw socketTimeoutException + + // send the controlled shutdown request + val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY) + val send = new RequestSend(node(prevController).idString, requestHeader, + new ControlledShutdownRequest(config.brokerId).toStruct) + val request = new ClientRequest(kafkaMetricsTime.milliseconds(), true, send, null) + val clientResponse = networkClient.blockingSendAndReceive(request, socketTimeoutMs).getOrElse { + throw socketTimeoutException + } + + val shutdownResponse = new ControlledShutdownResponse(clientResponse.responseBody) + if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) { + shutdownSucceeded = true + info("Controlled shutdown succeeded") + } + else { + info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(","))) + info("Error code from controller: %d".format(shutdownResponse.errorCode)) + } + } + catch { + case ioe: IOException => + ioException = true + warn("Error during controlled shutdown, possibly because leader movement took longer than the configured socket.timeout.ms: %s".format(ioe.getMessage)) + // ignore and try again + } + } + if (!shutdownSucceeded) { + Thread.sleep(config.controlledShutdownRetryBackoffMs) + warn("Retrying controlled shutdown after the previous attempt failed...") + } + } + } + finally + networkClient.close() + + shutdownSucceeded + } + + def blockingChannelControlledShutdown(retries: Int): Boolean = { + var remainingRetries = retries + var channel: BlockingChannel = null + var prevController: Broker = null + var shutdownSucceeded: Boolean = false try { - brokerState.newState(PendingControlledShutdown) while (!shutdownSucceeded && remainingRetries > 0) { remainingRetries = remainingRetries - 1 @@ -286,9 +401,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (channel == null || prevController == null || !prevController.equals(broker)) { // if this is the first attempt or if the controller has changed, create a channel to the most recent // controller - if (channel != null) { + if (channel != null) channel.disconnect() - } + channel = new BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host, broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port, BlockingChannel.UseDefaultBufferSize, @@ -297,8 +412,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg channel.connect() prevController = broker } - case None=> - //ignore and try again + case None => //ignore and try again } // 2. issue a controlled shutdown to the controller @@ -306,13 +420,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var response: NetworkReceive = null try { // send the controlled shutdown request - val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) + val request = new kafka.api.ControlledShutdownRequest(0, correlationId.getAndIncrement, None, config.brokerId) channel.send(request) response = channel.receive() - val shutdownResponse = ControlledShutdownResponse.readFrom(response.payload()) + val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload()) if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && - shutdownResponse.partitionsRemaining.size == 0) { + shutdownResponse.partitionsRemaining.size == 0) { shutdownSucceeded = true info ("Controlled shutdown succeeded") } @@ -341,9 +455,27 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg channel = null } } - if (!shutdownSucceeded) { + shutdownSucceeded + } + + if (startupComplete.get() && config.controlledShutdownEnable) { + // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period + // of time and try again for a configured number of retries. If all the attempt fails, we simply force + // the shutdown. + info("Starting controlled shutdown") + + brokerState.newState(PendingControlledShutdown) + + val shutdownSucceeded = + // Before 0.8.3, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in + // `RequestHeader`, which is used by `NetworkClient` + if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) + networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue) + else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue) + + if (!shutdownSucceeded) warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed") - } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala new file mode 100644 index 0000000..ad10721 --- /dev/null +++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.io.IOException +import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient} +import org.apache.kafka.common.Node + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.utils.{Time => JTime} + +object NetworkClientBlockingOps { + implicit def networkClientBlockingOps(client: NetworkClient): NetworkClientBlockingOps = + new NetworkClientBlockingOps(client) +} + +/** + * Provides extension methods for `NetworkClient` that are useful for implementing blocking behaviour. Use with care. + * + * Example usage: + * + * {{{ + * val networkClient: NetworkClient = ... + * import NetworkClientBlockingOps._ + * networkClient.blockingReady(...) + * }}} + */ +class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { + + /** + * Invokes `client.ready` followed by 0 or more `client.poll` invocations until the connection to `node` is ready, + * the timeout expires or the connection fails. + * + * It returns `true` if the call completes normally or `false` if the timeout expires. If the connection fails, + * an `IOException` is thrown instead. + * + * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with + * care. + */ + def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = { + client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) => + if (client.isReady(node, now)) + true + else if (client.connectionFailed(node)) + throw new IOException(s"Connection to $node failed") + else false + } + } + + /** + * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received, + * the timeout expires or a disconnection happens. + * + * It returns `true` if the call completes normally or `false` if the timeout expires. In the case of a disconnection, + * an `IOException` is thrown instead. + * + * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with + * care. + */ + def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = { + client.send(request) + + pollUntilFound(timeout) { case (responses, _) => + val response = responses.find { response => + response.request.request.header.correlationId == request.request.header.correlationId + } + response.foreach { r => + if (r.wasDisconnected) { + val destination = request.request.destination + throw new IOException(s"Connection to $destination was disconnected before the response was read") + } + } + response + } + + } + + /** + * Invokes `client.poll` until `predicate` returns `true` or the timeout expires. + * + * It returns `true` if the call completes normally or `false` if the timeout expires. Exceptions thrown via + * `predicate` are not handled and will bubble up. + * + * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with + * care. + */ + private def pollUntil(timeout: Long)(predicate: (Seq[ClientResponse], Long) => Boolean)(implicit time: JTime): Boolean = { + pollUntilFound(timeout) { (responses, now) => + if (predicate(responses, now)) Some(true) + else None + }.fold(false)(_ => true) + } + + /** + * Invokes `client.poll` until `collect` returns `Some` or the timeout expires. + * + * It returns the result of `collect` if the call completes normally or `None` if the timeout expires. Exceptions + * thrown via `collect` are not handled and will bubble up. + * + * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with + * care. + */ + private def pollUntilFound[T](timeout: Long)(collect: (Seq[ClientResponse], Long) => Option[T])(implicit time: JTime): Option[T] = { + + val methodStartTime = time.milliseconds() + val timeoutExpiryTime = methodStartTime + timeout + + @tailrec + def recurse(iterationStartTime: Long): Option[T] = { + val pollTimeout = if (timeout < 0) timeout else timeoutExpiryTime - iterationStartTime + val responses = client.poll(pollTimeout, iterationStartTime).asScala + val result = collect(responses, iterationStartTime) + if (result.isDefined) result + else { + val afterPollTime = time.milliseconds() + if (timeout < 0 || afterPollTime < timeoutExpiryTime) + recurse(afterPollTime) + else None + } + } + + recurse(methodStartTime) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index d7112d4..985c64f 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -25,6 +25,9 @@ import kafka.common.TopicAndPartition import kafka.integration.KafkaServerTestHarness import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils._ +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.requests.{AbstractRequestResponse, AbstractRequest} +import org.apache.kafka.common.utils.SystemTime import org.apache.log4j.{Level, Logger} import org.junit.{After, Before, Test} @@ -146,9 +149,9 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { } } -class MockChannelManager(private val controllerContext: ControllerContext, - config: KafkaConfig) - extends ControllerChannelManager(controllerContext, config) { +class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig) + extends ControllerChannelManager(controllerContext, config, new SystemTime, new Metrics) { + def stopSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread requestThread.isRunning.set(false) @@ -157,12 +160,9 @@ class MockChannelManager(private val controllerContext: ControllerContext, } def shrinkBlockingQueue(brokerId: Int) { - val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, RequestOrResponse => Unit)](1) + val messageQueue = new LinkedBlockingQueue[QueueItem](1) val brokerInfo = this.brokerStateInfo(brokerId) - this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel, - brokerInfo.broker, - messageQueue, - brokerInfo.requestSendThread)) + this.brokerStateInfo.put(brokerId, brokerInfo.copy(messageQueue = messageQueue)) } def resumeSendThread (brokerId: Int) { @@ -176,4 +176,4 @@ class MockChannelManager(private val controllerContext: ControllerContext, def queueSize(brokerId: Int): Int = { this.brokerStateInfo(brokerId).messageQueue.size } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index bb12a50..ff17830 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -17,15 +17,22 @@ package kafka.server +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState + +import scala.collection.JavaConverters._ +import kafka.api.{PartitionStateInfo, LeaderAndIsr} +import org.apache.kafka.common.requests.{LeaderAndIsrResponse, LeaderAndIsrRequest, AbstractRequestResponse} import org.junit.Assert._ -import kafka.api._ import kafka.utils.{TestUtils, ZkUtils, CoreUtils} import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrAndControllerEpoch} import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} +import org.apache.kafka.common.utils.SystemTime import org.junit.{Test, After, Before} class LeaderElectionTest extends ZooKeeperTestHarness { @@ -124,21 +131,26 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect)) val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())) - val brokerEndPoints = brokers.map(b => b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + val brokerEndPoints = brokers.map { b => + val brokerEndPoint = b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT) + new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + } val controllerContext = new ControllerContext(zkClient, 6000) controllerContext.liveBrokers = brokers.toSet - val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) + val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, new Metrics) controllerChannelManager.startup() val staleControllerEpoch = 0 - val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch] - leaderAndIsr.put((topic, partitionId), - new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) - val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokerEndPoints.toSet, controllerId, - staleControllerEpoch, 0, "") - - controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) + val partitionStates = Map( + new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch, + Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion, + Set(0, 1).map(Integer.valueOf).asJava) + ) + val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch, partitionStates.asJava, + brokerEndPoints.toSet.asJava) + + controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, + staleControllerEpochCallback) TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, "Controller epoch should be stale") assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected) @@ -146,7 +158,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { controllerChannelManager.shutdown() } - private def staleControllerEpochCallback(response: RequestOrResponse): Unit = { + private def staleControllerEpochCallback(response: AbstractRequestResponse): Unit = { val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse] staleControllerEpochDetected = leaderAndIsrResponse.errorCode match { case ErrorMapping.StaleControllerEpochCode => true
