John Conwell created SPARK-24569: ------------------------------------ Summary: Spark Aggregator with output type Option[Boolean] creates column of type Row Key: SPARK-24569 URL: https://issues.apache.org/jira/browse/SPARK-24569 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1 Environment: OSX Reporter: John Conwell
Spark SQL Aggregator that returns an output column of Option[Boolean] creates a column of type StructField(<col_name>,StructType(StructField(value,BooleanType,true)),true) instead of StructField(<col_name>,BooleanType,true). In other words it puts a Row instance into the new column Reproduction {code:java} class OptionBooleanAggregatorTest extends BaseFreeSpec { val ss: SparkSession = getSparkSession "test option" in { import ss.implicits._ val df = List( Thing("bob", Some(true)), Thing("bob", Some(false)), Thing("bob", None)) .toDF() val group = df .groupBy("name") .agg(OptionBooleanAggregator("isGood").toColumn.alias("isGood")) .cache() assert(group.schema("name").dataType == StringType) //this will fail assert(group.schema("isGood").dataType == BooleanType) } } case class Thing(name: String, isGood: Option[Boolean]) case class OptionBooleanAggregator(colName: String) extends Aggregator[Row, Option[Boolean], Option[Boolean]] { override def zero: Option[Boolean] = Option.empty[Boolean] override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = { val index = row.fieldIndex(colName) val value = if (row.isNullAt(index)) Option.empty[Boolean] else Some(row.getBoolean(index)) merge(buffer, value) } override def merge(b1: Option[Boolean], b2: Option[Boolean]): Option[Boolean] = { if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) { Some(true) } else if (b1.isDefined) { b1 } else b2 } override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder def OptionalBoolEncoder: org.apache.spark.sql.Encoder[Option[Boolean]] = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder() } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org