I meant to write 'last task in stage'.

Sent from my Verizon Wireless 4G LTE smartphone

-------- Original message --------
From: Darren Govoni <dar...@ontrenet.com> 
Date: 02/16/2016  6:55 AM  (GMT-05:00) 
To: Abhishek Modi <abshkm...@gmail.com>, user@spark.apache.org 
Subject: RE: Unusually large deserialisation time 


    
I think this is part of the bigger issue of serious deadlock conditions 
occurring in spark many of us have posted on.
Would the task in question be the past task of a stage by chance?


Sent from my Verizon Wireless 4G LTE smartphone

-------- Original message --------
From: Abhishek Modi <abshkm...@gmail.com> 
Date: 02/16/2016  4:12 AM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: Unusually large deserialisation time 

I'm doing a mapPartitions on a rdd cached in memory followed by a reduce. Here 
is my code snippet 

// myRdd is an rdd consisting of Tuple2[Int,Long] 
myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2)) 

//The rangify function 
def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [ 
ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= { 
  var sum=0L 
  val mylist=ArrayBuffer[ Tuple2[Long,Long] ]() 

  if(l.isEmpty) 
    return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ())).toIterator 

  var prev= -1000L 
  var begin= -1000L 

  for (x <- l){ 
    sum+=x._1 

    if(prev<0){ 
      prev=x._2 
      begin=x._2 
    } 

    else if(x._2==prev+1) 
      prev=x._2 

    else { 
      list+=((begin,prev)) 
      prev=x._2 
      begin=x._2 
    } 
  } 

  mylist+= ((begin,prev)) 

  List((sum, List(mylist) ) ).toIterator 
} 


The rdd is cached in memory. I'm using 20 executors with 1 core for each 
executor. The cached rdd has 60 blocks. The problem is for every 2-3 runs of 
the job, there is a task which has an abnormally large deserialisation time. 
Screenshot attached 

Thank you,Abhishek


Reply via email to