[ 
https://issues.apache.org/jira/browse/SPARK-30661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479793#comment-17479793
 ] 

zhengruifeng commented on SPARK-30661:
--------------------------------------

according to https://issues.apache.org/jira/browse/SPARK-31454, dense kmeans 
with MKL can be 3.5X faster than existing impl.

 

I re-tested my blocked impl on a relative low-dimensional dataset.  native BLAS 
(openblas) was linked.

total trainng duraion:

!image-2022-01-21-10-42-41-771.png!

 

on dense dataset, it could be 30% ~ 3x faster than existing impl.

on spase dataset, it is usually slower than existing impl.  (according to my 
exprerience, it maybe 10X slower than existing impl on some high dimensional 
(dim>100,000) datasets.)

 

test code
{code:java}
import scala.util.Randomimport org.apache.spark.ml.linalg._
import org.apache.spark.ml.clustering._
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

// val df = spark.read.option("numFeatures", 
"28").format("libsvm").load("/d1/Datasets/higgs/HIGGS")
val df = spark.read.parquet("/d1/Datasets/higgs/HIGGS.parquet").repartition(24)
df.persist(StorageLevel.MEMORY_AND_DISK)

df.count
df.count

def getSparseUDF(dim: Int) = {
    val rng = new Random(123)
    val newIndices = rng.shuffle(Seq.range(0, dim)).take(28).toArray.sorted
    udf { vec: Vector =>
        Vectors.sparse(dim, newIndices, vec.toArray).compressed
    }
}

// sc.setLogLevel("INFO")

// blocked impl
val km = new KMeans().setInitMode("random").setMaxIter(5).setTol(0)

for (dim <- Seq(28, 280, 2800); k <- Seq(2, 8, 32, 128); size <- Seq(1, 4, 16)) 
{
    Thread.sleep(1000)
    val ds = if (dim == 28) { df } else { val sparseUDF = getSparseUDF(dim); 
df.withColumn("features", sparseUDF(col("features"))) }
    val start = System.currentTimeMillis
    val model = km.setK(k).setMaxBlockSizeInMB(size).fit(ds)
    val end = System.currentTimeMillis
    println((model.uid, dim, k, size, end - start, model.summary.trainingCost, 
model.summary.numIter, model.centerMatrix.toString.take(20)))
} 


// existing impl
val km = new KMeans().setInitMode("random").setMaxIter(5).setTol(0)

for (dim <- Seq(28, 280, 2800); k <- Seq(2, 8, 32, 128)) {
    Thread.sleep(1000)
    val ds = if (dim == 28) { df } else { val sparseUDF = getSparseUDF(dim); 
df.withColumn("features", sparseUDF(col("features"))) }
    val start = System.currentTimeMillis
    val model = km.setK(k).fit(ds)
    val end = System.currentTimeMillis
    println((model.uid, dim, k, end - start, model.summary.trainingCost, 
model.summary.numIter, model.clusterCenters.head.toString.take(20)))
}{code}

> KMeans blockify input vectors
> -----------------------------
>
>                 Key: SPARK-30661
>                 URL: https://issues.apache.org/jira/browse/SPARK-30661
>             Project: Spark
>          Issue Type: Sub-task
>          Components: ML, PySpark
>    Affects Versions: 3.0.0
>            Reporter: zhengruifeng
>            Assignee: zhengruifeng
>            Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to