Hi Sean

thanks a lot for your reply, yes I understand
that as scala is a functional language maps
correspond to transforms of immutable objects
but the behavior of the program seems like
a deadlock as it simply does not proceed beyond
the B = B.map (A.aggregate) stage

my Spark Web interface shows a pure
scheduler delay bar when I click one
of the still active jobs and expand the
event timeline.

A snippet of thread dump follows my
message, where threads that correspond
to my code calls appear and they are
all in WAITING. like I said, when I remove
the second map and nested aggregate
the problem vanishes.

Thanks again,
Petros

2015-09-14 20:00:53
Full thread dump OpenJDK 64-Bit Server VM (24.79-b02 mixed mode):

"Attach Listener" daemon prio=10 tid=0x00007f9aa4001000 nid=0x674c waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"qtp1523343281-98" daemon prio=10 tid=0x00007f9a2c001000 nid=0x6675 waiting on condition [0x00007f9ab48a0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000e0389d80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342) at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526) at org.eclipse.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None

"qtp1523343281-97" daemon prio=10 tid=0x00007f9a28002000 nid=0x6649 waiting on condition [0x00007f99f7baa000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000e0389d80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342) at org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526) at org.eclipse.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None

"Executor task launch worker-7" daemon prio=10 tid=0x00007f9a8400a800 nid=0x661a in Object.wait() [0x00007f99f6e9c000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
- waiting on <0x00000000fc9ba760> (a org.apache.spark.scheduler.JobWaiter)
        at java.lang.Object.wait(Object.java:503)
at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked <0x00000000fc9ba760> (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:530)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1734)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1804)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1058) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
        at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1051)
at DistributedCholesky$$anonfun$compute_sigma_A$1.apply(DistributedCholesky.scala:323) at DistributedCholesky$$anonfun$compute_sigma_A$1.apply(DistributedCholesky.scala:321)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1056) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1056) at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1803) at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1803) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- <0x00000000fc9bac50> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"qtp1523343281-40 Acceptor0 SelectChannelConnector@0.0.0.0:4040" daemon prio=10 tid=0x00007f9adc95c800 nid=0x65f4 runnable [0x00007f9ab4ba3000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
        - locked <0x00000000e075b488> (a java.lang.Object)
at org.eclipse.jetty.server.nio.SelectChannelConnector.accept(SelectChannelConnector.java:109) at org.eclipse.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:938) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None

"qtp1523343281-39 Selector1" daemon prio=10 tid=0x00007f9adc95b000 nid=0x65f3 runnable [0x00007f9ab4ca4000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        - locked <0x00000000e07a9a50> (a sun.nio.ch.Util$2)
- locked <0x00000000e07a9a60> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000e038a2b8> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at org.eclipse.jetty.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:569) at org.eclipse.jetty.io.nio.SelectorManager$1.run(SelectorManager.java:290) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None



"VM Thread" prio=10 tid=0x00007f9adc07e800 nid=0x65d2 runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f9adc020000 nid=0x65ca runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f9adc022000 nid=0x65cb runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f9adc023800 nid=0x65cc runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f9adc025800 nid=0x65cd runnable

"GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f9adc027800 nid=0x65ce runnable

"GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f9adc029800 nid=0x65cf runnable

"GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f9adc02b000 nid=0x65d0 runnable

"GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f9adc02d000 nid=0x65d1 runnable

"VM Periodic Task Thread" prio=10 tid=0x00007f9adc0c1800 nid=0x65d9 waiting on condition

JNI global references: 208


On 14/09/15 19:45, Sean Owen wrote:
There isn't a cycle in your graph, since although you reuse reference
variables in your code called A and B you are in fact creating new
RDDs at each operation. You have some other problem, and you'd have to
provide detail on why you think something is deadlocked, like a thread
dump.

On Mon, Sep 14, 2015 at 10:42 AM, petranidis <pnyfan...@gmail.com> wrote:
Hi all,

I am new to spark and I have writen a few spark programs mostly around
machine learning
applications.

I am trying to resolve a particular problem where there are two RDDs that
should be updated
by using elements of each other. More specifically, if the two pair RDDs are
called A and B M
is a matrix that specifies which elements of each RDD should be taken into
account when
computing the other with rows of M corresponding to elements of A and
columns to elements
of B e.g.

A = (0, 6), (1,7), (2,8)
B = (0, 4), (1,6), (2,1)
and
M =
  0 1 1
  1 1 0
0 1 0

Then

for (it =0;it < 10; it++) {
A(0) = B(1) + B(2)
A(1) = B(0) + B(1)
A(2) = B(1)
B(0) = A(1)
B(1) = A(0) + A(1) + A(2)
B(2) = A(0)
}

To do such a computation in spark, I used
A = A.map( (key,val) => { B.aggregate(...) })
B = B.map( (key,val) => { A.aggregate(...) })

where if the key of each mapped element keyA is passed in the aggregate
function as a
initialization parameter and then for each B element key keyB, if M(keyA,
keyB) ==1
then the B element is being taken into account in the summation.

The calculation of A is done successfully and correctly, but then the DAG
scheduler
seems to deadlock when the calculation of B happens. This behaviour goes
away
when I remove the A.aggregate bit in my code. Apparently according to the
logs the
scheduler is expecting some results before if can go on but the results
should already
have been calculated.

I assume that this has to do with the DAG scheduling not handling cyclical
dependencies.
Is there a way I can force each iteration or update of A and B to be seen as
a separate
stage? Otherwise, how can I implement this type of aggregation in another
way? (It could
be the equivalent of mapping the A elements to a List of all the B elements
for which the M
matrix entry is 1 and then mapping again to their sum, but this means I need
a lot of space
especially when the problem in hand could be very large, which is
unfeasible, so I need to avoid this)

Thanks in advance for your help!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to