Github user aarondav commented on a diff in the pull request:
https://github.com/apache/incubator-spark/pull/611#discussion_r9875482
--- Diff:
core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
---
@@ -18,105 +18,73 @@
package org.apache.spark.deploy.master
import akka.actor.ActorRef
-import org.apache.zookeeper._
-import org.apache.zookeeper.Watcher.Event.EventType
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.master.MasterMessages._
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.leader.{LeaderLatchListener,
LeaderLatch}
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor:
ActorRef,
masterUrl: String, conf: SparkConf)
- extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
+ extends LeaderElectionAgent with LeaderLatchListener with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") +
"/leader_election"
- private val watcher = new ZooKeeperWatcher()
- private val zk = new SparkZooKeeperSession(this, conf)
+ private var zk: CuratorFramework = _
+ private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
- private var myLeaderFile: String = _
- private var leaderUrl: String = _
override def preStart() {
+
logInfo("Starting ZooKeeper LeaderElection agent")
- zk.connect()
- }
+ zk = SparkCuratorUtil.newClient(conf)
+ leaderLatch = new LeaderLatch(zk, WORKING_DIR)
+ leaderLatch.addListener(this)
- override def zkSessionCreated() {
- synchronized {
- zk.mkdirRecursive(WORKING_DIR)
- myLeaderFile =
- zk.create(WORKING_DIR + "/master_", masterUrl.getBytes,
CreateMode.EPHEMERAL_SEQUENTIAL)
- self ! CheckLeader
- }
+ leaderLatch.start()
}
override def preRestart(reason: scala.Throwable, message:
scala.Option[scala.Any]) {
- logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS
+ "...", reason)
- Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
+ logError("LeaderElectionAgent failed...", reason)
super.preRestart(reason, message)
}
- override def zkDown() {
- logError("ZooKeeper down! LeaderElectionAgent shutting down Master.")
- System.exit(1)
- }
-
override def postStop() {
+ leaderLatch.close()
zk.close()
}
override def receive = {
- case CheckLeader => checkLeader()
+ case _ =>
}
- private class ZooKeeperWatcher extends Watcher {
- def process(event: WatchedEvent) {
- if (event.getType == EventType.NodeDeleted) {
- logInfo("Leader file disappeared, a master is down!")
- self ! CheckLeader
+ override def isLeader() {
+ // In case that leadship gain and lost in a short time.
+ Thread.sleep(1000)
--- End diff --
Ah, sorry if I was unclear, but I was just joking about putting a
sleep(1000) in here. The real solution is to add a synchronized block to
isLeader and notLeader -- I was just making a point that we're not concerned
with the overhead of synchronization in this code path. (The synchronized block
is not needed with the current implementation and use of Curator, but I think
it makes the code clearer without a real downside.)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
[email protected] or file a JIRA ticket with INFRA.
---