Thanks very much, you're right.

I called the sc.stop() before the execute pool shutdown.

On Fri, Feb 13, 2015 at 7:04 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> It looks to me like perhaps your SparkContext has shut down due to too
> many failures.  I'd look in the logs of your executors for more information.
>
> On Thu, Feb 12, 2015 at 2:34 AM, lihu <lihu...@gmail.com> wrote:
>
>> I try to use the multi-thread to use the Spark SQL query.
>> some sample code just like this:
>>
>> val sqlContext = new SqlContext(sc)
>> val rdd_query = sc.parallelize(data,   part)
>> rdd_query.registerTempTable("MyTable")
>> sqlContext.cacheTable("MyTable")
>>
>> val serverPool = Executors.newFixedThreadPool(3)
>> val loopCnt = 10
>>
>> for(i <- 1 to loopCnt ){
>>     serverPool.execute(new Runnable(){
>>         override def run(){
>>             if( some condition){
>>                 sqlContext.sql("SELECT * from
>> ...").collect().foreach(println)
>>             }
>>             else{
>>                 //some other query
>>             }
>>
>>         }
>>     })
>> }
>>
>> this will throw a Task serializable Exception, if I do not use the
>> multi-thread, it works well.
>> Since there is no object is not serializable? so what is the problem?
>>
>>
>> java.lang.Error: org.apache.spark.SparkException: Task not serializable
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>> at java.lang.Thread.run(Thread.java:853)
>>
>> Caused by: org.apache.spark.SparkException: Task not serializable
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
>> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> at
>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
>> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
>> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
>> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>> at java.lang.Thread.run(Thread.java:853)
>>
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
>> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> at
>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
>> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
>> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
>> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>> at java.lang.Thread.run(Thread.java:853)
>>
>> --
>> *Best Wishes!*
>>
>>
>>
>


-- 
*Best Wishes!*

Reply via email to