[ https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599048#comment-16599048 ]
Dilip Biswal commented on SPARK-25279: -------------------------------------- Hello, Tried against the latest trunk. Seems to work fine. {code:java} scala> import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.Aggregator scala> import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoder scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> scala> case class Employee(name: String, salary: Long) defined class Employee scala> case class Average(var sum: Long, var count: Long) defined class Average scala> scala> object MyAverage extends Aggregator[Employee, Average, Double] { |// A zero value for this aggregation. Should satisfy the property that any b + zero = b| |def zero: Average = Average(0L, 0L)| |// Combine two values to produce a new value. For performance, the function may modify `buffer`| |// and return it instead of constructing a new object| |def reduce(buffer: Average, employee: Employee): Average = \{ \| buffer.sum += employee.salary \| buffer.count += 1 \| buffer \| }| |// Merge two intermediate values| |def merge(b1: Average, b2: Average): Average = \{ \| b1.sum += b2.sum \| b1.count += b2.count \| b1 \| }| |// Transform the output of the reduction| |def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count| |// Specifies the Encoder for the intermediate value type| |def bufferEncoder: Encoder[Average] = Encoders.product| |// Specifies the Encoder for the final output value type| |def outputEncoder: Encoder[Double] = Encoders.scalaDouble| |} defined object MyAverage| scala> scala> val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds: org.apache.spark.sql.Dataset[Employee] = [name: string, salary: bigint] scala> ds.show() +--------+-----+ |name|salary| +--------+-----+ |Michael|3000| |Andy|4500| |Justin|3500| |Berta|4000| +--------+-----+ scala> // +--------+-----+ scala> // | name|salary| scala> // +--------+-----+ scala> // |Michael| 3000| scala> // | Andy| 4500| scala> // | Justin| 3500| scala> // | Berta| 4000| scala> // +--------+-----+ scala> scala> // Convert the function to a `TypedColumn` and give it a name scala> val averageSalary = MyAverage.toColumn.name("average_salary") averageSalary: org.apache.spark.sql.TypedColumn[Employee,Double] = myaverage() AS `average_salary` scala> val result = ds.select(averageSalary) result: org.apache.spark.sql.Dataset[Double] = [average_salary: double] scala> result.show() +--------------+ |average_salary| +--------------+ |3750.0| +--------------+ {code} > Throw exception: zzcclp java.io.NotSerializableException: > org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc > ----------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-25279 > URL: https://issues.apache.org/jira/browse/SPARK-25279 > Project: Spark > Issue Type: Bug > Components: Spark Shell, SQL > Affects Versions: 2.2.1 > Reporter: Zhichao Zhang > Priority: Minor > > Hi dev: > I am using Spark-Shell to run the example which is in section > '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'], > > and there is an error: > {code:java} > Caused by: java.io.NotSerializableException: > org.apache.spark.sql.TypedColumn > Serialization stack: > - object not serializable (class: org.apache.spark.sql.TypedColumn, > value: > myaverage() AS `average_salary`) > - field (class: $iw, name: averageSalary, type: class > org.apache.spark.sql.TypedColumn) > - object (class $iw, $iw@4b2f8ae9) > - field (class: MyAverage$, name: $outer, type: class $iw) > - object (class MyAverage$, MyAverage$@2be41d90) > - field (class: > org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, > name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) > - object (class > org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, > MyAverage(Employee)) > - field (class: > org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, > name: aggregateFunction, type: class > org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) > - object (class > org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, > partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), > Some(class Employee), Some(StructType(StructField(name,StringType,true), > StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, > Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, > Average, true])).count AS count#26L, newInstance(class Average), input[0, > double, false] AS value#24, DoubleType, false, 0, 0)) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@5e92c46f) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class > Employee)), Some(class Employee), > Some(StructType(StructField(name,StringType,true), > StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, > Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, > Average, true])).count AS count#26L, newInstance(class Average), input[0, > double, false] AS value#24, DoubleType, false, 0, 0))) > - field (class: > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: > aggregateExpressions, type: interface scala.collection.Seq) > - object (class > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, > ObjectHashAggregate(keys=[], > functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class > Employee)), Some(class Employee), > Some(StructType(StructField(name,StringType,true), > StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, > Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, > Average, true])).count AS count#26L, newInstance(class Average), input[0, > double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37]) > +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location: > InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json], > > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<name:string,salary:bigint> > ) > - field (class: > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1, > > name: $outer, type: class > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec) > - object (class > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1, > > <function0>) > - field (class: > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2, > > name: $outer, type: class > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1) > > - object (class > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2, > > <function1>) > - field (class: > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, > name: f$23, type: interface scala.Function1) > - object (class > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, > <function0>) > - field (class: > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25, > name: $outer, type: class > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) > - object (class > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25, > <function3>) > - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: > interface scala.Function3) > - object (class org.apache.spark.rdd.MapPartitionsRDD, > MapPartitionsRDD[9] > at show at <console>:62) > - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: > class > org.apache.spark.rdd.RDD) > - object (class org.apache.spark.OneToOneDependency, > org.apache.spark.OneToOneDependency@5bb7895) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@6e81dca3) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(org.apache.spark.OneToOneDependency@5bb7895)) > - field (class: org.apache.spark.rdd.RDD, name: > org$apache$spark$rdd$RDD$$dependencies_, type: interface > scala.collection.Seq) > - object (class org.apache.spark.rdd.MapPartitionsRDD, > MapPartitionsRDD[10] > at show at <console>:62) > - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) > - object (class scala.Tuple2, (MapPartitionsRDD[10] at show at > <console>:62,org.apache.spark.ShuffleDependency@421cd28)) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > {code} > > But if I use idea to run the example directly, it works. What is the > difference among them? How I run the example sucessfully on Spark-Shell? > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org