Thanks very much, you're right. I called the sc.stop() before the execute pool shutdown.
On Fri, Feb 13, 2015 at 7:04 AM, Michael Armbrust <mich...@databricks.com> wrote: > It looks to me like perhaps your SparkContext has shut down due to too > many failures. I'd look in the logs of your executors for more information. > > On Thu, Feb 12, 2015 at 2:34 AM, lihu <lihu...@gmail.com> wrote: > >> I try to use the multi-thread to use the Spark SQL query. >> some sample code just like this: >> >> val sqlContext = new SqlContext(sc) >> val rdd_query = sc.parallelize(data, part) >> rdd_query.registerTempTable("MyTable") >> sqlContext.cacheTable("MyTable") >> >> val serverPool = Executors.newFixedThreadPool(3) >> val loopCnt = 10 >> >> for(i <- 1 to loopCnt ){ >> serverPool.execute(new Runnable(){ >> override def run(){ >> if( some condition){ >> sqlContext.sql("SELECT * from >> ...").collect().foreach(println) >> } >> else{ >> //some other query >> } >> >> } >> }) >> } >> >> this will throw a Task serializable Exception, if I do not use the >> multi-thread, it works well. >> Since there is no object is not serializable? so what is the problem? >> >> >> java.lang.Error: org.apache.spark.SparkException: Task not serializable >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) >> at java.lang.Thread.run(Thread.java:853) >> >> Caused by: org.apache.spark.SparkException: Task not serializable >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) >> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) >> at >> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) >> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) >> at >> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) >> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) >> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) >> at java.lang.Thread.run(Thread.java:853) >> >> Caused by: java.lang.NullPointerException >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) >> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) >> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) >> at >> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) >> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) >> at >> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) >> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) >> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) >> at java.lang.Thread.run(Thread.java:853) >> >> -- >> *Best Wishes!* >> >> >> > -- *Best Wishes!*