Hi!

I have a fairly simple Spark (1.6.1) Java RDD-based program that's scanning
through lines of about 1000 large text files of records and computing some
metrics about each line (record type, line length, etc). Most are identical
so I'm calling distinct().

In the loop over the list of files, I'm saving up the resulting RDDs into a
List. After the loop, I use the JavaSparkContext union(JavaRDD<T>... rdds)
method to collapse the tables into one.

Like this --

List<JavaRDD<String>> allMetrics = ...
for (int i = 0; i < files.size(); i++) {
   JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...);
   JavaRDD<String> distinctFileMetrics =
        lines.flatMap(fn).distinct();

   allMetrics.add(distinctFileMetrics);
}

JavaRDD<String> finalOutput =
    jsc.union(allMetrics.toArray(...)).coalesce(10);
finalOutput.saveAsTextFile(...);

There are posts suggesting
<https://stackoverflow.com/questions/30522564/spark-when-union-a-lot-of-rdd-throws-stack-overflow-error>
that using JavaRDD union(JavaRDD<T> other) many times creates a long
lineage that results in a StackOverflowError.

However, I'm seeing the StackOverflowError even with JavaSparkContext
union(JavaRDD<T>... rdds).

Should this still be happening?

I'm using the work-around from this 2014 thread
<http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tt5649.html#a5752>,
shown
below, which requires checkpointing to HDFS every N iterations, but it's
ugly and decreases performance.

Is there a lighter weight way to compact the lineage? It looks like at some
point there might've been a "local checkpoint
<https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-checkpointing.html>"
feature?

Work-around:

List<JavaRDD<String>> allMetrics = ...
for (int i = 0; i < files.size(); i++) {
   JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...);
   JavaRDD<String> distinctFileMetrics =
        lines.flatMap(fn).distinct();
   allMetrics.add(distinctFileMetrics);

   // Union and checkpoint occasionally to reduce lineage
   if (i % tablesPerCheckpoint == 0) {
       JavaRDD<String> dataSoFar =
           jsc.union(allMetrics.toArray(...));
       dataSoFar.checkpoint();
       dataSoFar.count();
       allMetrics.clear();
       allMetrics.add(dataSoFar);
   }
}

When the StackOverflowError happens, it's a long trace starting with --

16/06/05 18:01:29 INFO YarnAllocator: Driver requested a total number
of 20823 executor(s).
16/06/05 18:01:29 WARN ApplicationMaster: Reporter thread fails 3
time(s) in a row.
java.lang.StackOverflowError
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)

...

Thanks!

- Everett

Reply via email to