Re: Spark Akka/actor failures.

2014-08-14 Thread Xiangrui Meng
Could you try to map it to row-majored first? Your approach may
generate multiple copies of the data. The code should look like this:

~~~
val rows = rdd.map { case (j, values) =
  values.view.zipWithIndex.map { case (v, i) =
(i, (j, v))
  }
}.groupByKey().map { case (i, entries) =
  Vectors.dense(entries.sortBy(_._1).map(_._2).toArray)
}

val mat = new RowMatrix(rows)
val cov = mat.computeCovariance()
~~~

On Wed, Aug 13, 2014 at 3:56 PM, ldmtwo ldm...@gmail.com wrote:
 Need help getting around these errors.

 I have this program that runs fine on smaller input sizes. As it gets
 larger, Spark has increasing difficulty of being efficient and functioning
 without errors. We have about 46GB free on each node. The workers and
 executors are configured to use this up (the only way not to have Heap Space
 or GC overhead errors). On the driver, the data only uses 1.2GB RAM and is
 in the form of /matrix: RDD[(Integer, Array[Float])]/. It's a matrix that is
 column major with dimensions of 15k x 20k (columns). Each column takes about
 4*15k = 60KB. 60KB*20k = 1.2GB. The data is not even that large. Eventually,
 I want to test 60k x 70k.

 The Covariance Matrix algorithm we are using is basicly. O(N^3) At minimum,
 the outer loop needs to be parallelized.
   for each column i in matrix
  for each column j in matrix
   get the covariance between columns i and j

 Covariance is practically this. (no need to parallelize since we have enough
 work to do and this is small)
 for the two columns, get the sum of squares. O(N)


 Since I can't figure out a way to do permutation or nested for loop on RDD
 any other way, I had to call matrix.cartesian(matrix).map{ pair = ... }. I
 could do 5kx5k (1/4th of the work) using HashMap instead of RDD and finish
 in 10 sec. If I partition with 3k, it takes 18 hours. 300 takes 12 hours.
 200 fails (error #1). 16 would be ideal (error #2). Note that I set the Akka
 frame size (spark-defaults.conf) to 15 to address some of the other errors
 with Akka.





 This is error #1


 |
 |
 |
 |
 |
 |
 |
 |
 |
 |
 |
 |
 |
 |

 This is error 2





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Akka-actor-failures-tp12071.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Akka/actor failures.

2014-08-14 Thread ldmtwo
The reason we are not using MLLib and Breeze is the lack of control over the
data and performance. After computing the covariance matrix, there isn't too
much we can do after that. Many of the methods are private. For now, we need
the max value and the coresponding pair of columns. Later, we may do other
algorithms. The MLLib covariance gets the means and Gramian matrix in
parallel and after that, I believe it's back to single node computation. We
have to bring everything back to a single node to get the max. Making it
parallel again hasn't worked well either.

The reason we are using Spark is that we want a simple way to distribute
data and work in parallel. I would prefer a SIMD/MPI type of approach, but I
have to work within this framework which is more of a MapReduce style. 

I'm looking into getting the code you sent working. It won't allow me to
reduce by key.

RE: cartesian: I agree that it is generating many copies of the data. That
was a last resort. It would be a huge benefit to everyone if we could access
RDDs like a list, array or hash map. 

Here is the Covariance that works fast for us. We get the averages first
O(N^2). Then differences (Vi-Avgi) in O(N^2). Then compute Covariance
without having to do the above steps in O(N^3). You can see that I'm using
Java code to efficiently get Covariance. The Scala code was very slow in
comparison. We can next use JNI to add HW acceleration. Matrix is a HashMap
here. Also note that I am using the lower triangle. I'm sure that
MLLib/Breeze is making optimizations too.

This covariance is based off of the two pass algorithm, but we may change to
a one pass approximation.
http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
http://commons.apache.org/proper/commons-math/jacoco/org.apache.commons.math3.stat.correlation/Covariance.java.html







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Akka-actor-failures-tp12071p12140.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org