Indeed! I wasn't able to get this to work in cluster mode, yet, but increasing driver and executor stack sizes in client mode (still running on a YARN EMR cluster) got it to work! I'll fiddle more.
FWIW, I used spark-submit --deploy-mode client --conf "spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" --conf "spark.driver.extraJavaOptions=-XX:ThreadStackSize=81920" .... Thank you so much! On Sun, Jun 5, 2016 at 2:34 PM, Eugene Morozov <evgeny.a.moro...@gmail.com> wrote: > Everett, > > try to increase thread stack size. To do that run your application with > the following options (my app is a web application, so you might adjust > something): -XX:ThreadStackSize=81920 > -Dspark.executor.extraJavaOptions="-XX:ThreadStackSize=81920" > > The number 81920 is memory in KB. You could try smth less. It's pretty > memory consuming to have 80M for each thread (very simply there might be > 100 of them), but this is just a workaround. This is configuration that I > use to train random forest with input of 400k samples. > > Hope this helps. > > -- > Be well! > Jean Morozov > > On Sun, Jun 5, 2016 at 11:17 PM, Everett Anderson < > ever...@nuna.com.invalid> wrote: > >> Hi! >> >> I have a fairly simple Spark (1.6.1) Java RDD-based program that's >> scanning through lines of about 1000 large text files of records and >> computing some metrics about each line (record type, line length, etc). >> Most are identical so I'm calling distinct(). >> >> In the loop over the list of files, I'm saving up the resulting RDDs into >> a List. After the loop, I use the JavaSparkContext union(JavaRDD<T>... >> rdds) method to collapse the tables into one. >> >> Like this -- >> >> List<JavaRDD<String>> allMetrics = ... >> for (int i = 0; i < files.size(); i++) { >> JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...); >> JavaRDD<String> distinctFileMetrics = >> lines.flatMap(fn).distinct(); >> >> allMetrics.add(distinctFileMetrics); >> } >> >> JavaRDD<String> finalOutput = >> jsc.union(allMetrics.toArray(...)).coalesce(10); >> finalOutput.saveAsTextFile(...); >> >> There are posts suggesting >> <https://stackoverflow.com/questions/30522564/spark-when-union-a-lot-of-rdd-throws-stack-overflow-error> >> that using JavaRDD union(JavaRDD<T> other) many times creates a long >> lineage that results in a StackOverflowError. >> >> However, I'm seeing the StackOverflowError even with JavaSparkContext >> union(JavaRDD<T>... rdds). >> >> Should this still be happening? >> >> I'm using the work-around from this 2014 thread >> <http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tt5649.html#a5752>, >> shown >> below, which requires checkpointing to HDFS every N iterations, but it's >> ugly and decreases performance. >> >> Is there a lighter weight way to compact the lineage? It looks like at >> some point there might've been a "local checkpoint >> <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-checkpointing.html>" >> feature? >> >> Work-around: >> >> List<JavaRDD<String>> allMetrics = ... >> for (int i = 0; i < files.size(); i++) { >> JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...); >> JavaRDD<String> distinctFileMetrics = >> lines.flatMap(fn).distinct(); >> allMetrics.add(distinctFileMetrics); >> >> // Union and checkpoint occasionally to reduce lineage >> if (i % tablesPerCheckpoint == 0) { >> JavaRDD<String> dataSoFar = >> jsc.union(allMetrics.toArray(...)); >> dataSoFar.checkpoint(); >> dataSoFar.count(); >> allMetrics.clear(); >> allMetrics.add(dataSoFar); >> } >> } >> >> When the StackOverflowError happens, it's a long trace starting with -- >> >> 16/06/05 18:01:29 INFO YarnAllocator: Driver requested a total number of >> 20823 executor(s). >> 16/06/05 18:01:29 WARN ApplicationMaster: Reporter thread fails 3 time(s) in >> a row. >> java.lang.StackOverflowError >> at >> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) >> at >> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> at >> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) >> >> ... >> >> Thanks! >> >> - Everett >> >> >