I've downloaded a nightly build of Spark 2.0 (from today 4/23) and was
attempting to create an aggregator that will create a Seq[Rows], or
specifically a Seq[Class1], my custom class.

When I attempt to run the following code in a spark-shell, it errors out:

Gist: https://gist.github.com/dondrake/be6b92aff71433e9fb627b478b78b839

Code:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder,Row}
import org.apache.spark.sql.functions._
import java.util.Calendar

case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5:
Double)
val teams = sc.parallelize(Seq(
  C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21),
  C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88),
  C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"),
4322.12),
  C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"),
10283.72)
  )).toDS

//
https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html
object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]]  {
  def zero: Seq[C1] = null
  def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
  def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
  def finish(r: Seq[C1]): Seq[C1] = r

  override def bufferEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
  override def outputEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
}

val g_c1 = teams.select(C1Agg.toColumn)


scala> val g_c1 = teams.select(C1Agg.toColumn)
scala.ScalaReflectionException: object $line37.$read not found.
  at
scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
  at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
  at $typecreator1$1.apply(<console>:45)
  at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
  at
org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
  at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
  at
org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
  ... 52 elided

If I tweak my teams to be a DataFrame instead of a DataSet, and leave
everything else the same, I get a different error:

scala> val g_c1 = teams.select(C1Agg.toColumn)
org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate
[(C1Agg(unknown),mode=Complete,isDistinct=false) AS
c1agg(staticinvoke(class scala.collection.mutable.WrappedArray$,
ObjectType(interface scala.collection.Seq), make,
mapobjects(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27,
StructField(f1,StringType,true), StructField(f2,StringType,true),
StructField(f3,StringType,true), StructField(f4,DateType,true),
StructField(f5,DoubleType,false)), if
(isnull(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27,
StructField(f1,StringType,true), StructField(f2,StringType,true),
StructField(f3,StringType,true), StructField(f4,DateType,true),
StructField(f5,DoubleType,false)))) null else newInstance(class C1),
upcast(value, ArrayType(StructType(StructField(f1,StringType,true),
StructField(f2,StringType,true), StructField(f3,StringType,true),
StructField(f4,DateType,true), StructField(f5,DoubleType,false)),true), -
root class: "scala.collection.Seq")).array, true))#63];
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:289)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54)
  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61)
  at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2443)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:935)
  ... 52 elided

I'm not sure how to diagnose those errors.  Thoughts?

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143

Reply via email to