Repository: spark
Updated Branches:
  refs/heads/master 6e03de304 -> deefd9d73


SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable

Author: Prashant Sharma <prashan...@imaginea.com>

Closes #771 from ScrapCodes/deploy-failover-pluggable and squashes the 
following commits:

29ba440 [Prashant Sharma] fixed a compilation error
fef35ec [Prashant Sharma] Code review
57ee6f0 [Prashant Sharma] SPARK-1830 Deploy failover, Make Persistence engine 
and LeaderAgent Pluggable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/deefd9d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/deefd9d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/deefd9d7

Branch: refs/heads/master
Commit: deefd9d7377a8091a1d184b99066febd0e9f6afd
Parents: 6e03de3
Author: Prashant Sharma <prashan...@imaginea.com>
Authored: Tue Nov 11 09:29:48 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Tue Nov 11 09:29:48 2014 -0800

----------------------------------------------------------------------
 .../spark/deploy/master/ApplicationInfo.scala   |  1 +
 .../apache/spark/deploy/master/DriverInfo.scala |  1 +
 .../master/FileSystemPersistenceEngine.scala    | 62 +++++------------
 .../deploy/master/LeaderElectionAgent.scala     | 37 +++++------
 .../org/apache/spark/deploy/master/Master.scala | 40 ++++++-----
 .../spark/deploy/master/PersistenceEngine.scala | 70 +++++++++++++++-----
 .../deploy/master/RecoveryModeFactory.scala     | 69 +++++++++++++++++++
 .../apache/spark/deploy/master/WorkerInfo.scala |  1 +
 .../master/ZooKeeperLeaderElectionAgent.scala   | 24 ++-----
 .../master/ZooKeeperPersistenceEngine.scala     | 56 ++++++----------
 10 files changed, 211 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 6ba395b..ad7d817 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import akka.actor.ActorRef
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.ApplicationDescription
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
index 2ac2118..9d3d793 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.master
 
 import java.util.Date
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.DriverDescription
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 08a99bb..6ff2aa5 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.deploy.master
 
 import java.io._
-
-import akka.serialization.Serialization
+import java.nio.ByteBuffer
 
 import org.apache.spark.Logging
+import org.apache.spark.serializer.Serializer
+
+import scala.reflect.ClassTag
 
 /**
  * Stores data in a single on-disk directory with one file per application and 
worker.
@@ -32,65 +34,39 @@ import org.apache.spark.Logging
  */
 private[spark] class FileSystemPersistenceEngine(
     val dir: String,
-    val serialization: Serialization)
+    val serialization: Serializer)
   extends PersistenceEngine with Logging {
 
+  val serializer = serialization.newInstance()
   new File(dir).mkdir()
 
-  override def addApplication(app: ApplicationInfo) {
-    val appFile = new File(dir + File.separator + "app_" + app.id)
-    serializeIntoFile(appFile, app)
-  }
-
-  override def removeApplication(app: ApplicationInfo) {
-    new File(dir + File.separator + "app_" + app.id).delete()
-  }
-
-  override def addDriver(driver: DriverInfo) {
-    val driverFile = new File(dir + File.separator + "driver_" + driver.id)
-    serializeIntoFile(driverFile, driver)
+  override def persist(name: String, obj: Object): Unit = {
+    serializeIntoFile(new File(dir + File.separator + name), obj)
   }
 
-  override def removeDriver(driver: DriverInfo) {
-    new File(dir + File.separator + "driver_" + driver.id).delete()
+  override def unpersist(name: String): Unit = {
+    new File(dir + File.separator + name).delete()
   }
 
-  override def addWorker(worker: WorkerInfo) {
-    val workerFile = new File(dir + File.separator + "worker_" + worker.id)
-    serializeIntoFile(workerFile, worker)
-  }
-
-  override def removeWorker(worker: WorkerInfo) {
-    new File(dir + File.separator + "worker_" + worker.id).delete()
-  }
-
-  override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], 
Seq[WorkerInfo]) = {
-    val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
-    val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
-    val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
-    val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
-    val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
-    val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
-    val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
-    (apps, drivers, workers)
+  override def read[T: ClassTag](prefix: String) = {
+    val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))
+    files.map(deserializeFromFile[T])
   }
 
   private def serializeIntoFile(file: File, value: AnyRef) {
     val created = file.createNewFile()
     if (!created) { throw new IllegalStateException("Could not create file: " 
+ file) }
 
-    val serializer = serialization.findSerializerFor(value)
-    val serialized = serializer.toBinary(value)
-
-    val out = new FileOutputStream(file)
+    val out = serializer.serializeStream(new FileOutputStream(file))   
     try {
-      out.write(serialized)
+      out.writeObject(value)
     } finally {
       out.close()
     }
+
   }
 
