Re: PCA OutOfMemoryError

2016-01-13 Thread Alex Gittens
The PCA.fit function calls the RowMatrix PCA routine, which attempts to
construct the covariance matrix locally on the driver, and then computes
the SVD of that to get the PCs. I'm not sure what's causing the memory
error: RowMatrix.scala:124 is only using 3.5 GB of memory (n*(n+1)/2 with
n=29604 and double precision), so unless you're filling up the memory with
other RDDs, you should have plenty of space on the driver.

One alternative is to manually center your RDD (so make one pass over it to
compute the mean, then another to subtract it out and form a new RDD), then
directly call the computeSVD routine in RowMatrix to compute the SVD of the
gramian matrix of this RDD (e.g., the covariance matrix of the original
RDD) in a distributed manner, so the covariance matrix doesn't need to be
formed explicitly. You can look at the getLowRankFactorization and
convertLowRankFactorizationToEOFs routines at
https://github.com/rustandruin/large-scale-climate/blob/master/src/main/scala/eofs.scala
for example of this approach (call the second on the results of the first
to get the SVD of the input matrix to the first; EOF is another name for
PCA).

This takes about 30 minutes to compute the top 20 PCs of a 46.7K-by-6.3M
dense matrix of doubles (~2 Tb), with most of the time spent on the
distributed matrix-vector multiplies.

Best,
Alex


On Tue, Jan 12, 2016 at 6:39 PM, Bharath Ravi Kumar 
wrote:

> Any suggestion/opinion?
> On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar"  wrote:
>
>> We're running PCA (selecting 100 principal components) on a dataset that
>> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
>> matrix in question is mostly sparse with tens of columns populate in most
>> rows, but a few rows with thousands of columns populated. We're running
>> spark on mesos with driver memory set to 40G and executor memory set to
>> 80G. We're however encountering an out of memory error (included at the end
>> of the message) regardless of the number of rdd partitions or the degree of
>> task parallelism being set. I noticed a warning at the beginning of the PCA
>> computation stage: " WARN
>> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
>> require at least 7011 megabyte  of memory!"
>> I don't understand which memory this refers to. Is this the executor
>> memory?  The driver memory? Any other?
>> The stacktrace appears to indicate that a large array is probably being
>> passed along with the task. Could this array have been passed as a
>> broadcast variable instead ? Any suggestions / workarounds other than
>> re-implementing the algorithm?
>>
>> Thanks,
>> Bharath
>>
>> 
>>
>> Exception in thread "main" java.lang.OutOfMemoryError: Requested array
>> size exceeds VM limit
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>> at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>> at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>> at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1100)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>> at 

Re: failure to parallelize an RDD

2016-01-12 Thread Alex Gittens
I'm using Spark 1.5.1

When I turned on DEBUG, I don't see anything that looks useful. Other than
the INFO outputs, there is a ton of RPC message related logs, and this bit:

