Thanks. I marked the variable as transient and i moved ahead now i am getting exception in execution the query. final static transient SparkConf sparkConf = new SparkConf().setAppName("NumberCount"); final static transient JavaSparkContext jc = new JavaSparkContext(sparkConf); static transient JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000)); final static transient JavaSQLContext sqlContext = new JavaSQLContext(jc); public static void main(String args[]) {// List<String> males = new ArrayList<String>(); // val ssc = new StreamingContext(...) jssc.addStreamingListener(new WorkCountMonitor());// JavaDStream<String> data = jssc.textFileStream("/home/tgarg/data/employee.txt"); int numThreads = Integer.parseInt(args[3]); Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topics = args[2].split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);// data.window(Duration.apply(1000)); data.print(); JavaDStream<String> streamData = data.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String,String> tuple2) throws Exception { return tuple2._2(); } }); streamData.foreachRDD(new Function<JavaRDD<String>,Void>() { public Void call(JavaRDD<String> rdd) { if (rdd.count()<1) return null; try { rdd.map(new Function<String, Person>() { public Person call(String v1) throws Exception { String[] stringArray = v1.split(","); Person person = new Person(); person.setName(stringArray[1]); person.setAge(stringArray[0]); person.setNumber(stringArray[2]); return person; } }); for (String txt: rdd.collect()) System.out.println(txt); JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd, Person.class); subscriberSchema.registerAsTable("people"); System.out.println("all data"); JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people"); System.out.println("afterwards"); List<String> males = new ArrayList<String>(); males = names.map(new Function<Row,String>() { public String call(Row row) { return row.getString(0); } }).collect(); System.out.println("before for"); for (String name : males) { System.out.println(name); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }); jssc.start(); jssc.awaitTermination(); } But now i am getting exception java.lang.IllegalArgumentException: object is not an instance of declaring class at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(JavaSQLContext.scala:112) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(JavaSQLContext.scala:111) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1.apply(JavaSQLContext.scala:111) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1.apply(JavaSQLContext.scala:109) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
From: mich...@databricks.com Date: Thu, 25 Dec 2014 00:06:45 -0500 Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming To: bigdat...@live.com CC: lian.cs....@gmail.com; user@spark.apache.org The various spark contexts generally aren't serializable because you can't use them on the executors anyway. We made SQLContext serializable just because it gets pulled into scope more often due to the implicit conversions its contains. You should try marking the variable that holds the context with the annotation @transient. On Wed, Dec 24, 2014 at 7:04 PM, Tarun Garg <bigdat...@live.com> wrote: Thanks I debug this further and below is the cause Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext - field (class "com.basic.spark.NumberCount$2", name: "val$sqlContext", type: "class org.apache.spark.sql.api.java.JavaSQLContext") - object (class "com.basic.spark.NumberCount$2", com.basic.spark.NumberCount$2@69ddbcc7) - field (class "com.basic.spark.NumberCount$2$1", name: "this$0", type: "class com.basic.spark.NumberCount$2") - object (class "com.basic.spark.NumberCount$2$1", com.basic.spark.NumberCount$2$1@2524beed) - field (class "org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name: "fun$1", type: "interface org.apache.spark.api.java.function.Function") I tried this also http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150 Why there is difference SQLContext is Serializable but JavaSQLContext is not? Spark is designed like this. Thanks Date: Wed, 24 Dec 2014 16:23:30 +0800 From: lian.cs....@gmail.com To: bigdat...@live.com; user@spark.apache.org Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming Generally you can use -Dsun.io.serialization.extendedDebugInfo=true to enable serialization debugging information when serialization exceptions are raised. On 12/24/14 1:32 PM, bigdata4u wrote: I am trying to use sql over Spark streaming using Java. But i am getting Serialization Exception. public static void main(String args[]) { SparkConf sparkConf = new SparkConf().setAppName("NumberCount"); JavaSparkContext jc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000)); jssc.addStreamingListener(new WorkCountMonitor()); int numThreads = Integer.parseInt(args[3]); Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topics = args[2].split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); data.print(); JavaDStream<Person> streamData = data.map(new Function<Tuple2<String, String>, Person>() { public Person call(Tuple2<String,String> v1) throws Exception { String[] stringArray = v1._2.split(","); Person Person = new Person(); Person.setName(stringArray[0]); Person.setAge(stringArray[1]); return Person; } }); final JavaSQLContext sqlContext = new JavaSQLContext(jc); streamData.foreachRDD(new Function<JavaRDD<Person>,Void>() { public Void call(JavaRDD<Person> rdd) { JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd, Person.class); subscriberSchema.registerAsTable("people"); System.out.println("all data"); JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people"); System.out.println("afterwards"); List<String> males = new ArrayList<String>(); males = names.map(new Function<Row,String>() { public String call(Row row) { return row.getString(0); } }).collect(); System.out.println("before for"); for (String name : males) { System.out.println(name); } return null; } }); jssc.start(); jssc.awaitTermination(); Exception is 14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job 1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.rdd.RDD.map(RDD.scala:271) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at com.basic.spark.NumberCount$2.call(NumberCount.java:79) at com.basic.spark.NumberCount$2.call(NumberCount.java:67) at org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529) at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 20 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org