[ https://issues.apache.org/jira/browse/SPARK-19208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833345#comment-15833345 ]
zhengruifeng commented on SPARK-19208: -------------------------------------- After diving into sparksql's udaf, I design the new api like this: new MultivariateOnlineSummarizer in org.apache.spark.ml.stat {code} class MultivariateOnlineSummarizer(private var metrics: Seq[String]) extends UserDefinedAggregateFunction { def setMetrics(metrics: Seq[String]) = ... def setMetrics(metric: String, others: String*) = ... override def inputSchema: StructType = new StructType().add("weight", DoubleType).add("features", new VectorUDT) override def bufferSchema: StructType = ... override def dataType: DataType = DataTypes.createMapType(StringType, new VectorUDT) override def deterministic: Boolean = true override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ... override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ... override def evaluate(buffer: Row): Vector = ... } {code} usage examples: {code} // in MinMaxScaler val maxAbsAgg = new MultivariateOnlineSummarizer().setMetrics("min", "max") val summary = dataset.groupBy().agg(maxAbsAgg(col("features"), col("weight"))) summary.show +-----------------------------+ |multivariateonlinesummarizer(features, weight)| +-----------------------------+ | Map(min -> [1.0,0...| +-----------------------------+ summary.first res2: org.apache.spark.sql.Row = [Map(min -> [1.0,0.2,0.2], max -> [1.0,0.2,0.2])] val result = summary.first.getAs[Map[String,Vector]](0) val min: Vector = result("min") val max: Vector = result("max") // in LinearRegression val featuresAgg = new MultivariateOnlineSummarizer().setMetrics("mean", "variance") val labelAgg = new MultivariateOnlineSummarizer().setMetrics("mean", "variance") val result = dataset.map{...}.groupBy().agg(featuresAgg(col("features"), col("weight")), labelAgg(col("labelVec"), col("weight"))).first val featuresMetrics = result.getAs[Map[String,Vector]](0) val labelMetrics = result.getAs[Map[String,Vector]](1) {code} I have not found a way to output multi columns in udaf, so I use {{Map[String,Vector]}} as the output type temporarily. If there is some way, I'll be happy to modify this place. > MultivariateOnlineSummarizer perfermence optimization > ----------------------------------------------------- > > Key: SPARK-19208 > URL: https://issues.apache.org/jira/browse/SPARK-19208 > Project: Spark > Issue Type: Improvement > Components: ML > Reporter: zhengruifeng > Attachments: Tests.pdf, WechatIMG2621.jpeg > > > Now, {{MaxAbsScaler}} and {{MinMaxScaler}} are using > {{MultivariateOnlineSummarizer}} to compute the min/max. > However {{MultivariateOnlineSummarizer}} will also compute extra unused > statistics. It slows down the task, moreover it is more prone to cause OOM. > For example: > env : --driver-memory 4G --executor-memory 1G --num-executors 4 > data: > [http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#kdd2010%20(bridge%20to%20algebra)] > 748401 instances, and 29,890,095 features > {{MaxAbsScaler.fit}} fails because of OOM > {{MultivariateOnlineSummarizer}} maintains 8 arrays: > {code} > private var currMean: Array[Double] = _ > private var currM2n: Array[Double] = _ > private var currM2: Array[Double] = _ > private var currL1: Array[Double] = _ > private var totalCnt: Long = 0 > private var totalWeightSum: Double = 0.0 > private var weightSquareSum: Double = 0.0 > private var weightSum: Array[Double] = _ > private var nnz: Array[Long] = _ > private var currMax: Array[Double] = _ > private var currMin: Array[Double] = _ > {code} > For {{MaxAbsScaler}}, only 1 array is needed (max of abs value) > For {{MinMaxScaler}}, only 3 arrays are needed (max, min, nnz) > After modication in the pr, the above example run successfully. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org