16/01/13 05:53:43 DEBUG ClosureCleaner: +++ Cleaning closure 
(org.apache.spark.rdd.RDD$$anonfun$count$1) +++
16/01/13 05:53:43 DEBUG ClosureCleaner:  + declared fields: 1
16/01/13 05:53:43 DEBUG ClosureCleaner:  public static final long
org.apache.spark.rdd.RDD$$anonfun$count$1.serialVersionUID
16/01/13 05:53:43 DEBUG ClosureCleaner:  + declared methods: 2
16/01/13 05:53:43 DEBUG ClosureCleaner:  public final java.lang.Object
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(java.lang.Object)
16/01/13 05:53:43 DEBUG ClosureCleaner:  public final long
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(scala.collection.Iterator)
16/01/13 05:53:43 DEBUG ClosureCleaner:  + inner classes: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + outer classes: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + outer objects: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + populating accessed fields
because this is the starting closure
16/01/13 05:53:43 DEBUG ClosureCleaner:  + fields accessed by starting
closure: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + there are no enclosing objects!
16/01/13 05:53:43 DEBUG ClosureCleaner:  +++ closure 
(org.apache.spark.rdd.RDD$$anonfun$count$1) is now cleaned +++
16/01/13 05:53:43 DEBUG ClosureCleaner: +++ Cleaning closure 
(org.apache.spark.SparkContext$$anonfun$runJob$5) +++
16/01/13 05:53:43 DEBUG ClosureCleaner:  + declared fields: 2
16/01/13 05:53:43 DEBUG ClosureCleaner:  public static final long
org.apache.spark.SparkContext$$anonfun$runJob$5.serialVersionUID
16/01/13 05:53:43 DEBUG ClosureCleaner:  private final scala.Function1
org.apache.spark.SparkContext$$anonfun$runJob$5.cleanedFunc$1
16/01/13 05:53:43 DEBUG ClosureCleaner:  + declared methods: 2
16/01/13 05:53:43 DEBUG ClosureCleaner:  public final java.lang.Object
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(java.lang.Object,java.lang.Object)
16/01/13 05:53:43 DEBUG ClosureCleaner:  public final java.lang.Object
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(org.apache.spark.TaskContext,scala.collection.Iterator)
16/01/13 05:53:43 DEBUG ClosureCleaner:  + inner classes: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + outer classes: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + outer objects: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + populating accessed fields
because this is the starting closure
16/01/13 05:53:43 DEBUG ClosureCleaner:  + fields accessed by starting
closure: 0
16/01/13 05:53:43 DEBUG ClosureCleaner:  + there are no enclosing objects!
16/01/13 05:53:43 DEBUG ClosureCleaner:  +++ closure 
(org.apache.spark.SparkContext$$anonfun$runJob$5) is now cleaned +++
16/01/13 05:53:43 INFO SparkContext: Starting job: count at
transposeAvroToAvroChunks.scala:129
16/01/13 05:53:43 INFO DAGScheduler: Got job 3 (count at
transposeAvroToAvroChunks.scala:129) with 928 output partitions
16/01/13 05:53:43 INFO DAGScheduler: Final stage: ResultStage 3(count at
transposeAvroToAvroChunks.scala:129)


On Tue, Jan 12, 2016 at 6:41 PM, Ted Yu  wrote:

> Which release of Spark are you using ?
>
> Can you turn on DEBUG logging to see if there is more clue ?
>
> Thanks
>
> On Tue, Jan 12, 2016 at 6:37 PM, AlexG  wrote:
>
>> I transpose a matrix (colChunkOfA) stored as a 200-by-54843210 as an
>> array of
>> rows in Array[Array[Float]] format into another matrix (rowChunk) also
>> stored row-wise as a 54843210-by-200 Array[Array[Float]] using the
>> following
>> code:
>>
>> val rowChunk = new Array[Tuple2[Int,Array[Float]]](numCols)
>> val colIndices = (0 until colChunkOfA.length).toArray
>>
>> (0 until numCols).foreach( rowIdx => {
>>   rowChunk(rowIdx) = Tuple2(rowIdx,
>> colIndices.map(colChunkOfA(_)(rowIdx)))
>> })
>>
>> This succeeds, but the following code which attempts to turn rowChunk into
>> an RDD fails silently: spark-submit just ends, and none of the executor
>> logs
>> indicate any errors occurring.
>>
>> val parallelRowChunkRDD = sc.parallelize(rowChunk).cache
>> parallelRowChunkRDD.count
>>
>> What is the culprit here?
>>
>> Here is the log output starting from the count instruction:
>>
>> 16/01/13 02:23:38 INFO SparkContext: Starting job: count at
>> transposeAvroToAvroChunks.scala:129
>> 16/01/13 02:23:38 INFO DAGScheduler: Got job 3 (count at
>> transposeAvroToAvroChunks.scala:129) with 928 output partitions
>> 16/01/13 02:23:38 INFO DAGScheduler: Final stage: ResultStage 3(count at
>> transposeAvroToAvroChunks.scala:129)
>> 16/01/13 02:23:38 INFO DAGScheduler: Parents of final stage: List()
>> 16/01/13 02:23:38 INFO DAGScheduler: Missing parents: List()
>> 16/01/13 02:23:38 INFO DAGScheduler: Submitting ResultStage 3
>> (ParallelCollectionRDD[2448] at parallelize at
>> transposeAvroToAvroChunks.scala:128), which has no missing parents
>> 16/01/13 02:23:38 INFO MemoryStore: 

Re: distcp suddenly broken with spark-ec2 script setup

2015-12-09 Thread Alex Gittens
BTW, yes the referenced s3 bucket does exist, and

hdfs dfs -ls s3n://agittens/CFSRArawtars

does list the entries, although it first prints the same warnings:

