The various spark contexts generally aren't serializable because you can't
use them on the executors anyway.  We made SQLContext serializable just
because it gets pulled into scope more often due to the implicit
conversions its contains.  You should try marking the variable that holds
the context with the annotation @transient.

On Wed, Dec 24, 2014 at 7:04 PM, Tarun Garg <bigdat...@live.com> wrote:

> Thanks
>
> I debug this further and below is the cause
>
> Caused by: java.io.NotSerializableException:
> org.apache.spark.sql.api.java.JavaSQLContext
>         - field (class "com.basic.spark.NumberCount$2", name:
> "val$sqlContext", type: "class
> org.apache.spark.sql.api.java.JavaSQLContext")
>         - object (class "com.basic.spark.NumberCount$2",
> com.basic.spark.NumberCount$2@69ddbcc7)
>         - field (class "com.basic.spark.NumberCount$2$1", name: "this$0",
> type: "class com.basic.spark.NumberCount$2")
>         - object (class "com.basic.spark.NumberCount$2$1",
> com.basic.spark.NumberCount$2$1@2524beed)
>         - field (class
> "org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name:
> "fun$1", type: "interface org.apache.spark.api.java.function.Function")
>
> I tried this also
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150
>
> Why there is difference SQLContext is Serializable but JavaSQLContext is
> not? Spark is designed like this.
>
> Thanks
>
> ------------------------------
> Date: Wed, 24 Dec 2014 16:23:30 +0800
> From: lian.cs....@gmail.com
> To: bigdat...@live.com; user@spark.apache.org
> Subject: Re: Not Serializable exception when integrating SQL and Spark
> Streaming
>
>  Generally you can use -Dsun.io.serialization.extendedDebugInfo=true to
> enable serialization debugging information when serialization exceptions
> are raised.
>
> On 12/24/14 1:32 PM, bigdata4u wrote:
>
>
>  I am trying to use sql over Spark streaming using Java. But i am getting
> Serialization Exception.
>
> public static void main(String args[]) {
>     SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
>     JavaSparkContext jc = new JavaSparkContext(sparkConf);
>     JavaStreamingContext jssc = new JavaStreamingContext(jc, new
> Duration(2000));
>     jssc.addStreamingListener(new WorkCountMonitor());
>     int numThreads = Integer.parseInt(args[3]);
>     Map<String,Integer> topicMap = new HashMap<String,Integer>();
>     String[] topics = args[2].split(",");
>     for (String topic : topics) {
>         topicMap.put(topic, numThreads);
>     }
>     JavaPairReceiverInputDStream<String,String> data =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
>     data.print();
>
>     JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
> String>, Person>() {
>             public Person call(Tuple2<String,String> v1) throws Exception {
>                 String[] stringArray = v1._2.split(",");
>                 Person Person = new Person();
>                 Person.setName(stringArray[0]);
>                 Person.setAge(stringArray[1]);
>                 return Person;
>             }
>
>         });
>
>
>     final JavaSQLContext sqlContext = new JavaSQLContext(jc);
>     streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
>         public Void call(JavaRDD<Person> rdd) {
>
>             JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
> Person.class);
>
>             subscriberSchema.registerAsTable("people");
>             System.out.println("all data");
>             JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
>             System.out.println("afterwards");
>
>             List<String> males = new ArrayList<String>();
>
>             males = names.map(new Function<Row,String>() {
>                 public String call(Row row) {
>                     return row.getString(0);
>                 }
>             }).collect();
>             System.out.println("before for");
>             for (String name : males) {
>                 System.out.println(name);
>             }
>             return null;
>         }
>     });
>     jssc.start();
>     jssc.awaitTermination();
>
> Exception is
>
> 14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
> 1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
> org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
> org.apache.spark.rdd.RDD.map(RDD.scala:271) at
> org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
> org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
> com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
> com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
> org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
> at
> org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
> at
> org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161) at
> org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> Caused by: java.io.NotSerializableException:
> org.apache.spark.sql.api.java.JavaSQLContext at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
> ... 20 more
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>  ​
>

Reply via email to