[jira] [Assigned] (SPARK-11594) Cannot create UDAF in REPL

2015-11-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-11594:


Assignee: (was: Apache Spark)

> Cannot create UDAF in REPL
> --
>
> Key: SPARK-11594
> URL: https://issues.apache.org/jira/browse/SPARK-11594
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1, 1.6.0
> Environment: Latest Spark Master
> JVM 1.8.0_66-b17
>Reporter: Herman van Hovell
>Priority: Minor
>
> If you try to define the a UDAF in the REPL, an internal error is thrown by 
> Java. The following code for example:
> {noformat}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{DataType, LongType, StructType}
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
> UserDefinedAggregateFunction}
> class LongProductSum extends UserDefinedAggregateFunction {
>   def inputSchema: StructType = new StructType()
> .add("a", LongType)
> .add("b", LongType)
>   def bufferSchema: StructType = new StructType()
> .add("product", LongType)
>   def dataType: DataType = LongType
>   def deterministic: Boolean = true
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = 0L
>   }
>   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> if (!(input.isNullAt(0) || input.isNullAt(1))) {
>   buffer(0) = buffer.getLong(0) + input.getLong(0) * input.getLong(1)
> }
>   }
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
>   }
>   def evaluate(buffer: Row): Any =
> buffer.getLong(0)
> }
> sqlContext.udf.register("longProductSum", new LongProductSum)
> val data2 = Seq[(Integer, Integer, Integer)](
>   (1, 10, -10),
>   (null, -60, 60),
>   (1, 30, -30),
>   (1, 30, 30),
>   (2, 1, 1),
>   (3, null, null)).toDF("key", "value1", "value2")
> data2.registerTempTable("agg2")
> val q = sqlContext.sql("""
> |SELECT
> |  key,
> |  count(distinct value1, value2),
> |  longProductSum(distinct value1, value2)
> |FROM agg2
> |GROUP BY key
> """.stripMargin)
> q.show
> {noformat}
> Will throw the following error:
> {noformat}
> java.lang.InternalError: Malformed class name
>   at java.lang.Class.getSimpleName(Class.java:1330)
>   at 
> org.apache.spark.sql.execution.aggregate.ScalaUDAF.toString(udaf.scala:455)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$Aggregation$$anonfun$9.apply(SparkStrategies.scala:211)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$Aggregation$$anonfun$9.apply(SparkStrategies.scala:209)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:209)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:445)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:51)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:49)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:56)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:56)
>   at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2092)
>   at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1419)
>   at org.apache.spark

[jira] [Assigned] (SPARK-11594) Cannot create UDAF in REPL

2015-11-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-11594:


Assignee: Apache Spark

> Cannot create UDAF in REPL
> --
>
> Key: SPARK-11594
> URL: https://issues.apache.org/jira/browse/SPARK-11594
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1, 1.6.0
> Environment: Latest Spark Master
> JVM 1.8.0_66-b17
>Reporter: Herman van Hovell
>Assignee: Apache Spark
>Priority: Minor
>
> If you try to define the a UDAF in the REPL, an internal error is thrown by 
> Java. The following code for example:
> {noformat}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{DataType, LongType, StructType}
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
> UserDefinedAggregateFunction}
> class LongProductSum extends UserDefinedAggregateFunction {
>   def inputSchema: StructType = new StructType()
> .add("a", LongType)
> .add("b", LongType)
>   def bufferSchema: StructType = new StructType()
> .add("product", LongType)
>   def dataType: DataType = LongType
>   def deterministic: Boolean = true
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = 0L
>   }
>   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> if (!(input.isNullAt(0) || input.isNullAt(1))) {
>   buffer(0) = buffer.getLong(0) + input.getLong(0) * input.getLong(1)
> }
>   }
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
>   }
>   def evaluate(buffer: Row): Any =
> buffer.getLong(0)
> }
> sqlContext.udf.register("longProductSum", new LongProductSum)
> val data2 = Seq[(Integer, Integer, Integer)](
>   (1, 10, -10),
>   (null, -60, 60),
>   (1, 30, -30),
>   (1, 30, 30),
>   (2, 1, 1),
>   (3, null, null)).toDF("key", "value1", "value2")
> data2.registerTempTable("agg2")
> val q = sqlContext.sql("""
> |SELECT
> |  key,
> |  count(distinct value1, value2),
> |  longProductSum(distinct value1, value2)
> |FROM agg2
> |GROUP BY key
> """.stripMargin)
> q.show
> {noformat}
> Will throw the following error:
> {noformat}
> java.lang.InternalError: Malformed class name
>   at java.lang.Class.getSimpleName(Class.java:1330)
>   at 
> org.apache.spark.sql.execution.aggregate.ScalaUDAF.toString(udaf.scala:455)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$Aggregation$$anonfun$9.apply(SparkStrategies.scala:211)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$Aggregation$$anonfun$9.apply(SparkStrategies.scala:209)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:209)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>   at 
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:445)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:51)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:49)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:56)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:56)
>   at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2092)
>   at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1419)
>