On Sat, Apr 23, 2016 at 8:56 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Have you looked at aggregators?
>
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>

Thanks for the pointer to aggregators.  I wasn't yet aware of them.
However, I still get similar errors when attempting to go the Dataset
route.  Here is my attempt at an aggregator class for the example above.


import org.apache.spark.sql.expressions.Aggregator

case class KeyVal(k: Int, v: Int)

val keyValsDs = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield
KeyVal(i,j)).toDS

class AggKV extends Aggregator[KeyVal, List[KeyVal], List[KeyVal]]
with Serializable {
  override def zero: List[KeyVal] = List()

  override def reduce(b: List[KeyVal], a: KeyVal): List[KeyVal] = b :+ a

  override def finish(reduction: List[KeyVal]): List[KeyVal] = reduction

  override def merge(b1: List[KeyVal], b2: List[KeyVal]): List[KeyVal]
= b1 ++ b2
}

The following shows production of the correct schema

keyValsDs.groupBy($"k").agg(new AggKV().toColumn)

 org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, List[KeyVal])] =
[_1: struct<k:int>, _2: struct<value:array<struct<k:int,v:int>>>]

Actual execution fails with Task not serializable.  Am I missing something
or is this just not possible without dropping into RDDs?

scala> keyValsDs.groupBy($"k").agg(new AggKV().toColumn).show
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
SortBasedAggregate(key=[k#684],
functions=[(AggKV(k#684,v#685),mode=Final,isDistinct=false)],
output=[key#739,AggKV(k,v)#738])
+- ConvertToSafe
   +- Sort [k#684 ASC], false, 0
      +- TungstenExchange hashpartitioning(k#684,200), None
         +- ConvertToUnsafe
            +- SortBasedAggregate(key=[k#684],
functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)],
output=[k#684,value#731])
               +- ConvertToSafe
                  +- Sort [k#684 ASC], false, 0
                     +- Project [k#684,v#685,k#684]
                        +- Scan ExistingRDD[k#684,v#685]

        at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
        at
org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:69)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
        at
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
        at
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
        at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
        at org.apache.spark.sql.DataFrame.org
$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
        at org.apache.spark.sql.DataFrame.org
$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
        at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
        at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:228)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:192)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:200)
        at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
        at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
        at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
        at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:71)
        at $iwC$$iwC$$iwC.<init>(<console>:73)
        at $iwC$$iwC.<init>(<console>:75)
        at $iwC.<init>(<console>:77)
        at <init>(<console>:79)
        at .<init>(<console>:83)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
        at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at scala_maven_executions.MainHelper.runMain(MainHelper.java:164)
        at
scala_maven_executions.JavaMainCallerInProcess.runInternal(JavaMainCallerInProcess.java:94)
        at
scala_maven_executions.JavaMainCallerInProcess.run(JavaMainCallerInProcess.java:54)
        at
scala_maven_executions.JavaMainCallerSupport.run(JavaMainCallerSupport.java:110)
        at scala_maven.ScalaConsoleMojo.doExecute(ScalaConsoleMojo.java:102)
        at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
        at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
        at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
        at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
        at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
        at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
        at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
        at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
        at
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
        at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
        at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
        at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
        at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
        at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
        at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
        at
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
        at
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
        at
org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
TungstenExchange hashpartitioning(k#684,200), None
+- ConvertToUnsafe
   +- SortBasedAggregate(key=[k#684],
functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)],
output=[k#684,value#731])
      +- ConvertToSafe
         +- Sort [k#684 ASC], false, 0
            +- Project [k#684,v#685,k#684]
               +- Scan ExistingRDD[k#684,v#685]

        at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
        at
org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.Sort.doExecute(Sort.scala:64)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:72)
        at
org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:69)
        at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
        ... 98 more
Caused by: org.apache.spark.SparkException: Task not serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
        at
org.apache.spark.sql.execution.ConvertToUnsafe.doExecute(rowFormatConverters.scala:38)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:164)
        at
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
        at
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
        at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
        ... 116 more
Caused by: java.io.NotSerializableException:
scala.reflect.internal.Mirrors$Roots$RootClass$
Serialization stack:
        - object not serializable (class:
scala.reflect.internal.Mirrors$Roots$RootClass$, value: package <root>)
        - field (class: scala.reflect.internal.Types$ThisType, name: sym,
type: class scala.reflect.internal.Symbols$Symbol)
        - object (class scala.reflect.internal.Types$UniqueThisType, <root>)
        - field (class: scala.reflect.internal.Types$SingleType, name: pre,
type: class scala.reflect.internal.Types$Type)
        - object (class scala.reflect.internal.Types$UniqueSingleType,
$line142.type)
        - field (class: scala.reflect.internal.Types$SingleType, name: pre,
type: class scala.reflect.internal.Types$Type)
        - object (class scala.reflect.internal.Types$UniqueSingleType, type)
        - field (class: scala.reflect.internal.Types$SingleType, name: pre,
type: class scala.reflect.internal.Types$Type)
        - object (class scala.reflect.internal.Types$UniqueSingleType,
INSTANCE.type)
        - field (class: scala.reflect.internal.Types$SingleType, name: pre,
type: class scala.reflect.internal.Types$Type)
        - object (class scala.reflect.internal.Types$UniqueSingleType,
$iwC.type)
        - field (class: scala.reflect.internal.Types$SingleType, name: pre,
type: class scala.reflect.internal.Types$Type)
        - object (class scala.reflect.internal.Types$UniqueSingleType,
$iwC.$iwC.type)
        - field (class: scala.reflect.internal.Types$TypeRef, name: pre,
type: class scala.reflect.internal.Types$Type)
        - object (class scala.reflect.internal.Types$TypeRef$$anon$6,
KeyVal)
        - field (class:
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, name:
elementType$2, type: class scala.reflect.api.Types$TypeApi)
        - object (class
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, <function1>)
        - field (class:
org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type:
interface scala.Function1)
        - object (class
org.apache.spark.sql.catalyst.expressions.MapObjects,
mapobjects(<function1>,input[0,
ArrayType(StructType(StructField(k,IntegerType,false),
StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false)))
        - field (class: org.apache.spark.sql.catalyst.expressions.Invoke,
name: targetObject, type: class
org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Invoke,
invoke(mapobjects(<function1>,input[0,
ArrayType(StructType(StructField(k,IntegerType,false),
StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false)),array,ObjectType(class
[Ljava.lang.Object;)))
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon,
List(invoke(mapobjects(<function1>,input[0,
ArrayType(StructType(StructField(k,IntegerType,false),
StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false)),array,ObjectType(class
[Ljava.lang.Object;))))
        - field (class:
org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments,
type: interface scala.collection.Seq)
        - object (class
org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class
scala.collection.mutable.WrappedArray$,ObjectType(interface
scala.collection.Seq),make,invoke(mapobjects(<function1>,input[0,
ArrayType(StructType(StructField(k,IntegerType,false),
StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false)),array,ObjectType(class
[Ljava.lang.Object;)),true))
        - field (class:
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name:
fromRowExpression, type: class
org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[])
        - field (class:
org.apache.spark.sql.execution.aggregate.TypedAggregateExpression, name:
bEncoder, type: class
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder)
        - object (class
org.apache.spark.sql.execution.aggregate.TypedAggregateExpression,
AggKV(k#684,v#685))
        - field (class:
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
name: aggregateFunction, type: class
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction)
        - object (class
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
(AggKV(k#684,v#685),mode=Partial,isDistinct=false))
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: scala.collection.mutable.ArrayBuffer, name: array,
type: class [Ljava.lang.Object;)
        - object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer((AggKV(k#684,v#685),mode=Partial,isDistinct=false)))
        - field (class:
org.apache.spark.sql.execution.aggregate.SortBasedAggregate, name:
nonCompleteAggregateExpressions, type: interface scala.collection.Seq)
        - object (class
org.apache.spark.sql.execution.aggregate.SortBasedAggregate,
SortBasedAggregate(key=[k#684],
functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)],
output=[k#684,value#731])
+- ConvertToSafe
   +- Sort [k#684 ASC], false, 0
      +- Project [k#684,v#685,k#684]
         +- Scan ExistingRDD[k#684,v#685]
)
        - field (class: org.apache.spark.sql.execution.ConvertToUnsafe,
name: child, type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.ConvertToUnsafe,
ConvertToUnsafe
+- SortBasedAggregate(key=[k#684],
functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)],
output=[k#684,value#731])
   +- ConvertToSafe
      +- Sort [k#684 ASC], false, 0
         +- Project [k#684,v#685,k#684]
            +- Scan ExistingRDD[k#684,v#685]
)
        - field (class:
org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, name: $outer,
type: class org.apache.spark.sql.execution.ConvertToUnsafe)
        - object (class
org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, <function1>)
        at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 134 more

Reply via email to