[ 
https://issues.apache.org/jira/browse/SPARK-19208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833345#comment-15833345
 ] 

zhengruifeng commented on SPARK-19208:
--------------------------------------

After diving into sparksql's udaf, I design the new api like this:

new MultivariateOnlineSummarizer in org.apache.spark.ml.stat
{code}
class MultivariateOnlineSummarizer(private var metrics: Seq[String]) extends 
UserDefinedAggregateFunction {
    def setMetrics(metrics: Seq[String]) = ...
    def setMetrics(metric: String, others: String*) = ...
    override def inputSchema: StructType = new StructType().add("weight", 
DoubleType).add("features", new VectorUDT)
    override def bufferSchema: StructType = ...
    override def dataType: DataType = DataTypes.createMapType(StringType, new 
VectorUDT)
    override def deterministic: Boolean = true
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = 
...
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
...
    override def evaluate(buffer: Row): Vector = ...
}
{code}

usage examples:
{code}
// in MinMaxScaler
val maxAbsAgg = new MultivariateOnlineSummarizer().setMetrics("min", "max")
val summary = dataset.groupBy().agg(maxAbsAgg(col("features"), col("weight")))
summary.show
+-----------------------------+
|multivariateonlinesummarizer(features, weight)|
+-----------------------------+
|         Map(min -> [1.0,0...|
+-----------------------------+
summary.first
res2: org.apache.spark.sql.Row = [Map(min -> [1.0,0.2,0.2], max -> 
[1.0,0.2,0.2])]
val result = summary.first.getAs[Map[String,Vector]](0)
val min: Vector = result("min")
val max: Vector = result("max")

// in LinearRegression
val featuresAgg = new MultivariateOnlineSummarizer().setMetrics("mean", 
"variance")
val labelAgg = new MultivariateOnlineSummarizer().setMetrics("mean", "variance")

val result = dataset.map{...}.groupBy().agg(featuresAgg(col("features"), 
col("weight")), labelAgg(col("labelVec"), col("weight"))).first
val featuresMetrics = result.getAs[Map[String,Vector]](0)
val labelMetrics = result.getAs[Map[String,Vector]](1)
{code}

I have not found a way to output multi columns in udaf, so I use 
{{Map[String,Vector]}} as the output type temporarily. If there is some way, 
I'll be happy to modify this place.

> MultivariateOnlineSummarizer perfermence optimization
> -----------------------------------------------------
>
>                 Key: SPARK-19208
>                 URL: https://issues.apache.org/jira/browse/SPARK-19208
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>            Reporter: zhengruifeng
>         Attachments: Tests.pdf, WechatIMG2621.jpeg
>
>
> Now, {{MaxAbsScaler}} and {{MinMaxScaler}} are using 
> {{MultivariateOnlineSummarizer}} to compute the min/max.
> However {{MultivariateOnlineSummarizer}} will also compute extra unused 
> statistics. It slows down the task, moreover it is more prone to cause OOM.
> For example:
> env : --driver-memory 4G --executor-memory 1G --num-executors 4
> data: 
> [http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#kdd2010%20(bridge%20to%20algebra)]
>  748401 instances,   and 29,890,095 features
> {{MaxAbsScaler.fit}} fails because of OOM
> {{MultivariateOnlineSummarizer}} maintains 8 arrays:
> {code}
> private var currMean: Array[Double] = _
>   private var currM2n: Array[Double] = _
>   private var currM2: Array[Double] = _
>   private var currL1: Array[Double] = _
>   private var totalCnt: Long = 0
>   private var totalWeightSum: Double = 0.0
>   private var weightSquareSum: Double = 0.0
>   private var weightSum: Array[Double] = _
>   private var nnz: Array[Long] = _
>   private var currMax: Array[Double] = _
>   private var currMin: Array[Double] = _
> {code}
> For {{MaxAbsScaler}}, only 1 array is needed (max of abs value)
> For {{MinMaxScaler}}, only 3 arrays are needed (max, min, nnz)
> After modication in the pr, the above example run successfully.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to