Re: Using Neo4j with Apache Spark

2015-03-13 Thread Gautam Bajaj
I have been trying to do the same, but where exactly do you suggest
creating the static object? As creating it inside for each RDD will
ultimately result in same problem and not creating it inside will result in
serializability issue.

On Fri, Mar 13, 2015 at 11:47 AM, Tathagata Das t...@databricks.com wrote:

 Well, that's why I had also suggested using a pool of the GraphDBService
 objects :)
 Also present in the programming guide link I had given.

 TD


 On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Thanks a ton! That worked.

 However, this may have performance issue. As for each partition, I'd need
 to restart the server, that was the basic reason I was creating graphDb
 object outside this loop.

 On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das t...@databricks.com
 wrote:

 (Putting user@spark back in the to list)

 In the gist, you are creating graphDB object way outside the
 RDD.foreachPartition. I said last time, create the graphDB object inside
 the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
 and then using it from inside the rdd.foreachPartition. That is bringing
 the graphDB object in the task closure, and hence the system is trying to
 serialize the graphDB object when its serializing the closure. If you
 create the graphDB object inside the RDD.foreachPartition, then the closure
 will not refer to any prior graphDB object and therefore not serialize
 anything.

 On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab

 I'll add the flag and send you stack trace, I have meetings now.

 On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Could you show us that version of the code?

 Also helps to turn on java flag of extended debug info. That will show
 the lineage of objects leading to the nonserilaizable one.
 On Mar 12, 2015 1:32 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 I tried that too. It result in same serializability issue.

 GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
 http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html

 On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com
 wrote:

 What is GraphDatabaseService object that you are using? Instead of
 creating them on the driver (outside foreachRDD), can you create them
 inside the RDD.foreach?

 In general, the right pattern for doing this in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

 So you should be doing (sorry for writing in scala)

 dstream.foreachRDD ((rdd: RDD, time: Time) = {
 rdd.foreachPartition(iterator =
 // Create GraphDatabaseService object, or fetch it from a
 pool of GraphDatabaseService objects
 // Use it to send the whole partition to Neo4j
 // Destroy the object or release it to the pool
 })


 On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Neo4j is running externally. It has nothing to do with Spark
 processes.

 Basically, the problem is, I'm unable to figure out a way to store
 output of Spark on the database. As Spark Streaming requires Neo4j Core
 Java API to be serializable as well.

 The answer points out to using REST API but their performance is
 really poor when compared to Core Java API :
 http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

 On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com
  wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any
 more. Is the Neo4j graph database running externally (outside Spark
 cluster), or within the driver process, or on all the executors? Can 
 you
 clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj 
 gautam1...@gmail.com wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das 
 t...@databricks.com wrote:

 I am not sure if you realized but the code snipper it pretty
 mangled up in the email we received. It might be a good idea to put 
 the
 code in pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r 
 gautam1...@gmail.com wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am
 finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in
 real time.
 After, the data has been bundled it should be stored in the
 database, Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Marcin Cylke
On Thu, 12 Mar 2015 00:48:12 -0700
d34th4ck3r gautam1...@gmail.com wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.
 
 Basically, I want Apache Spark to parse and bundle my data in real
 time. After, the data has been bundled it should be stored in the
 database, Neo4j. However, I am getting this error:

Hi

It seems some things in your task aren't serializable. A quick look at
the code suggests graphDB as a potential problem. 

If you want to create that in one place (driver) and fetch it later in
the step you can do sth like this:

- create a container class, that you will broadcast

class LazyGraphDB extends Serializable {
  @transient
  override lazy val graphDB = new GraphDatabase()
}

- than in driver code:

val graphDbBc = sc.broadcast(new LazyGraphDB)

- and in the task you'd like to use it, just write:

graphDbBc.value.graphDB...

Just remember about all the transient, lazy modifiers.

Regards 
Marcin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
What is GraphDatabaseService object that you are using? Instead of creating
them on the driver (outside foreachRDD), can you create them inside the
RDD.foreach?

In general, the right pattern for doing this in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

So you should be doing (sorry for writing in scala)

dstream.foreachRDD ((rdd: RDD, time: Time) = {
rdd.foreachPartition(iterator =
// Create GraphDatabaseService object, or fetch it from a pool
of GraphDatabaseService
objects
// Use it to send the whole partition to Neo4j
// Destroy the object or release it to the pool
})


On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 Neo4j is running externally. It has nothing to do with Spark processes.

 Basically, the problem is, I'm unable to figure out a way to store output
 of Spark on the database. As Spark Streaming requires Neo4j Core Java API
 to be serializable as well.

 The answer points out to using REST API but their performance is really
 poor when compared to Core Java API :
 http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

 On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com
 wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any more. Is
 the Neo4j graph database running externally (outside Spark cluster), or
 within the driver process, or on all the executors? Can you clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 I am not sure if you realized but the code snipper it pretty mangled up
 in the email we received. It might be a good idea to put the code in
 pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com
 wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real
 time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at

 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at

 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Gautam Bajaj
I tried that too. It result in same serializability issue.

GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html

On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com wrote:

 What is GraphDatabaseService object that you are using? Instead of
 creating them on the driver (outside foreachRDD), can you create them
 inside the RDD.foreach?

 In general, the right pattern for doing this in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

 So you should be doing (sorry for writing in scala)

 dstream.foreachRDD ((rdd: RDD, time: Time) = {
 rdd.foreachPartition(iterator =
 // Create GraphDatabaseService object, or fetch it from a pool of 
 GraphDatabaseService
 objects
 // Use it to send the whole partition to Neo4j
 // Destroy the object or release it to the pool
 })


 On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Neo4j is running externally. It has nothing to do with Spark processes.

 Basically, the problem is, I'm unable to figure out a way to store output
 of Spark on the database. As Spark Streaming requires Neo4j Core Java API
 to be serializable as well.

 The answer points out to using REST API but their performance is really
 poor when compared to Core Java API :
 http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

 On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com
 wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any more.
 Is the Neo4j graph database running externally (outside Spark cluster), or
 within the driver process, or on all the executors? Can you clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 I am not sure if you realized but the code snipper it pretty mangled
 up in the email we received. It might be a good idea to put the code in
 pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com
 wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real
 time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at

 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at

 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
I am not sure if you realized but the code snipper it pretty mangled up in
the email we received. It might be a good idea to put the code in pastebin
or gist, much much easier for everyone to read.


On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at
 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 ... 17 more
 Here is my code:

 output a stream of type: JavaPairDStreamString, ArrayListlt;String

 output.foreachRDD(
 new
 Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){

 @Override
 public Void call(
 JavaPairRDDString, ArrayListlt;String arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub

 arg0.foreach(
 new
 VoidFunctionTuple2lt;String,ArrayListlt;String(){

 @Override
 public void call(
 Tuple2String,
 ArrayListlt;String arg0)
 throws Exception {
 // TODO Auto-generated method stub
 try( Transaction tx =
 graphDB.beginTx()){

 if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
 System.out.println(Alread
 in Database: + arg0._1);
 else{

 Neo4jOperations.createHMac(graphDB, arg0._1);
 }
 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Gautam Bajaj
Alright, I have also asked this question in StackOverflow:
http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

The code there is pretty neat.

On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote:

 I am not sure if you realized but the code snipper it pretty mangled up in
 the email we received. It might be a good idea to put the code in pastebin
 or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at
 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 ... 17 more
 Here is my code:

 output a stream of type: JavaPairDStreamString, ArrayListlt;String

 output.foreachRDD(
 new
 Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){

 @Override
 public Void call(
 JavaPairRDDString, ArrayListlt;String
 arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub

 arg0.foreach(
 new
 VoidFunctionTuple2lt;String,ArrayListlt;String(){

 @Override
 public void call(
 Tuple2String,
 ArrayListlt;String arg0)
 throws Exception {
 // TODO Auto-generated method stub
 try( Transaction tx =
 graphDB.beginTx()){

 if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
Well the answers you got there are correct as well.
Unfortunately I am not familiar with Neo4j enough to comment any more. Is
the Neo4j graph database running externally (outside Spark cluster), or
within the driver process, or on all the executors? Can you clarify that?

TD


On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 I am not sure if you realized but the code snipper it pretty mangled up
 in the email we received. It might be a good idea to put the code in
 pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com
 wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 ... 17 more
 Here is my code:

 output a stream of type: JavaPairDStreamString, ArrayListlt;String

 output.foreachRDD(
 new
 Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){

 @Override
 public Void call(
 JavaPairRDDString, ArrayListlt;String
 arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub

 arg0.foreach(
 new
 VoidFunctionTuple2lt;String,ArrayListlt;String(){

 @Override
 public void call(
 Tuple2String,
 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Gautam Bajaj
Neo4j is running externally. It has nothing to do with Spark processes.

Basically, the problem is, I'm unable to figure out a way to store output
of Spark on the database. As Spark Streaming requires Neo4j Core Java API
to be serializable as well.

The answer points out to using REST API but their performance is really
poor when compared to Core Java API :
http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any more. Is
 the Neo4j graph database running externally (outside Spark cluster), or
 within the driver process, or on all the executors? Can you clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 I am not sure if you realized but the code snipper it pretty mangled up
 in the email we received. It might be a good idea to put the code in
 pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com
 wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in real time.
 After, the data has been bundled it should be stored in the database,
 Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.NotSerializableException:
 org.neo4j.kernel.EmbeddedGraphDatabase
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 ... 17 more
 Here is my code:

 output a stream of type: JavaPairDStreamString, ArrayListlt;String

 output.foreachRDD(
 new
 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Tathagata Das
Well, that's why I had also suggested using a pool of the GraphDBService
objects :)
Also present in the programming guide link I had given.

TD


On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj gautam1...@gmail.com wrote:

 Thanks a ton! That worked.

 However, this may have performance issue. As for each partition, I'd need
 to restart the server, that was the basic reason I was creating graphDb
 object outside this loop.

 On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das t...@databricks.com
 wrote:

 (Putting user@spark back in the to list)

 In the gist, you are creating graphDB object way outside the
 RDD.foreachPartition. I said last time, create the graphDB object inside
 the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
 and then using it from inside the rdd.foreachPartition. That is bringing
 the graphDB object in the task closure, and hence the system is trying to
 serialize the graphDB object when its serializing the closure. If you
 create the graphDB object inside the RDD.foreachPartition, then the closure
 will not refer to any prior graphDB object and therefore not serialize
 anything.

 On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab

 I'll add the flag and send you stack trace, I have meetings now.

 On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Could you show us that version of the code?

 Also helps to turn on java flag of extended debug info. That will show
 the lineage of objects leading to the nonserilaizable one.
 On Mar 12, 2015 1:32 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 I tried that too. It result in same serializability issue.

 GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
 http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html

 On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com
 wrote:

 What is GraphDatabaseService object that you are using? Instead of
 creating them on the driver (outside foreachRDD), can you create them
 inside the RDD.foreach?

 In general, the right pattern for doing this in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

 So you should be doing (sorry for writing in scala)

 dstream.foreachRDD ((rdd: RDD, time: Time) = {
 rdd.foreachPartition(iterator =
 // Create GraphDatabaseService object, or fetch it from a
 pool of GraphDatabaseService objects
 // Use it to send the whole partition to Neo4j
 // Destroy the object or release it to the pool
 })


 On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Neo4j is running externally. It has nothing to do with Spark
 processes.

 Basically, the problem is, I'm unable to figure out a way to store
 output of Spark on the database. As Spark Streaming requires Neo4j Core
 Java API to be serializable as well.

 The answer points out to using REST API but their performance is
 really poor when compared to Core Java API :
 http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

 On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com
 wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any
 more. Is the Neo4j graph database running externally (outside Spark
 cluster), or within the driver process, or on all the executors? Can 
 you
 clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj 
 gautam1...@gmail.com wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das 
 t...@databricks.com wrote:

 I am not sure if you realized but the code snipper it pretty
 mangled up in the email we received. It might be a good idea to put 
 the
 code in pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r 
 gautam1...@gmail.com wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am
 finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in
 real time.
 After, the data has been bundled it should be stored in the
 database, Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at
 org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at 

Re: Using Neo4j with Apache Spark

2015-03-12 Thread Gautam Bajaj
Thanks a ton! That worked.

However, this may have performance issue. As for each partition, I'd need
to restart the server, that was the basic reason I was creating graphDb
object outside this loop.

On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das t...@databricks.com wrote:

 (Putting user@spark back in the to list)

 In the gist, you are creating graphDB object way outside the
 RDD.foreachPartition. I said last time, create the graphDB object inside
 the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
 and then using it from inside the rdd.foreachPartition. That is bringing
 the graphDB object in the task closure, and hence the system is trying to
 serialize the graphDB object when its serializing the closure. If you
 create the graphDB object inside the RDD.foreachPartition, then the closure
 will not refer to any prior graphDB object and therefore not serialize
 anything.

 On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab

 I'll add the flag and send you stack trace, I have meetings now.

 On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Could you show us that version of the code?

 Also helps to turn on java flag of extended debug info. That will show
 the lineage of objects leading to the nonserilaizable one.
 On Mar 12, 2015 1:32 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 I tried that too. It result in same serializability issue.

 GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
 http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html

 On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com
 wrote:

 What is GraphDatabaseService object that you are using? Instead of
 creating them on the driver (outside foreachRDD), can you create them
 inside the RDD.foreach?

 In general, the right pattern for doing this in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

 So you should be doing (sorry for writing in scala)

 dstream.foreachRDD ((rdd: RDD, time: Time) = {
 rdd.foreachPartition(iterator =
 // Create GraphDatabaseService object, or fetch it from a
 pool of GraphDatabaseService objects
 // Use it to send the whole partition to Neo4j
 // Destroy the object or release it to the pool
 })


 On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Neo4j is running externally. It has nothing to do with Spark
 processes.

 Basically, the problem is, I'm unable to figure out a way to store
 output of Spark on the database. As Spark Streaming requires Neo4j Core
 Java API to be serializable as well.

 The answer points out to using REST API but their performance is
 really poor when compared to Core Java API :
 http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

 On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com
 wrote:

 Well the answers you got there are correct as well.
 Unfortunately I am not familiar with Neo4j enough to comment any
 more. Is the Neo4j graph database running externally (outside Spark
 cluster), or within the driver process, or on all the executors? Can you
 clarify that?

 TD


 On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com
  wrote:

 Alright, I have also asked this question in StackOverflow:
 http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

 The code there is pretty neat.

 On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
  wrote:

 I am not sure if you realized but the code snipper it pretty
 mangled up in the email we received. It might be a good idea to put 
 the
 code in pastebin or gist, much much easier for everyone to read.


 On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com
  wrote:

 I'm trying to use Neo4j with Apache Spark Streaming but I am
 finding
 serializability as an issue.

 Basically, I want Apache Spark to parse and bundle my data in
 real time.
 After, the data has been bundled it should be stored in the
 database, Neo4j.
 However, I am getting this error:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at
 org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at

 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
 at
 org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
 at twoGrams.Main$4.call(Main.java:102)
 at twoGrams.Main$4.call(Main.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
 at