Repository: spark Updated Branches: refs/heads/master 1fd2bfd3d -> 46bcb9551
SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID Previously, ZooKeeperPersistenceEngine would crash the whole Master process if there was stored data from a prior Spark version. Now, we just delete these files. Author: Aaron Davidson <aa...@databricks.com> Closes #4 from aarondav/zookeeper2 and squashes the following commits: fa8b40f [Aaron Davidson] SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46bcb955 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46bcb955 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46bcb955 Branch: refs/heads/master Commit: 46bcb9551eb918ac4a31cd4cca924b432f6dc352 Parents: 1fd2bfd Author: Aaron Davidson <aa...@databricks.com> Authored: Sun Mar 2 01:00:42 2014 -0800 Committer: Reynold Xin <r...@apache.org> Committed: Sun Mar 2 01:00:42 2014 -0800 ---------------------------------------------------------------------- .../master/ZooKeeperPersistenceEngine.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/46bcb955/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 9390062..5413ff6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -64,11 +64,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted val appFiles = sortedFiles.filter(_.startsWith("app_")) - val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val apps = appFiles.map(deserializeFromFile[ApplicationInfo]).flatten val driverFiles = sortedFiles.filter(_.startsWith("driver_")) - val drivers = driverFiles.map(deserializeFromFile[DriverInfo]) + val drivers = driverFiles.map(deserializeFromFile[DriverInfo]).flatten val workerFiles = sortedFiles.filter(_.startsWith("worker_")) - val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) + val workers = workerFiles.map(deserializeFromFile[WorkerInfo]).flatten (apps, drivers, workers) } @@ -78,10 +78,18 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized) } - def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = { + def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): Option[T] = { val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename) val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) - serializer.fromBinary(fileData).asInstanceOf[T] + try { + Some(serializer.fromBinary(fileData).asInstanceOf[T]) + } catch { + case e: Exception => { + logWarning("Exception while reading persisted file, deleting", e) + zk.delete().forPath(WORKING_DIR + "/" + filename) + None + } + } } }