Hi, I have a below query. Please help me to solve this
I have a 20000 ids. I want to join these ids to table. This table contains some blob data. So i can not join these 20000 ids to this table in one step. I'm planning to join this table in a chunks. For example, each step I will join 5000 ids. The total number of batches are : 20000/500 = 40 I want to run these 40 batches in parallel. For that, I'm using *foreachAsync* method. Now I'm getting below exception *An error has occured: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.* *Code :- * var listOfEqs = //ListBuffer. Number of elements are 20000 var listOfEqsSeq = listOfEqs.grouped(500).toList var listR = sc.parallelize(listOfEqsSeq) var asyncRd = new AsyncRDDActions(listR) val f = asyncRd.foreachAsync { x => { val r = sc.parallelize(x).toDF() ======> This line I'm getting above mentioned exception r.registerTempTable("r") val acc = sc.accumulator(0, "My Accumulator") var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id ") result.foreach{ y => { acc += y } } acc.value.foreach(f => // saving values to other db) } f.onComplete { case scala.util.Success(res) => println(res) case scala.util.Failure(e)=> println("An error has occured: " + e.getMessage) } Please help me to solve this issue Regards, Rajesh