I'm trying to use Neo4j with Apache Spark Streaming but I am finding 
serializability as an issue.

Basically, I want Apache Spark to parse and bundle my data in real time. 
After, the data has been bundled it should be stored in the database, 
Neo4j. However, I am getting this error:

org.apache.spark.SparkException: Task not serializable
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
    at 
org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
    at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
    at twoGrams.Main$4.call(Main.java:102)
    at twoGrams.Main$4.call(Main.java:1)
    at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
    at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
    at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)Caused by: 
java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 17 more

Here is my code:

output a stream of type: JavaPairDStream<String, ArrayList<String>>

output.foreachRDD(
                new 
Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){

                    @Override
                    public Void call(
                            JavaPairRDD<String, ArrayList<String>> arg0,
                            Time arg1) throws Exception {
                        // TODO Auto-generated method stub

                        arg0.foreach(
                                new 
VoidFunction<Tuple2<String,ArrayList<String>>>(){

                                    @Override
                                    public void call(
                                            Tuple2<String, ArrayList<String>> 
arg0)
                                            throws Exception {
                                        // TODO Auto-generated method stub
                                        try( Transaction tx = 
graphDB.beginTx()){
                                            
if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
                                                System.out.println("Alread in 
Database:" + arg0._1);
                                            else{
                                                
Neo4jOperations.createHMac(graphDB, arg0._1);
                                            }
                                            tx.success();
                                        }
                                    }

                        });
                        return null;
                    }



                });

Neo4jOperations Class:

public class Neo4jOperations{
public static Node getHMacFromValue(GraphDatabaseService graphDB,String value){
        try(ResourceIterator<Node> 
HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), "value", 
value).iterator()){
            return HMacs.next();
        }
    }

    public static void createHMac(GraphDatabaseService graphDB,String value){
        Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
        HMac.setProperty("value", value);
        HMac.setProperty("time", new 
SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    }}

I know that I have to Serialize the class Neo4jOperations, but I'm able to 
figure out how. Or is there any other way to achieve this?

-- 
You received this message because you are subscribed to the Google Groups 
"Neo4j" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to neo4j+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to