[ https://issues.apache.org/jira/browse/SPARK-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814239#comment-15814239 ]
Hyukjin Kwon commented on SPARK-13209: -------------------------------------- It seems (at least at the current master) the plans are too large. If you use checkpoint API, it looks not going slow down. > transitive closure on a dataframe > --------------------------------- > > Key: SPARK-13209 > URL: https://issues.apache.org/jira/browse/SPARK-13209 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0 > Reporter: Placek > > When I run the following loop the join gets slower and slower regardless of > caching. If I change the data frame to rdd and then back again (uncomment the > last commented line) it seems that there is no slow down but I get an error > after around 30 iterations. The code assumes that Edge is a simple case class > with start and end. > {code} > val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, > 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, > 17), (17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), > (24, 25), (25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, > 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13)) > //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7)) > var edges = sc > .parallelize(e, N) > .map(p => Edge(p._1, p._2)) > .toDF() > .cache() > //var edges = e.map(p => Edge(p._1, p._2)).toDF().cache() > var filtered = edges > .filter("start = 1") > .distinct() > .withColumnRenamed("start", "fStart") > .withColumnRenamed("end", "fEnd") > .cache() > var i = 0 > while (i < 300) { > i = i + 1 > println("\n i = " + i) > filtered = filtered > .join(edges, filtered("fEnd") === edges("start")) > .select(filtered("fStart"), edges("end")) > .withColumnRenamed("start", "fStart") > .withColumnRenamed("end", "fEnd") > .distinct > .cache() > //filtered.explain() > //filtered.explain(true) > > //filtered = sqlContext.createDataFrame(filtered.rdd, filtered.schema) > filtered.show > } > {code} > The error I get is and it causes (TaskSchedulerImpl: Lost executor driver on > localhost: Executor heartbeat timed out after 121001 ms): > {code} > 16/02/07 01:55:59 ERROR Utils: Uncaught exception in thread driver-heartbeater > java.io.IOException: java.lang.ClassCastException: cannot assign instance of > scala.collection.immutable.HashMap$SerializationProxy to field > org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type > scala.collection.immutable.Map in instance of > org.apache.spark.executor.TaskMetrics > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) > at > org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) > at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426) > at > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) > at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: cannot assign instance of > scala.collection.immutable.HashMap$SerializationProxy to field > org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type > scala.collection.immutable.Map in instance of > org.apache.spark.executor.TaskMetrics > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) > at > org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) > ... 32 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org