Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/19317
  
    @jiangxb1987 ,@WeichenXu123, thanks for your reviewing. This change is 
inspired by the `TODO List`. You can see the follow code snippet:
    ```scala
        // TODO: Calling aggregateByKey and collect creates two stages, we can 
implement something
        // TODO: similar to reduceByKeyLocally to save one stage.
        val aggregated = dataset.select(col($(labelCol)), w, 
col($(featuresCol))).rdd
          .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
          }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
          seqOp = {
             case ((weightSum: Double, featureSum: DenseVector), (weight, 
features)) =>
               requireValues(features)
               BLAS.axpy(weight, features, featureSum)
               (weightSum + weight, featureSum)
          },
          combOp = {
             case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
               BLAS.axpy(1.0, featureSum2, featureSum1)
               (weightSum1 + weightSum2, featureSum1)
          }).collect().sortBy(_._1)
    
    ``` 
    
    - The code `aggregateByKeyLocally` we implemented is similar to the 
`reduceByKeyLocally `. 
    -  I agree with your suggestion for using `OpenHashSet ` instead of 
`JHashMap`. I could change it, and also the `reduceByKeyLocally` maybe need a 
change to.
    - Because here we collect all the aggregated data to driver and sort it. I 
think the data could be small. And the collected data of two implements could 
be almost equally.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to