STORM-166 Nimbus HA design documentation and sequence diagram.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07b69b7b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07b69b7b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07b69b7b Branch: refs/heads/0.11.x-branch Commit: 07b69b7b86828b4213873405323c0e9be03934f1 Parents: dd991e5 Author: Parth Brahmbhatt <[email protected]> Authored: Fri Dec 19 12:52:30 2014 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Fri Dec 19 12:52:30 2014 -0800 ---------------------------------------------------------------------- .../nimbus_ha_leader_election_and_failover.png | Bin 0 -> 154316 bytes .../images/nimbus_ha_topology_submission.png | Bin 0 -> 134180 bytes docs/documentation/nimbus-ha-design.md | 201 +++++++++++++++++++ 3 files changed, 201 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/07b69b7b/docs/documentation/images/nimbus_ha_leader_election_and_failover.png ---------------------------------------------------------------------- diff --git a/docs/documentation/images/nimbus_ha_leader_election_and_failover.png b/docs/documentation/images/nimbus_ha_leader_election_and_failover.png new file mode 100644 index 0000000..60cc1b7 Binary files /dev/null and b/docs/documentation/images/nimbus_ha_leader_election_and_failover.png differ http://git-wip-us.apache.org/repos/asf/storm/blob/07b69b7b/docs/documentation/images/nimbus_ha_topology_submission.png ---------------------------------------------------------------------- diff --git a/docs/documentation/images/nimbus_ha_topology_submission.png b/docs/documentation/images/nimbus_ha_topology_submission.png new file mode 100644 index 0000000..7707e5a Binary files /dev/null and b/docs/documentation/images/nimbus_ha_topology_submission.png differ http://git-wip-us.apache.org/repos/asf/storm/blob/07b69b7b/docs/documentation/nimbus-ha-design.md ---------------------------------------------------------------------- diff --git a/docs/documentation/nimbus-ha-design.md b/docs/documentation/nimbus-ha-design.md new file mode 100644 index 0000000..51b15f8 --- /dev/null +++ b/docs/documentation/nimbus-ha-design.md @@ -0,0 +1,201 @@ +#Highly Available Nimbus design proposal +##Problem Statement: +Currently the storm master aka nimbus, is a process that runs on a single machine under supervision. In most cases the +nimbus failure is transient and it is restarted by the supervisor. However sometimes when disks fail and networks +partitions occur, nimbus goes down. Under these circumstances the topologies run normally but no new topologies can be +submitted, no existing topologies can be killed/deactivated/activated and if a supervisor node fails then the +reassignments are not performed resulting in performance degradation or topology failures. With this project we intend +to resolve this problem by running nimbus in a primary backup mode to guarantee that even if a nimbus server fails one +of the backups will take over. +##Requirements: +* Increase overall availability of nimbus. +* Allow nimbus hosts to leave and join the cluster at will any time. A newly joined host should auto catch up and join +the list of potential leaders automatically. +* No topology resubmissions required in case of nimbus fail overs. +* No active topology should ever be lost. +##Components: +Following are different components to achieve the above goals. +###Leader Election: +The nimbus server will use the following interface: + +```java +public interface ILeaderElector { + /** + * queue up for leadership lock. The call returns immediately and the caller + * must check isLeader() to perform any leadership action. + */ + void addToLeaderLockQueue(); + + /** + * Removes the caller from the leader lock queue. If the caller is leader + * also releases the lock. + */ + void removeFromLeaderLockQueue(); + + /** + * + * @return true if the caller currently has the leader lock. + */ + boolean isLeader(); + + /** + * + * @return the current leader's address , throws exception if noone has has lock. + */ + InetSocketAddress getLeaderAddress(); + + /** + * + * @return list of current nimbus addresses, includes leader. + */ + List<InetSocketAddress> getAllNimbusAddresses(); +} +``` +On startup nimbus will check if it has code for all active topologies available locally. Once it gets to this state it +will call addToLeaderLockQueue() function. When a nimbus is notified to become a leader it will check if it has all the +code locally before assuming the leadership role. If any active topology code is missing, the node will not accept the +leadership role instead it will release the lock and wait till it has all the code before requeueing for leader lock. + +The first implementation will be Zookeeper based. If the zookeeper connection is lost/resetted resulting in loss of lock +or the spot in queue the implementation will take care of updating the state such that isLeader() will reflect the +current status.The leader like actions must finish in less than minimumOf(connectionTimeout, SessionTimeout) to ensure +the lock was held by nimbus for the entire duration of the action (Not sure if we want to just state this expectation +and ensure that zk configurations are set high enough which will result in higher failover time or we actually want to +create some sort of rollback mechanism for all actions, the second option needs a lot of code). If a nimbus that is not +leader receives a request that only a leader can perform it will throw a RunTimeException. + +Following steps describes a nimbus failover scenario: +* Letâs say we have 4 topologies running with 3 nimbus nodes and code-replication-factor = 2. We assume that the +invariant âThe leader nimbus has code for all topologies locallyâ holds true at the beginning. nonleader-1 has code for +the first 2 topologies and nonLeader-2 has code for the other 2 topologies. +* Leader nimbus dies, hard disk failure so no recovery possible. +* nonLeader-1 gets a zookeeper notification to indicate it is now the new leader. before accepting the leadership it +checks if it has code available for all 4 topologies(these are topologies under /storm/storms/). It realizes it only has +code for 2 topologies so it relinquishes the lock and looks under /storm/code-distributor/topologyId to find out from +where can it download the code/metafile for the missing topologies. it finds entries for the leader nimbus and +nonleader-2. It will try downloading from both as part of its retry mechanism. +* nonLeader-2âs code sync thread also realizes that it is missing code for 2 topologies and follows the same process +described in step-3 to download code for missing topologies. +* eventually at least one of the nimbuses will have all the code locally and will accept leadership. +This sequence diagram describes how leader election and failover would work with multiple components. + + + +###Nimbus state store: + +Currently the nimbus stores 2 kind of data +* Meta information like supervisor info, assignment info which is stored in zookeeper +* Actual topology configs and jars that is stored on nimbus hostâs local disk. + +To achieve fail over from primary to backup servers nimbus state/data needs to be replicated across all nimbus hosts or +needs to be stored in a distributed storage. Replicating the data correctly involves state management, consistency checks +and it is hard to test for correctness.However many storm users do not want to take extra dependency on another replicated +storage system like HDFS and still need high availability.Eventually, we want to move to the bittorrent protocol for code +distribution given the size of the jars and to achieve better scaling when the total number of supervisors is very high. +The current file system based model for code distribution works fine with systems that have file system like structure +but it fails to support a non file system based approach like bit torrent. To support bit torrent we can go with the +following interface instead of the storage interface described above. The interface described below can still be used with +HDFS,S3 and local file system, so this is a more extensible interface. +```java +/** + * Interface responsible to distribute code in the cluster. + */ +public interface ICodeDistributor { + /** + * Prepare this code distributor. + * @param conf + */ + void prepare(Map conf); + + /** + * This API will perform the actual upload of the code to the distributed implementation. + * The API should return a Meta file which should have enough information for downloader + * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something + * like HDFS or s3 it might have the actual directory or paths for files to be downloaded. + * @param dirPath local directory where all the code to be distributed exists. + * @param topologyId the topologyId for which the meta file needs to be created. + * @return metaFile + */ + File upload(Path dirPath, String topologyId); + + /** + * Given the topologyId and metafile, download the actual code and return the downloaded file's list. + * @param topologyid + * @param metafile + * @param destDirPath the folder where all the files will be downloaded. + * @return + */ + List<File> download(Path destDirPath, String topologyid, File metafile); + + /** + * Given the topologyId, returns number of hosts where the code has been replicated. + */ + int getReplicationCount(String topologyId); + + /** + * Performs the cleanup. + * @param topologyid + */ + void cleanup(String topologyid); + + /** + * Close this distributor. + * @param conf + */ + void close(Map conf); +} +``` +To support replication we will allow the user to define a code replication factor which would reflect number of nimbus +hosts to which the code must be replicated before starting the topology. With replication comes the issue of consistency. +We will treat zookeeperâs list of active topologies as our authority for topologies for which the code must exist on a +nimbus host. Any nimbus host that does not have all the code for all the topologies which are marked as active in zookeeper +will relinquish itâs lock so some other nimbus host could become leader. A background thread on all nimbus host will +continuously try to sync code from other hosts where the code was successfully replicated so eventually at least one nimbus +will accept leadership as long as at least one seed hosts exists for each active topology. + +Following steps describe code replication amongst nimbus hosts for a topology: +* When client uploads jar, nothing changes. +* When client submits a topology, leader nimbus calls code distributorâs upload function which will create a metafile stored +locally on leader nimbus. Leader nimbus will write new entries under /storm/code-distributor/topologyId to notify all +nonleader nimbuses that they should download this new code. +* We wait on the leader nimbus to ensure at least N non leader nimbus has the code replicated, with a user configurable timeout. +* When a non leader nimbus receives the notification about new code, it downloads the meta file from leader nimbus and then +downloads the real code by calling code distributorâs download function with metafile as input. +* Once non leader finishes downloading code, it will write an entry under /storm/code-distributor/topologyId to indicate +it is one of the possible places to download the code/metafile in case the leader nimbus dies. +* leader nimbus goes ahead and does all the usual things it does as part of submit topologies. + +The following sequence diagram describes the communication between different components involved in code distribution. + + + +##Thrift and Rest API + +This section only exists to track and document how we can reduce the added load on zookeeper for nimbus discovery if the +performance numbers indicated any degradation. The actual implementation will not be part of nimbus HA unless we have +performance tests to indicate degradation. + +In order to avoid workers/supervisors/ui talking to zookeeper for getting master nimbus address we can add following new API: + +```java +/** +* Returns list of all nimbus hosts that are either currently in queue or has +* the leadership lock. +*/ +List<NimbusInfo> getNimbusHosts(); + +/** +* NimbusInfo +*/ +Class NimbusInfo { + String host; + short port; + boolean isLeader; +} +``` + +These apis will be used by StormSubmitter, Nimbus clients,supervisors and ui to discover the current leaders and participating +nimbus hosts. Any nimbus host will be able to respond to these requests. The nimbus hosts can read this information once +from zookeeper and cache it and keep updating the cache when the watchers are fired to indicate any changes,which should be +rare in general case. In addition we should update all the existing thrift and rest apisâs to throw redirect +exceptions when a non leader receives a request that only a leader should serve.
