Hi, I'm running benchmark, which compares Mahout and SparkML. For now I
have next results for k-means:
Number of iterations= 10, number of elements = 10000000, mahouttime= 602,
spark time = 138
Number of iterations= 40, number of elements = 10000000, mahouttime= 1917,
spark time = 330
Number of iterations= 70, number of elements = 10000000, mahouttime= 3203,
spark time = 388
Number of iterations= 10, number of elements = 100000000, mahouttime= 1235,
spark time = 2226
Number of iterations= 40, number of elements = 100000000, mahouttime= 2755,
spark time = 6388
Number of iterations= 70, number of elements = 100000000, mahouttime= 4107,
spark time = 10967
Number of iterations= 10, number of elements = 1000000000, mahouttime=
7070, spark time = 25268

Time in seconds. It runs on Yarn cluster with about 40 machines. Elements
for clusterization are randomly created. When I changed persistence level
from Memory to Memory_and_disk, on big data spark started to work faster.

What am I missing?

See my benchmarking code in attachment.


-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*
package ru.yandex.spark.examples

import scala.util.Random
import scala.collection.mutable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import ru.yandex.spark.benchmark.Job
import org.apache.mahout.common.distance.EuclideanDistanceMeasure
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{LoggerFactory, Logger}
import org.apache.spark.storage.StorageLevel

object KMeansBenchMark {

  private final val log: Logger = LoggerFactory.getLogger(this.getClass)

  val benchPath: Path = new Path("/tmp/benchmark")
  val inputDataPath: Path = new Path("/tmp/benchmark/testdata")
  val outputDataPath: Path = new Path("/tmp/benchmark/output")

  val configuration = new Configuration()
  val fs = FileSystem.get(FileSystem.getDefaultUri(configuration), configuration)

  def main(args: Array[String]) {

    type MahoutTime = Long
    type SparkTime = Long
    type NumberOfIterations = Int
    type NumberOfElements = Long

    val result = new mutable.MutableList[(NumberOfIterations, NumberOfElements, MahoutTime, SparkTime)]


    System.setProperty("SPARK_YARN_APP_JAR", SparkContext.jarOfClass(this.getClass).head)
    System.setProperty("SPARK_JAR", SparkContext.jarOfClass(SparkContext.getClass).head)
    System.setProperty("spark.driver.port", "49014")

    val conf = new SparkConf()
    conf.setAppName("serp-api")
    conf.setMaster("yarn-client")
    conf.set("spark.httpBroadcast.port", "35660")
    conf.set("spark.fileserver.port", "35661")
    conf.setJars(SparkContext.jarOfClass(this.getClass))


    val numbers = List(10000000L, 100000000L, 1000000000L, 1000000000L)

    for (numberOfElements: NumberOfElements <- numbers) {
      for (numberOfIterations: NumberOfIterations <- 10 until 80 by 30) {
        println(s"------------------------------------- ${numberOfElements} ${numberOfIterations}")
        prepareData(numberOfElements)

        val sparkStart = System.currentTimeMillis()
        val spark = new SparkContext(conf)
        val input = spark.textFile(inputDataPath.toString).map(s => s.split(" ").map(number => number.toDouble)).persist(StorageLevel.DISK_ONLY)
        KMeans.train(input, 10, numberOfIterations, 1, KMeans.RANDOM).clusterCenters
        spark.stop()
        val sparkEnd = System.currentTimeMillis()

        val mahaoutStart = System.currentTimeMillis()
        Job.run(configuration, inputDataPath, outputDataPath, new EuclideanDistanceMeasure, 10, 0.5, numberOfIterations)
        val mahaoutEnd = System.currentTimeMillis()

        val mahaoutTime: MahoutTime = (mahaoutEnd - mahaoutStart) / 1000
        val sparkTime: SparkTime = (sparkEnd - sparkStart) / 1000
        result += ((numberOfIterations, numberOfElements, mahaoutTime, sparkTime))
        for (i <- result) {
          log.info(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}")
        }
        for (i <- result) {
          println(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}")
        }
      }
    }

  }

  def prepareData(numberOfElements: Long) = {
    fs.delete(benchPath, true)
    fs.mkdirs(benchPath)
    val output = fs.create(inputDataPath)
    for (i <- 0L until numberOfElements) {
      output.writeBytes(nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + "\n")
    }
    output.close()
  }

  def nextRandom = {
    Random.nextGaussian() * 10e5 - Random.nextInt(10) * 10e4
  }

}

Reply via email to