Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7788#discussion_r35949349
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
    @@ -632,3 +634,123 @@ private[hive] case class HiveUDAFFunction(
       }
     }
     
    +private[hive] case class HiveUdaf2(
    +    funcWrapper: HiveFunctionWrapper,
    +    children: Seq[Expression],
    +    isUDAF: Boolean) extends AggregateFunction2 with HiveInspectors {
    +  type UDFType = AbstractGenericUDAFResolver
    +
    +  protected def createEvaluator = resolver.getEvaluator(
    +    new SimpleGenericUDAFParameterInfo(inspectors, false, false))
    +
    +  // Hive UDAF evaluator
    +  @transient
    +  lazy val evaluator = createEvaluator
    +
    +  @transient
    +  protected lazy val resolver: AbstractGenericUDAFResolver = if (isUDAF) {
    +    // if it's UDAF, we need the UDAF bridge
    +    new GenericUDAFBridge(funcWrapper.createFunction())
    +  } else {
    +    funcWrapper.createFunction()
    +  }
    +
    +  // Output data object inspector, associated with the result of Hive UDAF.
    +  @transient
    +  lazy val objectInspector = 
createEvaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +
    +  // Partial Evaluator of Hive UDAF, which will be used when converting 
the Hive Aggregate Buffer
    +  // to Hive Object, right before the shuffling start.
    +  @transient
    +  lazy val partialEvaluator = createEvaluator
    +
    +  // The Aggregation Buffer Inspector, associated with the partial result 
of Hive UDAF.
    +  @transient
    +  lazy val bufferObjectInspector = {
    +    createEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inspectors)
    +  }
    +
    +  // Input arguments object inspectors
    +  @transient
    +  lazy val inspectors = children.map(toInspector).toArray
    +
    +  override def toString: String =
    +    
s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
    +
    +  // Aggregation Buffer Data Type, We assume only 1 element for the Hive 
Aggregation Buffer
    +  // It will be StructType if more than 1 element (Actually will be 
StructSettableObjectInspector)
    +  @transient
    +  lazy val bufferDataType = inspectorToDataType(bufferObjectInspector)
    +  override def bufferAttributes: Seq[AttributeReference] =
    +    AttributeReference("soi", bufferDataType)() :: Nil
    +
    +  // Output data type
    +  override def dataType: DataType = inspectorToDataType(objectInspector)
    +
    +  override def initialize(m: AggregateMode, buffer: MutableRow): Unit = {
    +    m match {
    +      case Final => evaluator.init(GenericUDAFEvaluator.Mode.FINAL, 
Array(bufferObjectInspector))
    +      case Complete => evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, 
inspectors)
    +      case Partial => evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, 
inspectors)
    +      case x => throw new IllegalArgumentException(s"We don't support $x 
yet.")
    --- End diff --
    
    PartialMerge will be GenericUDAFEvaluator.Mode.PARTIAL2, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to