Thanks for your reply Sean.
Looks like it's happening in a map:
15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at
NativeMethodAccessorImpl.java:-2)
That's my initial 'parse' stage, done before repartitioning. It reduces the
On the advice of some recent discussions on this list, I thought I would
try and consume gz files directly. I'm reading them, doing a preliminary
map, then repartitioning, then doing normal spark things.
As I understand it, zip files aren't readable in partitions because of the
format, so I
gzip and zip are not splittable compression formats; bzip and lzo are.
Ideally, use a splittable compression format.
Repartitioning is not a great solution since it means a shuffle, typically.
This is not necessarily related to how big your partitions are. The
question is, when does this happen?
Thanks for your detailed reply Imran. I'm writing this in Clojure (using
Flambo which uses the Java API) but I don't think that's relevant. So
here's the pseudocode (sorry I've not written Scala for a long time):
val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files.
val parsedFiles =
Hi Joe,
The issue is not that you have input partitions that are bigger than 2GB --
its just that they are getting cached. You can see in the stack trace, the
problem is when you try to read data out of the DiskStore:
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
Also, just
oh, I think you are just choosing a number that is too small for your
number of partitions. All of the data in /dir/to/gzfiles is going to be
sucked into one RDD, with the data divided into partitions. So if you're
parsing 200 files, each about 2 GB, and then repartitioning down to 100