I have an array 'dataAll' of key-value pairs where each value is an array of arrays. I would like to parallelize a task over the elements of 'dataAll' to the workers. In the dummy example below, the number of elements in 'dataAll' is 3 but in real application it would be tens to hundreds.
Without parallelizing dataAll, 'result' is calculated in less than a second: import org.jblas.DoubleMatrix val nY = 5000 val nX = 400 val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))), (2, Array.fill(nY)(Array.fill(nX)(1.0))), (3, Array.fill(nY)(Array.fill(nX)(1.0)))) val w1 = DoubleMatrix.ones(400) // This finishes in less than a second: val result = dataAll.map { dat => val c = dat._1 val dataArr = dat._2 // Map over the Arrays within dataArr: val test = dataArr.map { arr => val test2 = new DoubleMatrix(arr.length, 1, arr:_*) val out = test2.dot(w1) out } (c, test) } However, when I parallelize dataAll, the same task freezes: val dataAllRDD = sc.parallelize(dataAll, 3) // This doesn't finish in several minutes: val result = dataAllRDD.map { dat => val c = dat._1 val dataArr = dat._2 // Map over the Arrays within dataArr: val test = dataArr.map { arr => val test2 = new DoubleMatrix(arr.length, 1, arr:_*) val out = test2.dot(w1) out } (c, test) }.collect After sending the above task, nothing is written to the worker logs (as viewed through the web UI), but the following output is printed in the Spark shell where I'm running the task: 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at <console>:33 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at <console>:33) with 3 output partitions (allowLocal=false) 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at <console>:33) 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List() 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List() 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at <console>:23), which has no missing parents 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage 0 (MappedRDD[1] at map at <console>:23) 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: <executor_2_IP> (PROCESS_LOCAL) 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060 bytes in 69 ms 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 1: <executor_1_IP> (PROCESS_LOCAL) 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060 bytes in 81 ms 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on executor 0: <executor_0_IP> (PROCESS_LOCAL) 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060 bytes in 66 ms dataAllRDD.map does work with smaller array though (e.g. nY = 100; finishes in less than a second). Why is dataAllRDD.map so much slower than dataAll.map, or even not executing at all? The Spark version I'm using is 0.9.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org