[ https://issues.apache.org/jira/browse/SPARK-7792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14576681#comment-14576681 ]
Apache Spark commented on SPARK-7792: ------------------------------------- User 'navis' has created a pull request for this issue: https://github.com/apache/spark/pull/6699 > HiveContext registerTempTable not thread safe > --------------------------------------------- > > Key: SPARK-7792 > URL: https://issues.apache.org/jira/browse/SPARK-7792 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.3.1 > Reporter: Yana Kadiyska > > {code:java} > public class ThreadRepro { > public static void main(String[] args) throws Exception{ > new ThreadRepro().sparkPerfTest(); > } > public void sparkPerfTest(){ > final AtomicLong counter = new AtomicLong(); > SparkConf conf = new SparkConf(); > conf.setAppName("My Application"); > conf.setMaster("local[7]"); > SparkContext sc = new SparkContext(conf); > org.apache.spark.sql.hive.HiveContext hc = new > org.apache.spark.sql.hive.HiveContext(sc); > int poolSize = 10; > ExecutorService pool = Executors.newFixedThreadPool(poolSize); > for (int i=0; i<poolSize;i++ ) > pool.execute(new QueryJob(hc, i, counter)); > pool.shutdown(); > try { > pool.awaitTermination(60, TimeUnit.MINUTES); > }catch(Exception e){ > System.out.println("Thread interrupted"); > } > System.out.println("All jobs complete"); > System.out.println(" Counter is "+counter.get()); > } > } > class QueryJob implements Runnable{ > String threadId; > org.apache.spark.sql.hive.HiveContext sqlContext; > String key; > AtomicLong counter; > final AtomicLong local_counter = new AtomicLong(); > public QueryJob(org.apache.spark.sql.hive.HiveContext _sqlContext,int > id,AtomicLong ctr){ > threadId = "thread_"+id; > this.sqlContext= _sqlContext; > this.counter = ctr; > } > public void run() { > for (int i = 0; i < 100; i++) { > String tblName = threadId +"_"+i; > DataFrame df = sqlContext.emptyDataFrame(); > df.registerTempTable(tblName); > String _query = String.format("select count(*) from %s",tblName); > System.out.println(String.format(" registered table %s; catalog > (%s) ",tblName,debugTables())); > List<Row> res; > try { > res = sqlContext.sql(_query).collectAsList(); > }catch (Exception e){ > System.out.println("*Exception "+ debugTables() +"**"); > throw e; > } > sqlContext.dropTempTable(tblName); > System.out.println(" dropped table "+tblName); > try { > Thread.sleep(3000);//lets make this a not-so-tight loop > }catch(Exception e){ > System.out.println("Thread interrupted"); > } > } > } > private String debugTables(){ > String v = Joiner.on(',').join(sqlContext.tableNames()); > if (v==null)return ""; else return v; > } > } > {code} > this will periodically produce the following: > {quote} > registered table thread_0_50; catalog (thread_1_50) > registered table thread_4_50; catalog (thread_4_50,thread_1_50) > registered table thread_1_50; catalog (thread_1_50) > dropped table thread_1_50 > dropped table thread_4_50 > *Exception ** > Exception in thread "pool-6-thread-1" java.lang.Error: > org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos > 21 > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.sql.AnalysisException: no such table thread_0_50; > line 1 pos 21 > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:177) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:181) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) > at > org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:1082) > at > org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082) > at > org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080) > at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) > at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) > at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101) > at test.unit.QueryJob.run(ThreadRepro.java:93) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > {quote} > Line 93 is the .sql call... -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org