I try to use the multi-thread to use the Spark SQL query. some sample code just like this:
val sqlContext = new SqlContext(sc) val rdd_query = sc.parallelize(data, part) rdd_query.registerTempTable("MyTable") sqlContext.cacheTable("MyTable") val serverPool = Executors.newFixedThreadPool(3) val loopCnt = 10 for(i <- 1 to loopCnt ){ serverPool.execute(new Runnable(){ override def run(){ if( some condition){ sqlContext.sql("SELECT * from ...").collect().foreach(println) } else{ //some other query } } }) } this will throw a Task serializable Exception, if I do not use the multi-thread, it works well. Since there is no object is not serializable? so what is the problem? java.lang.Error: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) -- *Best Wishes!*