Hello Paul,
Many thanks for your quick answer. This did the trick!
Fantastic!
Best,
Raphael
***PARAGRAPH INPUT:***
val AggregatedChangepointAnalyzer = new UserDefinedAggregateFunction {
…
}
***PARAGRAPH OUTPUT:***
AggregatedChangepointAnalyzer:
org.apache.spark.sql.expressions.UserDefinedAggregateFunction{def
evaluate(buffer: org.apache.spark.sql.Row): String} =
$$$$79b2515edf74bd80cfc9d8ac1ba563c6$$$$anon$1@3b65afbc
I was then able to use the UDAF easily:
***PARAGRAPH INPUT:***
val cpt_df = df.groupBy("foo", "bar ", "baz",
"bok").agg(AggregatedChangepointAnalyzer(col("y")).as("cpt"))
cpt_df.show
cpt_df: org.apache.spark.sql.DataFrame = [foo: string, bar: string ... 3 more
fields]
+--------+--------+--------+----------+---+
|foo |bar |baz | bok |cpt|
+--------+--------+--------+----------+---+
|some | secret | thing | here | 40|
+--------+--------+--------+----------+---+
From: Paul Brenner <[email protected]>
Date: Tuesday, February 27, 2018 at 3:31 PM
To: Raphael Vannson <[email protected]>,
"[email protected]" <[email protected]>
Subject: Cannot define UDAF in %spark interpreter
[https://share.polymail.io/v2/z/a/NWE5NWU5NTdmN2Y5/ROsxnbrMSYqGdOuaYkRq7vFSwJ97WreGD-Dfi3zj_k7RT9GXsy7LJYxWVOSOxXNnopoYW22sBBaRxUGSCFmhLwx727JO_WGuGh8CZ5M6sOuFnUq9DZv6uloiPnfuhKSpaFMgs_T8eBORw_R9_ouLQgOanPF5xyctX24AtKNGHT8=.png]
Unfortunately, I don’t know why code that is working for you in spark shell
isn’t working in Zeppelin. But if you are looking for a quick fix perhaps this
could help?
I’ve had luck defining my UDAFs in zeppelin like:
val myUDAF = new UserDefinedAggregateFunction {}
So for example the following code compiles fine for me in zeppelin:
val FractionOfDayCoverage = new UserDefinedAggregateFunction {
// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("seconds",
LongType)))
// Intermediate Schema
def bufferSchema = StructType(Array(
StructField("times", ArrayType(LongType))))
// Returned Data Type .
def dataType = DoubleType
// Self-explaining
def deterministic = true
// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
var timeArray = new ListBuffer[Long]()
buffer.update(0,timeArray)
}
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!(input.isNullAt(0))){
var timeArray = new ListBuffer[Long]()
timeArray ++= buffer.getAs[List[Long]](0)
timeArray += input.getLong(0)
buffer.update(0,timeArray)
}}
// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
var timeArray = new ListBuffer[Long]()
timeArray ++= buffer1.getAs[List[Long]](0)
timeArray ++= buffer2.getAs[List[Long]](0)
buffer1.update(0,timeArray)
}
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {
var timeArray = new ListBuffer[Long]()
timeArray ++= buffer.getAs[List[Long]](0).filter(x => x != null)
val times = timeArray.toArray
scala.util.Sorting.quickSort(times)
var intStart = times(0) - 30*60
var intEnd = times(0) + 30*60
var seen = 0L
for (t <- times) {
if (t > intEnd + 30*60) {
seen += (intEnd - intStart)
intStart = t - 30*60
intEnd = t + 30*60
} else {
intEnd = t + 30*60
}
}
seen += intEnd - intStart
math.min(seen.toDouble/(24*60*60), 1)
}
}
I’m using zeppelin 0.7.2 and spark 2.0.1 (I think) so perhaps there is a
version issue somewhere?
[https://ci3.googleusercontent.com/proxy/tFn1I-GEOnccUtv8DHHEc49-6g3x3CbuQKzbfl2Z1BObEy0Qz6QebJimpP96TK3Za5MXwXTuwBZaobKp22nYAG3NdxAC0Q=s0-d-e1-ft#https://marketing.placeiq.net/images/placeiq.png]<http://www.placeiq.com/>
Paul Brenner
[https://ci4.googleusercontent.com/proxy/490PXYv9O6OiIp_DL4vuabJqVn53fMon5xNYZdftCVea9ySR2LcFDHe6Cdntb2G68uDAuA6FgLny8wKWLFWpsrPAt_FtLaE=s0-d-e1-ft#https://marketing.placeiq.net/images/twitter1.png]<https://twitter.com/placeiq>
[https://ci3.googleusercontent.com/proxy/fztHf1lRKLQYcAxebqfp2PYXCwVap3GobHVIbyp0j3NcuJOY16bUAZBibVOFf-fd1GsiuhrOfYy6dSwhlCwWU8ZUlw9OX5I=s0-d-e1-ft#https://marketing.placeiq.net/images/facebook.png]<https://www.facebook.com/PlaceIQ>
[https://ci5.googleusercontent.com/proxy/H26ThD7R6DOqxoLTgzi6k5SMrHoF2Tj44xI_7XlD9KfOIiGwe1WIMc5iQBxUBA9EuIyJMdaRXrhZTOrnkrn8O9Rf1FP9UQU=s0-d-e1-ft#https://marketing.placeiq.net/images/linkedin.png]<https://www.linkedin.com/company/placeiq>
DATA SCIENTIST
(217) 390-3033
[PlaceIQ:CES 2018]
On Tue, Feb 27, 2018 at 6:19 PM Vannson Raphael <Vannson Raphael
<mailto:vannson%20raphael%20%[email protected]%3e> >
wrote:
Hello,
I am having trouble defining a UDAF, using the same code in spark-shell in
:paste mode works fine.
Environment:
- Amazon EMR
- Apache Zeppelin Version 0.7.3
- Spark version 2.2.1
- Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)
1) Is there a way to configure the zeppelin %spark interpreter to do the
equivalent of spark-shell's :paste mode?
2) If not, is there a workaround to be able to define UDAFs in Zeppelin's
%spark interpreter?
Thanks!
Raphael
***PARAGRAPH INPUT:***
%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer
class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
// Input schema
override def inputSchema: StructType = StructType(StructField("y", DoubleType)
:: Nil)
// Intermediate buffer schema
override def bufferSchema: StructType = StructType(StructField("observations",
ArrayType(DoubleType)) :: Nil)
//Output schema
override def dataType: DataType = StringType
// Deterministic UDAF
override def deterministic: Boolean = true
// How to initialize the intermediate processing buffer for each group:
// We simply create a List[Double] which will hold the observations (y)
// of each group
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.emptyDoubleArray
}
// What to do with each new row within the group:
// Here we append each new observation of the group
// in a List[Double]
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// Put the observations collected into a List
var values = new ListBuffer[Double]()
values.appendAll(buffer.getAs[List[Double]](0))
// Get the new value for the current row
val newValue = input.getDouble(0)
// Append the new value to the buffer and return it
values.append(newValue)
buffer.update(0, values)
}
// How to merge 2 buffers located on 2 separate executor hosts or JVMs:
// Simply append one List at the end of another
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
var values = new ListBuffer[Double]()
values ++= buffer1.getAs[List[Double]](0)
values ++= buffer2.getAs[List[Double]](0)
buffer1.update(0, values)
}
override def evaluate(buffer: Row): String = {
val observations = buffer.getSeq[Double](0)
observations.size.toString
}
}
***PARAGRAPH OUTPUT:***
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer
:12: error: not found: type UserDefinedAggregateFunction
class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
^
:14: error: not found: type StructType
override def inputSchema: StructType = StructType(StructField("y", DoubleType)
:: Nil)
^
:14: error: not found: value StructType
override def inputSchema: StructType = StructType(StructField("y", DoubleType)
:: Nil)
^
:14: error: not found: value StructField
override def inputSchema: StructType = StructType(StructField("y", DoubleType)
:: Nil)
^
:14: error: not found: value DoubleType
override def inputSchema: StructType = StructType(StructField("y", DoubleType)
:: Nil)
^
:17: error: not found: type StructType
override def bufferSchema: StructType = StructType(StructField("observations",
ArrayType(DoubleType)) :: Nil)
^
:17: error: not found: value StructType
override def bufferSchema: StructType = StructType(StructField("observations",
ArrayType(DoubleType)) :: Nil)
^
:17: error: not found: value StructField
override def bufferSchema: StructType = StructType(StructField("observations",
ArrayType(DoubleType)) :: Nil)
:17: error: not found: value ArrayType
override def bufferSchema: StructType = StructType(StructField("observations",
ArrayType(DoubleType)) :: Nil)
^
:17: error: not found: value DoubleType
override def bufferSchema: StructType = StructType(StructField("observations",
ArrayType(DoubleType)) :: Nil)
^
:20: error: not found: type DataType
override def dataType: DataType = StringType
^
:20: error: not found: value StringType
override def dataType: DataType = StringType
^
:30: error: not found: type MutableAggregationBuffer
override def initialize(buffer: MutableAggregationBuffer): Unit = {
^
:37: error: not found: type MutableAggregationBuffer
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
^
:37: error: not found: type Row
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
^
:39: error: not found: type ListBuffer
var values = new ListBuffer[Double]()
^
:53: error: not found: type MutableAggregationBuffer
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
^
:53: error: not found: type Row
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
^
:54: error: not found: type ListBuffer
var values = new ListBuffer[Double]()
^
:62: error: not found: type Row
override def evaluate(buffer: Row): String = {
^