Placek created SPARK-13209:
------------------------------

             Summary: transitive closure on a dataframe
                 Key: SPARK-13209
                 URL: https://issues.apache.org/jira/browse/SPARK-13209
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.6.0
            Reporter: Placek


When I run the following loop the join gets slower and slower regardless of 
caching. If I change the data frame to rdd and then back again (uncomment the 
last commented line) it seems that there is no slow down but I get an error 
after around 30 iterations. The code assumes that Edge is a simple case class 
with start and end.

    val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), 
(9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), 
(17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), (24, 25), 
(25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), 
(9, 10), (10, 11), (11, 12), (12, 13))
    //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7))
    var edges = sc
      .parallelize(e, N)
      .map(p => Edge(p._1, p._2))
      .toDF()
      .cache()
    //var edges = e.map(p => Edge(p._1, p._2)).toDF().cache()
    var filtered = edges
      .filter("start = 1")
      .distinct()
      .withColumnRenamed("start", "fStart")
      .withColumnRenamed("end", "fEnd")
      .cache()

    var i = 0
    while (i < 300) {
      i = i + 1
      println("\n i = " + i)
      filtered = filtered
        .join(edges, filtered("fEnd") === edges("start"))
        .select(filtered("fStart"), edges("end"))
        .withColumnRenamed("start", "fStart")
        .withColumnRenamed("end", "fEnd")
        .distinct
        .cache()
      //filtered.explain()
      //filtered.explain(true)
        
      //filtered = sqlContext.createDataFrame(filtered.rdd, filtered.schema)

      filtered.show
    }

The error I get is:

    ERROR Utils: Uncaught exception in thread driver-heartbeater
    java.io.IOException: java.lang.ClassCastException: cannot assign instance 
of scala.collection.immutable.HashMap$SerializationProxy to field 
org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
scala.collection.immutable.Map in instance of 
org.apache.spark.executor.TaskMetrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to