John Conwell created SPARK-24569:
------------------------------------

             Summary: Spark Aggregator with output type Option[Boolean] creates 
column of type Row
                 Key: SPARK-24569
                 URL: https://issues.apache.org/jira/browse/SPARK-24569
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.1
         Environment: OSX
            Reporter: John Conwell


Spark SQL Aggregator that returns an output column of Option[Boolean] creates a 
column of type 
StructField(<col_name>,StructType(StructField(value,BooleanType,true)),true) 
instead of StructField(<col_name>,BooleanType,true).  

In other words it puts a Row instance into the new column

 

Reproduction

 
{code:java}
class OptionBooleanAggregatorTest extends BaseFreeSpec {

  val ss: SparkSession = getSparkSession

  "test option" in {
    import ss.implicits._

    val df = List(
      Thing("bob", Some(true)),
      Thing("bob", Some(false)),
      Thing("bob", None))
      .toDF()

    val group = df
      .groupBy("name")
      .agg(OptionBooleanAggregator("isGood").toColumn.alias("isGood"))
      .cache()

    assert(group.schema("name").dataType == StringType)

    //this will fail
    assert(group.schema("isGood").dataType == BooleanType)
  }
}

case class Thing(name: String, isGood: Option[Boolean])

case class OptionBooleanAggregator(colName: String) extends Aggregator[Row, 
Option[Boolean], Option[Boolean]] {

  override def zero: Option[Boolean] = Option.empty[Boolean]

  override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = {
    val index = row.fieldIndex(colName)
    val value = if (row.isNullAt(index))
      Option.empty[Boolean]
    else
      Some(row.getBoolean(index))
    merge(buffer, value)
  }

  override def merge(b1: Option[Boolean], b2: Option[Boolean]): Option[Boolean] 
= {
    if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
      Some(true)
    }
    else if (b1.isDefined) {
      b1
    }
    else
      b2
  }

  override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction
  override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
  override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder

  def OptionalBoolEncoder: org.apache.spark.sql.Encoder[Option[Boolean]] = 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder()
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to