[ https://issues.apache.org/jira/browse/STORM-588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rick Kellogg updated STORM-588: ------------------------------- Component/s: storm-core > Executor-Level Rebalance Mechanism > ---------------------------------- > > Key: STORM-588 > URL: https://issues.apache.org/jira/browse/STORM-588 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Affects Versions: 0.10.0, 0.9.3-rc2 > Reporter: troy ding > Assignee: troy ding > > I. The motivation > The current rebalance mechanism is implemented on the worker level. When > rebalance operation is triggered (e.g. by adding/removing a worker), storm > kills all the workers with different assignment. It means the rebalance > operation has to kill certain running workers and launches them according to > the new assignment. The advantage of the mechanism is the simplicity of the > implementation, but possibly incurs _huge_ overhead. Actually, the restarting > latency is usually more than one second, making the system almost impossible > to recover under high incoming data stream rate. No system administrator > dares to call rebalance, especially when the system is overloaded! To bring > back the real benefits of rebalancing operation, we believe it is important > to address the following problems: > *1. Resource wastage and additional initialization cost*: In most cases, the > changes on worker’s assignment (if not killed) only affect a small fraction > of running executors on it. Only part of them needs to be migrated or > created, while the remaining can keep running on the same worker. The current > implementation, however, forcefully restarts all the executors, and calls > unnecessary initializations (i.e. call Bolt.prepare() and Spout.prepare()) to > most of the running tasks. It not only wastes the computation resources of > unaffected executors, but also amplifies the initialization costs under > certain condition, e.g. index load in the bolt. > *2. Restarting workers causes avoidable in-memory data loss*: Currently, a > supervisor uses “kill -9” command to kill its correspondent worker. > Consequently, all the tasks on this worker have no chance to save the task > data. The running states of the workers, including important information when > resuming its duty, are simply lost, potentially causing unnecessary > recomputation on the states. > *3. JVM restart cost, long duration and lost of HotSpot optimizations*: > Restarting a JVM involves a long initialization procedure, and loses all the > runtime optimizations available for the application byte-code. As far as we > know, the HotSpot JVM is capable of detecting the performance-critical > sections in the code and dynamically translates the Java byte codes of these > hot spots into native machine code. In particular, tasks that are CPU-bound > can greatly benefit from this feature. If we directly kill the worker, all > the advantages of these features are lost. > II. Proposed solutions > 1. At the supervisor side: > The current supervisor implementation periodically calls the “sync-processes” > function to check whether a live worker should be killed: (1) the mapping > relationship between the worker and the topology has changed (e.g. this > worker is re-assigned to another topology or the serving topology is killed); > (2) the worker’s assignment has updated (e.g. the parallelism of some bolts > increases/decreases). > In order to reuse the worker’s JVM instance as much as possible, we propose > that we do not kill the workers mentioned in condition (2), but only kill > those that do not belong to the topology anymore (condition (1)). > 2. At the worker side: > Because of the reuse of the JVM instance, workers needs to periodically > synchronize its assigned executors. To achieve this, a new thread which is > similar to the existing “refresh-connections” is launched, to kill the > non-existing executors, and to start newly assigned ones. Note that, in > practice, the “refresh-connections“ threads already retrieves the assignment > information from the ZK, and this information can be shared with this new > thread, which reduce the load of the ZK. > Due to the change of the binging from the running executors to the worker, > re-routing tuple is also required. To fulfill this prepose, we need to > rewrite the following two functions, “transfer-local-fn” and “transfer-fn” > (note the rewrite is compulsive because these two functions are immutable in > the current implementation). > Another function needs careful modification is > “WorkerTopologyContext.getThisWorkerTasks()”, because the (defn- mk-grouper … > :local-or-shuffle) in “excutor.clj” depends on this function to get required > context information. Therefore, in the case that an end user calls > “WorkerTopologyContext.getThisWorkerTasks()” in the “prepare()”, and stores > the results, if the executor has not restarted, using these results may > potentially leads to inconsistency. > In summary, we propose this new executor-level rebalance mechanism, which > tries to maximize the resource usage and minimize the rebalance cost. This is > essential for the whole system, especially important for the the ultimate > purpose on elasticity features for Storm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)