I use a simple map/reduce step in a Java/Spark program to remove duplicated
documents from a large (10 TB compressed) sequence file containing some
html pages. Here is the partial code:

JavaPairRDD<BytesWritable, NullWritable> inputRecords =
    sc.sequenceFile(args[0], BytesWritable.class,
NullWritable.class).coalesce(numMaps);


JavaPairRDD<String, AteshProtos.CacheDoc> hashDocs =
inputRecords.mapToPair(t ->
    cacheDocs.add(new Tuple2<>(BaseEncoding.base64()
        .encode(Hashing.sha1().hashString(doc.getUrl(),
Charset.defaultCharset()).asBytes()), doc));
});


JavaPairRDD<BytesWritable, NullWritable> byteArrays =
    hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds).
        mapToPair(t -> new Tuple2<>(new
BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()),
                NullWritable.get()));


The logic is simple. The map generates a sha-1 signature from the html
and in the reduce phase we keep the html that has the shortest URL.
However, after running for 2-3 hours the application crashes due to
memory issue. Here is the exception:

15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in
stage 0.0 (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC
overhead limit exceeded


It seems that the map function keeps the hashDocs RDD in the memory
and when the memory is filled in an executor, the application crashes.
Persisting the map output to disk solves the problem. Adding the
following line between map and reduce solve the issue:

hashDocs.persist(StorageLevel.DISK_ONLY());


Is this a bug of Spark?

How can I tell Spark not to keep even a bit of RDD in the memory?


Thanks

Reply via email to