Hello,

I am having trouble defining a UDAF, using the same code in spark-shell in 
:paste mode works fine.

Environment:
 - Amazon EMR
 - Apache Zeppelin Version 0.7.3
 - Spark version 2.2.1
 - Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)

1) Is there a way to configure the zeppelin %spark interpreter to do the 
equivalent of spark-shell's :paste mode?
2) If not, is there a workaround to be able to define UDAFs in Zeppelin's 
%spark interpreter?

Thanks!
Raphael




***PARAGRAPH INPUT:***
%spark

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer

class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
    // Input schema
    override def inputSchema: StructType = StructType(StructField("y", 
DoubleType) :: Nil)
    
    // Intermediate buffer schema
    override def bufferSchema: StructType = 
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
    
    //Output schema
    override def dataType: DataType = StringType
    
    // Deterministic UDAF
    override def deterministic: Boolean = true
    
    
    
    // How to initialize the intermediate processing buffer for each group:
    // We simply create a List[Double] which will hold the observations (y)
    // of each group
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = Array.emptyDoubleArray
    }
    
    // What to do with each new row within the group:
    // Here we append each new observation of the group 
    // in a List[Double]
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        // Put the observations collected into a List
        var values = new ListBuffer[Double]()
        values.appendAll(buffer.getAs[List[Double]](0))
        
        // Get the new value for the current row
        val newValue = input.getDouble(0)
        
        // Append the new value to the buffer and return it
        values.append(newValue)
        buffer.update(0, values)
    }
  
  
    // How to merge 2 buffers located on 2 separate executor hosts or JVMs:
    // Simply append one List at the end of another
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
{
        var values = new ListBuffer[Double]()
        values ++= buffer1.getAs[List[Double]](0)
        values ++= buffer2.getAs[List[Double]](0)
        buffer1.update(0, values)
  }
  
  
  
  override def evaluate(buffer: Row): String = {
      val observations = buffer.getSeq[Double](0)
      observations.size.toString
  }
}



***PARAGRAPH OUTPUT:***
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer
<console>:12: error: not found: type UserDefinedAggregateFunction
       class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction 
{
                                                   ^
<console>:14: error: not found: type StructType
           override def inputSchema: StructType = StructType(StructField("y", 
DoubleType) :: Nil)
                                     ^
<console>:14: error: not found: value StructType
           override def inputSchema: StructType = StructType(StructField("y", 
DoubleType) :: Nil)
                                                  ^
<console>:14: error: not found: value StructField
           override def inputSchema: StructType = StructType(StructField("y", 
DoubleType) :: Nil)
                                                             ^
<console>:14: error: not found: value DoubleType
           override def inputSchema: StructType = StructType(StructField("y", 
DoubleType) :: Nil)
                                                                              ^
<console>:17: error: not found: type StructType
           override def bufferSchema: StructType = 
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                      ^
<console>:17: error: not found: value StructType
           override def bufferSchema: StructType = 
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                                   ^
<console>:17: error: not found: value StructField
           override def bufferSchema: StructType = 
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
<console>:17: error: not found: value ArrayType
           override def bufferSchema: StructType = 
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                                                                
          ^
<console>:17: error: not found: value DoubleType
           override def bufferSchema: StructType = 
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                                                                
                    ^
<console>:20: error: not found: type DataType
           override def dataType: DataType = StringType
                                  ^
<console>:20: error: not found: value StringType
           override def dataType: DataType = StringType
                                             ^
<console>:30: error: not found: type MutableAggregationBuffer
           override def initialize(buffer: MutableAggregationBuffer): Unit = {
                                           ^
<console>:37: error: not found: type MutableAggregationBuffer
           override def update(buffer: MutableAggregationBuffer, input: Row): 
Unit = {
                                       ^
<console>:37: error: not found: type Row
           override def update(buffer: MutableAggregationBuffer, input: Row): 
Unit = {
                                                                        ^
<console>:39: error: not found: type ListBuffer
               var values = new ListBuffer[Double]()
                                ^
<console>:53: error: not found: type MutableAggregationBuffer
           override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): 
Unit = {
                                       ^
<console>:53: error: not found: type Row
           override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): 
Unit = {
                                                                          ^
<console>:54: error: not found: type ListBuffer
               var values = new ListBuffer[Double]()
                                ^
<console>:62: error: not found: type Row
         override def evaluate(buffer: Row): String = {
                                       ^

 

Reply via email to