Nick Pritchard created SPARK-9141:
-------------------------------------

             Summary: DataFrame recomputed instead of using cached parent.
                 Key: SPARK-9141
                 URL: https://issues.apache.org/jira/browse/SPARK-9141
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.4.0, 1.4.1
            Reporter: Nick Pritchard


As I understand, DataFrame.cache() is supposed to work the same as RDD.cache(), 
so that repeated operations on it will use the cached results and not recompute 
the entire lineage. However, it seems that some DataFrame operations (e.g. 
withColumn) change the underlying RDD lineage so that cache doesn't work as 
expected.

Below is a Scala example that demonstrates this. First, I define two UDF's that 
 use println so that it is easy to see when they are being called. Next, I 
create a simple data frame with one row and two columns. Next, I add a column, 
cache it, and call count() to force the computation. Lastly, I add another 
column, cache it, and call count().

I would have expected the last statement to only compute the last column, since 
everything else was cached. However, because withColumn() changes the lineage, 
the whole data frame is recomputed.

{code:scala}
    // Examples udf's that println when called 
    val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 } 
    val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3 } 

    // Initial dataset 
    val df1 = sc.parallelize(Seq(("a", 1))).toDF("name", "value") 

    // Add column by applying twice udf 
    val df2 = df1.withColumn("twice", twice($"value")) 
    df2.cache() 
    df2.count() //prints Computed: twice(1) 

    // Add column by applying triple udf 
    val df3 = df2.withColumn("triple", triple($"value")) 
    df3.cache() 
    df3.count() //prints Computed: twice(1)\nComputed: triple(1) 
{code}

I found a workaround, which helped me understand what was going on behind the 
scenes, but doesn't seem like an ideal solution. Basically, I convert to RDD 
then back DataFrame, which seems to freeze the lineage. The code below shows 
the workaround for creating the second data frame so cache will work as 
expected.

{code:scala}
    val df2 = {
      val tmp = df1.withColumn("twice", twice($"value"))
      sqlContext.createDataFrame(tmp.rdd, tmp.schema)
    }
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to