We're running PCA (selecting 100 principal components) on a dataset that
has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
matrix in question is mostly sparse with tens of columns populate in most
rows, but a few rows with thousands of columns populated. We're running
spark on mesos with driver memory set to 40G and executor memory set to
80G. We're however encountering an out of memory error (included at the end
of the message) regardless of the number of rdd partitions or the degree of
task parallelism being set. I noticed a warning at the beginning of the PCA
computation stage: " WARN
org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
require at least 7011 megabyte  of memory!"
I don't understand which memory this refers to. Is this the executor
memory?  The driver memory? Any other?
The stacktrace appears to indicate that a large array is probably being
passed along with the task. Could this array have been passed as a
broadcast variable instead ? Any suggestions / workarounds other than
re-implementing the algorithm?

Thanks,
Bharath

----

Exception in thread "main" java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
        at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
        at
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091)
        at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124)
        at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350)
        at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386)
        at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46)

Reply via email to