015-12-10 00:26:53,815 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:26:53,909 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:26:54,243 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
Found 2306 items
-rwxrwxrwx   1 177408 2015-11-18 00:26
s3n://agittens/CFSRArawtars/completefilelist
-rwxrwxrwx   1 60 2015-11-18 00:26
s3n://agittens/CFSRArawtars/copyallfileshere.sh
-rwxrwxrwx   1 1814040064 2015-11-18 00:26
s3n://agittens/CFSRArawtars/pgbh02.gdas.19790101-19790105.tar
-rwxrwxrwx   1 1788727808 2015-11-18 00:27
s3n://agittens/CFSRArawtars/pgbh02.gdas.19790106-19790110.tar
-rw
...

On Wed, Dec 9, 2015 at 4:24 PM, AlexG  wrote:

> I've been using the same method to launch my clusters then pull my data
> from
> S3 to local hdfs:
>
> $SPARKHOME/ec2/spark-ec2 -k mykey -i ~/.ssh/mykey.pem -s 29
> --instance-type=r3.8xlarge --placement-group=pcavariants
> --copy-aws-credentials --hadoop-major-version=2 --spot-price=2.8 launch
> mycluster --region=us-west-2
>
> then
>
> ephemeral-hdfs/bin/hadoop distcp s3n://agittens/CFSRArawtars CFSRArawtars
>
> Before this worked as I'd expect. Within the last several days, I've been
> getting this error when I run the distcp command:
> 2015-12-10 00:16:43,113 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,207 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response
> '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,422 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,513 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response
> '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,737 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,830 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response
> '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
> 2015-12-10 00:16:44,015 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:46,141 WARN  conf.Configuration
> (Configuration.java:warnOnceIfDeprecated(824)) - io.sort.mb is deprecated.
> Instead, use mapreduce.task.io.sort.mb
> 2015-12-10 00:16:46,141 WARN  conf.Configuration
> (Configuration.java:warnOnceIfDeprecated(824)) - io.sort.factor is
> deprecated. Instead, use mapreduce.task.io.sort.factor
> 2015-12-10 00:16:46,630 INFO  service.AbstractService
> (AbstractService.java:init(81)) -
> Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited.
> 2015-12-10 00:16:46,630 INFO  service.AbstractService
> (AbstractService.java:start(94)) -
> Service:org.apache.hadoop.yarn.client.YarnClientImpl is started.
> 2015-12-10 00:16:47,135 INFO  mapreduce.JobSubmitter
> (JobSubmitter.java:submitJobInternal(368)) - number of splits:21
>
> Then the job hangs and does nothing until I kill it. Any idea what the
> problem is and how to fix it, or a work-around for getting my data off S3
> quickly? It is around 4 TB.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distcp-suddenly-broken-with-spark-ec2-script-setup-tp25658.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Why does a 3.8 T dataset take up 11.59 Tb on HDFS

2015-11-25 Thread Alex Gittens
Thanks, the issue was indeed the dfs replication factor. To fix it without
entirely clearing out HDFS and rebooting, I first ran
hdfs dfs -setrep -R -w 1 /
to reduce all the current files' replication factor to 1 recursively from
the root, then I changed the dfs.replication factor in
ephemeral-hdfs/conf/hdfs-site.xml and ran ephemeral-hdfs/sbin/stop-all.sh
and start-all.sh

Alex

On Tue, Nov 24, 2015 at 10:43 PM, Ye Xianjin  wrote:

> Hi AlexG:
>
> Files(blocks more specifically) has 3 copies on HDFS by default. So 3.8 *
> 3 = 11.4TB.
>
> --
> Ye Xianjin
> Sent with Sparrow 
>
> On Wednesday, November 25, 2015 at 2:31 PM, AlexG wrote:
>
> I downloaded a 3.8 T dataset from S3 to a freshly launched spark-ec2
> cluster
> with 16.73 Tb storage, using
> distcp. The dataset is a collection of tar files of about 1.7 Tb each.
> Nothing else was stored in the HDFS, but after completing the download, the
> namenode page says that 11.59 Tb are in use. When I use hdfs du -h -s, I
> see
> that the dataset only takes up 3.8 Tb as expected. I navigated through the
> entire HDFS hierarchy from /, and don't see where the missing space is. Any
> ideas what is going on and how to rectify it?
>
> I'm using the spark-ec2 script to launch, with the command
>
> spark-ec2 -k key -i ~/.ssh/key.pem -s 29 --instance-type=r3.8xlarge
> --placement-group=pcavariants --copy-aws-credentials
> --hadoop-major-version=yarn --spot-price=2.8 --region=us-west-2 launch
> conversioncluster
>
> and am not modifying any configuration files for Hadoop.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-does-a-3-8-T-dataset-take-up-11-59-Tb-on-HDFS-tp25471.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: spark hangs at broadcasting during a filter

