oh, I think you are just choosing a number that is too small for your
number of partitions.  All of the data in "/dir/to/gzfiles" is going to be
sucked into one RDD, with the data divided into partitions.  So if you're
parsing 200 files, each about 2 GB, and then repartitioning down to 100
partitions, you would expect 4 GB per partition.  Though you're filtering
the data down some, there may also be some bloat from from your parsed
objects.  Also if you're not using kryo for serialization, I'd strongly
recommend that over the default serialization, and try to register all your
classes.

I think you can get some information about how much data is in your RDDs
from the UI -- but it might depend on what version you are running of
spark, plus I think the info isn't saved on failed stages, so you might
just need to monitor it in the UI as its happening (I am not 100% sure
about that ...)

So I'd suggest (a) using a lot more partitions (maybe 1k, given your data
size) (b) turn on kryo if you haven't already.



On Thu, Feb 19, 2015 at 9:36 AM, Joe Wass <jw...@crossref.org> wrote:

> Thanks for your detailed reply Imran. I'm writing this in Clojure (using
> Flambo which uses the Java API) but I don't think that's relevant. So
> here's the pseudocode (sorry I've not written Scala for a long time):
>
> val rawData = sc.hadoopFile("/dir/to/gzfiles") // NB multiple files.
> val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
> val filtered = parsedFiles.filter(notNil)
> val partitioned = filtered.repartition(100) // guessed number
> val persisted = partitioned.persist(StorageLevels.DISK_ONLY)
>
> val resultA = stuffA(persisted)
> val resultB = stuffB(persisted)
> val resultC = stuffC(persisted)
>
> So, I think I'm already doing what you suggested. I would have assumed
> that partition size would be («size of expanded file» / «number of
> partitions»). In this case, 100 (which I picked out of the air).
>
> I wonder whether the «size of expanded file» is actually the size of all
> concatenated input files (probably about 800 GB)? In that case should I
> multiply it by the number of files? Or perhaps I'm barking up completely
> the wrong tree.
>
> Joe
>
>
>
>
> On 19 February 2015 at 14:44, Imran Rashid <iras...@cloudera.com> wrote:
>
>> Hi Joe,
>>
>> The issue is not that you have input partitions that are bigger than 2GB
>> -- its just that they are getting cached.  You can see in the stack trace,
>> the problem is when you try to read data out of the DiskStore:
>>
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>>
>> Also, just because you see this:
>>
>> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
>> tasks from Stage 1 (MappedRDD[17] at mapToPair at
>> NativeMethodAccessorImpl.java:-2)
>>
>> it doesn't *necessarily* mean that this is coming from your map.  It can
>> be pretty confusing how your operations on RDDs get turned into stages, it
>> could be a lot more than just your map.  and actually, it might not even be
>> your map at all -- some of the other operations you invoke call map
>> underneath the covers.  So its hard to say what is going on here w/ out
>> seeing more code.  Anyway, maybe you've already considered all this (you
>> did mention the lazy execution of the DAG), but I wanted to make sure.  it
>> might help to use rdd.setName() and also to look at rdd.toDebugString.
>>
>> As far as what you can do about this -- it could be as simple as moving
>> your rdd.persist() to after you have compressed and repartitioned your
>> data.  eg., I'm blindly guessing you have something like this:
>>
>> val rawData = sc.hadoopFile(...)
>> rawData.persist(DISK)
>> rawData.count()
>> val compressedData = rawData.map{...}
>> val repartitionedData = compressedData.repartition(N)
>> ...
>>
>> change it to something like:
>>
>> val rawData = sc.hadoopFile(...)
>> val compressedData = rawData.map{...}
>> val repartitionedData = compressedData.repartition(N)
>> repartitionedData.persist(DISK)
>> repartitionedData.count()
>> ...
>>
>>
>> The point is, you avoid caching any data until you have ensured that the
>> partitions are small.  You might have big partitions before that in
>> rawData, but that is OK.
>>
>> Imran
>>
>>
>> On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass <jw...@crossref.org> wrote:
>>
>>> Thanks for your reply Sean.
>>>
>>> Looks like it's happening in a map:
>>>
>>> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
>>> tasks from Stage 1 (MappedRDD[17] at mapToPair at
>>> NativeMethodAccessorImpl.java:-2)
>>>
>>> That's my initial 'parse' stage, done before repartitioning. It reduces
>>> the data size significantly so I thought it would be sensible to do before
>>> repartitioning, which involves moving lots of data around. That might be a
>>> stupid idea in hindsight!
>>>
>>> So the obvious thing to try would be to try repartitioning before the
>>> map as the first transformation. I would have done that if I could be sure
>>> that it would succeed or fail quickly.
>>>
>>> I'm not entirely clear about the lazy execution of transformations in
>>> DAG. It could be that the error is manifesting during the mapToPair, but
>>> caused by the earlier read from text file stage.
>>>
>>> Thanks for pointers to those compression formats. I'll give them a go
>>> (although it's not trivial to re-encode 200 GB of data on S3, so if I can
>>> get this working reasonably with gzip I'd like to).
>>>
>>> Any advice about whether this error can be worked round with an early
>>> partition?
>>>
>>> Cheers
>>>
>>> Joe
>>>
>>>
>>> On 19 February 2015 at 09:51, Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> gzip and zip are not splittable compression formats; bzip and lzo are.
>>>> Ideally, use a splittable compression format.
>>>>
>>>> Repartitioning is not a great solution since it means a shuffle,
>>>> typically.
>>>>
>>>> This is not necessarily related to how big your partitions are. The
>>>> question is, when does this happen? what operation?
>>>>
>>>> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass <jw...@crossref.org> wrote:
>>>> > On the advice of some recent discussions on this list, I thought I
>>>> would try
>>>> > and consume gz files directly. I'm reading them, doing a preliminary
>>>> map,
>>>> > then repartitioning, then doing normal spark things.
>>>> >
>>>> > As I understand it, zip files aren't readable in partitions because
>>>> of the
>>>> > format, so I thought that repartitioning would be the next best thing
>>>> for
>>>> > parallelism. I have about 200 files, some about 1GB compressed and
>>>> some over
>>>> > 2GB uncompressed.
>>>> >
>>>> > I'm hitting the 2GB maximum partition size. It's been discussed on
>>>> this list
>>>> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and
>>>> SPARK-1391).
>>>> > Stack trace at the end. This happened at 10 hours in (probably when
>>>> it saw
>>>> > its first file). I can't just re-run it quickly!
>>>> >
>>>> > Does anyone have any advice? Might I solve this by re-partitioning as
>>>> the
>>>> > first step after reading the file(s)? Or is it effectively impossible
>>>> to
>>>> > read a gz file that expands to over 2GB? Does anyone have any
>>>> experience
>>>> > with this?
>>>> >
>>>> > Thanks in advance
>>>> >
>>>> > Joe
>>>> >
>>>> > Stack trace:
>>>> >
>>>> > Exception in thread "main" 15/02/18 20:44:25 INFO
>>>> scheduler.TaskSetManager:
>>>> > Lost task 5.3 in stage 1.0 (TID 283) on executor:
>>>> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
>>>> > [duplicate 6]
>>>> > org.apache.spark.SparkException: Job aborted due to stage failure:
>>>> Task 2 in
>>>> > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>>>> 1.0:
>>>> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>>>> >         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
>>>> >         at
>>>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>>>> >         at
>>>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>>>> >         at
>>>> >
>>>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>>>> >         at
>>>> > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
>>>> >         at
>>>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
>>>> >         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>>>> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>>>
>>>
>>>
>>
>

Reply via email to