Hi, I'm running benchmark, which compares Mahout and SparkML. For now I have next results for k-means: Number of iterations= 10, number of elements = 10000000, mahouttime= 602, spark time = 138 Number of iterations= 40, number of elements = 10000000, mahouttime= 1917, spark time = 330 Number of iterations= 70, number of elements = 10000000, mahouttime= 3203, spark time = 388 Number of iterations= 10, number of elements = 100000000, mahouttime= 1235, spark time = 2226 Number of iterations= 40, number of elements = 100000000, mahouttime= 2755, spark time = 6388 Number of iterations= 70, number of elements = 100000000, mahouttime= 4107, spark time = 10967 Number of iterations= 10, number of elements = 1000000000, mahouttime= 7070, spark time = 25268
Time in seconds. It runs on Yarn cluster with about 40 machines. Elements for clusterization are randomly created. When I changed persistence level from Memory to Memory_and_disk, on big data spark started to work faster. What am I missing? See my benchmarking code in attachment. -- *Sincerely yoursEgor PakhomovScala Developer, Yandex*
package ru.yandex.spark.examples import scala.util.Random import scala.collection.mutable import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration import ru.yandex.spark.benchmark.Job import org.apache.mahout.common.distance.EuclideanDistanceMeasure import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.{LoggerFactory, Logger} import org.apache.spark.storage.StorageLevel object KMeansBenchMark { private final val log: Logger = LoggerFactory.getLogger(this.getClass) val benchPath: Path = new Path("/tmp/benchmark") val inputDataPath: Path = new Path("/tmp/benchmark/testdata") val outputDataPath: Path = new Path("/tmp/benchmark/output") val configuration = new Configuration() val fs = FileSystem.get(FileSystem.getDefaultUri(configuration), configuration) def main(args: Array[String]) { type MahoutTime = Long type SparkTime = Long type NumberOfIterations = Int type NumberOfElements = Long val result = new mutable.MutableList[(NumberOfIterations, NumberOfElements, MahoutTime, SparkTime)] System.setProperty("SPARK_YARN_APP_JAR", SparkContext.jarOfClass(this.getClass).head) System.setProperty("SPARK_JAR", SparkContext.jarOfClass(SparkContext.getClass).head) System.setProperty("spark.driver.port", "49014") val conf = new SparkConf() conf.setAppName("serp-api") conf.setMaster("yarn-client") conf.set("spark.httpBroadcast.port", "35660") conf.set("spark.fileserver.port", "35661") conf.setJars(SparkContext.jarOfClass(this.getClass)) val numbers = List(10000000L, 100000000L, 1000000000L, 1000000000L) for (numberOfElements: NumberOfElements <- numbers) { for (numberOfIterations: NumberOfIterations <- 10 until 80 by 30) { println(s"------------------------------------- ${numberOfElements} ${numberOfIterations}") prepareData(numberOfElements) val sparkStart = System.currentTimeMillis() val spark = new SparkContext(conf) val input = spark.textFile(inputDataPath.toString).map(s => s.split(" ").map(number => number.toDouble)).persist(StorageLevel.DISK_ONLY) KMeans.train(input, 10, numberOfIterations, 1, KMeans.RANDOM).clusterCenters spark.stop() val sparkEnd = System.currentTimeMillis() val mahaoutStart = System.currentTimeMillis() Job.run(configuration, inputDataPath, outputDataPath, new EuclideanDistanceMeasure, 10, 0.5, numberOfIterations) val mahaoutEnd = System.currentTimeMillis() val mahaoutTime: MahoutTime = (mahaoutEnd - mahaoutStart) / 1000 val sparkTime: SparkTime = (sparkEnd - sparkStart) / 1000 result += ((numberOfIterations, numberOfElements, mahaoutTime, sparkTime)) for (i <- result) { log.info(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}") } for (i <- result) { println(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}") } } } } def prepareData(numberOfElements: Long) = { fs.delete(benchPath, true) fs.mkdirs(benchPath) val output = fs.create(inputDataPath) for (i <- 0L until numberOfElements) { output.writeBytes(nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + "\n") } output.close() } def nextRandom = { Random.nextGaussian() * 10e5 - Random.nextInt(10) * 10e4 } }