I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue.
Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStream<String, ArrayList<String>> output.foreachRDD( new Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ @Override public Void call( JavaPairRDD<String, ArrayList<String>> arg0, Time arg1) throws Exception { // TODO Auto-generated method stub arg0.foreach( new VoidFunction<Tuple2<String,ArrayList<String>>>(){ @Override public void call( Tuple2<String, ArrayList<String>> arg0) throws Exception { // TODO Auto-generated method stub try( Transaction tx = graphDB.beginTx()){ if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) System.out.println("Alread in Database:" + arg0._1); else{ Neo4jOperations.createHMac(graphDB, arg0._1); } tx.success(); } } }); return null; } }); Neo4jOperations Class: public class Neo4jOperations{ public static Node getHMacFromValue(GraphDatabaseService graphDB,String value){ try(ResourceIterator<Node> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), "value", value).iterator()){ return HMacs.next(); } } public static void createHMac(GraphDatabaseService graphDB,String value){ Node HMac=graphDB.createNode(DynamicLabel.label("HMac")); HMac.setProperty("value", value); HMac.setProperty("time", new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())); }} I know that I have to Serialize the class Neo4jOperations, but I'm able to figure out how. Or is there any other way to achieve this? -- You received this message because you are subscribed to the Google Groups "Neo4j" group. To unsubscribe from this group and stop receiving emails from it, send an email to neo4j+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.