We're running PCA (selecting 100 principal components) on a dataset that has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The matrix in question is mostly sparse with tens of columns populate in most rows, but a few rows with thousands of columns populated. We're running spark on mesos with driver memory set to 40G and executor memory set to 80G. We're however encountering an out of memory error (included at the end of the message) regardless of the number of rdd partitions or the degree of task parallelism being set. I noticed a warning at the beginning of the PCA computation stage: " WARN org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will require at least 7011 megabyte of memory!" I don't understand which memory this refers to. Is this the executor memory? The driver memory? Any other? The stacktrace appears to indicate that a large array is probably being passed along with the task. Could this array have been passed as a broadcast variable instead ? Any suggestions / workarounds other than re-implementing the algorithm?
Thanks, Bharath ---- Exception in thread "main" java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386) at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46)