Repository: spark
Updated Branches:
  refs/heads/master 25f35806e -> 005d1c5f2


[SPARK-6269] [CORE] Use ScalaRunTime's array methods instead of 
java.lang.reflect.Array in size estimation

This patch switches the usage of java.lang.reflect.Array in Size estimation to 
using scala's RunTime array-getter methods. The notes on 
https://bugs.openjdk.java.net/browse/JDK-8051447 tipped me off to the fact that 
using java.lang.reflect.Array was not ideal. At first, I used the code from 
that ticket, but it turns out that ScalaRunTime's array-related methods avoid 
the bottleneck of invoking native code anyways, so that was sufficient to boost 
performance in size estimation.

The idea is to use pure Java code in implementing the methods there, as opposed 
to relying on native C code which ends up being ill-performing. This improves 
the performance of estimating the size of arrays when we are checking for 
spilling in Spark.

Here's the benchmark discussion from the ticket:

I did two tests. The first, less convincing, take-with-a-block-of-salt test I 
did was do a simple groupByKey operation to collect objects in a 4.0 GB text 
file RDD into 30,000 buckets. I ran 1 Master and 4 Spark Worker JVMs on my mac, 
fetching the RDD from a text file simply stored on disk, and saving it out to 
another file located on local disk. The wall clock times I got back before and 
after the change were:

Before: 352.195s, 343.871s, 359.080s
After (using code directly from the JDK ticket, not the scala code in this PR): 
342.929583s, 329.456623s, 326.151481s

So, there is a bit of an improvement after the change. I also did some YourKit 
profiling of the executors to get an idea of how much time was spent in size 
estimation before and after the change. I roughly saw that size estimation took 
up less of the time after my change, but YourKit's profiling can be 
inconsistent and who knows if I was profiling the executors that had the same 
data between runs?

The more convincing test I did was to run the size-estimation logic itself in 
an isolated unit test. I ran the following code:
```
val bigArray = 
Array.fill(1000)(Array.fill(1000)(java.util.UUID.randomUUID().toString()))
test("String arrays only perf testing") {
  val startTime = System.currentTimeMillis()
  for (i <- 1 to 50000) {
    SizeEstimator.estimate(bigArray)
  }
  println("Runtime: " + (System.currentTimeMillis() - startTime) / 1000.0000)
}
```
I wanted to use a 2D array specifically because I wanted to measure the 
performance of repeatedly calling Array.getLength. I used UUID-Strings to 
ensure that the strings were randomized (so String object re-use doesn't 
happen), but that they would all be the same size. The results were as follows:

Before PR: 222.681 s, 218.34 s, 211.739s
After latest change: 170.715 s, 176.775 s, 180.298 s
.

Author: mcheah <mch...@palantir.com>
Author: Justin Uang <justin.u...@gmail.com>

Closes #4972 from mccheah/feature/spark-6269-reflect-array and squashes the 
following commits:

8527852 [mcheah] Respect CamelCase for numElementsDrawn
18d4b50 [mcheah] Addressing style comments - while loops instead of for loops
16ce534 [mcheah] Organizing imports properly
db890ea [mcheah] Removing CastedArray and just using ScalaRunTime.
cb67ce2 [mcheah] Fixing a scalastyle error - line too long
5d53c4c [mcheah] Removing unused parameter in visitArray.
6467759 [mcheah] Including primitive size information inside CastedArray.
93f4b05 [mcheah] Using Scala instead of Java for the array-reflection 
implementation.
a557ab8 [mcheah] Using a wrapper around arrays to do casting only once
ca063fc [mcheah] Fixing a compiler error made while refactoring style
1fe09de [Justin Uang] [SPARK-6269] Use a different implementation of 
java.lang.reflect.Array


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/005d1c5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/005d1c5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/005d1c5f

Branch: refs/heads/master
Commit: 005d1c5f290decc606a0be59fb191136dafc0c9d
Parents: 25f3580
Author: mcheah <mch...@palantir.com>
Authored: Tue Mar 17 11:20:20 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Mar 17 11:20:20 2015 +0000

----------------------------------------------------------------------
 .../org/apache/spark/util/SizeEstimator.scala   | 28 +++++++++++---------
 1 file changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/005d1c5f/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala 
b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index bce3b3a..26ffbf9 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -18,18 +18,16 @@
 package org.apache.spark.util
 
 import java.lang.management.ManagementFactory
-import java.lang.reflect.{Array => JArray}
-import java.lang.reflect.Field
-import java.lang.reflect.Modifier
-import java.util.IdentityHashMap
-import java.util.Random
+import java.lang.reflect.{Field, Modifier}
+import java.util.{IdentityHashMap, Random}
 import java.util.concurrent.ConcurrentHashMap
-
 import scala.collection.mutable.ArrayBuffer
+import scala.runtime.ScalaRunTime
 
 import org.apache.spark.Logging
 import org.apache.spark.util.collection.OpenHashSet
 
+
 /**
  * Estimates the sizes of Java objects (number of bytes of memory they 
occupy), for use in
  * memory-aware caches.
@@ -184,9 +182,9 @@ private[spark] object SizeEstimator extends Logging {
   private val ARRAY_SIZE_FOR_SAMPLING = 200
   private val ARRAY_SAMPLE_SIZE = 100 // should be lower than 
ARRAY_SIZE_FOR_SAMPLING
 
-  private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
-    val length = JArray.getLength(array)
-    val elementClass = cls.getComponentType
+  private def visitArray(array: AnyRef, arrayClass: Class[_], state: 
SearchState) {
+    val length = ScalaRunTime.array_length(array)
+    val elementClass = arrayClass.getComponentType()
 
     // Arrays have object header and length field which is an integer
     var arrSize: Long = alignSize(objectSize + INT_SIZE)
@@ -199,22 +197,26 @@ private[spark] object SizeEstimator extends Logging {
       state.size += arrSize
 
       if (length <= ARRAY_SIZE_FOR_SAMPLING) {
-        for (i <- 0 until length) {
-          state.enqueue(JArray.get(array, i))
+        var arrayIndex = 0
+        while (arrayIndex < length) {
+          state.enqueue(ScalaRunTime.array_apply(array, 
arrayIndex).asInstanceOf[AnyRef])
+          arrayIndex += 1
         }
       } else {
         // Estimate the size of a large array by sampling elements without 
replacement.
         var size = 0.0
         val rand = new Random(42)
         val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
-        for (i <- 0 until ARRAY_SAMPLE_SIZE) {
+        var numElementsDrawn = 0
+        while (numElementsDrawn < ARRAY_SAMPLE_SIZE) {
           var index = 0
           do {
             index = rand.nextInt(length)
           } while (drawn.contains(index))
           drawn.add(index)
-          val elem = JArray.get(array, index)
+          val elem = ScalaRunTime.array_apply(array, 
index).asInstanceOf[AnyRef]
           size += SizeEstimator.estimate(elem, state.visited)
+          numElementsDrawn += 1
         }
         state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to