    --- Diff: mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala ---
    @@ -0,0 +1,633 @@
    +import org.apache.spark.annotation.Since
    +import org.apache.spark.internal.Logging
    +import{Vector, Vectors, VectorUDT}
    +import org.apache.spark.sql.Column
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.{Expression, 
Complete, TypedImperativeAggregate}
    +import org.apache.spark.sql.catalyst.util.ArrayData
    +import org.apache.spark.sql.functions.lit
    +import org.apache.spark.sql.types._
    + * A builder object that provides summary statistics about a given column.
    + *
    + * Users should not directly create such builders, but instead use one of 
the methods in
    + * [[Summarizer]].
    + */
    +abstract class SummaryBuilder {
    +  /**
    +   * Returns an aggregate object that contains the summary of the column 
with the requested metrics.
    +   * @param featuresCol a column that contains features Vector object.
    +   * @param weightCol a column that contains weight value.
    +   * @return an aggregate column that contains the statistics. The exact 
content of this
    +   *         structure is determined during the creation of the builder.
    +   */
    +  @Since("2.2.0")
    +  def summary(featuresCol: Column, weightCol: Column): Column
    +  @Since("2.2.0")
    +  def summary(featuresCol: Column): Column = summary(featuresCol, lit(1.0))
    + * Tools for vectorized statistics on MLlib Vectors.
    + *
    + * The methods in this package provide various statistics for Vectors 
contained inside DataFrames.
    + *
    + * This class lets users pick the statistics they would like to extract 
for a given column. Here is
    + * an example in Scala:
    + * {{{
    + *   val dataframe = ... // Some dataframe containing a feature column
    + *   val allStats ="min", 
    + *   val Row(min_, max_) = allStats.first()
    + * }}}
    + *
    + * If one wants to get a single metric, shortcuts are also available:
    + * {{{
    + *   val meanDF =$"features"))
    + *   val Row(mean_) = meanDF.first()
    + * }}}
    + */
    +object Summarizer extends Logging {
    +  import SummaryBuilderImpl._
    +  /**
    +   * Given a list of metrics, provides a builder that it turns computes 
metrics from a column.
    +   *
    +   * See the documentation of [[Summarizer]] for an example.
    +   *
    +   * The following metrics are accepted (case sensitive):
    +   *  - mean: a vector that contains the coefficient-wise mean.
    +   *  - variance: a vector tha contains the coefficient-wise variance.
    +   *  - count: the count of all vectors seen.
    +   *  - numNonzeros: a vector with the number of non-zeros for each 
    +   *  - max: the maximum for each coefficient.
    +   *  - min: the minimum for each coefficient.
    +   *  - normL2: the Euclidian norm for each coefficient.
    +   *  - normL1: the L1 norm of each coefficient (sum of the absolute 
    +   * @param firstMetric the metric being provided
    +   * @param metrics additional metrics that can be provided.
    +   * @return a builder.
    +   * @throws IllegalArgumentException if one of the metric names is not 
    +   */
    +  @Since("2.2.0")
    +  def metrics(firstMetric: String, metrics: String*): SummaryBuilder = {
    +    val (typedMetrics, computeMetrics) = 
getRelevantMetrics(Seq(firstMetric) ++ metrics)
    +    new SummaryBuilderImpl(typedMetrics, computeMetrics)
    +  }
    +  def mean(col: Column): Column = getSingleMetric(col, "mean")
    +  def variance(col: Column): Column = getSingleMetric(col, "variance")
    +  def count(col: Column): Column = getSingleMetric(col, "count")
    +  def numNonZeros(col: Column): Column = getSingleMetric(col, 
    +  def max(col: Column): Column = getSingleMetric(col, "max")
    +  def min(col: Column): Column = getSingleMetric(col, "min")
    +  def normL1(col: Column): Column = getSingleMetric(col, "normL1")
    +  def normL2(col: Column): Column = getSingleMetric(col, "normL2")
    +  private def getSingleMetric(col: Column, metric: String): Column = {
    +    val c1 = metrics(metric).summary(col)
    +    c1.getField(metric).as(s"$metric($col)")
    +  }
    +private[ml] class SummaryBuilderImpl(
    +    requestedMetrics: Seq[SummaryBuilderImpl.Metrics],
    +    requestedCompMetrics: Seq[SummaryBuilderImpl.ComputeMetrics]
    +  ) extends SummaryBuilder {
    +  override def summary(featuresCol: Column, weightCol: Column): Column = {
    +    val agg = SummaryBuilderImpl.MetricsAggregate(
    +      requestedMetrics,
    +      requestedCompMetrics,
    +      featuresCol.expr,
    +      weightCol.expr,
    +      mutableAggBufferOffset = 0,
    +      inputAggBufferOffset = 0)
    +    new Column(AggregateExpression(agg, mode = Complete, isDistinct = 
    +  }
    +object SummaryBuilderImpl extends Logging {
    +  def implementedMetrics: Seq[String] =
    +  @throws[IllegalArgumentException]("When the list is empty or not a 
subset of known metrics")
    +  def getRelevantMetrics(requested: Seq[String]): (Seq[Metrics], 
Seq[ComputeMetrics]) = {
    +    val all = { req =>
    +      val (_, metric, _, deps) = allMetrics.find(tup => tup._1 == 
req).getOrElse {
    +        throw new IllegalArgumentException(s"Metric $req cannot be found." 
    +          s" Valid metrics are $implementedMetrics")
    +      }
    +      metric -> deps
    +    }
    +    // Do not sort, otherwise the user has to look the schema to see the 
order that it
    +    // is going to be given in.
    +    val metrics =
    +    val computeMetrics = all.flatMap(_._2).distinct.sortBy(_.toString)
    +    metrics -> computeMetrics
    +  }
    +  def structureForMetrics(metrics: Seq[Metrics]): StructType = {
    +    val dct = { case (n, m, dt, _) => (m, (n, dt)) }.toMap
    +    val fields = { case (n, dt) =>
    +        StructField(n, dt, nullable = false)
    +    }
    +    StructType(fields)
    +  }
    +  private val arrayDType = ArrayType(DoubleType, containsNull = false)
    +  private val arrayLType = ArrayType(LongType, containsNull = false)
    +  /**
    +   * All the metrics that can be currently computed by Spark for vectors.
    +   *
    +   * This list associates the user name, the internal (typed) name, and 
the list of computation
    +   * metrics that need to de computed internally to get the final result.
    +   */
    +  private val allMetrics: Seq[(String, Metrics, DataType, 
Seq[ComputeMetrics])] = Seq(
    +    ("mean", Mean, arrayDType, Seq(ComputeMean, ComputeWeightSum)),
    +    ("variance", Variance, arrayDType, Seq(ComputeWeightSum, ComputeMean, 
    +    ("count", Count, LongType, Seq()),
    +    ("numNonZeros", NumNonZeros, arrayLType, Seq(ComputeNNZ)),
    +    ("max", Max, arrayDType, Seq(ComputeMax, ComputeNNZ)),
    +    ("min", Min, arrayDType, Seq(ComputeMin, ComputeNNZ)),
    +    ("normL2", NormL2, arrayDType, Seq(ComputeM2)),
    +    ("normL1", NormL1, arrayDType, Seq(ComputeL1))
    +  )
    +  /**
    +   * The metrics that are currently implemented.
    +   */
    +  sealed trait Metrics extends Serializable
    +  case object Mean extends Metrics
    +  case object Variance extends Metrics
    +  case object Count extends Metrics
    +  case object NumNonZeros extends Metrics
    +  case object Max extends Metrics
    +  case object Min extends Metrics
    +  case object NormL2 extends Metrics
    +  case object NormL1 extends Metrics
    +  /**
    +   * The running metrics that are going to be computed.
    +   *
    +   * There is a bipartite graph between the metrics and the computed 
    +   */
    +  sealed trait ComputeMetrics extends Serializable
    +  case object ComputeMean extends ComputeMetrics
    +  case object ComputeM2n extends ComputeMetrics
    +  case object ComputeM2 extends ComputeMetrics
    +  case object ComputeL1 extends ComputeMetrics
    +  case object ComputeWeightSum extends ComputeMetrics
    +  case object ComputeNNZ extends ComputeMetrics
    +  case object ComputeMax extends ComputeMetrics
    +  case object ComputeMin extends ComputeMetrics
    +  class SummarizerBuffer(
    +      requestedMetrics: Seq[Metrics],
    +      requestedCompMetrics: Seq[ComputeMetrics]
    +  ) extends Serializable {
    +    private var n = 0
    +    private var currMean: Array[Double] = null
    +    private var currM2n: Array[Double] = null
    +    private var currM2: Array[Double] = null
    +    private var currL1: Array[Double] = null
    +    private var totalCnt: Long = 0
    +    private var totalWeightSum: Double = 0.0
    +    private var weightSquareSum: Double = 0.0
    +    private var weightSum: Array[Double] = null
    +    private var nnz: Array[Long] = null
    +    private var currMax: Array[Double] = null
    +    private var currMin: Array[Double] = null
    +    def this() {
    +      this(
    +        Seq(Mean, Variance, Count, NumNonZeros, Max, Min, NormL2, NormL1),
    +        Seq(ComputeMean, ComputeM2n, ComputeM2, ComputeL1,
    +          ComputeWeightSum, ComputeNNZ, ComputeMax, ComputeMin)
    +      )
    +    }
    +    /**
    +     * Add a new sample to this summarizer, and update the statistical 
    +     */
    +    def addRaw(instance: TraversableIndexedSeq, weight: Double): this.type 
= {
    +      require(weight >= 0.0, s"sample weight, ${weight} has to be >= 0.0")
    +      if (weight == 0.0) return this
    +      if (n == 0) {
    +        require(instance.size > 0, s"Vector should have dimension larger 
than zero.")
    +        n = instance.size
    +        if (requestedCompMetrics.contains(ComputeMean)) { currMean = 
Array.ofDim[Double](n) }
    +        if (requestedCompMetrics.contains(ComputeM2n)) { currM2n = 
Array.ofDim[Double](n) }
    +        if (requestedCompMetrics.contains(ComputeM2)) { currM2 = 
Array.ofDim[Double](n) }
    +        if (requestedCompMetrics.contains(ComputeL1)) { currL1 = 
Array.ofDim[Double](n) }
    +        if (requestedCompMetrics.contains(ComputeWeightSum)) { weightSum = 
Array.ofDim[Double](n) }
    +        if (requestedCompMetrics.contains(ComputeNNZ)) { nnz = 
Array.ofDim[Long](n) }
    +        if (requestedCompMetrics.contains(ComputeMax)) {
    +          currMax = Array.fill[Double](n)(Double.MinValue)
    +        }
    +        if (requestedCompMetrics.contains(ComputeMin)) {
    +          currMin = Array.fill[Double](n)(Double.MaxValue)
    +        }
    +      }
    +      require(n == instance.size, s"Dimensions mismatch when adding new 
sample." +
    +        s" Expecting $n but got ${instance.size}.")
    +      val localCurrMean = currMean
    +      val localCurrM2n = currM2n
    +      val localCurrM2 = currM2
    +      val localCurrL1 = currL1
    +      val localWeightSum = weightSum
    +      val localNumNonzeros = nnz
    +      val localCurrMax = currMax
    +      val localCurrMin = currMin
    +      instance.foreachActive { (index, value) =>
    +        if (value != 0.0) {
    +          if (localCurrMax != null && localCurrMax(index) < value) {
    +            localCurrMax(index) = value
    +          }
    +          if (localCurrMin != null && localCurrMin(index) > value) {
    +            localCurrMin(index) = value
    +          }
    +          if (localWeightSum != null) {
    +            if (localCurrMean != null) {
    +              val prevMean = localCurrMean(index)
    +              val diff = value - prevMean
    +              localCurrMean(index) = prevMean + weight * diff / 
(localWeightSum(index) + weight)
    +              if (localCurrM2n != null) {
    +                localCurrM2n(index) += weight * (value - 
localCurrMean(index)) * diff
    +              }
    +            }
    +            localWeightSum(index) += weight
    +          }
    +          if (localCurrM2 != null) {
    +            localCurrM2(index) += weight * value * value
    +          }
    +          if (localCurrL1 != null) {
    +            localCurrL1(index) += weight * math.abs(value)
    +          }
    +          if (localNumNonzeros != null) {
    +            localNumNonzeros(index) += 1
    +          }
    +        }
    +      }
    +      totalWeightSum += weight
    +      weightSquareSum += weight * weight
    +      totalCnt += 1
    +      this
    +    }
    +    def addRaw(instance: TraversableIndexedSeq): this.type = 
addRaw(instance, 1.0)
    +    // For test
    +    def add(sample: Vector, weight: Double): this.type = {
    +      val v = new TraversableIndexedSeq {
    +        override def size: Int = sample.size
    +        override def foreachActive(f: (Int, Double) => Unit): Unit = {
    +          sample.foreachActive(f)
    +        }
    +      }
    +      addRaw(v, weight)
    +    }
    +    def add(sample: Vector): this.type = add(sample, 1.0)
    +    /**
    +     * Merge another SummarizerBuffer, and update the statistical summary.
    +     * (Note that it's in place merging; as a result, `this` object will 
be modified.)
    +     *
    +     * @param other The other MultivariateOnlineSummarizer to be merged.
    +     */
    +    def merge(other: SummarizerBuffer): this.type = {
    +      if (this.totalWeightSum != 0.0 && other.totalWeightSum != 0.0) {
    +        require(n == other.n, s"Dimensions mismatch when merging with 
another summarizer. " +
    +          s"Expecting $n but got ${other.n}.")
    +        totalCnt += other.totalCnt
    +        totalWeightSum += other.totalWeightSum
    +        weightSquareSum += other.weightSquareSum
    +        var i = 0
    +        while (i < n) {
    +          if (weightSum != null) {
    +            val thisWeightSum = weightSum(i)
    +            val otherWeightSum = other.weightSum(i)
    +            val totalWeightSum = thisWeightSum + otherWeightSum
    +            if (totalWeightSum != 0.0) {
    +              if (currMean != null) {
    +                val deltaMean = other.currMean(i) - currMean(i)
    +                // merge mean together
    +                currMean(i) += deltaMean * otherWeightSum / totalWeightSum
    +                if (currM2n != null) {
    +                  // merge m2n together
    +                  currM2n(i) += other.currM2n(i) +
    +                    deltaMean * deltaMean * thisWeightSum * otherWeightSum 
/ totalWeightSum
    +                }
    +              }
    +            }
    +            weightSum(i) = totalWeightSum
    +          }
    +          // merge m2 together
    +          if (currM2 != null) { currM2(i) += other.currM2(i) }
    +          // merge l1 together
    +          if (currL1 != null) { currL1(i) += other.currL1(i) }
    +          // merge max and min
    +          if (currMax != null) { currMax(i) = math.max(currMax(i), 
other.currMax(i)) }
    +          if (currMin != null) { currMin(i) = math.min(currMin(i), 
other.currMin(i)) }
    +          if (nnz != null) { nnz(i) = nnz(i) + other.nnz(i) }
    +          i += 1
    +        }
    +      } else if (totalWeightSum == 0.0 && other.totalWeightSum != 0.0) {
    +        this.n = other.n
    +        if (other.currMean != null) { this.currMean = 
other.currMean.clone() }
    +        if (other.currM2n != null) { this.currM2n = other.currM2n.clone() }
    +        if (other.currM2 != null) { this.currM2 = other.currM2.clone() }
    +        if (other.currL1 != null) { this.currL1 = other.currL1.clone() }
    +        this.totalCnt = other.totalCnt
    +        this.totalWeightSum = other.totalWeightSum
    +        this.weightSquareSum = other.weightSquareSum
    +        if (other.weightSum != null) { this.weightSum = 
other.weightSum.clone() }
    +        if (other.nnz != null) { this.nnz = other.nnz.clone() }
    +        if (other.currMax != null) { this.currMax = other.currMax.clone() }
    +        if (other.currMin != null) { this.currMin = other.currMin.clone() }
    +      }
    +      this
    +    }
    +    /**
    +     * Sample mean of each dimension.
    +     */
    +    def mean: Vector = {
    +      require(requestedMetrics.contains(Mean))
    +      require(totalWeightSum > 0, s"Nothing has been added to this 
    +      val realMean = Array.ofDim[Double](n)
    +      var i = 0
    +      while (i < n) {
    +        realMean(i) = currMean(i) * (weightSum(i) / totalWeightSum)
    +        i += 1
    +      }
    +      Vectors.dense(realMean)
    +    }
    +    /**
    +     * Unbiased estimate of sample variance of each dimension.
    +     */
    +    def variance: Vector = {
    +      require(requestedMetrics.contains(Variance))
    +      require(totalWeightSum > 0, s"Nothing has been added to this 
    +      val realVariance = Array.ofDim[Double](n)
    +      val denominator = totalWeightSum - (weightSquareSum / totalWeightSum)
    +      // Sample variance is computed, if the denominator is less than 0, 
the variance is just 0.
    +      if (denominator > 0.0) {
    +        val deltaMean = currMean
    +        var i = 0
    +        val len = currM2n.length
    +        while (i < len) {
    +          realVariance(i) = (currM2n(i) + deltaMean(i) * deltaMean(i) * 
weightSum(i) *
    +            (totalWeightSum - weightSum(i)) / totalWeightSum) / denominator
    +          i += 1
    +        }
    +      }
    +      Vectors.dense(realVariance)
    +    }
    +    /**
    +     * Sample size.
    +     */
    +    def count: Long = totalCnt
    +    /**
    +     * Number of nonzero elements in each dimension.
    +     *
    +     */
    +    def numNonzeros: Vector = {
    +      require(requestedMetrics.contains(NumNonZeros))
    +      require(totalCnt > 0, s"Nothing has been added to this summarizer.")
    +      Vectors.dense(
    +    }
    +    /**
    +     * Maximum value of each dimension.
    +     */
    +    def max: Vector = {
    +      require(requestedMetrics.contains(Max))
    +      require(totalWeightSum > 0, s"Nothing has been added to this 
    +      var i = 0
    +      while (i < n) {
    +        if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0
    +        i += 1
    +      }
    +      Vectors.dense(currMax)
    +    }
    +    /**
    +     * Minimum value of each dimension.
    +     */
    +    def min: Vector = {
    +      require(requestedMetrics.contains(Min))
    +      require(totalWeightSum > 0, s"Nothing has been added to this 
    +      var i = 0
    +      while (i < n) {
    +        if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0
    +        i += 1
    +      }
    +      Vectors.dense(currMin)
    +    }
    +    /**
    +     * L2 (Euclidian) norm of each dimension.
    +     */
    +    def normL2: Vector = {
    +      require(requestedMetrics.contains(NormL2))
    +      require(totalWeightSum > 0, s"Nothing has been added to this 
    +      val realMagnitude = Array.ofDim[Double](n)
    +      var i = 0
    +      val len = currM2.length
    +      while (i < len) {
    +        realMagnitude(i) = math.sqrt(currM2(i))
    +        i += 1
    +      }
    +      Vectors.dense(realMagnitude)
    +    }
    +    /**
    +     * L1 norm of each dimension.
    +     */
    +    def normL1: Vector = {
    +      require(requestedMetrics.contains(NormL1))
    +      require(totalWeightSum > 0, s"Nothing has been added to this 
    +      Vectors.dense(currL1)
    +    }
    +  }
    +  private case class MetricsAggregate(
    +      requestedMetrics: Seq[Metrics],
    +      requestedComputeMetrics: Seq[ComputeMetrics],
    +      featuresExpr: Expression,
    +      weightExpr: Expression,
    +      mutableAggBufferOffset: Int,
    +      inputAggBufferOffset: Int)
    +    extends TypedImperativeAggregate[SummarizerBuffer] {
    +    override def eval(state: SummarizerBuffer): InternalRow = {
    +      val metrics ={
    +        case Mean => UnsafeArrayData.fromPrimitiveArray(state.mean.toArray)
    +        case Variance => 
    +        case Count => state.count
    +        case NumNonZeros => UnsafeArrayData.fromPrimitiveArray(
    +        case Max => UnsafeArrayData.fromPrimitiveArray(state.max.toArray)
    +        case Min => UnsafeArrayData.fromPrimitiveArray(state.min.toArray)
    +        case NormL2 => 
    +        case NormL1 => 
    +      })
    +      InternalRow.apply(metrics: _*)
    +    }
    +    override def children: Seq[Expression] = featuresExpr :: weightExpr :: 
    +    override def update(state: SummarizerBuffer, row: InternalRow): 
SummarizerBuffer = {
    +      // val features = udt.deserialize(featuresExpr.eval(row))
    +      val featuresDatum = featuresExpr.eval(row).asInstanceOf[InternalRow]
    +      val isDense = featuresDatum.getByte(0) == 1
    +      val indices: ArrayData = if (isDense) null else 
    +      val values: ArrayData = featuresDatum.getArray(3)
    +      val _size = if (isDense) values.numElements() else 
    +      val features = new TraversableIndexedSeq {
    +        override def foreachActive(f: (Int, Double) => Unit): Unit = {
    +          var i = 0
    +          if (isDense) {
    +            while (i < _size) {
    +              f(i, values.getDouble(i))
    +              i += 1
    +            }
    +          } else {
    +            while (i < indices.numElements()) {
    +              f(indices.getInt(i), values.getDouble(i))
    +              i += 1
    +            }
    +          }
    +        }
    +        override def size: Int = _size
    +      }
    +      val weight = weightExpr.eval(row).asInstanceOf[Double]
    +      state.addRaw(features, weight)
    +      state
    +    }
    +    override def merge(state: SummarizerBuffer,
    +                       other: SummarizerBuffer): SummarizerBuffer = {
    +      state.merge(other)
    +    }
    +    override def nullable: Boolean = false
    +    override def createAggregationBuffer(): SummarizerBuffer
    +      = new SummarizerBuffer(requestedMetrics, requestedComputeMetrics)
    +    override def serialize(state: SummarizerBuffer): Array[Byte] = {
    +      // TODO: Use ByteBuffer to optimize
    +      val bos = new ByteArrayOutputStream()
    +      val oos = new ObjectOutputStream(bos)
    +      oos.writeObject(state)
    +      bos.toByteArray
    +    }
    +    override def deserialize(bytes: Array[Byte]): SummarizerBuffer = {
    +      // TODO: Use ByteBuffer to optimize
    +      val bis = new ByteArrayInputStream(bytes)
    +      val ois = new ObjectInputStream(bis)
    +      ois.readObject().asInstanceOf[SummarizerBuffer]
    +    }
    +    override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: 
Int): MetricsAggregate = {
    +      copy(mutableAggBufferOffset = newMutableAggBufferOffset)
    +    }
    +    override def withNewInputAggBufferOffset(newInputAggBufferOffset: 
Int): MetricsAggregate = {
    +      copy(inputAggBufferOffset = newInputAggBufferOffset)
    +    }
    +    override lazy val dataType: DataType = 
    +    override def prettyName: String = "aggregate_metrics"
    +  }
    +  trait TraversableIndexedSeq {
    +    def foreachActive(f: (Int, Double) => Unit): Unit
    +    def size: Int
    +  }
    +  private[this] val udt = new VectorUDT
    --- End diff --
    Is there some better way to get the object of `VectorUDT` ?  cc @cloud-fan 

