In the Spark core "example" directory (I am using Spark 1.2.0), there is an example called "SparkPageRank.scala",
val sparkConf = new SparkConf().setAppName("PageRank") val iters = if (args.length > 0) args(1).toInt else 10 val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(args(0), 1) val links = lines.map{ s => val parts = s.split("\\s+") (parts(0), parts(1)) }.distinct().groupByKey().cache() var ranks = links.mapValues(v => 1.0) for (i <- 1 to iters) { val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => val size = urls.size urls.map(url => (url, rank / size)) } ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) } val output = ranks.collect() ctx.stop() } I realize that in this example, the lineage will keep extending after each iteration. As a result, when I monitored the directory that holds the shuffle data, the shuffle data storage keeps increasing after each iteration. How should I structure the application code, so that the ContextCleaner's doCleanupShuffle will be activated after certain interval (say, several iterations), so that I can prevent the ever-increasing of the shuffle data storage for computation that takes many iterations? Jun