Thanks.  I marked the variable as transient and i moved ahead now i am getting 
exception in execution the query.
    final static transient SparkConf sparkConf = new 
SparkConf().setAppName("NumberCount");    final static transient 
JavaSparkContext jc = new JavaSparkContext(sparkConf);    static transient 
JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000));   
 final static transient JavaSQLContext sqlContext = new JavaSQLContext(jc);    
public static void main(String args[]) {//        List<String> males = new 
ArrayList<String>();        // val ssc = new StreamingContext(...)        
jssc.addStreamingListener(new WorkCountMonitor());//        JavaDStream<String> 
data = jssc.textFileStream("/home/tgarg/data/employee.txt");        int 
numThreads = Integer.parseInt(args[3]);        Map<String,Integer> topicMap = 
new HashMap<String,Integer>();        String[] topics = args[2].split(",");     
   for (String topic : topics) {            topicMap.put(topic, numThreads);    
    }        JavaPairReceiverInputDStream<String,String> data = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);//        
data.window(Duration.apply(1000));        data.print();                
JavaDStream<String> streamData = data.map(new Function<Tuple2<String, String>, 
String>() {            public String call(Tuple2<String,String> tuple2) throws 
Exception {                return tuple2._2();            }
        });               streamData.foreachRDD(new 
Function<JavaRDD<String>,Void>() {            public Void call(JavaRDD<String> 
rdd) {                                if (rdd.count()<1)                    
return null;                                try {                    
rdd.map(new Function<String, Person>() {                        public Person 
call(String v1) throws Exception {                            String[] 
stringArray = v1.split(",");                            Person person = new 
Person();                            person.setName(stringArray[1]);            
                person.setAge(stringArray[0]);                            
person.setNumber(stringArray[2]);                            return person;     
                   }
                    });                                        for (String txt: 
rdd.collect())                        System.out.println(txt);                  
                      JavaSchemaRDD subscriberSchema = 
sqlContext.applySchema(rdd, Person.class);                    
subscriberSchema.registerAsTable("people");                    
System.out.println("all data");                    JavaSchemaRDD names = 
sqlContext.sql("SELECT name FROM people");                    
System.out.println("afterwards");
                    List<String> males = new ArrayList<String>();
                    males = names.map(new Function<Row,String>() {              
          public String call(Row row) {                            return 
row.getString(0);                        }                    }).collect();     
               System.out.println("before for");                    for (String 
name : males) {                        System.out.println(name);                
    }                } catch (Exception e) {                    // TODO 
Auto-generated catch block                    e.printStackTrace();              
  }                return null;            }        });        jssc.start();    
    jssc.awaitTermination();    }
But now i am getting exception
java.lang.IllegalArgumentException: object is not an instance of declaring 
class        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)        at 
org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(JavaSQLContext.scala:112)
        at 
org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(JavaSQLContext.scala:111)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)  
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)  
      at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)        
at 
org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1.apply(JavaSQLContext.scala:111)
        at 
org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1.apply(JavaSQLContext.scala:109)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at 
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at 
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at 
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at 
scala.collection.Iterator$class.foreach(Iterator.scala:727)        at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)        
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)    
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)  
      at scala.collection.AbstractIterator.to(Iterator.scala:1157)        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)      
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)       
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)        at 
org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)        at 
org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)        at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)  
      at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)  
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)     
   at org.apache.spark.scheduler.Task.run(Task.scala:56)        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
       at java.lang.Thread.run(Thread.java:724)

From: mich...@databricks.com
Date: Thu, 25 Dec 2014 00:06:45 -0500
Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming
To: bigdat...@live.com
CC: lian.cs....@gmail.com; user@spark.apache.org

The various spark contexts generally aren't serializable because you can't use 
them on the executors anyway.  We made SQLContext serializable just because it 
gets pulled into scope more often due to the implicit conversions its contains. 
 You should try marking the variable that holds the context with the annotation 
@transient.
On Wed, Dec 24, 2014 at 7:04 PM, Tarun Garg <bigdat...@live.com> wrote:



Thanks
I debug this further and below is the cause
Caused by: java.io.NotSerializableException: 
org.apache.spark.sql.api.java.JavaSQLContext        - field (class 
"com.basic.spark.NumberCount$2", name: "val$sqlContext", type: "class 
org.apache.spark.sql.api.java.JavaSQLContext")        - object (class 
"com.basic.spark.NumberCount$2", com.basic.spark.NumberCount$2@69ddbcc7)        
- field (class "com.basic.spark.NumberCount$2$1", name: "this$0", type: "class 
com.basic.spark.NumberCount$2")        - object (class 
"com.basic.spark.NumberCount$2$1", com.basic.spark.NumberCount$2$1@2524beed)    
    - field (class 
"org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name: 
"fun$1", type: "interface org.apache.spark.api.java.function.Function")
I tried this also 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150
Why there is difference SQLContext is Serializable but JavaSQLContext is not? 
Spark is designed like this.
Thanks
Date: Wed, 24 Dec 2014 16:23:30 +0800
From: lian.cs....@gmail.com
To: bigdat...@live.com; user@spark.apache.org
Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming


  
    
  
  
    
      Generally you can use -Dsun.io.serialization.extendedDebugInfo=true
        to enable serialization debugging information when serialization
        exceptions are raised.

      On 12/24/14 1:32 PM,
        bigdata4u wrote:

      

      
        

        
          I am trying to use sql over Spark streaming using Java. But i am 
getting
Serialization Exception.

public static void main(String args[]) {
    SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    JavaSparkContext jc = new JavaSparkContext(sparkConf);
    JavaStreamingContext jssc = new JavaStreamingContext(jc, new
Duration(2000));
    jssc.addStreamingListener(new WorkCountMonitor());
    int numThreads = Integer.parseInt(args[3]);
    Map<String,Integer> topicMap = new HashMap<String,Integer>();
    String[] topics = args[2].split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }
    JavaPairReceiverInputDStream<String,String> data =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    data.print();

    JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
String>, Person>() {
            public Person call(Tuple2<String,String> v1) throws Exception {
                String[] stringArray = v1._2.split(",");
                Person Person = new Person();
                Person.setName(stringArray[0]);
                Person.setAge(stringArray[1]);
                return Person;
            }

        });


    final JavaSQLContext sqlContext = new JavaSQLContext(jc);
    streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
        public Void call(JavaRDD<Person> rdd) {

            JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
Person.class);

            subscriberSchema.registerAsTable("people");
            System.out.println("all data");
            JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
            System.out.println("afterwards");

            List<String> males = new ArrayList<String>();

            males = names.map(new Function<Row,String>() {
                public String call(Row row) {
                    return row.getString(0);
                }
            }).collect();
            System.out.println("before for");
            for (String name : males) {
                System.out.println(name);
            }
            return null;
        }
    });
    jssc.start();
    jssc.awaitTermination();

Exception is

14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
org.apache.spark.rdd.RDD.map(RDD.scala:271) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161) at
org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724) 
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.api.java.JavaSQLContext at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



        
        

      
      

      ​
                                          

                                          

Reply via email to