[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook
[ https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13674598#comment-13674598 ] Joel Koshy commented on KAFKA-927: -- +1 - sorry I got to this late. Small nit: the scaladoc for shutdown broker needs an edit which we will clean up later. We probably don't need the adminTest's testShutdownBroker given that the rolling bounce test exercises the same logic. Also, I think we can close KAFKA-817 - another approach with similar goals. Integrate controlled shutdown into kafka shutdown hook -- Key: KAFKA-927 URL: https://issues.apache.org/jira/browse/KAFKA-927 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Fix For: 0.8 Attachments: KAFKA-927.patch, KAFKA-927-v2.patch, KAFKA-927-v2-revised.patch, KAFKA-927-v3.patch, KAFKA-927-v3-removeimports.patch, KAFKA-927-v4.patch The controlled shutdown mechanism should be integrated into the software for better operational benefits. Also few optimizations can be done to reduce unnecessary rpc and zk calls. This patch has been tested on a prod like environment by doing rolling bounces continuously for a day. The average time of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes without this patch is 340 seconds. With this patch it reduces to 220 seconds. Also it ensures correctness in scenarios where the controller shrinks the isr and the new leader could place the broker to be shutdown back into the isr. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook
[ https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13673182#comment-13673182 ] Jun Rao commented on KAFKA-927: --- Thanks for patch v2. A few more comments: 20. KafkaController: If when shutdownBroker is called, the controller is no longer active, both state machines will throw an exception on state change calls. However, the issue is that we add the shutdown broker to controllerContext.shuttingDownBrokerIds and it's never reset. This may become a problem if this broker becomes a controller again. At the minimum, we need to reset controllerContext.shuttingDownBrokerIds in onControllerFailover(). However, I am a bit confused why we never reset controllerContext.shuttingDownBrokerIds and the shutdown logic still works. 21. ControlledShutdownRequest.handleError(): We should probably set partitionsRemaining in ControlledShutdownResponse to empty instead of null, since the serialization of ControlledShutdownResponse doesn't handle partitionsRemaining being null. 22. testRollingBounce: 22.1 The test makes sure that the leader for topic1 is changed after broker 0 is shutdown. However, the leader for topic1 could be on broker 1 initially. In this case, the leader won't be changed after broker 0 is shutdown. 22.2 The default controlledShutdownRetryBackoffMs is 5secs, which is probably too long for the unit test. 23. KafkaServer: We need to handle the errorCode in ControlledShutdownResponse since the controller may have moved after we send the ControlledShutdown request. From the previous review: 3. I think a simple solution is to (1) not call replicaManager.replicaFetcherManager.closeAllFetchers() in KafkaServer during shutdown; (2) in KafkaController.shutdownBroker(), for each partition on the shutdown broker, we first send a stopReplicaRequest to it for that partition before going through the state machine logic. Since the state machine logic involves ZK reads/writes, it's very likely that the stopReplicaRequest will reach the broker before the subsequent LeaderAndIsr requests. So, in most cases, the leader should be able to shrink ISR quicker than the timeout, without churns in ISR. Integrate controlled shutdown into kafka shutdown hook -- Key: KAFKA-927 URL: https://issues.apache.org/jira/browse/KAFKA-927 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Attachments: KAFKA-927.patch, KAFKA-927-v2.patch The controlled shutdown mechanism should be integrated into the software for better operational benefits. Also few optimizations can be done to reduce unnecessary rpc and zk calls. This patch has been tested on a prod like environment by doing rolling bounces continuously for a day. The average time of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes without this patch is 340 seconds. With this patch it reduces to 220 seconds. Also it ensures correctness in scenarios where the controller shrinks the isr and the new leader could place the broker to be shutdown back into the isr. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook
[ https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13671595#comment-13671595 ] Neha Narkhede commented on KAFKA-927: - Sriram, This patch doesn't compile with the following errors - [error] /home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaApis.scala:133: not found: type ControlledShutdownRequest [error] val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] [error] ^ [error] /home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaApis.scala:135: not found: type ControlledShutdownResponse [error] val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, [error] ^ [error] /home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:28: ControlledShutdownResponse is not a member of kafka.api [error] import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} [error]^ [error] /home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:157: not found: type ControlledShutdownRequest [error] val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) [error] ^ [error] /home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:160: not found: value ControlledShutdownResponse [error] val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) [error]^ Integrate controlled shutdown into kafka shutdown hook -- Key: KAFKA-927 URL: https://issues.apache.org/jira/browse/KAFKA-927 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Attachments: KAFKA-927.patch The controlled shutdown mechanism should be integrated into the software for better operational benefits. Also few optimizations can be done to reduce unnecessary rpc and zk calls. This patch has been tested on a prod like environment by doing rolling bounces continuously for a day. The average time of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes without this patch is 340 seconds. With this patch it reduces to 220 seconds. Also it ensures correctness in scenarios where the controller shrinks the isr and the new leader could place the broker to be shutdown back into the isr. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook
[ https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13671608#comment-13671608 ] Jun Rao commented on KAFKA-927: --- Thanks for the patch. Overall, a well thought-out patch. Some comments. 1. KafkaController.shutdownBroker: We should probably only do controlled shutdown if the controller is active. 2. KafkaApis: If the controller is not active, we should send an errorcode back to the ControlledShutdownRequest. 3. KafkaServer: I am not sure that we should call replicaManager.replicaFetcherManager.closeAllFetchers() at the beginning of the controlled shutdown. Once the fetchers are closed, the affected leaders have to wait for the timeout before committing new messages since they have to shrink the ISR. Instead, it's better if we let the fetcher to be closed through leaderAndIsr requests from the controller. 4. KafkaConfig: Could we consolidate controlledShutdownMaxRetries and controlledShutdownEnable to one config controlledShutDownWaitTime? If that value is =0, no controlled shutdown is done. Otherwise, we will try controlled shutdown until that time has passed. 5. With the new logic added in ReplicaManager/Partition, I am not sure if the old controlled shutdown tool still works properly. Should we just remove the tool and the jmx hook? 6. There are new files not included in the patch. Integrate controlled shutdown into kafka shutdown hook -- Key: KAFKA-927 URL: https://issues.apache.org/jira/browse/KAFKA-927 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Attachments: KAFKA-927.patch The controlled shutdown mechanism should be integrated into the software for better operational benefits. Also few optimizations can be done to reduce unnecessary rpc and zk calls. This patch has been tested on a prod like environment by doing rolling bounces continuously for a day. The average time of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes without this patch is 340 seconds. With this patch it reduces to 220 seconds. Also it ensures correctness in scenarios where the controller shrinks the isr and the new leader could place the broker to be shutdown back into the isr. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook
[ https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13671633#comment-13671633 ] Neha Narkhede commented on KAFKA-927: - Thanks for the patch, very well thought out! Few comments - 1. KafkaServer 1.1 doControlledShutdown() - Is there a reason why we cannot just invoke shutdown() on the ReplicaManager instead of hacking into the replica fetcher manager and shutting down the fetchers ? - starting controlled shutdown - Starting controlled shutdown. Though it is not introduced in this patch, can we please change the same in the shutdown() API as well? - Typo - shutdownSuceeded - This method is pretty big and slightly hard to read, for someone who is new to controlled shutdown. Can we move controller discovery/connection logic to a separate API named connectToController() ? - val controllerId = ZkUtils.getController(kafkaZookeeper.getZookeeperClient) ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, controllerId) match { case Some(broker) = if (channel == null || prevController == null || !prevController.equals(broker)) { // if this is the first attempt or if the controller has changed, create a channel to the most recent // controller if (channel != null) { channel.disconnect() } channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) channel.connect() prevController = broker } case None= //ignore and try again } - I also think it will be cleaner for the loop to look like, but it's upto you :) while (!shutdownSucceeded remainingRetries 0) { val controller = connectToController(zkClient) val shutdownSucceeded = sendControllerShutdownRequest(controller) if(!shutdownSucceeded) Thread.sleep(...) remainingRetries -= 1 } - Can we add either a warn or an info message that the broker will retry controlled shutdown after n ms ? if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) } - Can we rename doControlledShutdown() to just controlledShutdown(). This will follow the naming conventions in the rest of the code, since we don't name methods doSomething. - Let's remove the zkClient unused variable 2. KafkaApis - If the controller is not active, we should send the appropriate error code 3. KafkaController - getPartitionsAssignedToBroker() does not need to read from zookeeper. The controller should already have the latest data available as the controllerLock is acquired at this point. - The following updates zookeeper which is not required since the leader would've done that long before the controller does it. This is because you shutdown the replica fetchers at the beginning of controlled shutdown. It will be much faster to just send a leader and isr request with the shrunk ISR to the existing leader, though I doubt that is required as well. else { // if the broker is a follower, updates the isr in ZK and notifies the current leader replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica) } 4. We want to know that the broker is rejecting the become-follower request in the state change log when the following happens. So it is not enough to just surround the addFetcher call with this condition 5. New files are not included in the patch if (!replicaManager.isShuttingDown.get()) { // start fetcher thread to current leader if we are not shutting down replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) } Integrate controlled shutdown into kafka shutdown hook -- Key: KAFKA-927 URL: https://issues.apache.org/jira/browse/KAFKA-927 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Attachments: KAFKA-927.patch The controlled shutdown mechanism should be integrated into the software for better operational benefits. Also few optimizations can be done to reduce unnecessary rpc and zk calls. This patch has been tested on a prod like environment by doing rolling bounces continuously for a day. The average time of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes without this patch is 340 seconds. With this patch it reduces to 220 seconds. Also