[ https://issues.apache.org/jira/browse/SPARK-30661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479793#comment-17479793 ]
zhengruifeng commented on SPARK-30661: -------------------------------------- according to https://issues.apache.org/jira/browse/SPARK-31454, dense kmeans with MKL can be 3.5X faster than existing impl. I re-tested my blocked impl on a relative low-dimensional dataset. native BLAS (openblas) was linked. total trainng duraion: !image-2022-01-21-10-42-41-771.png! on dense dataset, it could be 30% ~ 3x faster than existing impl. on spase dataset, it is usually slower than existing impl. (according to my exprerience, it maybe 10X slower than existing impl on some high dimensional (dim>100,000) datasets.) test code {code:java} import scala.util.Randomimport org.apache.spark.ml.linalg._ import org.apache.spark.ml.clustering._ import org.apache.spark.sql.functions._ import org.apache.spark.storage.StorageLevel // val df = spark.read.option("numFeatures", "28").format("libsvm").load("/d1/Datasets/higgs/HIGGS") val df = spark.read.parquet("/d1/Datasets/higgs/HIGGS.parquet").repartition(24) df.persist(StorageLevel.MEMORY_AND_DISK) df.count df.count def getSparseUDF(dim: Int) = { val rng = new Random(123) val newIndices = rng.shuffle(Seq.range(0, dim)).take(28).toArray.sorted udf { vec: Vector => Vectors.sparse(dim, newIndices, vec.toArray).compressed } } // sc.setLogLevel("INFO") // blocked impl val km = new KMeans().setInitMode("random").setMaxIter(5).setTol(0) for (dim <- Seq(28, 280, 2800); k <- Seq(2, 8, 32, 128); size <- Seq(1, 4, 16)) { Thread.sleep(1000) val ds = if (dim == 28) { df } else { val sparseUDF = getSparseUDF(dim); df.withColumn("features", sparseUDF(col("features"))) } val start = System.currentTimeMillis val model = km.setK(k).setMaxBlockSizeInMB(size).fit(ds) val end = System.currentTimeMillis println((model.uid, dim, k, size, end - start, model.summary.trainingCost, model.summary.numIter, model.centerMatrix.toString.take(20))) } // existing impl val km = new KMeans().setInitMode("random").setMaxIter(5).setTol(0) for (dim <- Seq(28, 280, 2800); k <- Seq(2, 8, 32, 128)) { Thread.sleep(1000) val ds = if (dim == 28) { df } else { val sparseUDF = getSparseUDF(dim); df.withColumn("features", sparseUDF(col("features"))) } val start = System.currentTimeMillis val model = km.setK(k).fit(ds) val end = System.currentTimeMillis println((model.uid, dim, k, end - start, model.summary.trainingCost, model.summary.numIter, model.clusterCenters.head.toString.take(20))) }{code} > KMeans blockify input vectors > ----------------------------- > > Key: SPARK-30661 > URL: https://issues.apache.org/jira/browse/SPARK-30661 > Project: Spark > Issue Type: Sub-task > Components: ML, PySpark > Affects Versions: 3.0.0 > Reporter: zhengruifeng > Assignee: zhengruifeng > Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org