[ 
https://issues.apache.org/jira/browse/SPARK-12964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-12964:
---------------------------------
    Labels: bulk-closed  (was: )

> SparkContext.localProperties leaked
> -----------------------------------
>
>                 Key: SPARK-12964
>                 URL: https://issues.apache.org/jira/browse/SPARK-12964
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0
>            Reporter: Daniel Darabos
>            Priority: Minor
>              Labels: bulk-closed
>
> I have a non-deterministic but quite reliable reproduction for a case where 
> {{spark.sql.execution.id}} is leaked. Operations then die with 
> {{spark.sql.execution.id is already set}}. These threads never recover 
> because there is nothing to unset {{spark.sql.execution.id}}. (It's not a 
> case of nested {{withNewExecutionId}} calls.)
> I have figured out why this happens. We are within a {{withNewExecutionId}} 
> block. At some point we call back to user code. (In our case this is a custom 
> data source's {{buildScan}} method.) The user code calls 
> {{scala.concurrent.Await.result}}. Because our thread is a member of a 
> {{ForkJoinPool}} (this is a Play HTTP serving thread) {{Await.result}} starts 
> a new thread. {{SparkContext.localProperties}} is cloned for this thread and 
> then it's ready to serve an HTTP request.
> The first thread then finishes waiting, finishes {{buildScan}}, and leaves 
> {{withNewExecutionId}}, clearing {{spark.sql.execution.id}} in the 
> {{finally}} block. All good. But some time later another HTTP request will be 
> served by the second thread. This thread is "poisoned" with a 
> {{spark.sql.execution.id}}. When it tries to use {{withNewExecutionId}} it 
> fails.
> ----
> I don't know who's at fault here. 
>  - I don't like the {{ThreadLocal}} properties anyway. Why not create an 
> Execution object and let it wrap the operation? Then you could have two 
> executions in parallel on the same thread, and other stuff like that. It 
> would be much clearer than storing the execution ID in a kind-of-global 
> variable.
>  - Why do we have to inherit the {{ThreadLocal}} properties? I'm sure there 
> is a good reason, but this is essentially a bug-generator in my view. (It has 
> already generated https://issues.apache.org/jira/browse/SPARK-10563.)
>  - {{Await.result}} --- I never would have thought it starts threads.
>  - We probably shouldn't be calling {{Await.result}} inside {{buildScan}}.
>  - We probably shouldn't call Spark things from HTTP serving threads.
> I'm not sure what could be done on the Spark side, but I thought I should 
> mention this interesting issue. For supporting evidence here is the stack 
> trace when {{localProperties}} is getting cloned. It's contents at that point 
> are:
> {noformat}
> {spark.sql.execution.id=0, spark.rdd.scope.noOverride=true, 
> spark.rdd.scope={"id":"4","name":"ExecutedCommand"}}
> {noformat}
> {noformat}
>   at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:364) 
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:362) 
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:353) 
> [na:1.7.0_91]                
>   at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:261) 
> [na:1.7.0_91]                
>   at java.lang.ThreadLocal.createInheritedMap(ThreadLocal.java:236) 
> [na:1.7.0_91]                   
>   at java.lang.Thread.init(Thread.java:416) [na:1.7.0_91]                     
>                       
>   at java.lang.Thread.init(Thread.java:349) [na:1.7.0_91]                     
>                       
>   at java.lang.Thread.<init>(Thread.java:508) [na:1.7.0_91]                   
>                       
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:48)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.<init>(ExecutionContextImpl.scala:42)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory.newThread(ExecutionContextImpl.scala:42)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2341) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at scala.concurrent.Await$.result(package.scala:107) 
> [org.scala-lang.scala-library-2.10.5.jar:na] 
>   at 
> com.lynxanalytics.biggraph.graph_api.SafeFuture.awaitResult(SafeFuture.scala:50)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.graph_api.DataManager.get(DataManager.scala:315) 
> [biggraph.jar]     
>   at 
> com.lynxanalytics.biggraph.graph_api.Scripting$.getData(Scripting.scala:87) 
> [biggraph.jar]     
>   at 
> com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46)
>  [biggraph.jar]
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> com.lynxanalytics.biggraph.table.TableRelation.buildScan(DefaultSource.scala:46)
>  [biggraph.jar]
>   at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:60)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:58)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:78)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
>  [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
>   at 
> com.lynxanalytics.biggraph.controllers.SQLController.exportSQLQuery(SQLController.scala:112)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65)
>  [biggraph.jar]
>   at scala.util.Try$.apply(Try.scala:161) 
> [org.scala-lang.scala-library-2.10.5.jar:na]              
>   at 
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:65)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:58)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34)
>  [biggraph.jar]
>   at 
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34)
>  [biggraph.jar]
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [org.scala-lang.scala-library-2.10.5.jar:na]
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [org.scala-lang.scala-library-2.10.5.jar:na]
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to