[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook

2013-06-04 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13674598#comment-13674598
 ] 

Joel Koshy commented on KAFKA-927:
--

+1 - sorry I got to this late.

Small nit: the scaladoc for shutdown broker needs an edit which we will clean 
up later.
We probably don't need the adminTest's testShutdownBroker given that the 
rolling bounce test exercises the same logic.

Also, I think we can close KAFKA-817 - another approach with similar goals.



 Integrate controlled shutdown into kafka shutdown hook
 --

 Key: KAFKA-927
 URL: https://issues.apache.org/jira/browse/KAFKA-927
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Fix For: 0.8

 Attachments: KAFKA-927.patch, KAFKA-927-v2.patch, 
 KAFKA-927-v2-revised.patch, KAFKA-927-v3.patch, 
 KAFKA-927-v3-removeimports.patch, KAFKA-927-v4.patch


 The controlled shutdown mechanism should be integrated into the software for 
 better operational benefits. Also few optimizations can be done to reduce 
 unnecessary rpc and zk calls. This patch has been tested on a prod like 
 environment by doing rolling bounces continuously for a day. The average time 
 of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
 without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
 Also it ensures correctness in scenarios where the controller shrinks the isr 
 and the new leader could place the broker to be shutdown back into the isr.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook

2013-06-03 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13673182#comment-13673182
 ] 

Jun Rao commented on KAFKA-927:
---

Thanks for patch v2. A few more comments:

20. KafkaController: If when shutdownBroker is called, the controller is no 
longer active, both state machines will throw an exception on state change 
calls. However, the issue is that we add the shutdown broker to 
controllerContext.shuttingDownBrokerIds and it's never reset. This may become a 
problem if this broker becomes a controller again. At the minimum, we need to 
reset controllerContext.shuttingDownBrokerIds in  onControllerFailover(). 
However, I am a bit confused why we never reset 
controllerContext.shuttingDownBrokerIds and the shutdown logic still works.

21. ControlledShutdownRequest.handleError(): We should probably set 
partitionsRemaining in ControlledShutdownResponse to empty instead of null, 
since the serialization of ControlledShutdownResponse doesn't handle 
partitionsRemaining being null.

22. testRollingBounce:
22.1 The test makes sure that the leader for topic1 is changed after broker 0 
is shutdown. However, the leader for topic1 could be on broker 1 initially. In 
this case, the leader won't be changed after broker 0 is shutdown.
22.2 The default controlledShutdownRetryBackoffMs is 5secs, which is probably 
too long for the unit test. 

23. KafkaServer: We need to handle the errorCode in ControlledShutdownResponse 
since the controller may have moved after we send the ControlledShutdown 
request.

From the previous review:
3. I think a simple solution is to (1) not call 
replicaManager.replicaFetcherManager.closeAllFetchers() in KafkaServer during 
shutdown; (2) in KafkaController.shutdownBroker(), for each partition on the 
shutdown broker, we first send a stopReplicaRequest to it for that partition 
before going through the state machine logic. Since the state machine logic 
involves ZK reads/writes, it's very likely that the stopReplicaRequest will 
reach the broker before the subsequent LeaderAndIsr requests. So, in most 
cases, the leader should be able to shrink ISR quicker than the timeout, 
without churns in ISR.

 Integrate controlled shutdown into kafka shutdown hook
 --

 Key: KAFKA-927
 URL: https://issues.apache.org/jira/browse/KAFKA-927
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-927.patch, KAFKA-927-v2.patch


 The controlled shutdown mechanism should be integrated into the software for 
 better operational benefits. Also few optimizations can be done to reduce 
 unnecessary rpc and zk calls. This patch has been tested on a prod like 
 environment by doing rolling bounces continuously for a day. The average time 
 of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
 without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
 Also it ensures correctness in scenarios where the controller shrinks the isr 
 and the new leader could place the broker to be shutdown back into the isr.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook

2013-05-31 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13671595#comment-13671595
 ] 

Neha Narkhede commented on KAFKA-927:
-

Sriram,

This patch doesn't compile with the following errors - 

[error] 
/home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaApis.scala:133:
 not found: type ControlledShutdownRequest
[error] val controlledShutdownRequest = 
request.requestObj.asInstanceOf[ControlledShutdownRequest]
[error] ^
[error] 
/home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaApis.scala:135:
 not found: type ControlledShutdownResponse
[error] val controlledShutdownResponse = new 
ControlledShutdownResponse(controlledShutdownRequest.correlationId,
[error]  ^
[error] 
/home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:28:
 ControlledShutdownResponse is not a member of kafka.api
[error] import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
[error]^
[error] 
/home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:157:
 not found: type ControlledShutdownRequest
[error] val request = new 
ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
[error]   ^
[error] 
/home/nnarkhed/Projects/apache-kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:160:
 not found: value ControlledShutdownResponse
[error] val shutdownResponse = 
ControlledShutdownResponse.readFrom(response.buffer)
[error]^


 Integrate controlled shutdown into kafka shutdown hook
 --

 Key: KAFKA-927
 URL: https://issues.apache.org/jira/browse/KAFKA-927
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-927.patch


 The controlled shutdown mechanism should be integrated into the software for 
 better operational benefits. Also few optimizations can be done to reduce 
 unnecessary rpc and zk calls. This patch has been tested on a prod like 
 environment by doing rolling bounces continuously for a day. The average time 
 of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
 without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
 Also it ensures correctness in scenarios where the controller shrinks the isr 
 and the new leader could place the broker to be shutdown back into the isr.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook

2013-05-31 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13671608#comment-13671608
 ] 

