Repository: spark
Updated Branches:
  refs/heads/master d8f528ceb -> f10ae4b1e


[SPARK-6717][ML] Clear shuffle files after checkpointing in ALS

## What changes were proposed in this pull request?

When ALS is run with a checkpoint interval, during the checkpoint materialize 
the current state and cleanup the previous shuffles (non-blocking).

## How was this patch tested?

Existing ALS unit tests, new ALS checkpoint cleanup unit tests added & shuffle 
files checked after ALS w/checkpointing run.

Author: Holden Karau <hol...@us.ibm.com>
Author: Holden Karau <hol...@pigscanfly.ca>

Closes #11919 from 
holdenk/SPARK-6717-clear-shuffle-files-after-checkpointing-in-ALS.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f10ae4b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f10ae4b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f10ae4b1

Branch: refs/heads/master
Commit: f10ae4b1e169495af11b8e8123c60dd96174477e
Parents: d8f528c
Author: Holden Karau <hol...@us.ibm.com>
Authored: Tue May 3 00:18:10 2016 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Tue May 3 00:18:10 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala |   2 +-
 .../apache/spark/ml/recommendation/ALS.scala    |  35 +++-
 .../spark/ml/recommendation/ALSSuite.scala      | 203 +++++++++++++++----
 3 files changed, 195 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f10ae4b1/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index a51338c..c895fb3 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -212,7 +212,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
     }
   }
 
-  /** Perform shuffle cleanup, asynchronously. */
+  /** Perform shuffle cleanup. */
   def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
     try {
       logDebug("Cleaning shuffle " + shuffleId)

http://git-wip-us.apache.org/repos/asf/spark/blob/f10ae4b1/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 5419230..509c944 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 
-import org.apache.spark.Partitioner
+import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, 
SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.{Estimator, Model}
@@ -706,13 +706,15 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
         previousItemFactors.unpersist()
         
itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel)
         // TODO: Generalize PeriodicGraphCheckpointer and use it here.
+        val deps = itemFactors.dependencies
         if (shouldCheckpoint(iter)) {
-          itemFactors.checkpoint() // itemFactors gets materialized in 
computeFactors.
+          itemFactors.checkpoint() // itemFactors gets materialized in 
computeFactors
         }
         val previousUserFactors = userFactors
         userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, 
rank, regParam,
           itemLocalIndexEncoder, implicitPrefs, alpha, solver)
         if (shouldCheckpoint(iter)) {
+          ALS.cleanShuffleDependencies(sc, deps)
           deletePreviousCheckpointFile()
           previousCheckpointFile = itemFactors.getCheckpointFile
         }
@@ -723,8 +725,10 @@ object ALS extends DefaultParamsReadable[ALS] with Logging 
{
         itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, 
rank, regParam,
           userLocalIndexEncoder, solver = solver)
         if (shouldCheckpoint(iter)) {
+          val deps = itemFactors.dependencies
           itemFactors.checkpoint()
           itemFactors.count() // checkpoint item factors and cut lineage
+          ALS.cleanShuffleDependencies(sc, deps)
           deletePreviousCheckpointFile()
           previousCheckpointFile = itemFactors.getCheckpointFile
         }
@@ -1355,4 +1359,31 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
    * satisfies this requirement, we simply use a type alias here.
    */
   private[recommendation] type ALSPartitioner = 
