Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21611#discussion_r198342219
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
    @@ -148,6 +148,79 @@ object VeryComplexResultAgg extends Aggregator[Row, 
String, ComplexAggData] {
     }
     
     
    +case class OptionBooleanData(name: String, isGood: Option[Boolean])
    +case class OptionBooleanIntData(name: String, isGood: Option[(Boolean, 
Int)])
    +
    +case class OptionBooleanAggregator(colName: String)
    +    extends Aggregator[Row, Option[Boolean], Option[Boolean]] {
    +
    +  override def zero: Option[Boolean] = None
    +
    +  override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] 
= {
    +    val index = row.fieldIndex(colName)
    +    val value = if (row.isNullAt(index)) {
    +      Option.empty[Boolean]
    +    } else {
    +      Some(row.getBoolean(index))
    +    }
    +    merge(buffer, value)
    +  }
    +
    +  override def merge(b1: Option[Boolean], b2: Option[Boolean]): 
Option[Boolean] = {
    +    if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
    +      Some(true)
    +    } else if (b1.isDefined) {
    +      b1
    +    } else {
    +      b2
    +    }
    +  }
    +
    +  override def finish(reduction: Option[Boolean]): Option[Boolean] = 
reduction
    +
    +  override def bufferEncoder: Encoder[Option[Boolean]] = 
OptionalBoolEncoder
    +  override def outputEncoder: Encoder[Option[Boolean]] = 
OptionalBoolEncoder
    +
    +  def OptionalBoolEncoder: Encoder[Option[Boolean]] = ExpressionEncoder()
    +}
    +
    +case class OptionBooleanIntAggregator(colName: String)
    +    extends Aggregator[Row, Option[(Boolean, Int)], Option[(Boolean, 
Int)]] {
    +
    +  override def zero: Option[(Boolean, Int)] = None
    +
    +  override def reduce(buffer: Option[(Boolean, Int)], row: Row): 
Option[(Boolean, Int)] = {
    +    val index = row.fieldIndex(colName)
    +    val value = if (row.isNullAt(index)) {
    +      Option.empty[(Boolean, Int)]
    +    } else {
    +      val nestedRow = row.getStruct(index)
    +      Some((nestedRow.getBoolean(0), nestedRow.getInt(1)))
    +    }
    +    merge(buffer, value)
    +  }
    +
    +  override def merge(
    +      b1: Option[(Boolean, Int)],
    +      b2: Option[(Boolean, Int)]): Option[(Boolean, Int)] = {
    +    if ((b1.isDefined && b1.get._1) || (b2.isDefined && b2.get._1)) {
    +      val newInt = b1.map(_._2).getOrElse(0) + b2.map(_._2).getOrElse(0)
    +      Some((true, newInt))
    +    } else if (b1.isDefined) {
    +      b1
    +    } else {
    +      b2
    +    }
    +  }
    +
    +  override def finish(reduction: Option[(Boolean, Int)]): Option[(Boolean, 
Int)] = reduction
    +
    +  override def bufferEncoder: Encoder[Option[(Boolean, Int)]] = 
OptionalBoolIntEncoder
    +  override def outputEncoder: Encoder[Option[(Boolean, Int)]] = 
OptionalBoolIntEncoder
    +
    +  def OptionalBoolIntEncoder: Encoder[Option[(Boolean, Int)]] = 
ExpressionEncoder(topLevel = false)
    --- End diff --
    
    Top-level encoder for option of product is forbidden now. If you want to 
manually create an encoder like this case, you need an API like this to allow 
the creation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to