Jun Rao commented on KAFKA-927:
---

Thanks for the patch. Overall, a well thought-out patch. Some comments.

1. KafkaController.shutdownBroker: We should probably only do controlled 
shutdown if the controller is active.

2. KafkaApis: If the controller is not active, we should send an errorcode back 
to the ControlledShutdownRequest.

3. KafkaServer: I am not sure that we should call 
replicaManager.replicaFetcherManager.closeAllFetchers() at the beginning of the 
controlled shutdown. Once the fetchers are closed, the affected leaders have to 
wait for the timeout before committing new messages since they have to shrink 
the ISR. Instead, it's better if we let the fetcher to be closed through 
leaderAndIsr requests from the controller.

4. KafkaConfig: Could we consolidate controlledShutdownMaxRetries and 
controlledShutdownEnable to one config controlledShutDownWaitTime? If that 
value is =0, no controlled shutdown is done. Otherwise, we will try controlled 
shutdown until that time has passed.

5. With the new logic added in ReplicaManager/Partition, I am not sure if the 
old controlled shutdown tool still works properly. Should we just remove the 
tool and the jmx hook?

6. There are new files not included in the patch.

 Integrate controlled shutdown into kafka shutdown hook
 --

 Key: KAFKA-927
 URL: https://issues.apache.org/jira/browse/KAFKA-927
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-927.patch


 The controlled shutdown mechanism should be integrated into the software for 
 better operational benefits. Also few optimizations can be done to reduce 
 unnecessary rpc and zk calls. This patch has been tested on a prod like 
 environment by doing rolling bounces continuously for a day. The average time 
 of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
 without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
 Also it ensures correctness in scenarios where the controller shrinks the isr 
 and the new leader could place the broker to be shutdown back into the isr.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook

2013-05-31 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13671633#comment-13671633
 ] 

Neha Narkhede commented on KAFKA-927:
-

Thanks for the patch, very well thought out! Few comments -
1. KafkaServer
1.1 doControlledShutdown()
- Is there a reason why we cannot just invoke shutdown() on the ReplicaManager 
instead of hacking into the replica fetcher manager and shutting down the 
fetchers ?
- starting controlled shutdown - Starting controlled shutdown. Though it 
is not introduced in this patch, can we please change the same in the 
shutdown() API as well?
- Typo - shutdownSuceeded
- This method is pretty big and slightly hard to read, for someone who is new 
to controlled shutdown. Can we move controller discovery/connection logic to a 
separate API named connectToController() ? -
val controllerId = 
ZkUtils.getController(kafkaZookeeper.getZookeeperClient)
ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, controllerId) 
match {
  case Some(broker) =
if (channel == null || prevController == null || 
!prevController.equals(broker)) {
  // if this is the first attempt or if the controller has changed, 
create a channel to the most recent
  // controller
  if (channel != null) {
channel.disconnect()
  }
  channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
  channel.connect()
  prevController = broker
}
  case None=
//ignore and try again
}
- I also think it will be cleaner for the loop to look like, but it's upto you 
:) 
  while (!shutdownSucceeded  remainingRetries  0) {
 val controller = connectToController(zkClient)
 val shutdownSucceeded = sendControllerShutdownRequest(controller)
 if(!shutdownSucceeded)
Thread.sleep(...)
 remainingRetries -= 1
  }
- Can we add either a warn or an info message that the broker will retry 
controlled shutdown after n ms ?
if (!shutdownSuceeded) {
  Thread.sleep(config.controlledShutdownRetryBackoffMs)
}
- Can we rename doControlledShutdown() to just controlledShutdown(). This will 
follow the naming conventions in the rest of the code, since we don't name 
methods doSomething.
- Let's remove the zkClient unused variable

2. KafkaApis
- If the controller is not active, we should send the appropriate error code
  
3. KafkaController
- getPartitionsAssignedToBroker() does not need to read from zookeeper. The 
controller should already have the latest data available as the controllerLock 
is acquired at this point. 
- The following updates zookeeper which is not required since the leader 
would've done that long before the controller does it. This is because you 
shutdown the replica fetchers at the beginning of controlled shutdown. It will 
be much faster to just send a leader and isr request with the shrunk ISR to the 
existing leader, though I doubt that is required as well.
else {
  // if the broker is a follower, updates the isr in ZK and 
notifies the current leader
  
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
topicAndPartition.partition, id)), OfflineReplica)
}

4. We want to know that the broker is rejecting the become-follower request in 
the state change log when the following happens. So it is not enough to just 
surround the addFetcher call with this condition 
5. New files are not included in the patch
  if (!replicaManager.isShuttingDown.get()) {
// start fetcher thread to current leader if we are not shutting 
down
replicaFetcherManager.addFetcher(topic, partitionId, 
localReplica.logEndOffset, leaderBroker)
  }


 Integrate controlled shutdown into kafka shutdown hook
 --

 Key: KAFKA-927
 URL: https://issues.apache.org/jira/browse/KAFKA-927
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-927.patch


 The controlled shutdown mechanism should be integrated into the software for 
 better operational benefits. Also few optimizations can be done to reduce 
 unnecessary rpc and zk calls. This patch has been tested on a prod like 
 environment by doing rolling bounces continuously for a day. The average time 
 of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
 without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
 Also