Hello, I am having trouble defining a UDAF, using the same code in spark-shell in :paste mode works fine.
Environment: - Amazon EMR - Apache Zeppelin Version 0.7.3 - Spark version 2.2.1 - Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161) 1) Is there a way to configure the zeppelin %spark interpreter to do the equivalent of spark-shell's :paste mode? 2) If not, is there a workaround to be able to define UDAFs in Zeppelin's %spark interpreter? Thanks! Raphael ***PARAGRAPH INPUT:*** %spark import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArray import scala.collection.mutable.ListBuffer class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction { // Input schema override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) // Intermediate buffer schema override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) //Output schema override def dataType: DataType = StringType // Deterministic UDAF override def deterministic: Boolean = true // How to initialize the intermediate processing buffer for each group: // We simply create a List[Double] which will hold the observations (y) // of each group override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = Array.emptyDoubleArray } // What to do with each new row within the group: // Here we append each new observation of the group // in a List[Double] override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { // Put the observations collected into a List var values = new ListBuffer[Double]() values.appendAll(buffer.getAs[List[Double]](0)) // Get the new value for the current row val newValue = input.getDouble(0) // Append the new value to the buffer and return it values.append(newValue) buffer.update(0, values) } // How to merge 2 buffers located on 2 separate executor hosts or JVMs: // Simply append one List at the end of another override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { var values = new ListBuffer[Double]() values ++= buffer1.getAs[List[Double]](0) values ++= buffer2.getAs[List[Double]](0) buffer1.update(0, values) } override def evaluate(buffer: Row): String = { val observations = buffer.getSeq[Double](0) observations.size.toString } } ***PARAGRAPH OUTPUT:*** import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArray import scala.collection.mutable.ListBuffer <console>:12: error: not found: type UserDefinedAggregateFunction class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction { ^ <console>:14: error: not found: type StructType override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ <console>:14: error: not found: value StructType override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ <console>:14: error: not found: value StructField override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ <console>:14: error: not found: value DoubleType override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ <console>:17: error: not found: type StructType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ <console>:17: error: not found: value StructType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ <console>:17: error: not found: value StructField override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) <console>:17: error: not found: value ArrayType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ <console>:17: error: not found: value DoubleType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ <console>:20: error: not found: type DataType override def dataType: DataType = StringType ^ <console>:20: error: not found: value StringType override def dataType: DataType = StringType ^ <console>:30: error: not found: type MutableAggregationBuffer override def initialize(buffer: MutableAggregationBuffer): Unit = { ^ <console>:37: error: not found: type MutableAggregationBuffer override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { ^ <console>:37: error: not found: type Row override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { ^ <console>:39: error: not found: type ListBuffer var values = new ListBuffer[Double]() ^ <console>:53: error: not found: type MutableAggregationBuffer override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { ^ <console>:53: error: not found: type Row override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { ^ <console>:54: error: not found: type ListBuffer var values = new ListBuffer[Double]() ^ <console>:62: error: not found: type Row override def evaluate(buffer: Row): String = { ^