Hi Rishi and Ted, Thank you for the response. Now I'm using Accumulators and getting results. I have a another query, how to start parallel the code.
Example :- var listOfIds is a ListBuffer with 20000 records I'm creating batches. For each batch size is 500. It means, total batches are : 40. listOfIds.grouped(500).foreach { x => { val r = sc.parallelize(x).toDF() r.registerTempTable("r") val acc = sc.accumulator(0, "My Accumulator") var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id ") result.foreach{ y => { acc += y } } acc.value.foreach(f => // saveing values to other db) } Above code is working in sequence. I want to run these 40 batches in parallel. *How to start these 40 bathes in parallel instead of sequence. * Could you please help me to resolve this use case. Regards, Rajesh On Wed, Dec 9, 2015 at 4:46 PM, Ted Yu <yuzhih...@gmail.com> wrote: > To add onto what Rishi said, you can use foreachPartition() on result > where you can save values to DB. > > Cheers > > On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra <rmis...@snappydata.io> > wrote: > >> Your list is defined on the driver, whereas function specified in forEach >> will be evaluated on each executor. >> You might want to add an accumulator or handle a Sequence of list from >> each partition. >> >> On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar < >> mrajaf...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a below query. Please help me to solve this >>> >>> I have a 20000 ids. I want to join these ids to table. This table >>> contains some blob data. So i can not join these 2000 ids to this table in >>> one step. >>> >>> I'm planning to join this table in a chunks. For example, each step I >>> will join 5000 ids. >>> >>> Below code is not working. I'm not able to add result to ListBuffer. >>> Result s giving always ZERO >>> >>> *Code Block :-* >>> >>> var listOfIds is a ListBuffer with 20000 records >>> >>> listOfIds.grouped(5000).foreach { x => >>> { >>> var v1 = new ListBuffer[String]() >>> val r = sc.parallelize(x).toDF() >>> r.registerTempTable("r") >>> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = >>> t.id") >>> result.foreach{ y => >>> { >>> v1 += y >>> } >>> } >>> println(" SIZE OF V1 === "+ v1.size) ==> >>> >>> *THIS VALUE PRINTING AS ZERO* >>> >>> *// Save v1 values to other db* >>> } >>> >>> Please help me on this. >>> >>> Regards, >>> Rajesh >>> >> >> >> >> -- >> Regards, >> Rishitesh Mishra, >> SnappyData . (http://www.snappydata.io/) >> >> https://in.linkedin.com/in/rishiteshmishra >> > >