Hi Joe,

The issue is not that you have input partitions that are bigger than 2GB --
its just that they are getting cached.  You can see in the stack trace, the
problem is when you try to read data out of the DiskStore:

org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

Also, just because you see this:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:
-2)

it doesn't *necessarily* mean that this is coming from your map.  It can be
pretty confusing how your operations on RDDs get turned into stages, it
could be a lot more than just your map.  and actually, it might not even be
your map at all -- some of the other operations you invoke call map
underneath the covers.  So its hard to say what is going on here w/ out
seeing more code.  Anyway, maybe you've already considered all this (you
did mention the lazy execution of the DAG), but I wanted to make sure.  it
might help to use rdd.setName() and also to look at rdd.toDebugString.

As far as what you can do about this -- it could be as simple as moving
your rdd.persist() to after you have compressed and repartitioned your
data.  eg., I'm blindly guessing you have something like this:

val rawData = sc.hadoopFile(...)
rawData.persist(DISK)
rawData.count()
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
...

change it to something like:

val rawData = sc.hadoopFile(...)
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
repartitionedData.persist(DISK)
repartitionedData.count()
...


The point is, you avoid caching any data until you have ensured that the
partitions are small.  You might have big partitions before that in
rawData, but that is OK.

Imran


On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass <jw...@crossref.org> wrote:

> Thanks for your reply Sean.
>
> Looks like it's happening in a map:
>
> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
> tasks from Stage 1 (MappedRDD[17] at mapToPair at
> NativeMethodAccessorImpl.java:-2)
>
> That's my initial 'parse' stage, done before repartitioning. It reduces
> the data size significantly so I thought it would be sensible to do before
> repartitioning, which involves moving lots of data around. That might be a
> stupid idea in hindsight!
>
> So the obvious thing to try would be to try repartitioning before the map
> as the first transformation. I would have done that if I could be sure that
> it would succeed or fail quickly.
>
> I'm not entirely clear about the lazy execution of transformations in DAG.
> It could be that the error is manifesting during the mapToPair, but caused
> by the earlier read from text file stage.
>
> Thanks for pointers to those compression formats. I'll give them a go
> (although it's not trivial to re-encode 200 GB of data on S3, so if I can
> get this working reasonably with gzip I'd like to).
>
> Any advice about whether this error can be worked round with an early
> partition?
>
> Cheers
>
> Joe
>
>
> On 19 February 2015 at 09:51, Sean Owen <so...@cloudera.com> wrote:
>
>> gzip and zip are not splittable compression formats; bzip and lzo are.
>> Ideally, use a splittable compression format.
>>
>> Repartitioning is not a great solution since it means a shuffle,
>> typically.
>>
>> This is not necessarily related to how big your partitions are. The
>> question is, when does this happen? what operation?
>>
>> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass <jw...@crossref.org> wrote:
>> > On the advice of some recent discussions on this list, I thought I
>> would try
>> > and consume gz files directly. I'm reading them, doing a preliminary
>> map,
>> > then repartitioning, then doing normal spark things.
>> >
>> > As I understand it, zip files aren't readable in partitions because of
>> the
>> > format, so I thought that repartitioning would be the next best thing
>> for
>> > parallelism. I have about 200 files, some about 1GB compressed and some
>> over
>> > 2GB uncompressed.
>> >
>> > I'm hitting the 2GB maximum partition size. It's been discussed on this
>> list
>> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391).
>> > Stack trace at the end. This happened at 10 hours in (probably when it
>> saw
>> > its first file). I can't just re-run it quickly!
>> >
>> > Does anyone have any advice? Might I solve this by re-partitioning as
>> the
>> > first step after reading the file(s)? Or is it effectively impossible to
>> > read a gz file that expands to over 2GB? Does anyone have any experience
>> > with this?
>> >
>> > Thanks in advance
>> >
>> > Joe
>> >
>> > Stack trace:
>> >
>> > Exception in thread "main" 15/02/18 20:44:25 INFO
>> scheduler.TaskSetManager:
>> > Lost task 5.3 in stage 1.0 (TID 283) on executor:
>> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
>> > [duplicate 6]
>> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 2 in
>> > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>> 1.0:
>> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>> >         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
>> >         at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>> >         at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>> >         at
>> > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>> >         at
>> > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
>> >         at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
>> >         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>
>
>

Reply via email to