I am not sure if you realized but the code snipper it pretty mangled up in
the email we received. It might be a good idea to put the code in pastebin
or gist, much much easier for everyone to read.


On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1...@gmail.com> wrote:

> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
> serializability as an issue.
>
> Basically, I want Apache Spark to parse and bundle my data in real time.
> After, the data has been bundled it should be stored in the database,
> Neo4j.
> However, I am getting this error:
>
> org.apache.spark.SparkException: Task not serializable
>     at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>     at
> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>     at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>     at twoGrams.Main$4.call(Main.java:102)
>     at twoGrams.Main$4.call(Main.java:1)
>     at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>     at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>     at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>     at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>     at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>     at scala.util.Try$.apply(Try.scala:161)
>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>     at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>     at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.NotSerializableException:
> org.neo4j.kernel.EmbeddedGraphDatabase
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
>
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>     at
>
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>     at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>     ... 17 more
> Here is my code:
>
> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>
> output.foreachRDD(
>                 new
> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>
>                     @Override
>                     public Void call(
>                             JavaPairRDD<String, ArrayList&lt;String>> arg0,
>                             Time arg1) throws Exception {
>                         // TODO Auto-generated method stub
>
>                         arg0.foreach(
>                                 new
> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>
>                                     @Override
>                                     public void call(
>                                             Tuple2<String,
> ArrayList&lt;String>> arg0)
>                                             throws Exception {
>                                         // TODO Auto-generated method stub
>                                         try( Transaction tx =
> graphDB.beginTx()){
>
> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>                                                 System.out.println("Alread
> in Database:" + arg0._1);
>                                             else{
>
> Neo4jOperations.createHMac(graphDB, arg0._1);
>                                             }
>                                             tx.success();
>                                         }
>                                     }
>
>                         });
>                         return null;
>                     }
>
>
>
>                 });
> Neo4jOperations Class:
>
> public class Neo4jOperations{
>
> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
> value){
>         try(ResourceIterator<Node>
> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
> "value", value).iterator()){
>             return HMacs.next();
>         }
>     }
>
>     public static void createHMac(GraphDatabaseService graphDB,String
> value){
>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>         HMac.setProperty("value", value);
>         HMac.setProperty("time", new
>
> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>     }
> }
> I know that I have to Serialize the class Neo4jOperations, but I'm able to
> figure out how. Or is there any other way to achieve this?
>
> Also, how can I store output of Spark Streaming in a database ?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to