Thanks very much, you're right.
I called the sc.stop() before the execute pool shutdown.
On Fri, Feb 13, 2015 at 7:04 AM, Michael Armbrust mich...@databricks.com
wrote:
It looks to me like perhaps your SparkContext has shut down due to too
many failures. I'd look in the logs of your executors for more information.
On Thu, Feb 12, 2015 at 2:34 AM, lihu lihu...@gmail.com wrote:
I try to use the multi-thread to use the Spark SQL query.
some sample code just like this:
val sqlContext = new SqlContext(sc)
val rdd_query = sc.parallelize(data, part)
rdd_query.registerTempTable(MyTable)
sqlContext.cacheTable(MyTable)
val serverPool = Executors.newFixedThreadPool(3)
val loopCnt = 10
for(i - 1 to loopCnt ){
serverPool.execute(new Runnable(){
override def run(){
if( some condition){
sqlContext.sql(SELECT * from
...).collect().foreach(println)
}
else{
//some other query
}
}
})
}
this will throw a Task serializable Exception, if I do not use the
multi-thread, it works well.
Since there is no object is not serializable? so what is the problem?
java.lang.Error: org.apache.spark.SparkException: Task not serializable
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)
Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)
Caused by: java.lang.NullPointerException
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)
--
*Best Wishes!*
--
*Best Wishes!*