Re: How to use collections inside foreach block

2015-12-10 Thread Madabhattula Rajesh Kumar
Hi Rishi and Ted,

Thank you for the response. Now I'm using Accumulators and getting results.
I have a another query, how to start parallel the code.

Example :-

var listOfIds is a ListBuffer with 2 records

I'm creating batches. For each batch size is 500. It means, total batches
are : 40.

listOfIds.grouped(500).foreach { x =>
{
val r = sc.parallelize(x).toDF()
r.registerTempTable("r")
 val acc = sc.accumulator(0, "My Accumulator")
var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id
")
 result.foreach{ y =>
 {
 acc += y
  }
}
acc.value.foreach(f => // saveing values to other db)
}

Above code is working in sequence. I want to run these 40 batches in
parallel.

*How to start these 40 bathes in parallel instead of sequence. *

Could you please help me to resolve this use case.

Regards,
Rajesh


On Wed, Dec 9, 2015 at 4:46 PM, Ted Yu  wrote:

> To add onto what Rishi said, you can use foreachPartition() on result
> where you can save values to DB.
>
> Cheers
>
> On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra 
> wrote:
>
>> Your list is defined on the driver, whereas function specified in forEach
>> will be evaluated on each executor.
>> You might want to add an accumulator or handle a Sequence of list from
>> each partition.
>>
>> On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a below query. Please help me to solve this
>>>
>>> I have a 2 ids. I want to join these ids to table. This table
>>> contains some blob data. So i can not join these 2000 ids to this table in
>>> one step.
>>>
>>> I'm planning to join this table in a chunks. For example, each step I
>>> will join 5000 ids.
>>>
>>> Below code is not working. I'm not able to add result to ListBuffer.
>>> Result s giving always ZERO
>>>
>>> *Code Block :-*
>>>
>>> var listOfIds is a ListBuffer with 2 records
>>>
>>> listOfIds.grouped(5000).foreach { x =>
>>> {
>>> var v1 = new ListBuffer[String]()
>>> val r = sc.parallelize(x).toDF()
>>> r.registerTempTable("r")
>>> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
>>> t.id")
>>>  result.foreach{ y =>
>>>  {
>>>  v1 += y
>>>   }
>>> }
>>> println(" SIZE OF V1 === "+ v1.size)  ==>
>>>
>>> *THIS VALUE PRINTING AS ZERO*
>>>
>>> *// Save v1 values to other db*
>>> }
>>>
>>> Please help me on this.
>>>
>>> Regards,
>>> Rajesh
>>>
>>
>>
>>
>> --
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


Re: How to use collections inside foreach block

2015-12-09 Thread Rishi Mishra
Your list is defined on the driver, whereas function specified in forEach
will be evaluated on each executor.
You might want to add an accumulator or handle a Sequence of list from each
partition.

On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I have a below query. Please help me to solve this
>
> I have a 2 ids. I want to join these ids to table. This table contains
> some blob data. So i can not join these 2000 ids to this table in one step.
>
> I'm planning to join this table in a chunks. For example, each step I will
> join 5000 ids.
>
> Below code is not working. I'm not able to add result to ListBuffer.
> Result s giving always ZERO
>
> *Code Block :-*
>
> var listOfIds is a ListBuffer with 2 records
>
> listOfIds.grouped(5000).foreach { x =>
> {
> var v1 = new ListBuffer[String]()
> val r = sc.parallelize(x).toDF()
> r.registerTempTable("r")
> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
> t.id")
>  result.foreach{ y =>
>  {
>  v1 += y
>   }
> }
> println(" SIZE OF V1 === "+ v1.size)  ==>
>
> *THIS VALUE PRINTING AS ZERO*
>
> *// Save v1 values to other db*
> }
>
> Please help me on this.
>
> Regards,
> Rajesh
>



-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: How to use collections inside foreach block

2015-12-09 Thread Ted Yu
To add onto what Rishi said, you can use foreachPartition() on result where
you can save values to DB.

Cheers

On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra  wrote:

> Your list is defined on the driver, whereas function specified in forEach
> will be evaluated on each executor.
> You might want to add an accumulator or handle a Sequence of list from
> each partition.
>
> On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a below query. Please help me to solve this
>>
>> I have a 2 ids. I want to join these ids to table. This table
>> contains some blob data. So i can not join these 2000 ids to this table in
>> one step.
>>
>> I'm planning to join this table in a chunks. For example, each step I
>> will join 5000 ids.
>>
>> Below code is not working. I'm not able to add result to ListBuffer.
>> Result s giving always ZERO
>>
>> *Code Block :-*
>>
>> var listOfIds is a ListBuffer with 2 records
>>
>> listOfIds.grouped(5000).foreach { x =>
>> {
>> var v1 = new ListBuffer[String]()
>> val r = sc.parallelize(x).toDF()
>> r.registerTempTable("r")
>> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
>> t.id")
>>  result.foreach{ y =>
>>  {
>>  v1 += y
>>   }
>> }
>> println(" SIZE OF V1 === "+ v1.size)  ==>
>>
>> *THIS VALUE PRINTING AS ZERO*
>>
>> *// Save v1 values to other db*
>> }
>>
>> Please help me on this.
>>
>> Regards,
>> Rajesh
>>
>
>
>
> --
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


How to use collections inside foreach block

2015-12-08 Thread Madabhattula Rajesh Kumar
Hi,

I have a below query. Please help me to solve this

I have a 2 ids. I want to join these ids to table. This table contains
some blob data. So i can not join these 2000 ids to this table in one step.

I'm planning to join this table in a chunks. For example, each step I will
join 5000 ids.

Below code is not working. I'm not able to add result to ListBuffer. Result
s giving always ZERO

*Code Block :-*

var listOfIds is a ListBuffer with 2 records

listOfIds.grouped(5000).foreach { x =>
{
var v1 = new ListBuffer[String]()
val r = sc.parallelize(x).toDF()
r.registerTempTable("r")
var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id
")
 result.foreach{ y =>
 {
 v1 += y
  }
}
println(" SIZE OF V1 === "+ v1.size)  ==>

*THIS VALUE PRINTING AS ZERO*

*// Save v1 values to other db*
}

Please help me on this.

Regards,
Rajesh