org.apache.spark.HashPartitioner
+
+  /**
+   * Private function to clean up all of the shuffles files from the 
dependencies and their parents.
+   */
+  private[spark] def cleanShuffleDependencies[T](
+      sc: SparkContext,
+      deps: Seq[Dependency[_]],
+      blocking: Boolean = false): Unit = {
+    // If there is no reference tracking we skip clean up.
+    sc.cleaner.foreach { cleaner =>
+      /**
+       * Clean the shuffles & all of its parents.
+       */
+      def cleanEagerly(dep: Dependency[_]): Unit = {
+        if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) {
+          val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleId
+          cleaner.doCleanupShuffle(shuffleId, blocking)
+        }
+        val rdd = dep.rdd
+        val rddDeps = rdd.dependencies
+        if (rdd.getStorageLevel == StorageLevel.NONE && rddDeps != null) {
+          rddDeps.foreach(cleanEagerly)
+        }
+      }
+      deps.foreach(cleanEagerly)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f10ae4b1/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 4c4eb72..1704037 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -17,15 +17,19 @@
 
 package org.apache.spark.ml.recommendation
 
+import java.io.File
 import java.util.Random
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 import scala.language.existentials
 
 import com.github.fommil.netlib.BLAS.{getInstance => blas}
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.TrueFileFilter
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.recommendation.ALS._
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
@@ -34,8 +38,9 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class ALSSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest 
with Logging {
@@ -255,37 +260,7 @@ class ALSSuite
       rank: Int,
       noiseStd: Double = 0.0,
       seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = {
-    // The assumption of the implicit feedback model is that unobserved 
ratings are more likely to
-    // be negatives.
-    val positiveFraction = 0.8
-    val negativeFraction = 1.0 - positiveFraction
-    val trainingFraction = 0.6
-    val testFraction = 0.3
-    val totalFraction = trainingFraction + testFraction
-    val random = new Random(seed)
-    val userFactors = genFactors(numUsers, rank, random)
-    val itemFactors = genFactors(numItems, rank, random)
-    val training = ArrayBuffer.empty[Rating[Int]]
-    val test = ArrayBuffer.empty[Rating[Int]]
-    for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- 
itemFactors) {
-      val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1)
-      val threshold = if (rating > 0) positiveFraction else negativeFraction
-      val observed = random.nextDouble() < threshold
-      if (observed) {
-        val x = random.nextDouble()
-        if (x < totalFraction) {
-          if (x < trainingFraction) {
-            val noise = noiseStd * random.nextGaussian()
-            training += Rating(userId, itemId, rating + noise.toFloat)
-          } else {
-            test += Rating(userId, itemId, rating)
-          }
-        }
-      }
-    }
-    logInfo(s"Generated an implicit feedback dataset with ${training.size} 
ratings for training " +
-      s"and ${test.size} for test.")
-    (sc.parallelize(training, 2), sc.parallelize(test, 2))
+    ALSSuite.genImplicitTestData(sc, numUsers, numItems, rank, noiseStd, seed)
   }
 
   /**
@@ -304,14 +279,7 @@ class ALSSuite
       random: Random,
       a: Float = -1.0f,
       b: Float = 1.0f): Seq[(Int, Array[Float])] = {
-    require(size > 0 && size < Int.MaxValue / 3)
-    require(b > a)
-    val ids = mutable.Set.empty[Int]
-    while (ids.size < size) {
-      ids += random.nextInt()
-    }
-    val width = b - a
-    ids.toSeq.sorted.map(id => (id, Array.fill(rank)(a + random.nextFloat() * 
width)))
+    ALSSuite.genFactors(size, rank, random, a, b)
   }
 
   /**
@@ -520,6 +488,79 @@ class ALSSuite
   }
 }
 
+class ALSCleanerSuite extends SparkFunSuite {
+  test("ALS shuffle cleanup standalone") {
+    val conf = new SparkConf()
+    val localDir = Utils.createTempDir()
+    val checkpointDir = Utils.createTempDir()
+    def getAllFiles: Set[File] =
+      FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, 
TrueFileFilter.INSTANCE).asScala.toSet
+    try {
+      conf.set("spark.local.dir", localDir.getAbsolutePath)
+      val sc = new SparkContext("local[2]", "test", conf)
+      try {
+        sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+        // Test checkpoint and clean parents
+        val input = sc.parallelize(1 to 1000)
+        val keyed = input.map(x => (x % 20, 1))
+        val shuffled = keyed.reduceByKey(_ + _)
+        val keysOnly = shuffled.keys
+        val deps = keysOnly.dependencies
+        keysOnly.count()
+        ALS.cleanShuffleDependencies(sc, deps, true)
+        val resultingFiles = getAllFiles
+        assert(resultingFiles === Set())
+        // Ensure running count again works fine even if we kill the shuffle 
files.
+        keysOnly.count()
+      } finally {
+        sc.stop()
+      }
+    } finally {
+      Utils.deleteRecursively(localDir)
+      Utils.deleteRecursively(checkpointDir)
+    }
+  }
+
+  test("ALS shuffle cleanup in algorithm") {
+    val conf = new SparkConf()
+    val localDir = Utils.createTempDir()
+    val checkpointDir = Utils.createTempDir()
+    def getAllFiles: Set[File] =
+      FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, 
TrueFileFilter.INSTANCE).asScala.toSet
+    try {
+      conf.set("spark.local.dir", localDir.getAbsolutePath)
+      val sc = new SparkContext("local[2]", "test", conf)
+      try {
+        sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+        // Generate test data
+        val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
+        // Implicitly test the cleaning of parents during ALS training
+        val sqlContext = new SQLContext(sc)
+        import sqlContext.implicits._
+        val als = new ALS()
+          .setRank(1)
+          .setRegParam(1e-5)
+          .setSeed(0)
+          .setCheckpointInterval(1)
+          .setMaxIter(7)
+        val model = als.fit(training.toDF())
+        val resultingFiles = getAllFiles
+        // We expect the last shuffles files, block ratings, user factors, and 
item factors to be
+        // around but no more.
+        val pattern = "shuffle_(\\d+)_.+\\.data".r
+        val rddIds = resultingFiles.flatMap { f =>
+          pattern.findAllIn(f.getName()).matchData.map { _.group(1) } }
+        assert(rddIds.toSet.size === 4)
+      } finally {
+        sc.stop()
+      }
+    } finally {
+      Utils.deleteRecursively(localDir)
+      Utils.deleteRecursively(checkpointDir)
+    }
+  }
+}
+
 class ALSStorageSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest 
with Logging {
 
@@ -591,7 +632,7 @@ private class IntermediateRDDStorageListener extends 
SparkListener {
 
 }
 
-object ALSSuite {
+object ALSSuite extends Logging {
 
   /**
    * Mapping from all Params to valid settings which differ from the defaults.
@@ -620,4 +661,82 @@ object ALSSuite {
     "intermediateStorageLevel" -> "MEMORY_ONLY",
     "finalStorageLevel" -> "MEMORY_AND_DISK_SER"
   )
+
+  // Helper functions to generate test data we share between ALS test suites
+
+  /**
+   * Generates random user/item factors, with i.i.d. values drawn from U(a, b).
+   * @param size number of users/items
+   * @param rank number of features
+   * @param random random number generator
+   * @param a min value of the support (default: -1)
+   * @param b max value of the support (default: 1)
+   * @return a sequence of (ID, factors) pairs
+   */
+  private def genFactors(
+      size: Int,
+      rank: Int,
+      random: Random,
+      a: Float = -1.0f,
+      b: Float = 1.0f): Seq[(Int, Array[Float])] = {
+    require(size > 0 && size < Int.MaxValue / 3)
+    require(b > a)
+    val ids = mutable.Set.empty[Int]
+    while (ids.size < size) {
+      ids += random.nextInt()
+    }
+    val width = b - a
+    ids.toSeq.sorted.map(id => (id, Array.fill(rank)(a + random.nextFloat() * 
width)))
+  }
+
+  /**
+   * Generates an implicit feedback dataset for testing ALS.
+   *
+   * @param sc SparkContext
+   * @param numUsers number of users
+   * @param numItems number of items
+   * @param rank rank
+   * @param noiseStd the standard deviation of additive Gaussian noise on 
training data
+   * @param seed random seed
+   * @return (training, test)
+   */
+  def genImplicitTestData(
+      sc: SparkContext,
+      numUsers: Int,
+      numItems: Int,
+      rank: Int,
+      noiseStd: Double = 0.0,
+      seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = {
+    // The assumption of the implicit feedback model is that unobserved 
ratings are more likely to
+    // be negatives.
+    val positiveFraction = 0.8
+    val negativeFraction = 1.0 - positiveFraction
+    val trainingFraction = 0.6
+    val testFraction = 0.3
+    val totalFraction = trainingFraction + testFraction
+    val random = new Random(seed)
+    val userFactors = genFactors(numUsers, rank, random)
+    val itemFactors = genFactors(numItems, rank, random)
+    val training = ArrayBuffer.empty[Rating[Int]]
+    val test = ArrayBuffer.empty[Rating[Int]]
+    for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- 
itemFactors) {
+      val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1)
+      val threshold = if (rating > 0) positiveFraction else negativeFraction
+      val observed = random.nextDouble() < threshold
+      if (observed) {
+        val x = random.nextDouble()
+        if (x < totalFraction) {
+          if (x < trainingFraction) {
+            val noise = noiseStd * random.nextGaussian()
+            training += Rating(userId, itemId, rating + noise.toFloat)
+          } else {
+            test += Rating(userId, itemId, rating)
+          }
+        }
+      }
+    }
+    logInfo(s"Generated an implicit feedback dataset with ${training.size} 
ratings for training " +
+      s"and ${test.size} for test.")
+    (sc.parallelize(training, 2), sc.parallelize(test, 2))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to