[ https://issues.apache.org/jira/browse/SPARK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-32159: ------------------------------------ Assignee: (was: Apache Spark) > New udaf(Aggregator) has an integration bug with UnresolvedMapObjects > serialization > ----------------------------------------------------------------------------------- > > Key: SPARK-32159 > URL: https://issues.apache.org/jira/browse/SPARK-32159 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.0 > Reporter: Erik Erlandson > Priority: Major > > The new user defined aggregator feature (SPARK-27296) based on calling > 'functions.udaf(aggregator)' works fine when the aggregator input type is > atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an > array, like 'Aggregator[Array[Double], _, _]', it is tripping over the > following: > /** > * When constructing [[MapObjects]], the element type must be given, which > may not be available > * before analysis. This class acts like a placeholder for [[MapObjects]], > and will be replaced by > * [[MapObjects]] during analysis after the input data is resolved. > * Note that, ideally we should not serialize and send unresolved expressions > to executors, but > * users may accidentally do this(e.g. mistakenly reference an encoder > instance when implementing > * Aggregator). Here we mark `function` as transient because it may reference > scala Type, which is > * not serializable. Then even users mistakenly reference unresolved > expression and serialize it, > * it's just a performance issue(more network traffic), and will not fail. > */ > case class UnresolvedMapObjects( > {color:#de350b}@transient function: Expression => Expression{color}, > child: Expression, > customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with > Unevaluable { > override lazy val resolved = false > override def dataType: DataType = > customCollectionCls.map(ObjectType.apply).getOrElse > { throw new UnsupportedOperationException("not resolved") } > } > > *The '@transient' is causing the function to be unpacked as 'null' over on > the executors, and it is causing a null-pointer exception here, when it tries > to do 'function(loopVar)'* > object MapObjects { > def apply( > function: Expression => Expression, > inputData: Expression, > elementType: DataType, > elementNullable: Boolean = true, > customCollectionCls: Option[Class[_]] = None): MapObjects = > { val loopVar = LambdaVariable("MapObject", elementType, elementNullable) > MapObjects(loopVar, {color:#de350b}function(loopVar){color}, inputData, > customCollectionCls) } > } > *I believe it may be possible to just use 'loopVar' instead of > 'function(loopVar)', whenever 'function' is null, but need second opinion > from catalyst developers on what a robust fix should be* -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org