Gal Topper created SPARK-19476:
----------------------------------

             Summary: Running threads in Spark DataFrame foreachPartition() 
causes NullPointerException
                 Key: SPARK-19476
                 URL: https://issues.apache.org/jira/browse/SPARK-19476
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3, 1.6.2, 1.6.1, 1.6.0
            Reporter: Gal Topper


First reported on [Stack 
overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].

I use multiple threads inside foreachPartition(), which works great for me 
except for when the underlying iterator is TungstenAggregationIterator. Here is 
a minimal code snippet to reproduce:
{code:title=Reproduce.scala|borderStyle=solid}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration.Duration
    import scala.concurrent.{Await, Future}

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext

    object Reproduce extends App {

      val sc = new SparkContext("local", "reproduce")
      val sqlContext = new SQLContext(sc)

      import sqlContext.implicits._

      val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()

      df.foreachPartition { iterator =>
        val f = Future(iterator.toVector)
        Await.result(f, Duration.Inf)
      }
    }
{code}

When I run this, I get:
{noformat}
    java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
        at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
{noformat}

I believe I actually understand why this happens - TungstenAggregationIterator 
uses a ThreadLocal variable that returns null when called from a thread other 
than the original thread that got the iterator from Spark. From examining the 
code, this does not appear to differ between recent Spark versions.

However, this limitation is specific to TungstenAggregationIterator, and not 
documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to