Hi,

Have you tried creating more column blocks?

BlockMatrix matrix = cmatrix.toBlockMatrix(100, 100);

for example.


Is your data randomly spread out, or do you generally have clusters of
data points together?


On Wed, Jan 25, 2017 at 4:23 AM, Petr Shestov <petya.os...@gmail.com> wrote:

> Hi all!
>
> I'm using Spark 2.0.1 with two workers (one executor each) with 20Gb each.
> And run following code:
>
> JavaRDD<MatrixEntry> entries = ...; // filing the dataCoordinateMatrix 
> cmatrix = new CoordinateMatrix(entries.rdd());BlockMatrix matrix = 
> cmatrix.toBlockMatrix(100, 1000);BlockMatrix cooc = 
> matrix.transpose().multiply(matrix);
>
> My matrix is approx 8 000 000 x 3000, but only 10 000 000 cells have
> meaningful value. During multiplication I always get:
>
> 17/01/24 08:03:10 WARN TaskMemoryManager: leak 1322.6 MB memory from 
> org.apache.spark.util.collection.ExternalAppendOnlyMap@649e701917/01/24 
> 08:03:10 ERROR Executor: Exception in task 1.0 in stage 57.0 (TID 83664)
> java.lang.OutOfMemoryError: Java heap space
>         at 
> org.apache.spark.mllib.linalg.DenseMatrix$.zeros(Matrices.scala:453)
>         at 
> org.apache.spark.mllib.linalg.Matrix$class.multiply(Matrices.scala:101)
>         at 
> org.apache.spark.mllib.linalg.SparseMatrix.multiply(Matrices.scala:565)
>         at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:483)
>         at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:480)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.immutable.List.map(List.scala:285)
>         at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:480)
>         at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:479)
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at 
> org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at 
> org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
>         at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>         at 
> org.apache.spark.util.collection.CompactBuffer.flatMap(CompactBuffer.scala:30)
>         at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:479)
>         at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:478)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>         at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>         at org.apache.spark.scheduler.Task.run(Task.scala:86)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> Now I'm even trying to use only one core per executor. What can be the
> problem? And how can I debug it and find root cause? What could I miss in
> spark configuration?
>
> I've already tried increasing spark.default.parallelism and decreasing
> blocks size for BlockMatrix.
>
> Thanks.
>

Reply via email to