Thanks
I debug this further and below is the cause
Caused by: java.io.NotSerializableException: 
org.apache.spark.sql.api.java.JavaSQLContext        - field (class 
"com.basic.spark.NumberCount$2", name: "val$sqlContext", type: "class 
org.apache.spark.sql.api.java.JavaSQLContext")        - object (class 
"com.basic.spark.NumberCount$2", com.basic.spark.NumberCount$2@69ddbcc7)        
- field (class "com.basic.spark.NumberCount$2$1", name: "this$0", type: "class 
com.basic.spark.NumberCount$2")        - object (class 
"com.basic.spark.NumberCount$2$1", com.basic.spark.NumberCount$2$1@2524beed)    
    - field (class 
"org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name: 
"fun$1", type: "interface org.apache.spark.api.java.function.Function")
I tried this also 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150
Why there is difference SQLContext is Serializable but JavaSQLContext is not? 
Spark is designed like this.
Thanks
Date: Wed, 24 Dec 2014 16:23:30 +0800
From: lian.cs....@gmail.com
To: bigdat...@live.com; user@spark.apache.org
Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming


  
    
  
  
    
      Generally you can use -Dsun.io.serialization.extendedDebugInfo=true
        to enable serialization debugging information when serialization
        exceptions are raised.
      On 12/24/14 1:32 PM,
        bigdata4u wrote:
      
      
        

        
          I am trying to use sql over Spark streaming using Java. But i am 
getting
Serialization Exception.

public static void main(String args[]) {
    SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    JavaSparkContext jc = new JavaSparkContext(sparkConf);
    JavaStreamingContext jssc = new JavaStreamingContext(jc, new
Duration(2000));
    jssc.addStreamingListener(new WorkCountMonitor());
    int numThreads = Integer.parseInt(args[3]);
    Map<String,Integer> topicMap = new HashMap<String,Integer>();
    String[] topics = args[2].split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }
    JavaPairReceiverInputDStream<String,String> data =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    data.print();

    JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
String>, Person>() {
            public Person call(Tuple2<String,String> v1) throws Exception {
                String[] stringArray = v1._2.split(",");
                Person Person = new Person();
                Person.setName(stringArray[0]);
                Person.setAge(stringArray[1]);
                return Person;
            }

        });


    final JavaSQLContext sqlContext = new JavaSQLContext(jc);
    streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
        public Void call(JavaRDD<Person> rdd) {

            JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
Person.class);

            subscriberSchema.registerAsTable("people");
            System.out.println("all data");
            JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
            System.out.println("afterwards");

            List<String> males = new ArrayList<String>();

            males = names.map(new Function<Row,String>() {
                public String call(Row row) {
                    return row.getString(0);
                }
            }).collect();
            System.out.println("before for");
            for (String name : males) {
                System.out.println(name);
            }
            return null;
        }
    });
    jssc.start();
    jssc.awaitTermination();

Exception is

14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
org.apache.spark.rdd.RDD.map(RDD.scala:271) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161) at
org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724) 
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.api.java.JavaSQLContext at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



        
        

      
      
      ​
                                          

Reply via email to