I'm using the following function to compute B*A where B is a 32-by-8mil
Breeze DenseMatrix and A is a 8mil-by-100K IndexedRowMatrix. 

// computes BA where B is a local matrix and A is distributed: let b_i
denote the
// ith col of B and a_i denote the ith row of A, then BA = sum(b_i a_i)

def leftMultiplyCenteredMatrixBy(mat: IndexedRowMatrix, lhs: DenseMatrix,
avg: BDV[Double]) : DenseMatrix = {
   val lhsBrz = lhs.toBreeze.asInstanceOf[BDM[Double]]
   val result =
     mat.rows.treeAggregate(BDM.zeros[Double](lhs.numRows.toInt,
mat.numCols.toInt))(
       seqOp = (U: BDM[Double], row: IndexedRow) => {
         val rowBrz = row.vector.toBreeze.asInstanceOf[BSV[Double]] - avg
         U += lhsBrz(::, row.index.toInt) * rowBrz.t
       },
       combOp = (U1, U2) => U1 += U2
     )
   fromBreeze(result)
  }

The accumulator used by the treeAggregate call is only 32-by-100K, and B is
less than a Gb. The executors have 64Gb RAM, yet the call fails with the
error

Exception in thread "main" java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
  at java.util.Arrays.copyOf(Arrays.java:2271)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
  at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
  at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
  at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
  at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
  at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
  at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
  at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1072)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
  at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1067)
  at
org.apache.spark.mllib.linalg.distributed.SVDVariants$.leftMultiplyCenteredMatrixBy(SVDVariants.scala:120)

Any idea what's going on/how to fix it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-in-treeAggregate-tp23859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to