[ 
https://issues.apache.org/jira/browse/SPARK-32813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Klyushnikov updated SPARK-32813:
-----------------------------------------
    Description: 
Reading parquet rdd in non columnar mode (i.e. with list fields)  if Spark 
session was  created in one thread and rdd is being read in another  - so 
InheritableThreadLocal  with active session is not propagated. Code below was 
working perfectly in Spark 2.X, but fails in Spark 3  
{code:scala}
import java.util.concurrent.Executors

import org.apache.spark.sql.SparkSession

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

object Main {

  final case class Data(list: List[Int])

  def main(args: Array[String]): Unit = {

    val executor1 = Executors.newSingleThreadExecutor()
    val executor2 = Executors.newSingleThreadExecutor()
    try {
      val ds = Await.result(Future {
        val session = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        import session.implicits._

        session.createDataset(Data(1 :: Nil) :: 
Nil).write.parquet("test.parquet")
        session.read.parquet("test.parquet").as[Data]
      }(ExecutionContext.fromExecutorService(executor1)), 1.minute)

      Await.result(Future {
        ds.rdd.collect().foreach(println(_))
      }(ExecutionContext.fromExecutorService(executor2)), 1.minute)

    } finally {
      executor1.shutdown()
      executor2.shutdown()
    }
  }
}
{code}
This code fails with following exception:
{noformat}
Exception in thread "main" java.util.NoSuchElementException: None.getException 
in thread "main" java.util.NoSuchElementException: None.get at 
scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at 
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at 
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) 
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at 
org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
{noformat}

  was:
Reading parquet rdd in non columnar mode (i.e. with list fields)  if Spark 
session was  created in one thread and rdd is being read in another  - so 
InheritableThreadLocal  with active session is not propagated. Code below was 
working perfectly in Spark 2.X, but fails in Spark 3  
{code:scala}
import java.util.concurrent.Executors

import org.apache.spark.sql.SparkSession

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

object Main {

  final case class Data(list: List[Int])

  def main(args: Array[String]): Unit = {

    val executor1 = Executors.newSingleThreadExecutor()
    val executor2 = Executors.newSingleThreadExecutor()
    try {
      val ds = Await.result(Future {
        val session = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        import session.implicits._

        session.createDataset(Data(1 :: Nil) :: 
Nil).write.parquet("test.parquet")
        session.read.parquet("test.parquet").as[Data]
      }(ExecutionContext.fromExecutorService(executor1)), 1.minute)

      Await.result(Future {
        ds.rdd.collect().foreach(println(_))
      }(ExecutionContext.fromExecutorService(executor2)), 1.minute)

    } finally {
      executor1.shutdown()
      executor2.shutdown()
    }
  }
}
{code}
This code fails with following exception:
{noformat}
Exception in thread "main" java.util.NoSuchElementException: None.getException 
in thread "main" java.util.NoSuchElementException: None.get at 
scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at 
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at 
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) 
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at 
org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196){noformat}


> Reading parquet rdd in non columnar mode fails in multithreaded environment
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-32813
>                 URL: https://issues.apache.org/jira/browse/SPARK-32813
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Vladimir Klyushnikov
>            Priority: Blocker
>
> Reading parquet rdd in non columnar mode (i.e. with list fields)  if Spark 
> session was  created in one thread and rdd is being read in another  - so 
> InheritableThreadLocal  with active session is not propagated. Code below was 
> working perfectly in Spark 2.X, but fails in Spark 3  
> {code:scala}
> import java.util.concurrent.Executors
> import org.apache.spark.sql.SparkSession
> import scala.concurrent.{Await, ExecutionContext, Future}
> import scala.concurrent.duration._
> object Main {
>   final case class Data(list: List[Int])
>   def main(args: Array[String]): Unit = {
>     val executor1 = Executors.newSingleThreadExecutor()
>     val executor2 = Executors.newSingleThreadExecutor()
>     try {
>       val ds = Await.result(Future {
>         val session = 
> SparkSession.builder().appName("test").master("local[*]").getOrCreate()
>         import session.implicits._
>         session.createDataset(Data(1 :: Nil) :: 
> Nil).write.parquet("test.parquet")
>         session.read.parquet("test.parquet").as[Data]
>       }(ExecutionContext.fromExecutorService(executor1)), 1.minute)
>       Await.result(Future {
>         ds.rdd.collect().foreach(println(_))
>       }(ExecutionContext.fromExecutorService(executor2)), 1.minute)
>     } finally {
>       executor1.shutdown()
>       executor2.shutdown()
>     }
>   }
> }
> {code}
> This code fails with following exception:
> {noformat}
> Exception in thread "main" java.util.NoSuchElementException: 
> None.getException in thread "main" java.util.NoSuchElementException: None.get 
> at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at 
> org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
>  at 
> org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
>  at 
> org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
>  at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
>  at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at 
> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
>  at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
>  at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
>  at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) 
> at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at 
> org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to