The PCA.fit function calls the RowMatrix PCA routine, which attempts to
construct the covariance matrix locally on the driver, and then computes
the SVD of that to get the PCs. I'm not sure what's causing the memory
error: RowMatrix.scala:124 is only using 3.5 GB of memory (n*(n+1)/2 with
n=29604 and double precision), so unless you're filling up the memory with
other RDDs, you should have plenty of space on the driver.

One alternative is to manually center your RDD (so make one pass over it to
compute the mean, then another to subtract it out and form a new RDD), then
directly call the computeSVD routine in RowMatrix to compute the SVD of the
gramian matrix of this RDD (e.g., the covariance matrix of the original
RDD) in a distributed manner, so the covariance matrix doesn't need to be
formed explicitly. You can look at the getLowRankFactorization and
convertLowRankFactorizationToEOFs routines at
https://github.com/rustandruin/large-scale-climate/blob/master/src/main/scala/eofs.scala
for example of this approach (call the second on the results of the first
to get the SVD of the input matrix to the first; EOF is another name for
PCA).

This takes about 30 minutes to compute the top 20 PCs of a 46.7K-by-6.3M
dense matrix of doubles (~2 Tb), with most of the time spent on the
distributed matrix-vector multiplies.

Best,
Alex


On Tue, Jan 12, 2016 at 6:39 PM, Bharath Ravi Kumar <reachb...@gmail.com>
wrote:

> Any suggestion/opinion?
> On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote:
>
>> We're running PCA (selecting 100 principal components) on a dataset that
>> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
>> matrix in question is mostly sparse with tens of columns populate in most
>> rows, but a few rows with thousands of columns populated. We're running
>> spark on mesos with driver memory set to 40G and executor memory set to
>> 80G. We're however encountering an out of memory error (included at the end
>> of the message) regardless of the number of rdd partitions or the degree of
>> task parallelism being set. I noticed a warning at the beginning of the PCA
>> computation stage: " WARN
>> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
>> require at least 7011 megabyte  of memory!"
>> I don't understand which memory this refers to. Is this the executor
>> memory?  The driver memory? Any other?
>> The stacktrace appears to indicate that a large array is probably being
>> passed along with the task. Could this array have been passed as a
>> broadcast variable instead ? Any suggestions / workarounds other than
>> re-implementing the algorithm?
>>
>> Thanks,
>> Bharath
>>
>> ----
>>
>> Exception in thread "main" java.lang.OutOfMemoryError: Requested array
>> size exceeds VM limit
>>         at java.util.Arrays.copyOf(Arrays.java:2271)
>>         at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>         at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>         at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>         at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>>         at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>>         at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>>         at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>>         at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091)
>>         at
>> org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124)
>>         at
>> org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350)
>>         at
>> org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386)
>>         at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46)
>>
>>

Reply via email to