In the Spark core "example" directory (I am using Spark 1.2.0), there is an
example called "SparkPageRank.scala",

val sparkConf = new SparkConf().setAppName("PageRank")
val iters = if (args.length > 0) args(1).toInt else 10
val ctx = new SparkContext(sparkConf)
val lines = ctx.textFile(args(0), 1)
val links = lines.map{ s =>
  val parts = s.split("\\s+")
  (parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)

for (i <- 1 to iters) {
  val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
    val size = urls.size
    urls.map(url => (url, rank / size))
  }
  ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}

val output = ranks.collect()

ctx.stop()

}

I realize that in this example, the lineage will keep extending after each
iteration. As a result, when I monitored the directory that holds the
shuffle data, the shuffle data storage keeps increasing after each
iteration.

How should I structure the application code, so that the ContextCleaner's
doCleanupShuffle will be activated after certain interval (say, several
iterations), so that I can prevent the ever-increasing of the shuffle data
storage for computation that takes many iterations?

Jun

Reply via email to