Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-22 Thread Naresh Dulam
Hi Sunitha,

Make the class which is having the common function your calling as
serializable.


Thank you,
Naresh

On Wed, Dec 20, 2017 at 9:58 PM Sunitha Chennareddy <
chennareddysuni...@gmail.com> wrote:

> Hi,
>
> Thank You All..
>
> Here is my requirement, I have a dataframe which contains list of rows
> retrieved from oracle table.
> I need to iterate dataframe and fetch each record and call a common
> function by passing few parameters.
>
> Issue I am facing is : I am not able to call common function
>
> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
> Function() {
>   @Override
>   public Person call(Row row)  throws Exception{
>   Person person = new Person();
>   person.setId(row.getDecimal(0).longValue());
>   person.setName(row.getString(1));
>
> personLst.add(person);
> return person;
>   }
> });
>
> personRDD.foreach(new VoidFunction() {
> private static final long serialVersionUID = 1123456L;
>
> @Override
> public void call(Person person) throws Exception
> {
>   System.out.println(person.getId());
> Here I tried to call common function 
> }
>});
>
> I am able to print data in foreach loop, however if I tried to call common
> function it gives me below error
> Error Message :  org.apache.spark.SparkException: Task not serializable
>
> I kindly request you to share some idea(sample code / link to refer) on
> how to call a common function/Interace method by passing values in each
> record of the dataframe.
>
> Regards,
> Sunitha
>
>
> On Tue, Dec 19, 2017 at 1:20 PM, Weichen Xu 
> wrote:
>
>> Hi Sunitha,
>>
>> In the mapper function, you cannot update outer variables such as 
>> `personLst.add(person)`,
>> this won't work so that's the reason you got an empty list.
>>
>> You can use `rdd.collect()` to get a local list of `Person` objects
>> first, then you can safely iterate on the local list and do any update you
>> want.
>>
>> Thanks.
>>
>> On Tue, Dec 19, 2017 at 2:16 PM, Sunitha Chennareddy <
>> chennareddysuni...@gmail.com> wrote:
>>
>>> Hi Deepak,
>>>
>>> I am able to map row to person class, issue is I want to to call another
>>> method.
>>> I tried converting to list and its not working with out using collect.
>>>
>>> Regards
>>> Sunitha
>>> On Tuesday, December 19, 2017, Deepak Sharma 
>>> wrote:
>>>
 I am not sure about java but in scala it would be something like
 df.rdd.map{ x => MyClass(x.getString(0),.)}

 HTH

 --Deepak

 On Dec 19, 2017 09:25, "Sunitha Chennareddy" > wrote:

 Hi All,

 I am new to Spark, I want to convert DataFrame to List with
 out using collect().

 Main requirement is I need to iterate through the rows of dataframe and
 call another function by passing column value of each row (person.getId())

 Here is the snippet I have tried, Kindly help me to resolve the issue,
 personLst is returning 0:

 List personLst= new ArrayList();
 JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
 Function() {
   public Person call(Row row)  throws Exception{
   Person person = new Person();
   person.setId(row.getDecimal(0).longValue());
   person.setName(row.getString(1));

 personLst.add(person);
 // here I tried to call another function but control never passed
 return person;
   }
 });
 logger.info("personLst size =="+personLst.size());
 logger.info("personRDD count ==="+personRDD.count());

 //output is
 personLst size == 0
 personRDD count === 3



>>
>


Re: How Fault Tolerance is achieved in Spark ??

2017-12-12 Thread Naresh Dulam
Hi Nikhil,


Fault tolerance is something which is not lost incase of failures. Fault
tolerance achieved in different way in case of different cases.
In case of HDFS fault tolerance is achieved by having the replication
across different nodes.
In case of spark fault tolerance is achieved by having DAG.  Let me put in
simple words
 You have created RDD1 by reading data from HDFS. Applied couple of
transformations and created two new data frames

RDD1-->RDD2--> RDD3.

Let's assume now you have cached RDD3 and for after some time for some
reason RDD3 cleared from cache from to provide space for new RDD4 created
and cached.

Now if you wanted to acccess RDD3 which is not available in cache. So now
Spark will use the DAG to compute RDD3. So in this way Data in RDD3 always
available.


Hope this answer your question in straight way.

Thank you,
Naresh


On Tue, Dec 12, 2017 at 12:51 AM  wrote:

> Hello Techie’s,
>
>
>
> How fault tolerance is achieved in Spark when data is read from HDFS and
> is in form of RDD (Memory).
>
>
>
> Regards
>
> Nikhil
>
>
> "*Confidentiality Warning*: This message and any attachments are intended
> only for the use of the intended recipient(s), are confidential and may be
> privileged. If you are not the intended recipient, you are hereby notified
> that any review, re-transmission, conversion to hard copy, copying,
> circulation or other use of this message and any attachments is strictly
> prohibited. If you are not the intended recipient, please notify the sender
> immediately by return email and delete this message and any attachments
> from your system.
>
> *Virus Warning:* Although the company has taken reasonable precautions to
> ensure no viruses are present in this email. The company cannot accept
> responsibility for any loss or damage arising from the use of this email or
> attachment."
>