Hello all,
I am currently looking in 1 spark application to squeze little performance
and here this code (attached in email)

I looked in difference and in:
org.apache.spark.sql.catalyst.CatalystTypeConverters.ArrayConverter
if its primitive we still use boxing and unboxing version because in code
org.apache.spark.sql.catalyst.util.ArrayData#toArray
we don't use method :  ArrayData .toDoubleArray as its used in VectorUDT.

Now is the question do i need to provide patch or someone can me show it
how to get same performance with array as with dense vector.
Or i need to create jira ticket


Thanks
 import org.apache.spark.ml.linalg.{DenseVector, Vectors}
 import scala.util.Random
 import spark.implicits._
 
 val dotVector = udf {(x:DenseVector,y:DenseVector) => {
        var i = 0; var dotProduct = 0.0
        val size = x.size;val v1 = x.values; val v2 = y.values
        while (i < size) {
          dotProduct += v1(i) * v2(i)
          i += 1
        }
        dotProduct}}
val dotSeq = udf {(x:Seq[Double],y:Seq[Double]) => {
        var i = 0;var dotProduct = 0.0;val size = x.size
        while (i < size) {
          dotProduct += x(i) * y(i)
          i += 1
        }
        dotProduct}}
def time(name: String, block: => Unit): Float = {
    val t0 = System.nanoTime()
    block // call-by-name
    val t1 = System.nanoTime()
    //println(s"$name: " + (t1 - t0) / 1000000000f + "s")
    ((t1 - t0)/ 1000000000f )
 }
val densevector = udf { (p: Seq[Float]) =>  Vectors.dense(p.map(_.toDouble).toArray)   }
val genVec = udf { (l:Int,c:Int) => {
 val r = new Random(l*c)
 (1 to 300).map(p => r.nextDouble()).toArray}
 }

val dfBig = {Seq(1).toDF("s")
    .withColumn("line",explode(lit((1 to 1000).toArray)))
    .withColumn("column",explode(lit((1 to 200).toArray)))
    .withColumn("v1",genVec(col("line").+(lit(22)).*(lit(-1)),col("column")))
    .withColumn("v2",genVec(col("line"),col("column")))
    .withColumn("v1d",densevector(col("v1")))
    .withColumn("v2d",densevector(col("v2")))
    .repartition(1)
    .persist()}
dfBig.count
dfBig.show(10)
    
val arrayTime =(1 to 20).map {p=> time("array",dfBig.withColumn("dot",dotSeq(col("v1"),col("v2"))).sort(desc("dot")).limit(10).collect())}.sum /20
val vectorTime = (1 to 20).map {p=> time("array",dfBig.withColumn("dot",dotVector(col("v1d"),col("v2d"))).sort(desc("dot")).limit(10).collect())}.sum / 20
vectorTime/ arrayTime *100
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to