[ https://issues.apache.org/jira/browse/SPARK-13747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16008526#comment-16008526 ]
Kagan Turgut edited comment on SPARK-13747 at 5/12/17 6:27 PM: --------------------------------------------------------------- I am having the same exception. I am creating a new data source that processes reading batch files asynchronously into a temp folder and then returns them as a data frame. Within the buildScan(): RDD[Row] method I have a loop that saves the results of each batch in a parquet file: val df = spark.sparkContext.parallelize(batchResult.records, 200).toDF() df.write.mode(SaveMode.Overwrite).save(tempFile)} Then once the temp files are all written, buildScan method returns I will load all those temp files in parallel and return the union in an RDD like this: sqlContext.read .schema(schema) .load(files: _*) .queryExecution.executedPlan. execute().asInstanceOf[RDD[Row]] I can see the concurrency issue as I am trying to write the temp files at same time I am trying to construct a return RDD. Is there a better way of doing this? To work around, I can save the temp files as regular CSV to work around the issue, or upgrade to 2.12 to see if that fixes it, but I prefer to save these files as Parquet files using Spark API. was (Author: kagan): I am having the same exception. I am creating a new data source that processes reading batch files asynchronously into a temp folder and then returns them as a data frame. Within the buildScan(): RDD[Row] method I have a loop that saves the results of each batch in a parquet file: val df = spark.sparkContext.parallelize(batchResult.records, 200).toDF() df.write.mode(SaveMode.Overwrite).save(s"$tempDir${tempFile} Then once the temp files are all written, buildScan method returns I will load all those temp files in parallel and return the union in an RDD like this: sqlContext.read .schema(schema) .load(files: _*) .queryExecution.executedPlan. execute().asInstanceOf[RDD[Row]] I can see the concurrency issue as I am trying to write the temp files at same time I am trying to construct a return RDD. Is there a better way of doing this? To work around, I can save the temp files as regular CSV to work around the issue, or upgrade to 2.12 to see if that fixes it, but I prefer to save these files as Parquet files using Spark API. > Concurrent execution in SQL doesn't work with Scala ForkJoinPool > ---------------------------------------------------------------- > > Key: SPARK-13747 > URL: https://issues.apache.org/jira/browse/SPARK-13747 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0, 2.0.1 > Reporter: Shixiong Zhu > Assignee: Shixiong Zhu > > Run the following codes may fail > {code} > (1 to 100).par.foreach { _ => > println(sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()) > } > java.lang.IllegalArgumentException: spark.sql.execution.id is already set > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) > > at > org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) > at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) > {code} > This is because SparkContext.runJob can be suspended when using a > ForkJoinPool (e.g.,scala.concurrent.ExecutionContext.Implicits.global) as it > calls Await.ready (introduced by https://github.com/apache/spark/pull/9264). > So when SparkContext.runJob is suspended, ForkJoinPool will run another task > in the same thread, however, the local properties has been polluted. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org