[ https://issues.apache.org/jira/browse/SPARK-10821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Luan updated SPARK-10821: ----------------------------- Description: I am getting OOM during serialization for a relatively small dataset for a RandomForest. Even with spark.serializer.objectStreamReset at 1, It is still running out of memory when attempting to serialize my data. Stack Trace: Traceback (most recent call last): File "/root/random_forest/random_forest_spark.py", line 198, in <module> main() File "/root/random_forest/random_forest_spark.py", line 166, in main trainModel(dset) File "/root/random_forest/random_forest_spark.py", line 191, in trainModel impurity='gini', maxDepth=4, maxBins=32) File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 352, in trainClassifier File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 270, in _train File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, in callMLlibFunc File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 123, in callJavaFunc File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Done removing RDD 7, response is 0 15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to AkkaRpcEndpointRef(Actor[akka://sparkDriver/temp/$Mj]) : An error occurred while calling o89.trainRandomForestModel. : java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2021) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) at org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:625) at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:235) at org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:291) at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRandomForestModel(PythonMLLibAPI.scala:742) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Details: My RDD is type MLLIB LabeledPoint objects, with each holding sparse vectors inside. This RDD has a total size of roughly 45MB. My sparse vector has a total length of ~15 million while only about 3000 or so are non-zeros. Works fine for up to sparse vector size 10 million. My cluster is setup on AWS such that my master is a r3.8xlarge along with two r3.4xlarge workers. Driver has ~190GB allocated to it while my RDD is ~45MB. Configurations as follows: spark version: 1.5.0 ----------------------------------- spark.executor.memory 32000m spark.driver.memory 230000m spark.driver.cores 10 spark.executor.cores 5 spark.executor.instances 17 spark.driver.maxResultSize 0 spark.storage.safetyFraction 1 spark.storage.memoryFraction 0.9 spark.storage.shuffleFraction 0.05 spark.default.parallelism 128 spark.serializer.objectStreamReset 1 My original code is in python which I tried on 1.4.0 and 1.5.0, so I thought that maybe running something in scala may resolve the problem. I wrote a toy scala example and tested it on the same system yielding the same errors. Note the test code will most likely eventually throw an error due to the fact certain features are always 0 and MLLIB currently errors out during this operation. Running the following using spark-shell with my spark configuration gives me the OOM: -------------------------------------------------------------------------- import scala.util.Random import scala.collection.mutable.ArrayBuffer import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.model.RandomForestModel import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint val r = Random var size = 15000000 var count = 3000 val indptr = (1 to size by size/count).toArray val data = Seq.fill(count)(r.nextDouble()).toArray var dset = ArrayBuffer[LabeledPoint]() for (i <- 1 to 10) { dset += LabeledPoint(r.nextInt(2), Vectors.sparse(size, indptr, data)); } val distData = sc.parallelize(dset) val splits = distData.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) // Train a RandomForest model. // Empty categoricalFeaturesInfo indicates all features are continuous. val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 3 // Use more in practice. val featureSubsetStrategy = "auto" // Let the algorithm choose. val impurity = "gini" val maxDepth = 4 val maxBins = 32 val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) was: I am getting OOM during serialization for a relatively small dataset for a RandomForest. Even with spark.serializer.objectStreamReset at 1, It is still running out of memory when attempting to serialize my data. Stack Trace: Traceback (most recent call last): File "/root/random_forest/random_forest_spark.py", line 198, in <module> main() File "/root/random_forest/random_forest_spark.py", line 166, in main trainModel(dset) File "/root/random_forest/random_forest_spark.py", line 191, in trainModel impurity='gini', maxDepth=4, maxBins=32) File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 352, in trainClassifier File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 270, in _train File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, in callMLlibFunc File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 123, in callJavaFunc File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Done removing RDD 7, response is 0 15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to AkkaRpcEndpointRef(Actor[akka://sparkDriver/temp/$Mj]) : An error occurred while calling o89.trainRandomForestModel. : java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2021) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) at org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:625) at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:235) at org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:291) at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRandomForestModel(PythonMLLibAPI.scala:742) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Details: My RDD is type MLLIB LabeledPoint objects, with each holding sparse vectors inside. This RDD has a total size of roughly 45MB. My sparse vector has a total length of ~15 million while only about 3000 or so are non-zeros. My cluster is setup on AWS such that my master is a r3.8xlarge along with two r3.4xlarge workers. Driver has ~190GB allocated to it while my RDD is ~45MB. Configurations as follows: spark version: 1.5.0 ----------------------------------- spark.executor.memory 32000m spark.driver.memory 230000m spark.driver.cores 10 spark.executor.cores 5 spark.executor.instances 17 spark.driver.maxResultSize 0 spark.storage.safetyFraction 1 spark.storage.memoryFraction 0.9 spark.storage.shuffleFraction 0.05 spark.default.parallelism 128 spark.serializer.objectStreamReset 1 My original code is in python which I tried on 1.4.0 and 1.5.0, so I thought that maybe running something in scala may resolve the problem. I wrote a toy scala example and tested it on the same system yielding the same errors. Note the test code will most likely eventually throw an error due to the fact certain features are always 0 and MLLIB currently errors out during this operation. test.scala: -------------------------------------------------------------------------- import scala.util.Random import scala.collection.mutable.ArrayBuffer import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.model.RandomForestModel import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint val r = Random var size = 15000000 var count = 3000 val indptr = (1 to size by size/count).toArray val data = Seq.fill(count)(r.nextDouble()).toArray var dset = ArrayBuffer[LabeledPoint]() for (i <- 1 to 10) { dset += LabeledPoint(r.nextInt(2), Vectors.sparse(size, indptr, data)); } val distData = sc.parallelize(dset) val splits = distData.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) // Train a RandomForest model. // Empty categoricalFeaturesInfo indicates all features are continuous. val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 3 // Use more in practice. val featureSubsetStrategy = "auto" // Let the algorithm choose. val impurity = "gini" val maxDepth = 4 val maxBins = 32 val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) > RandomForest serialization OOM during findBestSplits > ---------------------------------------------------- > > Key: SPARK-10821 > URL: https://issues.apache.org/jira/browse/SPARK-10821 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 1.4.0, 1.5.0 > Environment: Amazon EC2 Linux > Reporter: Jay Luan > Labels: OOM, out-of-memory > > I am getting OOM during serialization for a relatively small dataset for a > RandomForest. Even with spark.serializer.objectStreamReset at 1, It is still > running out of memory when attempting to serialize my data. > Stack Trace: > Traceback (most recent call last): > File "/root/random_forest/random_forest_spark.py", line 198, in <module> > main() > File "/root/random_forest/random_forest_spark.py", line 166, in main > trainModel(dset) > File "/root/random_forest/random_forest_spark.py", line 191, in trainModel > impurity='gini', maxDepth=4, maxBins=32) > File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 352, > in trainClassifier > File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 270, > in _train > File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line > 130, in callMLlibFunc > File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line > 123, in callJavaFunc > File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line > 300, in get_return_value > py4j.protocol.Py4JJavaError15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: > Done removing RDD 7, response is 0 > 15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to > AkkaRpcEndpointRef(Actor[akka://sparkDriver/temp/$Mj]) > : An error occurred while calling o89.trainRandomForestModel. > : java.lang.OutOfMemoryError > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2021) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) > at > org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:625) > at > org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:235) > at > org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:291) > at > org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRandomForestModel(PythonMLLibAPI.scala:742) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Details: > My RDD is type MLLIB LabeledPoint objects, with each holding sparse vectors > inside. This RDD has a total size of roughly 45MB. My sparse vector has a > total length of ~15 million while only about 3000 or so are non-zeros. Works > fine for up to sparse vector size 10 million. > My cluster is setup on AWS such that my master is a r3.8xlarge along with two > r3.4xlarge workers. Driver has ~190GB allocated to it while my RDD is ~45MB. > Configurations as follows: > spark version: 1.5.0 > ----------------------------------- > spark.executor.memory 32000m > spark.driver.memory 230000m > spark.driver.cores 10 > spark.executor.cores 5 > spark.executor.instances 17 > spark.driver.maxResultSize 0 > spark.storage.safetyFraction 1 > spark.storage.memoryFraction 0.9 > spark.storage.shuffleFraction 0.05 > spark.default.parallelism 128 > spark.serializer.objectStreamReset 1 > My original code is in python which I tried on 1.4.0 and 1.5.0, so I thought > that maybe running something in scala may resolve the problem. I wrote a toy > scala example and tested it on the same system yielding the same errors. Note > the test code will most likely eventually throw an error due to the fact > certain features are always 0 and MLLIB currently errors out during this > operation. > Running the following using spark-shell with my spark configuration gives me > the OOM: > -------------------------------------------------------------------------- > import scala.util.Random > import scala.collection.mutable.ArrayBuffer > import org.apache.spark.mllib.tree.RandomForest > import org.apache.spark.mllib.tree.model.RandomForestModel > import org.apache.spark.mllib.util.MLUtils > import org.apache.spark.mllib.linalg.Vectors > import org.apache.spark.mllib.regression.LabeledPoint > val r = Random > var size = 15000000 > var count = 3000 > val indptr = (1 to size by size/count).toArray > val data = Seq.fill(count)(r.nextDouble()).toArray > var dset = ArrayBuffer[LabeledPoint]() > for (i <- 1 to 10) { > dset += LabeledPoint(r.nextInt(2), Vectors.sparse(size, indptr, data)); > } > val distData = sc.parallelize(dset) > val splits = distData.randomSplit(Array(0.7, 0.3)) > val (trainingData, testData) = (splits(0), splits(1)) > // Train a RandomForest model. > // Empty categoricalFeaturesInfo indicates all features are continuous. > val numClasses = 2 > val categoricalFeaturesInfo = Map[Int, Int]() > val numTrees = 3 // Use more in practice. > val featureSubsetStrategy = "auto" // Let the algorithm choose. > val impurity = "gini" > val maxDepth = 4 > val maxBins = 32 > val model = RandomForest.trainClassifier(trainingData, numClasses, > categoricalFeaturesInfo, > numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org