There isn't a cycle in your graph, since although you reuse reference
variables in your code called A and B you are in fact creating new
RDDs at each operation. You have some other problem, and you'd have to
provide detail on why you think something is deadlocked, like a thread
dump.

On Mon, Sep 14, 2015 at 10:42 AM, petranidis <pnyfan...@gmail.com> wrote:
> Hi all,
>
> I am new to spark and I have writen a few spark programs mostly around
> machine learning
> applications.
>
> I am trying to resolve a particular problem where there are two RDDs that
> should be updated
> by using elements of each other. More specifically, if the two pair RDDs are
> called A and B M
> is a matrix that specifies which elements of each RDD should be taken into
> account when
> computing the other with rows of M corresponding to elements of A and
> columns to elements
> of B e.g.
>
> A = (0, 6), (1,7), (2,8)
> B = (0, 4), (1,6), (2,1)
> and
> M =
>  0 1 1
>  1 1 0
> 0 1 0
>
> Then
>
> for (it =0;it < 10; it++) {
> A(0) = B(1) + B(2)
> A(1) = B(0) + B(1)
> A(2) = B(1)
> B(0) = A(1)
> B(1) = A(0) + A(1) + A(2)
> B(2) = A(0)
> }
>
> To do such a computation in spark, I used
> A = A.map( (key,val) => { B.aggregate(...) })
> B = B.map( (key,val) => { A.aggregate(...) })
>
> where if the key of each mapped element keyA is passed in the aggregate
> function as a
> initialization parameter and then for each B element key keyB, if M(keyA,
> keyB) ==1
> then the B element is being taken into account in the summation.
>
> The calculation of A is done successfully and correctly, but then the DAG
> scheduler
> seems to deadlock when the calculation of B happens. This behaviour goes
> away
> when I remove the A.aggregate bit in my code. Apparently according to the
> logs the
> scheduler is expecting some results before if can go on but the results
> should already
> have been calculated.
>
> I assume that this has to do with the DAG scheduling not handling cyclical
> dependencies.
> Is there a way I can force each iteration or update of A and B to be seen as
> a separate
> stage? Otherwise, how can I implement this type of aggregation in another
> way? (It could
> be the equivalent of mapping the A elements to a List of all the B elements
> for which the M
> matrix entry is 1 and then mapping again to their sum, but this means I need
> a lot of space
> especially when the problem in hand could be very large, which is
> unfeasible, so I need to avoid this)
>
> Thanks in advance for your help!
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to