Hi,

I have a below query. Please help me to solve this

I have a 20000 ids. I want to join these ids to table. This table contains
some blob data. So i can not join these 20000 ids to this table in one step.

I'm planning to join this table in a chunks. For example, each step I will
join 5000 ids. The total number of batches are : 20000/500 = 40

I want to run these 40 batches in parallel. For that, I'm using
*foreachAsync* method. Now I'm getting below exception

*An error has occured: Job aborted due to stage failure: Task 0 in stage
1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1,
localhost): java.lang.IllegalStateException: Cannot call methods on a
stopped SparkContext.*

*Code :- *

var listOfEqs = //ListBuffer. Number of elements are 20000
var listOfEqsSeq = listOfEqs.grouped(500).toList
var listR = sc.parallelize(listOfEqsSeq)
var asyncRd =  new AsyncRDDActions(listR)

val f = asyncRd.foreachAsync { x =>
{
val r = sc.parallelize(x).toDF()  ======> This line I'm getting above
mentioned exception
r.registerTempTable("r")
 val acc = sc.accumulator(0, "My Accumulator")
var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id
")
     result.foreach{ y =>
     {
         acc += y
      }
}
acc.value.foreach(f => // saving values to other db)
}
f.onComplete {
        case scala.util.Success(res) => println(res)
        case scala.util.Failure(e)=> println("An error has occured: " +
e.getMessage)
}

Please help me to solve this issue

Regards,
Rajesh

Reply via email to