[ 
https://issues.apache.org/jira/browse/STORM-588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Kellogg updated STORM-588:
-------------------------------
    Component/s: storm-core

> Executor-Level Rebalance Mechanism
> ----------------------------------
>
>                 Key: STORM-588
>                 URL: https://issues.apache.org/jira/browse/STORM-588
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>    Affects Versions: 0.10.0, 0.9.3-rc2
>            Reporter: troy ding
>            Assignee: troy ding
>
> I. The motivation
> The current rebalance mechanism is implemented on the worker level. When 
> rebalance operation is triggered (e.g. by adding/removing a worker), storm 
> kills all the workers with different assignment. It means the rebalance 
> operation has to kill certain running workers and launches them according to 
> the new assignment. The advantage of the mechanism is the simplicity of the 
> implementation, but possibly incurs _huge_ overhead. Actually, the restarting 
> latency is usually more than one second, making the system almost impossible 
> to recover under high incoming data stream rate. No system administrator 
> dares to call rebalance, especially when the system is overloaded! To bring 
> back the real benefits of rebalancing operation, we believe it is important 
> to address the following problems:
> *1. Resource wastage and additional initialization cost*: In most cases, the 
> changes on worker’s assignment (if not killed) only affect a small fraction 
> of running executors on it. Only part of them needs to be migrated or 
> created, while the remaining can keep running on the same worker. The current 
> implementation, however, forcefully restarts all the executors, and calls 
> unnecessary initializations (i.e. call Bolt.prepare() and Spout.prepare()) to 
> most of the running tasks. It not only wastes the computation resources of 
> unaffected executors, but also amplifies the initialization costs under 
> certain condition, e.g. index load in the bolt.
> *2. Restarting workers causes avoidable in-memory data loss*: Currently, a 
> supervisor uses “kill -9” command to kill its correspondent worker. 
> Consequently, all the tasks on this worker have no chance to save the task 
> data. The running states of the workers, including important information when 
> resuming its duty, are simply lost, potentially causing unnecessary 
> recomputation on the states.
> *3. JVM restart cost, long duration and lost of HotSpot optimizations*: 
> Restarting a JVM involves a long initialization procedure, and loses all the 
> runtime optimizations available for the application byte-code. As far as we 
> know, the HotSpot JVM is capable of detecting the performance-critical 
> sections in the code and dynamically translates the Java byte codes of these 
> hot spots into native machine code. In particular, tasks that are CPU-bound 
> can greatly benefit from this feature. If we directly kill the worker, all 
> the advantages of these features are lost.
> II. Proposed solutions
> 1. At the supervisor side:
> The current supervisor implementation periodically calls the “sync-processes” 
> function to check whether a live worker should be killed: (1) the mapping 
> relationship between the worker and the topology has changed (e.g. this 
> worker is re-assigned to another topology or the serving topology is killed); 
> (2) the worker’s assignment has updated (e.g. the parallelism of some bolts 
> increases/decreases). 
> In order to reuse the worker’s JVM instance as much as possible, we propose 
> that we do not kill the workers mentioned in condition (2), but only kill 
> those that do not belong to the topology anymore (condition (1)).
> 2. At the worker side: 
> Because of the reuse of the JVM instance, workers needs to periodically 
> synchronize its assigned executors. To achieve this, a new thread which is 
> similar to the existing “refresh-connections” is launched, to kill the 
> non-existing executors, and to start newly assigned ones. Note that, in 
> practice, the “refresh-connections“ threads already retrieves the assignment 
> information from the ZK, and this information can be shared with this new 
> thread, which reduce the load of the ZK.
> Due to the change of the binging from the running executors to the worker, 
> re-routing tuple is also required. To fulfill this prepose, we need to 
> rewrite the following two functions, “transfer-local-fn” and “transfer-fn” 
> (note the rewrite is compulsive because these two functions are immutable in 
> the current implementation). 
> Another function needs careful modification is 
> “WorkerTopologyContext.getThisWorkerTasks()”, because the (defn- mk-grouper … 
> :local-or-shuffle) in “excutor.clj” depends on this function to get required 
> context information. Therefore, in the case that an end user calls 
> “WorkerTopologyContext.getThisWorkerTasks()” in the “prepare()”, and stores 
> the results, if the executor has not restarted, using these results may 
> potentially leads to inconsistency.
> In summary, we propose this new executor-level rebalance mechanism, which 
> tries to maximize the resource usage and minimize the rebalance cost. This is 
> essential for the whole system, especially important for the the ultimate 
> purpose on elasticity features for Storm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to