On Sat, Apr 23, 2016 at 8:56 AM, Michael Armbrust <mich...@databricks.com> wrote:
> Have you looked at aggregators? > > > https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html > Thanks for the pointer to aggregators. I wasn't yet aware of them. However, I still get similar errors when attempting to go the Dataset route. Here is my attempt at an aggregator class for the example above. import org.apache.spark.sql.expressions.Aggregator case class KeyVal(k: Int, v: Int) val keyValsDs = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j)).toDS class AggKV extends Aggregator[KeyVal, List[KeyVal], List[KeyVal]] with Serializable { override def zero: List[KeyVal] = List() override def reduce(b: List[KeyVal], a: KeyVal): List[KeyVal] = b :+ a override def finish(reduction: List[KeyVal]): List[KeyVal] = reduction override def merge(b1: List[KeyVal], b2: List[KeyVal]): List[KeyVal] = b1 ++ b2 } The following shows production of the correct schema keyValsDs.groupBy($"k").agg(new AggKV().toColumn) org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, List[KeyVal])] = [_1: struct<k:int>, _2: struct<value:array<struct<k:int,v:int>>>] Actual execution fails with Task not serializable. Am I missing something or is this just not possible without dropping into RDDs? scala> keyValsDs.groupBy($"k").agg(new AggKV().toColumn).show org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: SortBasedAggregate(key=[k#684], functions=[(AggKV(k#684,v#685),mode=Final,isDistinct=false)], output=[key#739,AggKV(k,v)#738]) +- ConvertToSafe +- Sort [k#684 ASC], false, 0 +- TungstenExchange hashpartitioning(k#684,200), None +- ConvertToUnsafe +- SortBasedAggregate(key=[k#684], functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)], output=[k#684,value#731]) +- ConvertToSafe +- Sort [k#684 ASC], false, 0 +- Project [k#684,v#685,k#684] +- Scan ExistingRDD[k#684,v#685] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) at org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.Dataset.show(Dataset.scala:228) at org.apache.spark.sql.Dataset.show(Dataset.scala:192) at org.apache.spark.sql.Dataset.show(Dataset.scala:200) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:71) at $iwC$$iwC$$iwC.<init>(<console>:73) at $iwC$$iwC.<init>(<console>:75) at $iwC.<init>(<console>:77) at <init>(<console>:79) at .<init>(<console>:83) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at scala_maven_executions.MainHelper.runMain(MainHelper.java:164) at scala_maven_executions.JavaMainCallerInProcess.runInternal(JavaMainCallerInProcess.java:94) at scala_maven_executions.JavaMainCallerInProcess.run(JavaMainCallerInProcess.java:54) at scala_maven_executions.JavaMainCallerSupport.run(JavaMainCallerSupport.java:110) at scala_maven.ScalaConsoleMojo.doExecute(ScalaConsoleMojo.java:102) at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286) at org.apache.maven.cli.MavenCli.main(MavenCli.java:197) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: TungstenExchange hashpartitioning(k#684,200), None +- ConvertToUnsafe +- SortBasedAggregate(key=[k#684], functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)], output=[k#684,value#731]) +- ConvertToSafe +- Sort [k#684 ASC], false, 0 +- Project [k#684,v#685,k#684] +- Scan ExistingRDD[k#684,v#685] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Sort.doExecute(Sort.scala:64) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:72) at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:69) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 98 more Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) at org.apache.spark.sql.execution.ConvertToUnsafe.doExecute(rowFormatConverters.scala:38) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:164) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 116 more Caused by: java.io.NotSerializableException: scala.reflect.internal.Mirrors$Roots$RootClass$ Serialization stack: - object not serializable (class: scala.reflect.internal.Mirrors$Roots$RootClass$, value: package <root>) - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object (class scala.reflect.internal.Types$UniqueThisType, <root>) - field (class: scala.reflect.internal.Types$SingleType, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$UniqueSingleType, $line142.type) - field (class: scala.reflect.internal.Types$SingleType, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$UniqueSingleType, type) - field (class: scala.reflect.internal.Types$SingleType, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$UniqueSingleType, INSTANCE.type) - field (class: scala.reflect.internal.Types$SingleType, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$UniqueSingleType, $iwC.type) - field (class: scala.reflect.internal.Types$SingleType, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$UniqueSingleType, $iwC.$iwC.type) - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$TypeRef$$anon$6, KeyVal) - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, name: elementType$2, type: class scala.reflect.api.Types$TypeApi) - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, <function1>) - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(<function1>,input[0, ArrayType(StructType(StructField(k,IntegerType,false), StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false))) - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(<function1>,input[0, ArrayType(StructType(StructField(k,IntegerType,false), StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false)),array,ObjectType(class [Ljava.lang.Object;))) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(<function1>,input[0, ArrayType(StructType(StructField(k,IntegerType,false), StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false)),array,ObjectType(class [Ljava.lang.Object;)))) - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class scala.collection.mutable.WrappedArray$,ObjectType(interface scala.collection.Seq),make,invoke(mapobjects(<function1>,input[0, ArrayType(StructType(StructField(k,IntegerType,false), StructField(v,IntegerType,false)),true)],StructField(k,IntegerType,false),StructField(v,IntegerType,false)),array,ObjectType(class [Ljava.lang.Object;)),true)) - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: fromRowExpression, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[]) - field (class: org.apache.spark.sql.execution.aggregate.TypedAggregateExpression, name: bEncoder, type: class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object (class org.apache.spark.sql.execution.aggregate.TypedAggregateExpression, AggKV(k#684,v#685)) - field (class: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, name: aggregateFunction, type: class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) - object (class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, (AggKV(k#684,v#685),mode=Partial,isDistinct=false)) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer((AggKV(k#684,v#685),mode=Partial,isDistinct=false))) - field (class: org.apache.spark.sql.execution.aggregate.SortBasedAggregate, name: nonCompleteAggregateExpressions, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.execution.aggregate.SortBasedAggregate, SortBasedAggregate(key=[k#684], functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)], output=[k#684,value#731]) +- ConvertToSafe +- Sort [k#684 ASC], false, 0 +- Project [k#684,v#685,k#684] +- Scan ExistingRDD[k#684,v#685] ) - field (class: org.apache.spark.sql.execution.ConvertToUnsafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan) - object (class org.apache.spark.sql.execution.ConvertToUnsafe, ConvertToUnsafe +- SortBasedAggregate(key=[k#684], functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)], output=[k#684,value#731]) +- ConvertToSafe +- Sort [k#684 ASC], false, 0 +- Project [k#684,v#685,k#684] +- Scan ExistingRDD[k#684,v#685] ) - field (class: org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, name: $outer, type: class org.apache.spark.sql.execution.ConvertToUnsafe) - object (class org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 134 more