Hello Alex,

Thanks for the response. There isn't much other data on the driver, so the
issue is probably inherent to this particular PCA implementation.  I'll try
the alternative approach that you suggested instead. Thanks again.

-Bharath

On Wed, Jan 13, 2016 at 11:24 PM, Alex Gittens <swift...@gmail.com> wrote:

> The PCA.fit function calls the RowMatrix PCA routine, which attempts to
> construct the covariance matrix locally on the driver, and then computes
> the SVD of that to get the PCs. I'm not sure what's causing the memory
> error: RowMatrix.scala:124 is only using 3.5 GB of memory (n*(n+1)/2 with
> n=29604 and double precision), so unless you're filling up the memory with
> other RDDs, you should have plenty of space on the driver.
>
> One alternative is to manually center your RDD (so make one pass over it
> to compute the mean, then another to subtract it out and form a new RDD),
> then directly call the computeSVD routine in RowMatrix to compute the SVD
> of the gramian matrix of this RDD (e.g., the covariance matrix of the
> original RDD) in a distributed manner, so the covariance matrix doesn't
> need to be formed explicitly. You can look at the getLowRankFactorization
> and convertLowRankFactorizationToEOFs routines at
>
> https://github.com/rustandruin/large-scale-climate/blob/master/src/main/scala/eofs.scala
> for example of this approach (call the second on the results of the first
> to get the SVD of the input matrix to the first; EOF is another name for
> PCA).
>
> This takes about 30 minutes to compute the top 20 PCs of a 46.7K-by-6.3M
> dense matrix of doubles (~2 Tb), with most of the time spent on the
> distributed matrix-vector multiplies.
>
> Best,
> Alex
>
>
> On Tue, Jan 12, 2016 at 6:39 PM, Bharath Ravi Kumar <reachb...@gmail.com>
> wrote:
>
>> Any suggestion/opinion?
>> On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote:
>>
>>> We're running PCA (selecting 100 principal components) on a dataset that
>>> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
>>> matrix in question is mostly sparse with tens of columns populate in most
>>> rows, but a few rows with thousands of columns populated. We're running
>>> spark on mesos with driver memory set to 40G and executor memory set to
>>> 80G. We're however encountering an out of memory error (included at the end
>>> of the message) regardless of the number of rdd partitions or the degree of
>>> task parallelism being set. I noticed a warning at the beginning of the PCA
>>> computation stage: " WARN
>>> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
>>> require at least 7011 megabyte  of memory!"
>>> I don't understand which memory this refers to. Is this the executor
>>> memory?  The driver memory? Any other?
>>> The stacktrace appears to indicate that a large array is probably being
>>> passed along with the task. Could this array have been passed as a
>>> broadcast variable instead ? Any suggestions / workarounds other than
>>> re-implementing the algorithm?
>>>
>>> Thanks,
>>> Bharath
>>>
>>> ----
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError: Requested array
>>> size exceeds VM limit
>>>         at java.util.Arrays.copyOf(Arrays.java:2271)
>>>         at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>         at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>         at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>>         at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>         at
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>>>         at
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>>>         at
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>>>         at
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>>>         at
>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>>>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>>>         at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>>>         at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091)
>>>         at
>>> org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:124)
>>>         at
>>> org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:350)
>>>         at
>>> org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:386)
>>>         at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:46)
>>>
>>>
>

Reply via email to