[ 
https://issues.apache.org/jira/browse/SPARK-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814239#comment-15814239
 ] 

Hyukjin Kwon commented on SPARK-13209:
--------------------------------------

It seems (at least at the current master) the plans are too large. If you use 
checkpoint API, it looks not going slow down. 

> transitive closure on a dataframe
> ---------------------------------
>
>                 Key: SPARK-13209
>                 URL: https://issues.apache.org/jira/browse/SPARK-13209
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Placek
>
> When I run the following loop the join gets slower and slower regardless of 
> caching. If I change the data frame to rdd and then back again (uncomment the 
> last commented line) it seems that there is no slow down but I get an error 
> after around 30 iterations. The code assumes that Edge is a simple case class 
> with start and end.
> {code}
>     val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 
> 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 
> 17), (17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), 
> (24, 25), (25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 
> 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13))
>     //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7))
>     var edges = sc
>       .parallelize(e, N)
>       .map(p => Edge(p._1, p._2))
>       .toDF()
>       .cache()
>     //var edges = e.map(p => Edge(p._1, p._2)).toDF().cache()
>     var filtered = edges
>       .filter("start = 1")
>       .distinct()
>       .withColumnRenamed("start", "fStart")
>       .withColumnRenamed("end", "fEnd")
>       .cache()
>     var i = 0
>     while (i < 300) {
>       i = i + 1
>       println("\n i = " + i)
>       filtered = filtered
>         .join(edges, filtered("fEnd") === edges("start"))
>         .select(filtered("fStart"), edges("end"))
>         .withColumnRenamed("start", "fStart")
>         .withColumnRenamed("end", "fEnd")
>         .distinct
>         .cache()
>       //filtered.explain()
>       //filtered.explain(true)
>         
>       //filtered = sqlContext.createDataFrame(filtered.rdd, filtered.schema)
>       filtered.show
>     }
> {code}
> The error I get is and it causes (TaskSchedulerImpl: Lost executor driver on 
> localhost: Executor heartbeat timed out after 121001 ms):
> {code}
> 16/02/07 01:55:59 ERROR Utils: Uncaught exception in thread driver-heartbeater
> java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> scala.collection.immutable.HashMap$SerializationProxy to field 
> org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
> scala.collection.immutable.Map in instance of 
> org.apache.spark.executor.TaskMetrics
>       at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
>       at 
> org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
>       at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>       at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
>       at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436)
>       at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426)
>       at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424)
>       at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468)
>       at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
>       at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
>       at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
>       at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> scala.collection.immutable.HashMap$SerializationProxy to field 
> org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
> scala.collection.immutable.Map in instance of 
> org.apache.spark.executor.TaskMetrics
>       at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
>       at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
>       at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
>       at 
> org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
>       at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
>       ... 32 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to