Jay Luan created SPARK-10821:
--------------------------------

             Summary: RandomForest serialization OOM during findBestSplits
                 Key: SPARK-10821
                 URL: https://issues.apache.org/jira/browse/SPARK-10821
             Project: Spark
          Issue Type: Bug
          Components: MLlib
    Affects Versions: 1.5.0, 1.4.0
         Environment: Amazon EC2 Linux
            Reporter: Jay Luan


I am getting OOM during serialization for a relatively small dataset for a 
RandomForest. Even with spark.serializer.objectStreamReset at 1, It is still 
running out of memory when attempting to serialize my data.

Stack Trace:
Traceback (most recent call last):
  File "/root/random_forest/random_forest_spark.py", line 198, in <module>
    main()
  File "/root/random_forest/random_forest_spark.py", line 166, in main
    trainModel(dset)
  File "/root/random_forest/random_forest_spark.py", line 191, in trainModel
    impurity='gini', maxDepth=4, maxBins=32)
  File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 352, in 
trainClassifier
  File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 270, in 
_train
  File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, 
in callMLlibFunc
  File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 123, 
in callJavaFunc
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 
538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 
300, in get_return_value
py4j.protocol.Py4JJavaError15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: 
Done removing RDD 7, response is 0
15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to 
AkkaRpcEndpointRef(Actor[akka://sparkDriver/temp/$Mj])
: An error occurred while calling o89.trainRandomForestModel.
: java.lang.OutOfMemoryError
        at 
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
        at 
org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:625)
        at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:235)
        at 
org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:291)
        at 
org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRandomForestModel(PythonMLLibAPI.scala:742)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

Details:

My RDD is type MLLIB LabeledPoint objects, with each holding sparse vectors 
inside. This RDD has a total size of roughly 45MB. My sparse vector has a total 
length of ~15 million while only about 3000 or so are non-zeros.

My cluster is setup on AWS such that my master is a r3.8xlarge along with two 
r3.4xlarge workers. Driver has ~190GB allocated to it while my RDD is ~45MB.

Configurations as follows:
spark version: 1.5.0 
----------------------------------- 
spark.executor.memory 32000m 
spark.driver.memory 230000m 
spark.driver.cores 10 
spark.executor.cores 5 
spark.executor.instances 17 
spark.driver.maxResultSize 0 
spark.storage.safetyFraction 1 
spark.storage.memoryFraction 0.9 
spark.storage.shuffleFraction 0.05 
spark.default.parallelism 128 
spark.serializer.objectStreamReset 1

My original code is in python which I tried on 1.4.0 and 1.5.0, so I thought 
that maybe running something in scala may resolve the problem. I wrote a toy 
scala example and tested it on the same system yielding the same errors. Note 
the test code will most likely eventually throw an error due to the fact 
certain features are always 0 and MLLIB currently errors out during this 
operation.

test.scala:
--------------------------------------------------------------------------
import scala.util.Random
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

val r = Random

var size = 15000000
var count = 3000
val indptr = (1 to size by size/count).toArray
val data = Seq.fill(count)(r.nextDouble()).toArray

var dset = ArrayBuffer[LabeledPoint]()
for (i <- 1 to 10) {
        dset += LabeledPoint(r.nextInt(2), Vectors.sparse(size, indptr, data));
}

val distData = sc.parallelize(dset)
val splits = distData.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a RandomForest model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32

val model = RandomForest.trainClassifier(trainingData, numClasses, 
categoricalFeaturesInfo,
  numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to