Well, that's why I had also suggested using a pool of the GraphDBService objects :) Also present in the programming guide link I had given.
TD On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj <gautam1...@gmail.com> wrote: > Thanks a ton! That worked. > > However, this may have performance issue. As for each partition, I'd need > to restart the server, that was the basic reason I was creating graphDb > object outside this loop. > > On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das <t...@databricks.com> > wrote: > >> (Putting user@spark back in the to list) >> >> In the gist, you are creating graphDB object way outside the >> RDD.foreachPartition. I said last time, create the graphDB object inside >> the RDD.foreachPartition. You are creating it outside DStream.foreachRDD, >> and then using it from inside the rdd.foreachPartition. That is bringing >> the graphDB object in the task closure, and hence the system is trying to >> serialize the graphDB object when its serializing the closure. If you >> create the graphDB object inside the RDD.foreachPartition, then the closure >> will not refer to any prior graphDB object and therefore not serialize >> anything. >> >> On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <gautam1...@gmail.com> >> wrote: >> >>> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab >>> >>> I'll add the flag and send you stack trace, I have meetings now. >>> >>> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Could you show us that version of the code? >>>> >>>> Also helps to turn on java flag of extended debug info. That will show >>>> the lineage of objects leading to the nonserilaizable one. >>>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <gautam1...@gmail.com> wrote: >>>> >>>>> I tried that too. It result in same serializability issue. >>>>> >>>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() : >>>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html >>>>> >>>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <t...@databricks.com> >>>>> wrote: >>>>> >>>>>> What is GraphDatabaseService object that you are using? Instead of >>>>>> creating them on the driver (outside foreachRDD), can you create them >>>>>> inside the RDD.foreach? >>>>>> >>>>>> In general, the right pattern for doing this in the programming guide >>>>>> >>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd >>>>>> >>>>>> So you should be doing (sorry for writing in scala) >>>>>> >>>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => { >>>>>> rdd.foreachPartition(iterator => >>>>>> // Create GraphDatabaseService object, or fetch it from a >>>>>> pool of GraphDatabaseService objects >>>>>> // Use it to send the whole partition to Neo4j >>>>>> // Destroy the object or release it to the pool >>>>>> }) >>>>>> >>>>>> >>>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <gautam1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Neo4j is running externally. It has nothing to do with Spark >>>>>>> processes. >>>>>>> >>>>>>> Basically, the problem is, I'm unable to figure out a way to store >>>>>>> output of Spark on the database. As Spark Streaming requires Neo4j Core >>>>>>> Java API to be serializable as well. >>>>>>> >>>>>>> The answer points out to using REST API but their performance is >>>>>>> really poor when compared to Core Java API : >>>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ >>>>>>> >>>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <t...@databricks.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Well the answers you got there are correct as well. >>>>>>>> Unfortunately I am not familiar with Neo4j enough to comment any >>>>>>>> more. Is the Neo4j graph database running externally (outside Spark >>>>>>>> cluster), or within the driver process, or on all the executors? Can >>>>>>>> you >>>>>>>> clarify that? >>>>>>>> >>>>>>>> TD >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj < >>>>>>>> gautam1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Alright, I have also asked this question in StackOverflow: >>>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark >>>>>>>>> >>>>>>>>> The code there is pretty neat. >>>>>>>>> >>>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das < >>>>>>>>> t...@databricks.com> wrote: >>>>>>>>> >>>>>>>>>> I am not sure if you realized but the code snipper it pretty >>>>>>>>>> mangled up in the email we received. It might be a good idea to put >>>>>>>>>> the >>>>>>>>>> code in pastebin or gist, much much easier for everyone to read. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r < >>>>>>>>>> gautam1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am >>>>>>>>>>> finding >>>>>>>>>>> serializability as an issue. >>>>>>>>>>> >>>>>>>>>>> Basically, I want Apache Spark to parse and bundle my data in >>>>>>>>>>> real time. >>>>>>>>>>> After, the data has been bundled it should be stored in the >>>>>>>>>>> database, Neo4j. >>>>>>>>>>> However, I am getting this error: >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.SparkException: Task not serializable >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1264) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) >>>>>>>>>>> at twoGrams.Main$4.call(Main.java:102) >>>>>>>>>>> at twoGrams.Main$4.call(Main.java:1) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>>>>>>> at scala.util.Try$.apply(Try.scala:161) >>>>>>>>>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>> Caused by: java.io.NotSerializableException: >>>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase >>>>>>>>>>> at >>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>>>>>>> at >>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>>>>>>> at >>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>>>>>>> at >>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>>>>>>> at >>>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) >>>>>>>>>>> ... 17 more >>>>>>>>>>> Here is my code: >>>>>>>>>>> >>>>>>>>>>> output a stream of type: JavaPairDStream<String, >>>>>>>>>>> ArrayList<String>> >>>>>>>>>>> >>>>>>>>>>> output.foreachRDD( >>>>>>>>>>> new >>>>>>>>>>> >>>>>>>>>>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public Void call( >>>>>>>>>>> JavaPairRDD<String, >>>>>>>>>>> ArrayList<String>> arg0, >>>>>>>>>>> Time arg1) throws Exception { >>>>>>>>>>> // TODO Auto-generated method stub >>>>>>>>>>> >>>>>>>>>>> arg0.foreach( >>>>>>>>>>> new >>>>>>>>>>> VoidFunction<Tuple2<String,ArrayList<String>>>(){ >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public void call( >>>>>>>>>>> Tuple2<String, >>>>>>>>>>> ArrayList<String>> arg0) >>>>>>>>>>> throws Exception { >>>>>>>>>>> // TODO Auto-generated >>>>>>>>>>> method stub >>>>>>>>>>> try( Transaction tx = >>>>>>>>>>> graphDB.beginTx()){ >>>>>>>>>>> >>>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) >>>>>>>>>>> >>>>>>>>>>> System.out.println("Alread >>>>>>>>>>> in Database:" + arg0._1); >>>>>>>>>>> else{ >>>>>>>>>>> >>>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1); >>>>>>>>>>> } >>>>>>>>>>> tx.success(); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> }); >>>>>>>>>>> return null; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> }); >>>>>>>>>>> Neo4jOperations Class: >>>>>>>>>>> >>>>>>>>>>> public class Neo4jOperations{ >>>>>>>>>>> >>>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService >>>>>>>>>>> graphDB,String >>>>>>>>>>> value){ >>>>>>>>>>> try(ResourceIterator<Node> >>>>>>>>>>> >>>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), >>>>>>>>>>> "value", value).iterator()){ >>>>>>>>>>> return HMacs.next(); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> public static void createHMac(GraphDatabaseService >>>>>>>>>>> graphDB,String >>>>>>>>>>> value){ >>>>>>>>>>> Node HMac=graphDB.createNode(DynamicLabel.label("HMac")); >>>>>>>>>>> HMac.setProperty("value", value); >>>>>>>>>>> HMac.setProperty("time", new >>>>>>>>>>> >>>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> I know that I have to Serialize the class Neo4jOperations, but >>>>>>>>>>> I'm able to >>>>>>>>>>> figure out how. Or is there any other way to achieve this? >>>>>>>>>>> >>>>>>>>>>> Also, how can I store output of Spark Streaming in a database ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> View this message in context: >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html >>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>> Nabble.com. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Gautam >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Gautam >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Gautam >>>>> >>>> >>> >>> >>> -- >>> Gautam >>> >> >> > > > -- > Gautam >