You can't serialize nodes or relationships themselves, they were not meant for 
that.

You could use a helper that serializes the node as id + labels + properties
and the relationship as id + type + properties + start-node-id + end-node-id

Currently node and relationship-objects have several internal references that 
can't be serialized and also not repopulated on deserialization as they are not 
statically available

Michael

> Am 12.03.2015 um 08:39 schrieb Gautam Bajaj <gautam1...@gmail.com>:
> 
> I'm trying to use Neo4j with Apache Spark Streaming but I am finding 
> serializability as an issue.
> 
> Basically, I want Apache Spark to parse and bundle my data in real time. 
> After, the data has been bundled it should be stored in the database, Neo4j. 
> However, I am getting this error:
> 
> org.apache.spark.SparkException: Task not serializable
>     at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>     at 
> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>     at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>     at twoGrams.Main$4.call(Main.java:102)
>     at twoGrams.Main$4.call(Main.java:1)
>     at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>     at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>     at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>     at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>     at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>     at scala.util.Try$.apply(Try.scala:161)
>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>     at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.NotSerializableException: 
> org.neo4j.kernel.EmbeddedGraphDatabase
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>     at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>     at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>     ... 17 more
> Here is my code:
> 
> output a stream of type: JavaPairDStream<String, ArrayList<String>>
> 
> output.foreachRDD(
>                 new 
> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){
> 
>                     @Override
>                     public Void call(
>                             JavaPairRDD<String, ArrayList<String>> arg0,
>                             Time arg1) throws Exception {
>                         // TODO Auto-generated method stub
> 
>                         arg0.foreach(
>                                 new 
> VoidFunction<Tuple2<String,ArrayList<String>>>(){
> 
>                                     @Override
>                                     public void call(
>                                             Tuple2<String, ArrayList<String>> 
> arg0)
>                                             throws Exception {
>                                         // TODO Auto-generated method stub
>                                         try( Transaction tx = 
> graphDB.beginTx()){
>                                             
> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>                                                 System.out.println("Alread in 
> Database:" + arg0._1);
>                                             else{
>                                                 
> Neo4jOperations.createHMac(graphDB, arg0._1);
>                                             }
>                                             tx.success();
>                                         }
>                                     }
> 
>                         });
>                         return null;
>                     }
> 
> 
> 
>                 });
> Neo4jOperations Class:
> 
> public class Neo4jOperations{
> 
> public static Node getHMacFromValue(GraphDatabaseService graphDB,String 
> value){
>         try(ResourceIterator<Node> 
> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), 
> "value", value).iterator()){
>             return HMacs.next();
>         }
>     }
> 
>     public static void createHMac(GraphDatabaseService graphDB,String value){
>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>         HMac.setProperty("value", value);
>         HMac.setProperty("time", new 
> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>     }
> }
> I know that I have to Serialize the class Neo4jOperations, but I'm able to 
> figure out how. Or is there any other way to achieve this?
> 
> 
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "Neo4j" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to neo4j+unsubscr...@googlegroups.com 
> <mailto:neo4j+unsubscr...@googlegroups.com>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.

-- 
You received this message because you are subscribed to the Google Groups 
"Neo4j" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to neo4j+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to