2015-08-06 Thread Alex Gittens
Thanks. Repartitioning to a smaller number of partitions seems to fix my
issue, but I'll keep broadcasting in mind (droprows is an integer array
with about 4 million entries).

On Wed, Aug 5, 2015 at 12:34 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 How big is droprows?

 Try explicitly broadcasting it like this:

 val broadcastDropRows = sc.broadcast(dropRows)

 val valsrows = ...
 .filter(x = !broadcastDropRows.value.contains(x._1))

 - Philip


 On Wed, Aug 5, 2015 at 11:54 AM, AlexG swift...@gmail.com wrote:

 I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
 matrix given as A_{ij} = v so I can convert it to a Parquet file. Only
 some
 of the rows of A are relevant, so the following code first loads the
 triplets are text, splits them into Tuple3[Int, Int, Double], drops
 triplets
 whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
 Double]]] for each row (if I'm judging datatypes correctly).

 val valsrows = sc.textFile(valsinpath).map(_.split(,)).
   map(x = (x(1).toInt, (x(0).toInt,
 x(2).toDouble))).
   filter(x = !droprows.contains(x._1)).
   groupByKey.
   map(x = (x._1, x._2.toSeq.sortBy(_._1)))

 Spark hangs during a broadcast that occurs during the filter step
 (according
 to the Spark UI). The last two lines in the log before it pauses are:

 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
 in
 memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
 in
 memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)

 I've left Spark running for up to 17 minutes one time, and it never
 continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
 instances (244Gb, 32 cores) with spark in standalone mode with 220G
 executor
 and driver memory, and using the kyroserializer.

 Any ideas on what could be causing this hang?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: breeze.linalg.DenseMatrix not found

2015-07-01 Thread Alex Gittens
I think the issue was NOT with spark. I was running a spark program that
dumped output to a binary file and then calling a scala program to read it
and write out Matrix Market format files. The issue seems to have been with
the classpath on the scala program, and went away when I added the spark
jar to the classpath. Thanks for your help!

Alex

On Tue, Jun 30, 2015 at 9:11 AM, Burak Yavuz brk...@gmail.com wrote:

 How does your build file look? Are you possibly using wrong Scala
 versions? Have you added Breeze as a dependency to your project? If so
 which version?

 Thanks,
 Burak

 On Mon, Jun 29, 2015 at 3:45 PM, AlexG swift...@gmail.com wrote:

 I get the same error even when I define covOperator not to use a matrix at
 all:

 def covOperator(v : BDV[Double]) :BDV[Double] = { v }




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/breeze-linalg-DenseMatrix-not-found-tp23537p23538.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Need clarification on spark on cluster set up instruction

2015-07-01 Thread Alex Gittens
I have a similar use case, so I wrote a python script to fix the cluster
configuration that spark-ec2 uses when you use Hadoop 2. Start a cluster
with enough machines that the hdfs system can hold 1Tb (so use instance
types that have SSDs), then follow the instructions at
http://thousandfold.net/cz/2015/07/01/installing-spark-with-hadoop-2-using-spark-ec2/.
Let me know if you have any issues.

On Mon, Jun 29, 2015 at 4:32 PM, manish ranjan cse1.man...@gmail.com
wrote:


 Hi All

 here goes my first question :
 Here is my use case

 I have 1TB data I want to process on ec2 using spark
 I have uploaded the data on ebs volume
 The instruction on amazon ec2 set up explains
 *If your application needs to access large datasets, the fastest way to
 do that is to load them from Amazon S3 or an Amazon EBS device into an
 instance of the Hadoop Distributed File System (HDFS) on your nodes*

 Now the new amazon instances don't have any physical volume
 http://aws.amazon.com/ec2/instance-types/

 So do I need to do a set up for HDFS separately  on ec2 (instruction also
 says The spark-ec2 script already sets up a HDFS instance for you) ? Any
 blog/write up which can help me understanding this better ?

 ~Manish