Unfortunately, I don’t know why code that is working for you in spark shell isn’t working in Zeppelin. But if you are looking for a quick fix perhaps this could help?
I’ve had luck defining my UDAFs in zeppelin like: val myUDAF = new UserDefinedAggregateFunction {} So for example the following code compiles fine for me in zeppelin: val FractionOfDayCoverage = new UserDefinedAggregateFunction { // Input Data Type Schema def inputSchema: StructType = StructType(Array(StructField("seconds", LongType))) // Intermediate Schema def bufferSchema = StructType(Array( StructField("times", ArrayType(LongType)))) // Returned Data Type . def dataType = DoubleType // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = { var timeArray = new ListBuffer[Long]() buffer.update(0,timeArray) } // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer, input: Row) = { if (!(input.isNullAt(0))){ var timeArray = new ListBuffer[Long]() timeArray ++= buffer.getAs[List[Long]](0) timeArray += input.getLong(0) buffer.update(0,timeArray) }} // Merge two partial aggregates def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { var timeArray = new ListBuffer[Long]() timeArray ++= buffer1.getAs[List[Long]](0) timeArray ++= buffer2.getAs[List[Long]](0) buffer1.update(0,timeArray) } // Called after all the entries are exhausted. def evaluate(buffer: Row) = { var timeArray = new ListBuffer[Long]() timeArray ++= buffer.getAs[List[Long]](0).filter(x => x != null) val times = timeArray.toArray scala.util.Sorting.quickSort(times) var intStart = times(0) - 30*60 var intEnd = times(0) + 30*60 var seen = 0L for (t <- times) { if (t > intEnd + 30*60) { seen += (intEnd - intStart) intStart = t - 30*60 intEnd = t + 30*60 } else { intEnd = t + 30*60 } } seen += intEnd - intStart math.min(seen.toDouble/(24*60*60), 1) } } I’m using zeppelin 0.7.2 and spark 2.0.1 (I think) so perhaps there is a version issue somewhere? ( http://www.placeiq.com/ ) ( http://www.placeiq.com/ ) ( http://www.placeiq.com/ ) *Paul Brenner* ( https://twitter.com/placeiq ) ( https://twitter.com/placeiq ) ( https://twitter.com/placeiq ) ( https://www.facebook.com/PlaceIQ ) ( https://www.facebook.com/PlaceIQ ) ( https://www.linkedin.com/company/placeiq ) ( https://www.linkedin.com/company/placeiq ) DATA SCIENTIST (217) 390-3033 ( http://www.placeiq.com/2015/05/26/placeiq-named-winner-of-prestigious-2015-oracle-data-cloud-activate-award/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2016/03/08/measuring-addressable-tv-campaigns-is-now-possible/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://pages.placeiq.com/Location-Data-Accuracy-Whitepaper-Download.html?utm_source=Signature&utm_medium=Email&utm_campaign=AccuracyWP ) ( http://placeiq.com/2016/08/03/placeiq-bolsters-location-intelligence-platform-with-mastercard-insights/ ) ( http://placeiq.com/2016/10/26/the-making-of-a-location-data-industry-milestone/ ) ( http://placeiq.com/2016/12/07/placeiq-introduces-landmark-a-groundbreaking-offering-that-delivers-access-to-the-highest-quality-location-data-for-insights-that-fuel-limitless-business-decisions/ ) ( http://placeiq.com/2016/12/07/placeiq-introduces-landmark-a-groundbreaking-offering-that-delivers-access-to-the-highest-quality-location-data-for-insights-that-fuel-limitless-business-decisions/ ) ( https://www.placeiq.com/2017/05/placeiqs-landmark-powers-location-based-insight-innovation-for-ansible-gstv-havas-media-the-media-kitchen-and-more/ ) ( http://pages.placeiq.com/CES2018_MeetingRequest.html ) PlaceIQ:CES 2018 ( http://pages.placeiq.com/2017-Integrated-Marketing-Whitepaper_LP_Download.html ) On Tue, Feb 27, 2018 at 6:19 PM Vannson Raphael < Vannson Raphael ( Vannson Raphael <raphael.vann...@thinkbiganalytics.com> ) > wrote: > > > > Hello, > > I am having trouble defining a UDAF, using the same code in spark-shell in > :paste mode works fine. > > Environment: > - Amazon EMR > - Apache Zeppelin Version 0.7.3 > - Spark version 2.2.1 > - Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161) > > 1) Is there a way to configure the zeppelin %spark interpreter to do the > equivalent of spark-shell's :paste mode? > 2) If not, is there a workaround to be able to define UDAFs in Zeppelin's > %spark interpreter? > > Thanks! > Raphael > > > > > ***PARAGRAPH INPUT:*** > %spark > > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.types._ > import org.apache.spark.sql.expressions.{MutableAggregationBuffer, > UserDefinedAggregateFunction} > import org.apache.spark.sql.Row > import scala.collection.mutable.WrappedArray > import scala.collection.mutable.ListBuffer > > class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction { > > // Input schema > override def inputSchema: StructType = StructType(StructField("y", > DoubleType) :: Nil) > > // Intermediate buffer schema > override def bufferSchema: StructType = > StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) > > //Output schema > override def dataType: DataType = StringType > > // Deterministic UDAF > override def deterministic: Boolean = true > > > > // How to initialize the intermediate processing buffer for each group: > // We simply create a List[Double] which will hold the observations (y) > // of each group > override def initialize(buffer: MutableAggregationBuffer): Unit = { > buffer(0) = Array.emptyDoubleArray > } > > // What to do with each new row within the group: > // Here we append each new observation of the group > // in a List[Double] > override def update(buffer: MutableAggregationBuffer, input: Row): Unit = > { > // Put the observations collected into a List > var values = new ListBuffer[Double]() > values.appendAll(buffer.getAs[List[Double]](0)) > > // Get the new value for the current row > val newValue = input.getDouble(0) > > // Append the new value to the buffer and return it > values.append(newValue) > buffer.update(0, values) > } > > > // How to merge 2 buffers located on 2 separate executor hosts or JVMs: > // Simply append one List at the end of another > override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit > = { > var values = new ListBuffer[Double]() > values ++= buffer1.getAs[List[Double]](0) > values ++= buffer2.getAs[List[Double]](0) > buffer1.update(0, values) > } > > > > override def evaluate(buffer: Row): String = { > val observations = buffer.getSeq[Double](0) > observations.size.toString > } > } > > > > ***PARAGRAPH OUTPUT:*** > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.types._ > import org.apache.spark.sql.expressions.{MutableAggregationBuffer, > UserDefinedAggregateFunction} > import org.apache.spark.sql.Row > import scala.collection.mutable.WrappedArray > import scala.collection.mutable.ListBuffer > :12: error: not found: type UserDefinedAggregateFunction > class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction { > > ^ > :14: error: not found: type StructType > override def inputSchema: StructType = StructType(StructField("y", > DoubleType) :: Nil) > ^ > :14: error: not found: value StructType > override def inputSchema: StructType = StructType(StructField("y", > DoubleType) :: Nil) > ^ > :14: error: not found: value StructField > override def inputSchema: StructType = StructType(StructField("y", > DoubleType) :: Nil) > ^ > :14: error: not found: value DoubleType > override def inputSchema: StructType = StructType(StructField("y", > DoubleType) :: Nil) > ^ > :17: error: not found: type StructType > override def bufferSchema: StructType = > StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) > ^ > :17: error: not found: value StructType > override def bufferSchema: StructType = > StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) > ^ > :17: error: not found: value StructField > override def bufferSchema: StructType = > StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) > :17: error: not found: value ArrayType > override def bufferSchema: StructType = > StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) > ^ > :17: error: not found: value DoubleType > override def bufferSchema: StructType = > StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) > ^ > :20: error: not found: type DataType > override def dataType: DataType = StringType > ^ > :20: error: not found: value StringType > override def dataType: DataType = StringType > ^ > :30: error: not found: type MutableAggregationBuffer > override def initialize(buffer: MutableAggregationBuffer): Unit = { > ^ > :37: error: not found: type MutableAggregationBuffer > override def update(buffer: MutableAggregationBuffer, input: Row): Unit = > { > ^ > :37: error: not found: type Row > override def update(buffer: MutableAggregationBuffer, input: Row): Unit = > { > ^ > :39: error: not found: type ListBuffer > var values = new ListBuffer[Double]() > ^ > :53: error: not found: type MutableAggregationBuffer > override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit > = { > ^ > :53: error: not found: type Row > override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit > = { > ^ > :54: error: not found: type ListBuffer > var values = new ListBuffer[Double]() > ^ > :62: error: not found: type Row > override def evaluate(buffer: Row): String = { > ^ > > > > > >