Thanks I debug this further and below is the cause Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext - field (class "com.basic.spark.NumberCount$2", name: "val$sqlContext", type: "class org.apache.spark.sql.api.java.JavaSQLContext") - object (class "com.basic.spark.NumberCount$2", com.basic.spark.NumberCount$2@69ddbcc7) - field (class "com.basic.spark.NumberCount$2$1", name: "this$0", type: "class com.basic.spark.NumberCount$2") - object (class "com.basic.spark.NumberCount$2$1", com.basic.spark.NumberCount$2$1@2524beed) - field (class "org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name: "fun$1", type: "interface org.apache.spark.api.java.function.Function") I tried this also http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150 Why there is difference SQLContext is Serializable but JavaSQLContext is not? Spark is designed like this. Thanks Date: Wed, 24 Dec 2014 16:23:30 +0800 From: lian.cs....@gmail.com To: bigdat...@live.com; user@spark.apache.org Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming
Generally you can use -Dsun.io.serialization.extendedDebugInfo=true to enable serialization debugging information when serialization exceptions are raised. On 12/24/14 1:32 PM, bigdata4u wrote: I am trying to use sql over Spark streaming using Java. But i am getting Serialization Exception. public static void main(String args[]) { SparkConf sparkConf = new SparkConf().setAppName("NumberCount"); JavaSparkContext jc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000)); jssc.addStreamingListener(new WorkCountMonitor()); int numThreads = Integer.parseInt(args[3]); Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topics = args[2].split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); data.print(); JavaDStream<Person> streamData = data.map(new Function<Tuple2<String, String>, Person>() { public Person call(Tuple2<String,String> v1) throws Exception { String[] stringArray = v1._2.split(","); Person Person = new Person(); Person.setName(stringArray[0]); Person.setAge(stringArray[1]); return Person; } }); final JavaSQLContext sqlContext = new JavaSQLContext(jc); streamData.foreachRDD(new Function<JavaRDD<Person>,Void>() { public Void call(JavaRDD<Person> rdd) { JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd, Person.class); subscriberSchema.registerAsTable("people"); System.out.println("all data"); JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people"); System.out.println("afterwards"); List<String> males = new ArrayList<String>(); males = names.map(new Function<Row,String>() { public String call(Row row) { return row.getString(0); } }).collect(); System.out.println("before for"); for (String name : males) { System.out.println(name); } return null; } }); jssc.start(); jssc.awaitTermination(); Exception is 14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job 1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.rdd.RDD.map(RDD.scala:271) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at com.basic.spark.NumberCount$2.call(NumberCount.java:79) at com.basic.spark.NumberCount$2.call(NumberCount.java:67) at org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529) at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 20 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org