Hi Rishi and Ted,

Thank you for the response. Now I'm using Accumulators and getting results.
I have a another query, how to start parallel the code.

Example :-

var listOfIds is a ListBuffer with 20000 records

I'm creating batches. For each batch size is 500. It means, total batches
are : 40.

listOfIds.grouped(500).foreach { x =>
{
val r = sc.parallelize(x).toDF()
r.registerTempTable("r")
 val acc = sc.accumulator(0, "My Accumulator")
var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id
")
     result.foreach{ y =>
     {
         acc += y
      }
}
acc.value.foreach(f => // saveing values to other db)
}

Above code is working in sequence. I want to run these 40 batches in
parallel.

*How to start these 40 bathes in parallel instead of sequence. *

Could you please help me to resolve this use case.

Regards,
Rajesh


On Wed, Dec 9, 2015 at 4:46 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> To add onto what Rishi said, you can use foreachPartition() on result
> where you can save values to DB.
>
> Cheers
>
> On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra <rmis...@snappydata.io>
> wrote:
>
>> Your list is defined on the driver, whereas function specified in forEach
>> will be evaluated on each executor.
>> You might want to add an accumulator or handle a Sequence of list from
>> each partition.
>>
>> On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a below query. Please help me to solve this
>>>
>>> I have a 20000 ids. I want to join these ids to table. This table
>>> contains some blob data. So i can not join these 2000 ids to this table in
>>> one step.
>>>
>>> I'm planning to join this table in a chunks. For example, each step I
>>> will join 5000 ids.
>>>
>>> Below code is not working. I'm not able to add result to ListBuffer.
>>> Result s giving always ZERO
>>>
>>> *Code Block :-*
>>>
>>> var listOfIds is a ListBuffer with 20000 records
>>>
>>> listOfIds.grouped(5000).foreach { x =>
>>> {
>>> var v1 = new ListBuffer[String]()
>>> val r = sc.parallelize(x).toDF()
>>> r.registerTempTable("r")
>>> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
>>> t.id")
>>>      result.foreach{ y =>
>>>      {
>>>          v1 += y
>>>       }
>>> }
>>> println(" SIZE OF V1 === "+ v1.size)  ==>
>>>
>>> *THIS VALUE PRINTING AS ZERO*
>>>
>>> *// Save v1 values to other db*
>>> }
>>>
>>> Please help me on this.
>>>
>>> Regards,
>>> Rajesh
>>>
>>
>>
>>
>> --
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>

Reply via email to