You can't serialize nodes or relationships themselves, they were not meant for that.
You could use a helper that serializes the node as id + labels + properties and the relationship as id + type + properties + start-node-id + end-node-id Currently node and relationship-objects have several internal references that can't be serialized and also not repopulated on deserialization as they are not statically available Michael > Am 12.03.2015 um 08:39 schrieb Gautam Bajaj <gautam1...@gmail.com>: > > I'm trying to use Neo4j with Apache Spark Streaming but I am finding > serializability as an issue. > > Basically, I want Apache Spark to parse and bundle my data in real time. > After, the data has been bundled it should be stored in the database, Neo4j. > However, I am getting this error: > > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) > at > org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) > at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) > at twoGrams.Main$4.call(Main.java:102) > at twoGrams.Main$4.call(Main.java:1) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.NotSerializableException: > org.neo4j.kernel.EmbeddedGraphDatabase > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) > ... 17 more > Here is my code: > > output a stream of type: JavaPairDStream<String, ArrayList<String>> > > output.foreachRDD( > new > Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ > > @Override > public Void call( > JavaPairRDD<String, ArrayList<String>> arg0, > Time arg1) throws Exception { > // TODO Auto-generated method stub > > arg0.foreach( > new > VoidFunction<Tuple2<String,ArrayList<String>>>(){ > > @Override > public void call( > Tuple2<String, ArrayList<String>> > arg0) > throws Exception { > // TODO Auto-generated method stub > try( Transaction tx = > graphDB.beginTx()){ > > if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) > System.out.println("Alread in > Database:" + arg0._1); > else{ > > Neo4jOperations.createHMac(graphDB, arg0._1); > } > tx.success(); > } > } > > }); > return null; > } > > > > }); > Neo4jOperations Class: > > public class Neo4jOperations{ > > public static Node getHMacFromValue(GraphDatabaseService graphDB,String > value){ > try(ResourceIterator<Node> > HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), > "value", value).iterator()){ > return HMacs.next(); > } > } > > public static void createHMac(GraphDatabaseService graphDB,String value){ > Node HMac=graphDB.createNode(DynamicLabel.label("HMac")); > HMac.setProperty("value", value); > HMac.setProperty("time", new > SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())); > } > } > I know that I have to Serialize the class Neo4jOperations, but I'm able to > figure out how. Or is there any other way to achieve this? > > > > -- > You received this message because you are subscribed to the Google Groups > "Neo4j" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to neo4j+unsubscr...@googlegroups.com > <mailto:neo4j+unsubscr...@googlegroups.com>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. -- You received this message because you are subscribed to the Google Groups "Neo4j" group. To unsubscribe from this group and stop receiving emails from it, send an email to neo4j+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.