I am trying to implement a sample “sum” functionality over rolling window. Below code may not make sense (may not be efficient) but during the course of other major implementation, have stumbled on below error which is blocking.
Error Obtained - “GenericRowWithSchema cannot be cast to java.lang.Double” during evaluation. Is this a known problem in Spark? I am using 2.0.0 Code ==== class GeometricMean extends UserDefinedAggregateFunction { def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil) def bufferSchema: StructType = new StructType().add("info", ArrayType(DoubleType),false) def dataType : DataType = DoubleType def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = ArrayBuffer.empty[Double] } def update(buffer: MutableAggregationBuffer,input: Row): Unit = { val arr1 = buffer.getAs[Seq[Double]](0) val arr = ArrayBuffer(input) ++ arr1 buffer(0) = arr } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val arr1 = buffer1.getAs[Seq[Double]](0) val arr = arr1 ++ buffer2.getAs[Seq[Double]](0) buffer1.update(0,arr) } def evaluate(buffer: Row): Any = { var s : Double = 0 val arr = buffer.getAs[Seq[Double]](0) val arrd = arr.toArray arrd.foreach(s += _) s } } val GM = new GeometricMean val r = new scala.util.Random(88) val schema = new StructType().add("id",IntegerType).add("Count",IntegerType) val rnNum1 = for( i <- 1 to 10) yield { Row(i,r.nextInt(10-0+1)) } val wSpec1 = Window.orderBy("id").rowsBetween(-1, +3) val rdd = sc.parallelize(rnNum) val df = sqlContext.createDataFrame(rdd,schema) val dfWithMovingAvg = df.withColumn( "movingAvg",avg(df.col("Count")).over(wSpec1)).withColumn("customMovingAvg",GM(df.col("Count")).over(wSpec1)) dfWithMovingAvg.take(5).foreach(println) Error === org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114) Regards, Kiran