I use a simple map/reduce step in a Java/Spark program to remove duplicated documents from a large (10 TB compressed) sequence file containing some html pages. Here is the partial code:
JavaPairRDD<BytesWritable, NullWritable> inputRecords = sc.sequenceFile(args[0], BytesWritable.class, NullWritable.class).coalesce(numMaps); JavaPairRDD<String, AteshProtos.CacheDoc> hashDocs = inputRecords.mapToPair(t -> cacheDocs.add(new Tuple2<>(BaseEncoding.base64() .encode(Hashing.sha1().hashString(doc.getUrl(), Charset.defaultCharset()).asBytes()), doc)); }); JavaPairRDD<BytesWritable, NullWritable> byteArrays = hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds). mapToPair(t -> new Tuple2<>(new BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()), NullWritable.get())); The logic is simple. The map generates a sha-1 signature from the html and in the reduce phase we keep the html that has the shortest URL. However, after running for 2-3 hours the application crashes due to memory issue. Here is the exception: 15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in stage 0.0 (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC overhead limit exceeded It seems that the map function keeps the hashDocs RDD in the memory and when the memory is filled in an executor, the application crashes. Persisting the map output to disk solves the problem. Adding the following line between map and reduce solve the issue: hashDocs.persist(StorageLevel.DISK_ONLY()); Is this a bug of Spark? How can I tell Spark not to keep even a bit of RDD in the memory? Thanks