[ https://issues.apache.org/jira/browse/SPARK-24569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520042#comment-16520042 ]
Apache Spark commented on SPARK-24569: -------------------------------------- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/21611 > Spark Aggregator with output type Option[Boolean] creates column of type Row > ---------------------------------------------------------------------------- > > Key: SPARK-24569 > URL: https://issues.apache.org/jira/browse/SPARK-24569 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.1 > Environment: OSX > Reporter: John Conwell > Priority: Major > > Spark SQL Aggregator that returns an output column of Option[Boolean] creates > a column of type > StructField(<col_name>,StructType(StructField(value,BooleanType,true)),true) > instead of StructField(<col_name>,BooleanType,true). > In other words it puts a Row instance into the new column > > Reproduction > > {code:java} > class OptionBooleanAggregatorTest extends BaseFreeSpec { > val ss: SparkSession = getSparkSession > "test option" in { > import ss.implicits._ > val df = List( > Thing("bob", Some(true)), > Thing("bob", Some(false)), > Thing("bob", None)) > .toDF() > val group = df > .groupBy("name") > .agg(OptionBooleanAggregator("isGood").toColumn.alias("isGood")) > .cache() > assert(group.schema("name").dataType == StringType) > //this will fail > assert(group.schema("isGood").dataType == BooleanType) > } > } > case class Thing(name: String, isGood: Option[Boolean]) > case class OptionBooleanAggregator(colName: String) extends Aggregator[Row, > Option[Boolean], Option[Boolean]] { > override def zero: Option[Boolean] = Option.empty[Boolean] > override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = { > val index = row.fieldIndex(colName) > val value = if (row.isNullAt(index)) > Option.empty[Boolean] > else > Some(row.getBoolean(index)) > merge(buffer, value) > } > override def merge(b1: Option[Boolean], b2: Option[Boolean]): > Option[Boolean] = { > if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) { > Some(true) > } > else if (b1.isDefined) { > b1 > } > else > b2 > } > override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction > override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder > override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder > def OptionalBoolEncoder: org.apache.spark.sql.Encoder[Option[Boolean]] = > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder() > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org