[ https://issues.apache.org/jira/browse/SPARK-12964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-12964: --------------------------------- Labels: bulk-closed (was: ) > SparkContext.localProperties leaked > ----------------------------------- > > Key: SPARK-12964 > URL: https://issues.apache.org/jira/browse/SPARK-12964 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.0 > Reporter: Daniel Darabos > Priority: Minor > Labels: bulk-closed > > I have a non-deterministic but quite reliable reproduction for a case where > {{spark.sql.execution.id}} is leaked. Operations then die with > {{spark.sql.execution.id is already set}}. These threads never recover > because there is nothing to unset {{spark.sql.execution.id}}. (It's not a > case of nested {{withNewExecutionId}} calls.) > I have figured out why this happens. We are within a {{withNewExecutionId}} > block. At some point we call back to user code. (In our case this is a custom > data source's {{buildScan}} method.) The user code calls > {{scala.concurrent.Await.result}}. Because our thread is a member of a > {{ForkJoinPool}} (this is a Play HTTP serving thread) {{Await.result}} starts > a new thread. {{SparkContext.localProperties}} is cloned for this thread and > then it's ready to serve an HTTP request. > The first thread then finishes waiting, finishes {{buildScan}}, and leaves > {{withNewExecutionId}}, clearing {{spark.sql.execution.id}} in the > {{finally}} block. All good. But some time later another HTTP request will be > served by the second thread. This thread is "poisoned" with a > {{spark.sql.execution.id}}. When it tries to use {{withNewExecutionId}} it > fails. > ---- > I don't know who's at fault here. > - I don't like the {{ThreadLocal}} properties anyway. Why not create an > Execution object and let it wrap the operation? Then you could have two > executions in parallel on the same thread, and other stuff like that. It > would be much clearer than storing the execution ID in a kind-of-global > variable. > - Why do we have to inherit the {{ThreadLocal}} properties? I'm sure there > is a good reason, but this is essentially a bug-generator in my view. (It has > already generated https://issues.apache.org/jira/browse/SPARK-10563.) > - {{Await.result}} --- I never would have thought it starts threads. > - We probably shouldn't be calling {{Await.result}} inside {{buildScan}}. > - We probably shouldn't call Spark things from HTTP serving threads. > I'm not sure what could be done on the Spark side, but I thought I should > mention this interesting issue. For supporting evidence here is the stack > trace when {{localProperties}} is getting cloned. It's contents at that point > are: > {noformat} > {spark.sql.execution.id=0, spark.rdd.scope.noOverride=true, > spark.rdd.scope={"id":"4","name":"ExecutedCommand"}} > {noformat} > {noformat} > at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:364) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:362) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:353) > [na:1.7.0_91] > at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:261) > [na:1.7.0_91] > at java.lang.ThreadLocal.createInheritedMap(ThreadLocal.java:236) > [na:1.7.0_91] > at java.lang.Thread.init(Thread.java:416) [na:1.7.0_91] > > at java.lang.Thread.init(Thread.java:349) [na:1.7.0_91] > > at java.lang.Thread.<init>(Thread.java:508) [na:1.7.0_91] > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:48) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.<init>(ExecutionContextImpl.scala:42) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory.newThread(ExecutionContextImpl.scala:42) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2341) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45) > [org.scala-lang.scala-library-2.10.5.jar:na] > at scala.concurrent.Await$.result(package.scala:107) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > com.lynxanalytics.biggraph.graph_api.SafeFuture.awaitResult(SafeFuture.scala:50) > [biggraph.jar] > at > com.lynxanalytics.biggraph.graph_api.DataManager.get(DataManager.scala:315) > [biggraph.jar] > at > com.lynxanalytics.biggraph.graph_api.Scripting$.getData(Scripting.scala:87) > [biggraph.jar] > at > com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) > [biggraph.jar] > at > com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) > [biggraph.jar] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > [org.scala-lang.scala-library-2.10.5.jar:na] > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > [org.scala-lang.scala-library-2.10.5.jar:na] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > [org.scala-lang.scala-library-2.10.5.jar:na] > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > com.lynxanalytics.biggraph.table.TableRelation.buildScan(DefaultSource.scala:46) > [biggraph.jar] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:60) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:58) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:78) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) > [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] > at > com.lynxanalytics.biggraph.controllers.SQLController.exportSQLQuery(SQLController.scala:112) > [biggraph.jar] > at > com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) > [biggraph.jar] > at > com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) > [biggraph.jar] > at > com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) > [biggraph.jar] > at > com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) > [biggraph.jar] > at scala.util.Try$.apply(Try.scala:161) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:65) > [biggraph.jar] > at > com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:58) > [biggraph.jar] > at > com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) > [biggraph.jar] > at > com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) > [biggraph.jar] > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > [org.scala-lang.scala-library-2.10.5.jar:na] > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [org.scala-lang.scala-library-2.10.5.jar:na] > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [org.scala-lang.scala-library-2.10.5.jar:na] > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [org.scala-lang.scala-library-2.10.5.jar:na] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org