This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c9b6271  [SPARK-33043][ML] Handle spark.driver.maxResultSize=0 in 
RowMatrix heuristic computation
c9b6271 is described below

commit c9b62711fdec24160c4bdeff8fc09eedb0b75ee0
Author: Sean Owen <sro...@gmail.com>
AuthorDate: Sat Oct 3 13:12:55 2020 -0500

    [SPARK-33043][ML] Handle spark.driver.maxResultSize=0 in RowMatrix 
heuristic computation
    
    ### What changes were proposed in this pull request?
    
    RowMatrix contains a computation based on spark.driver.maxResultSize. 
However, when this value is set to 0, the computation fails (log of 0). The fix 
is simply to correctly handle this setting, which means unlimited result size, 
by using a tree depth of 1 in the RowMatrix method.
    
    ### Why are the changes needed?
    
    Simple bug fix to make several Spark ML functions which use RowMatrix run 
correctly in this case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Not other than the bug fix of course.
    
    ### How was this patch tested?
    
    Existing RowMatrix tests plus a new test.
    
    Closes #29925 from srowen/SPARK-33043.
    
    Authored-by: Sean Owen <sro...@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit f86171aea43479f54ac2bbbca8f128baa3fc4a8c)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../apache/spark/mllib/linalg/distributed/RowMatrix.scala |  6 +++++-
 .../spark/mllib/linalg/distributed/RowMatrixSuite.scala   | 15 +++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 20e26ce..07b9d91 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -786,11 +786,15 @@ class RowMatrix @Since("1.0.0") (
    * Based on the formulae: (numPartitions)^(1/depth) * objectSize <= 
DriverMaxResultSize
    * @param aggregatedObjectSizeInBytes the size, in megabytes, of the object 
being tree aggregated
    */
-  private[spark] def getTreeAggregateIdealDepth(aggregatedObjectSizeInBytes: 
Long) = {
+  private[spark] def getTreeAggregateIdealDepth(aggregatedObjectSizeInBytes: 
Long): Int = {
     require(aggregatedObjectSizeInBytes > 0,
       "Cannot compute aggregate depth heuristic based on a zero-size object to 
aggregate")
 
     val maxDriverResultSizeInBytes = rows.conf.get[Long](MAX_RESULT_SIZE)
+    if (maxDriverResultSizeInBytes <= 0) {
+      // Unlimited result size, so 1 is OK
+      return 1
+    }
 
     require(maxDriverResultSizeInBytes > aggregatedObjectSizeInBytes,
       s"Cannot aggregate object of size $aggregatedObjectSizeInBytes Bytes, "
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 0a4b119..adc4eee 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -25,6 +25,7 @@ import breeze.linalg.{norm => brzNorm, svd => brzSvd, 
DenseMatrix => BDM, DenseV
 import breeze.numerics.abs
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
 import org.apache.spark.mllib.random.RandomRDDs
 import org.apache.spark.mllib.util.{LocalClusterSparkContext, 
MLlibTestSparkContext}
@@ -121,6 +122,20 @@ class RowMatrixSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     assert(objectBiggerThanResultSize.getMessage.contains("it's bigger than 
maxResultSize"))
   }
 
+  test("SPARK-33043: getTreeAggregateIdealDepth with unlimited driver size") {
+    val originalMaxResultSize = sc.conf.get[Long](MAX_RESULT_SIZE)
+    sc.conf.set(MAX_RESULT_SIZE, 0L)
+    try {
+      val nbPartitions = 100
+      val vectors = sc.emptyRDD[Vector]
+        .repartition(nbPartitions)
+      val rowMat = new RowMatrix(vectors)
+      assert(rowMat.getTreeAggregateIdealDepth(700 * 1024 * 1024) === 1)
+    } finally {
+      sc.conf.set(MAX_RESULT_SIZE, originalMaxResultSize)
+    }
+  }
+
   test("similar columns") {
     val colMags = Vectors.dense(math.sqrt(126), math.sqrt(66), math.sqrt(94))
     val expected = BDM(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to