I have an array 'dataAll' of key-value pairs where each value is an array of
arrays. I would like to parallelize a task over the elements of 'dataAll' to
the workers. In the dummy example below, the number of elements in 'dataAll'
is 3 but in real application it would be tens to hundreds. 

Without parallelizing dataAll, 'result' is calculated in less than a second: 

import org.jblas.DoubleMatrix  

val nY = 5000
val nX = 400

val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))),
                    (2, Array.fill(nY)(Array.fill(nX)(1.0))),
                    (3, Array.fill(nY)(Array.fill(nX)(1.0))))

val w1 = DoubleMatrix.ones(400)

// This finishes in less than a second: 
val result = dataAll.map { dat =>
  val c       = dat._1
  val dataArr = dat._2
  // Map over the Arrays within dataArr: 
  val test = dataArr.map { arr =>
    val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
    val out = test2.dot(w1)
    out
  }
  (c, test)
}

However, when I parallelize dataAll, the same task freezes: 

val dataAllRDD = sc.parallelize(dataAll, 3)

// This doesn't finish in several minutes: 
val result = dataAllRDD.map { dat =>
  val c       = dat._1
  val dataArr = dat._2
  // Map over the Arrays within dataArr: 
  val test = dataArr.map { arr =>
    val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
    val out = test2.dot(w1)
    out
  }
  (c, test)
}.collect

After sending the above task, nothing is written to the worker logs (as
viewed through the web UI), but the following output is printed in the Spark
shell where I'm running the task: 

14/08/11 18:17:31 INFO SparkContext: Starting job: collect at <console>:33
14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at <console>:33)
with 3 output partitions (allowLocal=false)
14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at
<console>:33)
14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List()
14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List()
14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map
at <console>:23), which has no missing parents
14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage 0
(MappedRDD[1] at map at <console>:23)
14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 2: <executor_2_IP> (PROCESS_LOCAL)
14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060
bytes in 69 ms
14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
executor 1: <executor_1_IP> (PROCESS_LOCAL)
14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060
bytes in 81 ms
14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on
executor 0: <executor_0_IP> (PROCESS_LOCAL)
14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060
bytes in 66 ms


dataAllRDD.map does work with smaller array though (e.g. nY = 100; finishes
in less than a second). 

Why is dataAllRDD.map so much slower than dataAll.map, or even not executing
at all? 

The Spark version I'm using is 0.9.0. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to