Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19317 @jiangxb1987 ,@WeichenXu123, thanks for your reviewing. This change is inspired by the `TODO List`. You can see the follow code snippet: ```scala // TODO: Calling aggregateByKey and collect creates two stages, we can implement something // TODO: similar to reduceByKeyLocally to save one stage. val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( seqOp = { case ((weightSum: Double, featureSum: DenseVector), (weight, features)) => requireValues(features) BLAS.axpy(weight, features, featureSum) (weightSum + weight, featureSum) }, combOp = { case ((weightSum1, featureSum1), (weightSum2, featureSum2)) => BLAS.axpy(1.0, featureSum2, featureSum1) (weightSum1 + weightSum2, featureSum1) }).collect().sortBy(_._1) ``` - The code `aggregateByKeyLocally` we implemented is similar to the `reduceByKeyLocally `. - I agree with your suggestion for using `OpenHashSet ` instead of `JHashMap`. I could change it, and also the `reduceByKeyLocally` maybe need a change to. - Because here we collect all the aggregated data to driver and sort it. I think the data could be small. And the collected data of two implements could be almost equally.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org