I'm simply pasting in the UDAF example from this page and getting errors (basic EMR setup with Spark 2.0): https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/03%20UDF%20and%20UDAF%20-%20scala.html
The imports appear to work, but then I see errors like "not found: type UserDefinedAggregateFunction". If I run ":paste" and paste it in that way it does work, but I'm interested in knowing why Ctrl-V doesn't. What is happening under the hood which makes it seem like the imports are working even though they aren't? And is there a way to fix this in general? >>> scala> import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.MutableAggregationBuffer scala> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.expressions.UserDefinedAggregateFunction scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> scala> class GeometricMean extends UserDefinedAggregateFunction { | // This is the input fields for your aggregate function. | override def inputSchema: org.apache.spark.sql.types.StructType = | StructType(StructField("value", DoubleType) :: Nil) | | // This is the internal fields you keep for computing your aggregate. | override def bufferSchema: StructType = StructType( | StructField("count", LongType) :: | StructField("product", DoubleType) :: Nil | ) | | // This is the output type of your aggregatation function. | override def dataType: DataType = DoubleType | | override def deterministic: Boolean = true | | // This is the initial value for your buffer schema. | override def initialize(buffer: MutableAggregationBuffer): Unit = { | buffer(0) = 0L | buffer(1) = 1.0 | } | | // This is how to update your buffer schema given an input. | override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { | buffer(0) = buffer.getAs[Long](0) + 1 | buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) | } | | // This is how to merge two objects with the bufferSchema type. | override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { | buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) | buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) | } | | // This is where you output the final value, given the final value of your bufferSchema. | override def evaluate(buffer: Row): Any = { | math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0)) | } | } <console>:11: error: not found: type UserDefinedAggregateFunction class GeometricMean extends UserDefinedAggregateFunction { ^ <console>:14: error: not found: value StructType StructType(StructField("value", DoubleType) :: Nil) ^ <console>:14: error: not found: value StructField StructType(StructField("value", DoubleType) :: Nil) ^ <console>:14: error: not found: value DoubleType StructType(StructField("value", DoubleType) :: Nil) ^ <console>:17: error: not found: type StructType override def bufferSchema: StructType = StructType( ^ <console>:17: error: not found: value StructType override def bufferSchema: StructType = StructType( ^ <console>:18: error: not found: value StructField StructField("count", LongType) :: ^ <console>:18: error: not found: value LongType StructField("count", LongType) :: ^ <console>:19: error: not found: value StructField StructField("product", DoubleType) :: Nil ^ <console>:19: error: not found: value DoubleType StructField("product", DoubleType) :: Nil ^ <console>:23: error: not found: type DataType override def dataType: DataType = DoubleType ^ <console>:23: error: not found: value DoubleType override def dataType: DataType = DoubleType ^ <console>:28: error: not found: type MutableAggregationBuffer override def initialize(buffer: MutableAggregationBuffer): Unit = { ^ <console>:34: error: not found: type MutableAggregationBuffer override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { ^ <console>:34: error: not found: type Row override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { ^ <console>:40: error: not found: type MutableAggregationBuffer override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { ^ <console>:40: error: not found: type Row override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { ^ <console>:46: error: not found: type Row override def evaluate(buffer: Row): Any = { ^ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-for-Databricks-example-tp28113.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org