I try to use the multi-thread to use the Spark SQL query.
some sample code just like this:

val sqlContext = new SqlContext(sc)
val rdd_query = sc.parallelize(data,   part)
rdd_query.registerTempTable("MyTable")
sqlContext.cacheTable("MyTable")

val serverPool = Executors.newFixedThreadPool(3)
val loopCnt = 10

for(i <- 1 to loopCnt ){
    serverPool.execute(new Runnable(){
        override def run(){
            if( some condition){
                sqlContext.sql("SELECT * from
...").collect().foreach(println)
            }
            else{
                //some other query
            }

        }
    })
}

this will throw a Task serializable Exception, if I do not use the
multi-thread, it works well.
Since there is no object is not serializable? so what is the problem?


java.lang.Error: org.apache.spark.SparkException: Task not serializable
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

Caused by: java.lang.NullPointerException
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

-- 
*Best Wishes!*

Reply via email to