Hello,
I am having trouble defining a UDAF, using the same code in spark-shell in
:paste mode works fine.
Environment:
- Amazon EMR
- Apache Zeppelin Version 0.7.3
- Spark version 2.2.1
- Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)
1) Is there a way to configure the zeppelin %spark interpreter to do the
equivalent of spark-shell's :paste mode?
2) If not, is there a workaround to be able to define UDAFs in Zeppelin's
%spark interpreter?
Thanks!
Raphael
***PARAGRAPH INPUT:***
%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer
class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
// Input schema
override def inputSchema: StructType = StructType(StructField("y",
DoubleType) :: Nil)
// Intermediate buffer schema
override def bufferSchema: StructType =
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
//Output schema
override def dataType: DataType = StringType
// Deterministic UDAF
override def deterministic: Boolean = true
// How to initialize the intermediate processing buffer for each group:
// We simply create a List[Double] which will hold the observations (y)
// of each group
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.emptyDoubleArray
}
// What to do with each new row within the group:
// Here we append each new observation of the group
// in a List[Double]
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// Put the observations collected into a List
var values = new ListBuffer[Double]()
values.appendAll(buffer.getAs[List[Double]](0))
// Get the new value for the current row
val newValue = input.getDouble(0)
// Append the new value to the buffer and return it
values.append(newValue)
buffer.update(0, values)
}
// How to merge 2 buffers located on 2 separate executor hosts or JVMs:
// Simply append one List at the end of another
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit =
{
var values = new ListBuffer[Double]()
values ++= buffer1.getAs[List[Double]](0)
values ++= buffer2.getAs[List[Double]](0)
buffer1.update(0, values)
}
override def evaluate(buffer: Row): String = {
val observations = buffer.getSeq[Double](0)
observations.size.toString
}
}
***PARAGRAPH OUTPUT:***
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer
<console>:12: error: not found: type UserDefinedAggregateFunction
class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction
{
^
<console>:14: error: not found: type StructType
override def inputSchema: StructType = StructType(StructField("y",
DoubleType) :: Nil)
^
<console>:14: error: not found: value StructType
override def inputSchema: StructType = StructType(StructField("y",
DoubleType) :: Nil)
^
<console>:14: error: not found: value StructField
override def inputSchema: StructType = StructType(StructField("y",
DoubleType) :: Nil)
^
<console>:14: error: not found: value DoubleType
override def inputSchema: StructType = StructType(StructField("y",
DoubleType) :: Nil)
^
<console>:17: error: not found: type StructType
override def bufferSchema: StructType =
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
<console>:17: error: not found: value StructType
override def bufferSchema: StructType =
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
<console>:17: error: not found: value StructField
override def bufferSchema: StructType =
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
<console>:17: error: not found: value ArrayType
override def bufferSchema: StructType =
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
<console>:17: error: not found: value DoubleType
override def bufferSchema: StructType =
StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
<console>:20: error: not found: type DataType
override def dataType: DataType = StringType
^
<console>:20: error: not found: value StringType
override def dataType: DataType = StringType
^
<console>:30: error: not found: type MutableAggregationBuffer
override def initialize(buffer: MutableAggregationBuffer): Unit = {
^
<console>:37: error: not found: type MutableAggregationBuffer
override def update(buffer: MutableAggregationBuffer, input: Row):
Unit = {
^
<console>:37: error: not found: type Row
override def update(buffer: MutableAggregationBuffer, input: Row):
Unit = {
^
<console>:39: error: not found: type ListBuffer
var values = new ListBuffer[Double]()
^
<console>:53: error: not found: type MutableAggregationBuffer
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row):
Unit = {
^
<console>:53: error: not found: type Row
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row):
Unit = {
^
<console>:54: error: not found: type ListBuffer
var values = new ListBuffer[Double]()
^
<console>:62: error: not found: type Row
override def evaluate(buffer: Row): String = {
^