I am trying to implement a sample “sum” functionality over rolling window.
Below code may not make sense (may not be efficient) but during the course of 
other major implementation, have stumbled on below error which is blocking.

Error Obtained -  “GenericRowWithSchema cannot be cast to java.lang.Double” 
during evaluation.
Is this a known problem in Spark?
I am using 2.0.0

Code
====

class GeometricMean extends UserDefinedAggregateFunction {

    def inputSchema: StructType = StructType(StructField("value", DoubleType) 
:: Nil)
    def bufferSchema: StructType = new StructType().add("info", 
ArrayType(DoubleType),false)


    def dataType : DataType =  DoubleType
    def deterministic: Boolean = true

    def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = ArrayBuffer.empty[Double]

    }

    def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
        val arr1 = buffer.getAs[Seq[Double]](0)
        val arr = ArrayBuffer(input) ++ arr1
        buffer(0) = arr
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        val arr1 = buffer1.getAs[Seq[Double]](0)
        val arr = arr1 ++  buffer2.getAs[Seq[Double]](0)
        buffer1.update(0,arr)
    }

    def evaluate(buffer: Row): Any = {
        var s : Double = 0
        val arr = buffer.getAs[Seq[Double]](0)
        val arrd = arr.toArray
        arrd.foreach(s += _)
        s
    }
}

val GM  = new GeometricMean
val r = new scala.util.Random(88)
val schema = new StructType().add("id",IntegerType).add("Count",IntegerType)

val rnNum1 = for( i <- 1 to 10) yield { Row(i,r.nextInt(10-0+1)) }

val wSpec1 = Window.orderBy("id").rowsBetween(-1, +3)
val rdd = sc.parallelize(rnNum)
val df = sqlContext.createDataFrame(rdd,schema)
val dfWithMovingAvg = df.withColumn( 
"movingAvg",avg(df.col("Count")).over(wSpec1)).withColumn("customMovingAvg",GM(df.col("Count")).over(wSpec1))
dfWithMovingAvg.take(5).foreach(println)


Error
===


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 
2, localhost): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)



Regards,
Kiran

Reply via email to