[ 
https://issues.apache.org/jira/browse/SPARK-13747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16008526#comment-16008526
 ] 

Kagan Turgut commented on SPARK-13747:
--------------------------------------

I am having the same exception.

I am creating a new data source that processes reading batch files 
asynchronously into a temp folder and then returns them as a data frame.

Within the buildScan(): RDD[Row]  method  I have a loop that saves the results 
of each batch in a parquet file:

 val df = spark.sparkContext.parallelize(batchResult.records, 200).toDF()     
 df.write.mode(SaveMode.Overwrite).save(s"$tempDir${tempFile}

Then once the temp files are all written, buildScan method returns 
I will load all those temp files in parallel and return the union in an RDD 
like this:
sqlContext.read
      .schema(schema)
      .load(files: _*)     
      .queryExecution.executedPlan. execute().asInstanceOf[RDD[Row]]

I can see the concurrency issue as I am trying to write the temp files at same 
time I am trying to construct a return RDD.  
Is there a better way of doing this?
To work around, I can save the temp files as regular CSV to work around the 
issue, or upgrade to 2.12 to see if that fixes it, but I prefer to save these 
files as Parquet files using Spark API.



> Concurrent execution in SQL doesn't work with Scala ForkJoinPool
> ----------------------------------------------------------------
>
>                 Key: SPARK-13747
>                 URL: https://issues.apache.org/jira/browse/SPARK-13747
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Shixiong Zhu
>            Assignee: Shixiong Zhu
>
> Run the following codes may fail
> {code}
> (1 to 100).par.foreach { _ =>
>   println(sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count())
> }
> java.lang.IllegalArgumentException: spark.sql.execution.id is already set 
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
>  
>         at 
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) 
>         at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) 
> {code}
> This is because SparkContext.runJob can be suspended when using a 
> ForkJoinPool (e.g.,scala.concurrent.ExecutionContext.Implicits.global) as it 
> calls Await.ready (introduced by https://github.com/apache/spark/pull/9264).
> So when SparkContext.runJob is suspended, ForkJoinPool will run another task 
> in the same thread, however, the local properties has been polluted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to