[ 
https://issues.apache.org/jira/browse/SPARK-24569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520042#comment-16520042
 ] 

Apache Spark commented on SPARK-24569:
--------------------------------------

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/21611

> Spark Aggregator with output type Option[Boolean] creates column of type Row
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-24569
>                 URL: https://issues.apache.org/jira/browse/SPARK-24569
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1
>         Environment: OSX
>            Reporter: John Conwell
>            Priority: Major
>
> Spark SQL Aggregator that returns an output column of Option[Boolean] creates 
> a column of type 
> StructField(<col_name>,StructType(StructField(value,BooleanType,true)),true) 
> instead of StructField(<col_name>,BooleanType,true).  
> In other words it puts a Row instance into the new column
>  
> Reproduction
>  
> {code:java}
> class OptionBooleanAggregatorTest extends BaseFreeSpec {
>   val ss: SparkSession = getSparkSession
>   "test option" in {
>     import ss.implicits._
>     val df = List(
>       Thing("bob", Some(true)),
>       Thing("bob", Some(false)),
>       Thing("bob", None))
>       .toDF()
>     val group = df
>       .groupBy("name")
>       .agg(OptionBooleanAggregator("isGood").toColumn.alias("isGood"))
>       .cache()
>     assert(group.schema("name").dataType == StringType)
>     //this will fail
>     assert(group.schema("isGood").dataType == BooleanType)
>   }
> }
> case class Thing(name: String, isGood: Option[Boolean])
> case class OptionBooleanAggregator(colName: String) extends Aggregator[Row, 
> Option[Boolean], Option[Boolean]] {
>   override def zero: Option[Boolean] = Option.empty[Boolean]
>   override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = {
>     val index = row.fieldIndex(colName)
>     val value = if (row.isNullAt(index))
>       Option.empty[Boolean]
>     else
>       Some(row.getBoolean(index))
>     merge(buffer, value)
>   }
>   override def merge(b1: Option[Boolean], b2: Option[Boolean]): 
> Option[Boolean] = {
>     if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
>       Some(true)
>     }
>     else if (b1.isDefined) {
>       b1
>     }
>     else
>       b2
>   }
>   override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction
>   override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
>   override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
>   def OptionalBoolEncoder: org.apache.spark.sql.Encoder[Option[Boolean]] = 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder()
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to