RE: Get the previous state string in Spark streaming
Thank you, we got it fixed. Regards, Anand.C From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, October 16, 2015 3:38 PM To: Chandra Mohan, Ananda Vel Murugan Cc: user Subject: Re: Get the previous state string in Spark streaming A simple Javadoc look up in Java List<http://docs.oracle.com/javase/7/docs/api/java/util/List.html#add(E)> shows that List.add can throw UnsupportedOperationException if the implementation of the List interface does not support that operation (example, does not support adding null). A good place to start would be to print the parameter to the List.add(), and the type of class used for List.add(). Even better if you can reproduce it locally, then you will get the prints in the driver logs. Otherwise you will have to hunt for that print statement in the executor logs. Also, I am not sure what you are trying to do but using state.toString() (which is, Optional.toString) to compare with null, and then trying to add that to a list, both seem pretty weird to me. Are you what you want to do works as you intend to. Might be worth trying this out locally first with println() to see whether the logic actually works as you think it should. Hope this helps. On Fri, Oct 16, 2015 at 2:23 AM, Chandra Mohan, Ananda Vel Murugan mailto:ananda.muru...@honeywell.com>> wrote: Hi, Thanks for the response. We are trying to implement something similar as discussed in the following SFO post. http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation We are doing it in java while accepted answer(second answer) in this post is in Scala. We wrote our java code taking this scala code as reference. But we are getting exception in highlighted line i.e. in return Optional.of(events.add(state.toString());); Specifically, it happens when we call events.add() final Function2, Optional>, Optional>> updateFunc = new Function2, Optional>, Optional>>() { public Optional> call(List events, Optional> state) throws Exception { // TODO Auto-generated method stub if(state.toString()==null) return Optional.of(events); else { //UnsupportedOperationException here return Optional.of(events.add(state.toString());); } } }; Please let us know if you need more details. Unfortunately we are not in a position to share whole code. Thanks Regards, Anand/Yogesh From: Tathagata Das [mailto:t...@databricks.com<mailto:t...@databricks.com>] Sent: Friday, October 16, 2015 1:22 PM To: Chandra Mohan, Ananda Vel Murugan Cc: user Subject: Re: Get the previous state string in Spark streaming Its hard to help without any stacktrace associated with UnsupportedOperationException. On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan mailto:ananda.muru...@honeywell.com>> wrote: One of my co-worker(Yogesh) was trying to get this posted in spark mailing and it seems it did not get posted. So I am reposting it here. Please help. Hi, I am new to Spark and was trying to do some experiments with it. I had a JavaPairDStream> RDD. I want to get the list of string from its previous state. For that I use updateStateByKey function as follows: final Function2, Optional>, Optional>> updateFunc = new Function2, Optional>, Optional>>() { public Optional> call(List arg0, Optional> arg1) throws Exception { // TODO Auto-generated method stub if(arg1.toString()==null) return Optional.of(arg0); else { arg0.add(arg1.toString()); return Optional.of(arg0); } } }; I want the function to append the new list of string to the previous list and return the new list. But I am not able to do so. I am getting the C error. Can anyone which help me out in getting the desired output?
Re: Get the previous state string in Spark streaming
A simple Javadoc look up in Java List <http://docs.oracle.com/javase/7/docs/api/java/util/List.html#add(E)> shows that List.add can throw UnsupportedOperationException if the implementation of the List interface does not support that operation (example, does not support adding null). A good place to start would be to print the parameter to the List.add(), and the type of class used for List.add(). Even better if you can reproduce it locally, then you will get the prints in the driver logs. Otherwise you will have to hunt for that print statement in the executor logs. Also, I am not sure what you are trying to do but using state.toString() (which is, Optional.toString) to compare with null, and then trying to add that to a list, both seem pretty weird to me. Are you what you want to do works as you intend to. Might be worth trying this out locally first with println() to see whether the logic actually works as you think it should. Hope this helps. On Fri, Oct 16, 2015 at 2:23 AM, Chandra Mohan, Ananda Vel Murugan < ananda.muru...@honeywell.com> wrote: > Hi, > > > > Thanks for the response. We are trying to implement something similar as > discussed in the following SFO post. > > > > > http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation > > > > We are doing it in java while accepted answer(second answer) in this post > is in Scala. > > > > We wrote our java code taking this scala code as reference. But we are > getting exception in highlighted line i.e. in return Optional.of( > events.add(state.toString());); Specifically, it happens when we call > events.add() > > > > final Function2, Optional>, > Optional>> updateFunc = > >new Function2, Optional>, > > Optional>>() { > > > > public Optional> call(List events, > Optional> state) throws Exception { > > // TODO Auto-generated method stub > > if(state.toString()==null) > >return Optional.of(events); > > else { > > //UnsupportedOperationException here > >return Optional.of(events.add(state.toString());); > > } > >} > > }; > > Please let us know if you need more details. Unfortunately we are not in a > position to share whole code. > > Thanks > > > > Regards, > > Anand/Yogesh > > *From:* Tathagata Das [mailto:t...@databricks.com] > *Sent:* Friday, October 16, 2015 1:22 PM > *To:* Chandra Mohan, Ananda Vel Murugan > *Cc:* user > *Subject:* Re: Get the previous state string in Spark streaming > > > > Its hard to help without any stacktrace associated > with UnsupportedOperationException. > > > > On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan < > ananda.muru...@honeywell.com> wrote: > > One of my co-worker(Yogesh) was trying to get this posted in spark mailing > and it seems it did not get posted. So I am reposting it here. Please help. > > > > > > Hi, > > I am new to Spark and was trying to do some experiments with it. > > > > I had a JavaPairDStream> RDD. > > I want to get the list of string from its previous state. For that I use > updateStateByKey function as follows: > > > > final Function2, Optional>, > Optional>> updateFunc = > >new Function2, Optional>, > > Optional>>() { > > > > public Optional> call(List arg0, > Optional> arg1) throws Exception { > > // TODO Auto-generated method stub > > if(arg1.toString()==null) > >return Optional.of(arg0); > > else { > >arg0.add(arg1.toString()); > >return Optional.of(arg0); > > } > >} > > }; > > > > I want the function to append the new list of string to the previous list > and return the new list. But I am not able to do so. I am getting the C > error. > > Can anyone which help me out in getting the desired output? > > > > >
RE: Get the previous state string in Spark streaming
Hi, Thanks for the response. We are trying to implement something similar as discussed in the following SFO post. http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation We are doing it in java while accepted answer(second answer) in this post is in Scala. We wrote our java code taking this scala code as reference. But we are getting exception in highlighted line i.e. in return Optional.of(events.add(state.toString());); Specifically, it happens when we call events.add() final Function2, Optional>, Optional>> updateFunc = new Function2, Optional>, Optional>>() { public Optional> call(List events, Optional> state) throws Exception { // TODO Auto-generated method stub if(state.toString()==null) return Optional.of(events); else { //UnsupportedOperationException here return Optional.of(events.add(state.toString());); } } }; Please let us know if you need more details. Unfortunately we are not in a position to share whole code. Thanks Regards, Anand/Yogesh From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, October 16, 2015 1:22 PM To: Chandra Mohan, Ananda Vel Murugan Cc: user Subject: Re: Get the previous state string in Spark streaming Its hard to help without any stacktrace associated with UnsupportedOperationException. On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan mailto:ananda.muru...@honeywell.com>> wrote: One of my co-worker(Yogesh) was trying to get this posted in spark mailing and it seems it did not get posted. So I am reposting it here. Please help. Hi, I am new to Spark and was trying to do some experiments with it. I had a JavaPairDStream> RDD. I want to get the list of string from its previous state. For that I use updateStateByKey function as follows: final Function2, Optional>, Optional>> updateFunc = new Function2, Optional>, Optional>>() { public Optional> call(List arg0, Optional> arg1) throws Exception { // TODO Auto-generated method stub if(arg1.toString()==null) return Optional.of(arg0); else { arg0.add(arg1.toString()); return Optional.of(arg0); } } }; I want the function to append the new list of string to the previous list and return the new list. But I am not able to do so. I am getting the C error. Can anyone which help me out in getting the desired output?
Re: Get the previous state string in Spark streaming
Its hard to help without any stacktrace associated with UnsupportedOperationException. On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan < ananda.muru...@honeywell.com> wrote: > One of my co-worker(Yogesh) was trying to get this posted in spark mailing > and it seems it did not get posted. So I am reposting it here. Please help. > > > > > > Hi, > > I am new to Spark and was trying to do some experiments with it. > > > > I had a JavaPairDStream> RDD. > > I want to get the list of string from its previous state. For that I use > updateStateByKey function as follows: > > > > final Function2, Optional>, > Optional>> updateFunc = > >new Function2, Optional>, > > Optional>>() { > > > > public Optional> call(List arg0, > Optional> arg1) throws Exception { > > // TODO Auto-generated method stub > > if(arg1.toString()==null) > >return Optional.of(arg0); > > else { > >arg0.add(arg1.toString()); > >return Optional.of(arg0); > > } > >} > > }; > > > > I want the function to append the new list of string to the previous list > and return the new list. But I am not able to do so. I am getting the " > java.lang.UnsupportedOperationException" error. > > Can anyone which help me out in getting the desired output? > > >