-  def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
+  def deserializeFromFile[T](file: File): T = {
     val fileData = new Array[Byte](file.length().asInstanceOf[Int])
     val dis = new DataInputStream(new FileInputStream(file))
     try {
@@ -99,8 +75,6 @@ private[spark] class FileSystemPersistenceEngine(
       dis.close()
     }
 
-    val clazz = m.runtimeClass.asInstanceOf[Class[T]]
-    val serializer = serialization.serializerFor(clazz)
-    serializer.fromBinary(fileData).asInstanceOf[T]
+    serializer.deserializeStream(dis).readObject()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index 4433a2e..cf77c86 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -17,30 +17,27 @@
 
 package org.apache.spark.deploy.master
 
-import akka.actor.{Actor, ActorRef}
-
-import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
+import org.apache.spark.annotation.DeveloperApi
 
 /**
- * A LeaderElectionAgent keeps track of whether the current Master is the 
leader, meaning it
- * is the only Master serving requests.
- * In addition to the API provided, the LeaderElectionAgent will use of the 
following messages
- * to inform the Master of leader changes:
- * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader 
ElectedLeader]]
- * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership 
RevokedLeadership]]
+ * :: DeveloperApi ::
+ *
+ * A LeaderElectionAgent tracks current master and is a common interface for 
all election Agents.
  */
-private[spark] trait LeaderElectionAgent extends Actor {
-  // TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need 
refactoring.
-  val masterActor: ActorRef
+@DeveloperApi
+trait LeaderElectionAgent {
+  val masterActor: LeaderElectable
+  def stop() {} // to avoid noops in implementations.
 }
 
-/** Single-node implementation of LeaderElectionAgent -- we're initially and 
always the leader. */
-private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends 
LeaderElectionAgent {
-  override def preStart() {
-    masterActor ! ElectedLeader
-  }
+@DeveloperApi
+trait LeaderElectable {
+  def electedLeader()
+  def revokedLeadership()
+}
 
-  override def receive = {
-    case _ =>
-  }
+/** Single-node implementation of LeaderElectionAgent -- we're initially and 
always the leader. */
+private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
+  extends LeaderElectionAgent {
+  masterActor.electedLeader()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2f81d47..021454e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -50,7 +50,7 @@ private[spark] class Master(
     port: Int,
     webUiPort: Int,
     val securityMgr: SecurityManager)
-  extends Actor with ActorLogReceive with Logging {
+  extends Actor with ActorLogReceive with Logging with LeaderElectable {
 
   import context.dispatcher   // to use Akka's scheduler.schedule()
 
@@ -61,7 +61,6 @@ private[spark] class Master(
   val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 
200)
   val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
   val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
-  val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
   val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
 
   val workers = new HashSet[WorkerInfo]
@@ -103,7 +102,7 @@ private[spark] class Master(
 
   var persistenceEngine: PersistenceEngine = _
 
-  var leaderElectionAgent: ActorRef = _
+  var leaderElectionAgent: LeaderElectionAgent = _
 
   private var recoveryCompletionTask: Cancellable = _
 
@@ -130,23 +129,24 @@ private[spark] class Master(
     masterMetricsSystem.start()
     applicationMetricsSystem.start()
 
-    persistenceEngine = RECOVERY_MODE match {
+    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
       case "ZOOKEEPER" =>
         logInfo("Persisting recovery state to ZooKeeper")
-        new ZooKeeperPersistenceEngine(SerializationExtension(context.system), 
conf)
+        val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
+        (zkFactory.createPersistenceEngine(), 
zkFactory.createLeaderElectionAgent(this))
       case "FILESYSTEM" =>
-        logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
-        new FileSystemPersistenceEngine(RECOVERY_DIR, 
SerializationExtension(context.system))
+        val fsFactory = new FileSystemRecoveryModeFactory(conf)
+        (fsFactory.createPersistenceEngine(), 
fsFactory.createLeaderElectionAgent(this))
+      case "CUSTOM" =>
+        val clazz = 
Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
+        val factory = clazz.getConstructor(conf.getClass)
+          .newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
+        (factory.createPersistenceEngine(), 
factory.createLeaderElectionAgent(this))
       case _ =>
-        new BlackHolePersistenceEngine()
+        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
     }
-
-    leaderElectionAgent = RECOVERY_MODE match {
-        case "ZOOKEEPER" =>
-          context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, 
masterUrl, conf))
-        case _ =>
-          context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
-      }
+    persistenceEngine = persistenceEngine_
+    leaderElectionAgent = leaderElectionAgent_
   }
 
   override def preRestart(reason: Throwable, message: Option[Any]) {
@@ -165,7 +165,15 @@ private[spark] class Master(
     masterMetricsSystem.stop()
     applicationMetricsSystem.stop()
     persistenceEngine.close()
-    context.stop(leaderElectionAgent)
+    leaderElectionAgent.stop()
+  }
+
+  override def electedLeader() {
+    self ! ElectedLeader
+  }
+
+  override def revokedLeadership() {
+    self ! RevokedLeadership
   }
 
   override def receiveWithLogging = {

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
index e3640ea..2e0e1e7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.deploy.master
 
+import org.apache.spark.annotation.DeveloperApi
+
+import scala.reflect.ClassTag
+
 /**
  * Allows Master to persist any state that is necessary in order to recover 
from a failure.
  * The following semantics are required:
@@ -25,36 +29,70 @@ package org.apache.spark.deploy.master
  * Given these two requirements, we will have all apps and workers persisted, 
but
  * we might not have yet deleted apps or workers that finished (so their 
liveness must be verified
  * during recovery).
+ *
+ * The implementation of this trait defines how name-object pairs are stored 
or retrieved.
  */
-private[spark] trait PersistenceEngine {
-  def addApplication(app: ApplicationInfo)
+@DeveloperApi
+trait PersistenceEngine {
 
-  def removeApplication(app: ApplicationInfo)
+  /**
+   * Defines how the object is serialized and persisted. Implementation will
+   * depend on the store used.
+   */
+  def persist(name: String, obj: Object)
 
-  def addWorker(worker: WorkerInfo)
+  /**
+   * Defines how the object referred by its name is removed from the store.
+   */
+  def unpersist(name: String)
 
-  def removeWorker(worker: WorkerInfo)
+  /**
+   * Gives all objects, matching a prefix. This defines how objects are
+   * read/deserialized back.
+   */
+  def read[T: ClassTag](prefix: String): Seq[T]
 
-  def addDriver(driver: DriverInfo)
+  final def addApplication(app: ApplicationInfo): Unit = {
+    persist("app_" + app.id, app)
+  }
 
-  def removeDriver(driver: DriverInfo)
+  final def removeApplication(app: ApplicationInfo): Unit = {
+    unpersist("app_" + app.id)
+  }
+
+  final def addWorker(worker: WorkerInfo): Unit = {
+    persist("worker_" + worker.id, worker)
+  }
+
+  final def removeWorker(worker: WorkerInfo): Unit = {
+    unpersist("worker_" + worker.id)
+  }
+
+  final def addDriver(driver: DriverInfo): Unit = {
+    persist("driver_" + driver.id, driver)
+  }
+
+  final def removeDriver(driver: DriverInfo): Unit = {
+    unpersist("driver_" + driver.id)
+  }
 
   /**
    * Returns the persisted data sorted by their respective ids (which implies 
that they're
    * sorted by time of creation).
    */
-  def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], 
Seq[WorkerInfo])
+  final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], 
Seq[WorkerInfo]) = {
+    (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), 
read[WorkerInfo]("worker_"))
+  }
 
   def close() {}
 }
 
 private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
-  override def addApplication(app: ApplicationInfo) {}
-  override def removeApplication(app: ApplicationInfo) {}
-  override def addWorker(worker: WorkerInfo) {}
-  override def removeWorker(worker: WorkerInfo) {}
-  override def addDriver(driver: DriverInfo) {}
-  override def removeDriver(driver: DriverInfo) {}
-
-  override def readPersistedData() = (Nil, Nil, Nil)
+
+  override def persist(name: String, obj: Object): Unit = {}
+
+  override def unpersist(name: String): Unit = {}
+
+  override def read[T: ClassTag](name: String): Seq[T] = Nil
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
new file mode 100644
index 0000000..d9d36c1
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.serializer.JavaSerializer
+
+/**
+ * ::DeveloperApi::
+ *
+ * Implementation of this class can be plugged in as recovery mode alternative 
for Spark's
+ * Standalone mode.
+ *
+ */
+@DeveloperApi
+abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
+
+  /**
+   * PersistenceEngine defines how the persistent data(Information about 
worker, driver etc..)
+   * is handled for recovery.
+   *
+   */
+  def createPersistenceEngine(): PersistenceEngine
+
+  /**
+   * Create an instance of LeaderAgent that decides who gets elected as master.
+   */
+  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent
+}
+
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the 
actual
+ * recovery is made by restoring from filesystem.
+ */
+private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf)
+  extends StandaloneRecoveryModeFactory(conf) with Logging {
+  val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+
+  def createPersistenceEngine() = {
+    logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
+    new FileSystemPersistenceEngine(RECOVERY_DIR, new JavaSerializer(conf))
+  }
+
+  def createLeaderElectionAgent(master: LeaderElectable) = new 
MonarchyLeaderAgent(master)
+}
+
+private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf)
+  extends StandaloneRecoveryModeFactory(conf) {
+  def createPersistenceEngine() = new ZooKeeperPersistenceEngine(new 
JavaSerializer(conf), conf)
+
+  def createLeaderElectionAgent(master: LeaderElectable) =
+    new ZooKeeperLeaderElectionAgent(master, conf)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index d221b0f..473ddc2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 
 import akka.actor.ActorRef
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
 
 private[spark] class WorkerInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 285f9b0..8eaa0ad 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -24,9 +24,8 @@ import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.curator.framework.CuratorFramework
 import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, 
LeaderLatch}
 
-private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
-    masterUrl: String, conf: SparkConf)
-  extends LeaderElectionAgent with LeaderLatchListener with Logging  {
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: 
LeaderElectable,
+    conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with 
Logging  {
 
   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + 
"/leader_election"
 
@@ -34,30 +33,21 @@ private[spark] class ZooKeeperLeaderElectionAgent(val 
masterActor: ActorRef,
   private var leaderLatch: LeaderLatch = _
   private var status = LeadershipStatus.NOT_LEADER
 
-  override def preStart() {
+  start()
 
+  def start() {
     logInfo("Starting ZooKeeper LeaderElection agent")
     zk = SparkCuratorUtil.newClient(conf)
     leaderLatch = new LeaderLatch(zk, WORKING_DIR)
     leaderLatch.addListener(this)
-
     leaderLatch.start()
   }
 
-  override def preRestart(reason: scala.Throwable, message: 
scala.Option[scala.Any]) {
-    logError("LeaderElectionAgent failed...", reason)
-    super.preRestart(reason, message)
-  }
-
-  override def postStop() {
+  override def stop() {
     leaderLatch.close()
     zk.close()
   }
 
-  override def receive = {
-    case _ =>
-  }
-
   override def isLeader() {
     synchronized {
       // could have lost leadership by now.
@@ -85,10 +75,10 @@ private[spark] class ZooKeeperLeaderElectionAgent(val 
masterActor: ActorRef,
   def updateLeadershipStatus(isLeader: Boolean) {
     if (isLeader && status == LeadershipStatus.NOT_LEADER) {
       status = LeadershipStatus.LEADER
-      masterActor ! ElectedLeader
+      masterActor.electedLeader()
     } else if (!isLeader && status == LeadershipStatus.LEADER) {
       status = LeadershipStatus.NOT_LEADER
-      masterActor ! RevokedLeadership
+      masterActor.revokedLeadership()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 834dfed..96c2139 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -19,72 +19,54 @@ package org.apache.spark.deploy.master
 
 import scala.collection.JavaConversions._
 
-import akka.serialization.Serialization
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
 import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.serializer.Serializer
+import java.nio.ByteBuffer
 
-class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
+import scala.reflect.ClassTag
+
+
+private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, 
conf: SparkConf)
   extends PersistenceEngine
   with Logging
 {
   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + 
"/master_status"
   val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
 
-  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
-
-  override def addApplication(app: ApplicationInfo) {
-    serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
-  }
+  val serializer = serialization.newInstance()
 
-  override def removeApplication(app: ApplicationInfo) {
-    zk.delete().forPath(WORKING_DIR + "/app_" + app.id)
-  }
+  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
 
-  override def addDriver(driver: DriverInfo) {
-    serializeIntoFile(WORKING_DIR + "/driver_" + driver.id, driver)
-  }
 
-  override def removeDriver(driver: DriverInfo) {
-    zk.delete().forPath(WORKING_DIR + "/driver_" + driver.id)
+  override def persist(name: String, obj: Object): Unit = {
+    serializeIntoFile(WORKING_DIR + "/" + name, obj)
   }
 
-  override def addWorker(worker: WorkerInfo) {
-    serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
+  override def unpersist(name: String): Unit = {
+    zk.delete().forPath(WORKING_DIR + "/" + name)
   }
 
-  override def removeWorker(worker: WorkerInfo) {
-    zk.delete().forPath(WORKING_DIR + "/worker_" + worker.id)
+  override def read[T: ClassTag](prefix: String) = {
+    val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix))
+    file.map(deserializeFromFile[T]).flatten
   }
 
   override def close() {
     zk.close()
   }
 
-  override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], 
Seq[WorkerInfo]) = {
-    val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
-    val appFiles = sortedFiles.filter(_.startsWith("app_"))
-    val apps = appFiles.map(deserializeFromFile[ApplicationInfo]).flatten
-    val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
-    val drivers = driverFiles.map(deserializeFromFile[DriverInfo]).flatten
-    val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
-    val workers = workerFiles.map(deserializeFromFile[WorkerInfo]).flatten
-    (apps, drivers, workers)
-  }
-
   private def serializeIntoFile(path: String, value: AnyRef) {
-    val serializer = serialization.findSerializerFor(value)
-    val serialized = serializer.toBinary(value)
-    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
+    val serialized = serializer.serialize(value)
+    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, 
serialized.array())
   }
 
-  def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): 
Option[T] = {
+  def deserializeFromFile[T](filename: String): Option[T] = {
     val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
-    val clazz = m.runtimeClass.asInstanceOf[Class[T]]
-    val serializer = serialization.serializerFor(clazz)
     try {
-      Some(serializer.fromBinary(fileData).asInstanceOf[T])
+      Some(serializer.deserialize(ByteBuffer.wrap(fileData)))
     } catch {
       case e: Exception => {
         logWarning("Exception while reading persisted file, deleting", e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to