[ 
https://issues.apache.org/jira/browse/SPARK-13747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976580#comment-15976580
 ] 

Mousa HAMAD commented on SPARK-13747:
-------------------------------------

I am also running into this issue *sporadically* when collecting the results of 
joining two dataframes.

The code that *sporadically* generates this issue is:
{code}
val spark = 
SparkSession.builder().appName("application").master("local[*]").getOrCreate()
val itemCountry = spark.read.format("csv")
  .option("header", "true")
  .schema(StructType(Array(
    StructField("itemId", IntegerType, false),
    StructField("countryId", IntegerType, false))))
  .csv("/item_country.csv") // This file matches the schema provided
val itemPerformance = spark.read.format("csv")
  .option("header", "true")
  .schema(StructType(Array(
    StructField("itemId", IntegerType, false),
    StructField("date", TimestampType, false),
    StructField("performance", IntegerType, false))))
  .csv("/item_performance.csv") // This file matches the schema provided

itemCountry.join(itemPerformance, itemCountry("itemId") === 
itemPerformance("itemId"))
  .groupBy("countryId")
  .agg(sum(when(to_date(itemPerformance("date")) > to_date(lit("2017-01-01")), 
itemPerformance("performance")).otherwise(0)).alias("performance")).show()
{code}

The stack trace is:
{code}
java.lang.IllegalArgumentException: spark.sql.execution.id is already set
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:81)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2375)
at 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2375)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778)
at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2375)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2351)
at .... [Custom caller functions]
{code}

> Concurrent execution in SQL doesn't work with Scala ForkJoinPool
> ----------------------------------------------------------------
>
>                 Key: SPARK-13747
>                 URL: https://issues.apache.org/jira/browse/SPARK-13747
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Shixiong Zhu
>            Assignee: Shixiong Zhu
>             Fix For: 2.2.0
>
>
> Run the following codes may fail
> {code}
> (1 to 100).par.foreach { _ =>
>   println(sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count())
> }
> java.lang.IllegalArgumentException: spark.sql.execution.id is already set 
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
>  
>         at 
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) 
>         at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) 
> {code}
> This is because SparkContext.runJob can be suspended when using a 
> ForkJoinPool (e.g.,scala.concurrent.ExecutionContext.Implicits.global) as it 
> calls Await.ready (introduced by https://github.com/apache/spark/pull/9264).
> So when SparkContext.runJob is suspended, ForkJoinPool will run another task 
> in the same thread, however, the local properties has been polluted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to