Re: Get the previous state string in Spark streaming

2015-10-16 Thread Tathagata Das
Its hard to help without any stacktrace associated
with UnsupportedOperationException.

On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan <
ananda.muru...@honeywell.com> wrote:

> One of my co-worker(Yogesh) was trying to get this posted in spark mailing
> and it seems it did not get posted. So I am reposting it here. Please help.
>
>
>
>
>
> Hi,
>
> I am new to Spark and was trying to do some experiments with it.
>
>
>
> I had a JavaPairDStream RDD.
>
> I want to get the list of string from its previous state. For that I use
> updateStateByKey function as follows:
>
>
>
> final Function2,
> Optional> updateFunc =
>
>new Function2,
>
> Optional>() {
>
>
>
> public Optional call(List arg0,
> Optional arg1) throws Exception {
>
> // TODO Auto-generated method stub
>
> if(arg1.toString()==null)
>
>return Optional.of(arg0);
>
> else {
>
>arg0.add(arg1.toString());
>
>return Optional.of(arg0);
>
> }
>
>}
>
> };
>
>
>
> I want the function to append the new list of string to the previous list
> and return the new list. But I am not able to do so. I am getting the "
> java.lang.UnsupportedOperationException" error.
>
> Can anyone which help me out in getting the desired output?
>
>
>


RE: Get the previous state string in Spark streaming

2015-10-16 Thread Chandra Mohan, Ananda Vel Murugan
Hi,

Thanks for the response. We are trying to implement something similar as 
discussed in the following SFO post.

http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation

We are doing it in java while accepted answer(second answer) in this post is in 
Scala.


We wrote our java code taking this scala code as reference. But we are getting 
exception in highlighted line i.e. in  return 
Optional.of(events.add(state.toString());); Specifically, it happens when we 
call events.add()



final Function2<List, Optional<List>, Optional<List>> 
updateFunc =

   new Function2<List, Optional<List>,

Optional<List>>() {



public Optional<List> call(List events, 
Optional<List> state) throws Exception {

// TODO Auto-generated method stub

if(state.toString()==null)

   return Optional.of(events);

else {

//UnsupportedOperationException here

   return Optional.of(events.add(state.toString()););

}

   }

};

Please let us know if you need more details. Unfortunately we are not in a 
position to share whole code.

Thanks



Regards,

Anand/Yogesh
From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, October 16, 2015 1:22 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: user
Subject: Re: Get the previous state string in Spark streaming

Its hard to help without any stacktrace associated with 
UnsupportedOperationException.

On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan 
<ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>> wrote:

One of my co-worker(Yogesh) was trying to get this posted in spark mailing and 
it seems it did not get posted. So I am reposting it here. Please help.





Hi,

I am new to Spark and was trying to do some experiments with it.



I had a JavaPairDStream<String, List> RDD.

I want to get the list of string from its previous state. For that I use 
updateStateByKey function as follows:



final Function2<List, Optional<List>, Optional<List>> 
updateFunc =

   new Function2<List, Optional<List>,

Optional<List>>() {



public Optional<List> call(List arg0, 
Optional<List> arg1) throws Exception {

// TODO Auto-generated method stub

if(arg1.toString()==null)

   return Optional.of(arg0);

else {

   arg0.add(arg1.toString());

   return Optional.of(arg0);

}

   }

};



I want the function to append the new list of string to the previous list and 
return the new list. But I am not able to do so. I am getting the C error.

Can anyone which help me out in getting the desired output?




Re: Get the previous state string in Spark streaming

2015-10-16 Thread Tathagata Das
A simple Javadoc look up in Java List
<http://docs.oracle.com/javase/7/docs/api/java/util/List.html#add(E)> shows
that List.add can throw UnsupportedOperationException if the implementation
of the List interface does not support that operation (example, does not
support adding null). A good place to start would be to print the parameter
to the List.add(), and the type of class used for List.add(). Even better
if you can reproduce it locally, then you will get the prints in the driver
logs. Otherwise you will have to hunt for that print statement in the
executor logs.

