Martin Loncaric created SPARK-26555:
---------------------------------------

             Summary: Thread safety issue causes createDataset to fail with 
misleading errors
                 Key: SPARK-26555
                 URL: https://issues.apache.org/jira/browse/SPARK-26555
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.0
            Reporter: Martin Loncaric


This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .getOrCreate()
    import sparkSession.implicits._

    val executor = Executors.newFixedThreadPool(1)
    try {
      implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
      val futures = new ListBuffer[Future[_]]()

      for (i <- 1 to 3) {
        futures += executor.submit(new Runnable {
          override def run(): Unit = {
            val d = if (Random.nextInt(2) == 0) Some("d value") else None
            val e = if (Random.nextInt(2) == 0) Some(5.0) else None
            val f = if (Random.nextInt(2) == 0) Some(6.0) else None
            println("DEBUG", d, e, f)
            sparkSession.createDataset(Seq(
              MyClass(new Timestamp(1L), "b", "c", d, e, f)
            ))
          }
        })
      }

      futures.foreach(_.get())
    } finally {
      println("SHUTDOWN")
      executor.shutdown()
      sparkSession.stop()
    }
  }

  case class MyClass(
    a: Timestamp,
    b: String,
    c: String,
    d: Option[String],
    e: Option[Double],
    f: Option[Double]
  )
}
{code}

causing a variety of possible errors, such as

{{Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210)}}

or

{{Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
        at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to