Also, I am not sure what you are trying to do but using state.toString()
(which is, Optional.toString) to compare with null, and then trying to add
that to a list, both seem pretty weird to me. Are you what you want to do
works as you intend to. Might be worth trying this out locally first with
println() to see whether the logic actually works as you think it should.

Hope this helps.

On Fri, Oct 16, 2015 at 2:23 AM, Chandra Mohan, Ananda Vel Murugan <
ananda.muru...@honeywell.com> wrote:

> Hi,
>
>
>
> Thanks for the response. We are trying to implement something similar as
> discussed in the following SFO post.
>
>
>
>
> http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation
>
>
>
> We are doing it in java while accepted answer(second answer) in this post
> is in Scala.
>
>
>
> We wrote our java code taking this scala code as reference. But we are
> getting exception in highlighted line i.e. in  return Optional.of(
> events.add(state.toString());); Specifically, it happens when we call
> events.add()
>
>
>
> final Function2<List, Optional<List>,
> Optional<List>> updateFunc =
>
>new Function2<List, Optional<List>,
>
> Optional<List>>() {
>
>
>
> public Optional<List> call(List events,
> Optional<List> state) throws Exception {
>
> // TODO Auto-generated method stub
>
> if(state.toString()==null)
>
>return Optional.of(events);
>
> else {
>
> //UnsupportedOperationException here
>
>return Optional.of(events.add(state.toString()););
>
> }
>
>}
>
> };
>
> Please let us know if you need more details. Unfortunately we are not in a
> position to share whole code.
>
> Thanks
>
>
>
> Regards,
>
> Anand/Yogesh
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Friday, October 16, 2015 1:22 PM
> *To:* Chandra Mohan, Ananda Vel Murugan
> *Cc:* user
> *Subject:* Re: Get the previous state string in Spark streaming
>
>
>
> Its hard to help without any stacktrace associated
> with UnsupportedOperationException.
>
>
>
> On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan <
> ananda.muru...@honeywell.com> wrote:
>
> One of my co-worker(Yogesh) was trying to get this posted in spark mailing
> and it seems it did not get posted. So I am reposting it here. Please help.
>
>
>
>
>
> Hi,
>
> I am new to Spark and was trying to do some experiments with it.
>
>
>
> I had a JavaPairDStream<String, List> RDD.
>
> I want to get the list of string from its previous state. For that I use
> updateStateByKey function as follows:
>
>
>
> final Function2<List, Optional<List>,
> Optional<List>> updateFunc =
>
>new Function2<List, Optional<List>,
>
> Optional<List>>() {
>
>
>
> public Optional<List> call(List arg0,
> Optional<List> arg1) throws Exception {
>
> // TODO Auto-generated method stub
>
> if(arg1.toString()==null)
>
>return Optional.of(arg0);
>
> else {
>
>arg0.add(arg1.toString());
>
>return Optional.of(arg0);
>
> }
>
>}
>
> };
>
>
>
> I want the function to append the new list of string to the previous list
> and return the new list. But I am not able to do so. I am getting the C
> error.
>
> Can anyone which help me out in getting the desired output?
>
>
>
>
>


Get the previous state string in Spark streaming

2015-10-15 Thread Chandra Mohan, Ananda Vel Murugan
One of my co-worker(Yogesh) was trying to get this posted in spark mailing and 
it seems it did not get posted. So I am reposting it here. Please help.





Hi,

I am new to Spark and was trying to do some experiments with it.



I had a JavaPairDStream RDD.

I want to get the list of string from its previous state. For that I use 
updateStateByKey function as follows:



final Function2, Optional> 
updateFunc =

   new Function2,

Optional>() {



public Optional call(List arg0, 
Optional arg1) throws Exception {

// TODO Auto-generated method stub

if(arg1.toString()==null)

   return Optional.of(arg0);

else {

   arg0.add(arg1.toString());

   return Optional.of(arg0);

}

   }

};



I want the function to append the new list of string to the previous list and 
return the new list. But I am not able to do so. I am getting the " 
java.lang.UnsupportedOperationException" error.

Can anyone which help me out in getting the desired output?