Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
On the advice of some recent discussions on this list, I thought I would
try and consume gz files directly. I'm reading them, doing a preliminary
map, then repartitioning, then doing normal spark things.

As I understand it, zip files aren't readable in partitions because of the
format, so I thought that repartitioning would be the next best thing for
parallelism. I have about 200 files, some about 1GB compressed and some
over 2GB uncompressed.

I'm hitting the 2GB maximum partition size. It's been discussed on this
list (topic: "2GB limit for partitions?", tickets SPARK-1476 and
SPARK-1391).  Stack trace at the end. This happened at 10 hours in
(probably when it saw its first file). I can't just re-run it quickly!

Does anyone have any advice? Might I solve this by re-partitioning as the
first step after reading the file(s)? Or is it effectively impossible to
read a gz file that expands to over 2GB? Does anyone have any experience
with this?

Thanks in advance

Joe

Stack trace:

Exception in thread "main" 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
Lost task 5.3 in stage 1.0 (TID 283) on executor:
java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
[duplicate 6]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Sean Owen
gzip and zip are not splittable compression formats; bzip and lzo are.
Ideally, use a splittable compression format.

Repartitioning is not a great solution since it means a shuffle, typically.

This is not necessarily related to how big your partitions are. The
question is, when does this happen? what operation?

On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass  wrote:
> On the advice of some recent discussions on this list, I thought I would try
> and consume gz files directly. I'm reading them, doing a preliminary map,
> then repartitioning, then doing normal spark things.
>
> As I understand it, zip files aren't readable in partitions because of the
> format, so I thought that repartitioning would be the next best thing for
> parallelism. I have about 200 files, some about 1GB compressed and some over
> 2GB uncompressed.
>
> I'm hitting the 2GB maximum partition size. It's been discussed on this list
> (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391).
> Stack trace at the end. This happened at 10 hours in (probably when it saw
> its first file). I can't just re-run it quickly!
>
> Does anyone have any advice? Might I solve this by re-partitioning as the
> first step after reading the file(s)? Or is it effectively impossible to
> read a gz file that expands to over 2GB? Does anyone have any experience
> with this?
>
> Thanks in advance
>
> Joe
>
> Stack trace:
>
> Exception in thread "main" 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
> Lost task 5.3 in stage 1.0 (TID 283) on executor:
> java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
> [duplicate 6]
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in
> stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0:
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
> at
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your reply Sean.

Looks like it's happening in a map:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at
NativeMethodAccessorImpl.java:-2)

That's my initial 'parse' stage, done before repartitioning. It reduces the
data size significantly so I thought it would be sensible to do before
repartitioning, which involves moving lots of data around. That might be a
stupid idea in hindsight!

So the obvious thing to try would be to try repartitioning before the map
as the first transformation. I would have done that if I could be sure that
it would succeed or fail quickly.

I'm not entirely clear about the lazy execution of transformations in DAG.
It could be that the error is manifesting during the mapToPair, but caused
by the earlier read from text file stage.

Thanks for pointers to those compression formats. I'll give them a go
(although it's not trivial to re-encode 200 GB of data on S3, so if I can
get this working reasonably with gzip I'd like to).

Any advice about whether this error can be worked round with an early
partition?

Cheers

Joe


On 19 February 2015 at 09:51, Sean Owen  wrote:

> gzip and zip are not splittable compression formats; bzip and lzo are.
> Ideally, use a splittable compression format.
>
> Repartitioning is not a great solution since it means a shuffle, typically.
>
> This is not necessarily related to how big your partitions are. The
> question is, when does this happen? what operation?
>
> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass  wrote:
> > On the advice of some recent discussions on this list, I thought I would
> try
> > and consume gz files directly. I'm reading them, doing a preliminary map,
> > then repartitioning, then doing normal spark things.
> >
> > As I understand it, zip files aren't readable in partitions because of
> the
> > format, so I thought that repartitioning would be the next best thing for
> > parallelism. I have about 200 files, some about 1GB compressed and some
> over
> > 2GB uncompressed.
> >
> > I'm hitting the 2GB maximum partition size. It's been discussed on this
> list
> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391).
> > Stack trace at the end. This happened at 10 hours in (probably when it
> saw
> > its first file). I can't just re-run it quickly!
> >
> > Does anyone have any advice? Might I solve this by re-partitioning as the
> > first step after reading the file(s)? Or is it effectively impossible to
> > read a gz file that expands to over 2GB? Does anyone have any experience
> > with this?
> >
> > Thanks in advance
> >
> > Joe
> >
> > Stack trace:
> >
> > Exception in thread "main" 15/02/18 20:44:25 INFO
> scheduler.TaskSetManager:
> > Lost task 5.3 in stage 1.0 (TID 283) on executor:
> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
> > [duplicate 6]
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 2 in
> > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
> 1.0:
> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
> > at
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
> > at
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
> > at
> > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
> > at
> > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
> > at
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
> > at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
Hi Joe,

The issue is not that you have input partitions that are bigger than 2GB --
its just that they are getting cached.  You can see in the stack trace, the
problem is when you try to read data out of the DiskStore:

org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

Also, just because you see this:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:
-2)

it doesn't *necessarily* mean that this is coming from your map.  It can be
pretty confusing how your operations on RDDs get turned into stages, it
could be a lot more than just your map.  and actually, it might not even be
your map at all -- some of the other operations you invoke call map
underneath the covers.  So its hard to say what is going on here w/ out
seeing more code.  Anyway, maybe you've already considered all this (you
did mention the lazy execution of the DAG), but I wanted to make sure.  it
might help to use rdd.setName() and also to look at rdd.toDebugString.

As far as what you can do about this -- it could be as simple as moving
your rdd.persist() to after you have compressed and repartitioned your
data.  eg., I'm blindly guessing you have something like this:

val rawData = sc.hadoopFile(...)
rawData.persist(DISK)
rawData.count()
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
...

change it to something like:

val rawData = sc.hadoopFile(...)
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
repartitionedData.persist(DISK)
repartitionedData.count()
...


The point is, you avoid caching any data until you have ensured that the
partitions are small.  You might have big partitions before that in
rawData, but that is OK.

Imran


On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass  wrote:

> Thanks for your reply Sean.
>
> Looks like it's happening in a map:
>
> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
> tasks from Stage 1 (MappedRDD[17] at mapToPair at
> NativeMethodAccessorImpl.java:-2)
>
> That's my initial 'parse' stage, done before repartitioning. It reduces
> the data size significantly so I thought it would be sensible to do before
> repartitioning, which involves moving lots of data around. That might be a
> stupid idea in hindsight!
>
> So the obvious thing to try would be to try repartitioning before the map
> as the first transformation. I would have done that if I could be sure that
> it would succeed or fail quickly.
>
> I'm not entirely clear about the lazy execution of transformations in DAG.
> It could be that the error is manifesting during the mapToPair, but caused
> by the earlier read from text file stage.
>
> Thanks for pointers to those compression formats. I'll give them a go
> (although it's not trivial to re-encode 200 GB of data on S3, so if I can
> get this working reasonably with gzip I'd like to).
>
> Any advice about whether this error can be worked round with an early
> partition?
>
> Cheers
>
> Joe
>
>
> On 19 February 2015 at 09:51, Sean Owen  wrote:
>
>> gzip and zip are not splittable compression formats; bzip and lzo are.
>> Ideally, use a splittable compression format.
>>
>> Repartitioning is not a great solution since it means a shuffle,
>> typically.
>>
>> This is not necessarily related to how big your partitions are. The
>> question is, when does this happen? what operation?
>>
>> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass  wrote:
>> > On the advice of some recent discussions on this list, I thought I
>> would try
>> > and consume gz files directly. I'm reading them, doing a preliminary
>> map,
>> > then repartitioning, then doing normal spark things.
>> >
>> > As I understand it, zip files aren't readable in partitions because of
>> the
>> > format, so I thought that repartitioning would be the next best thing
>> for
>> > parallelism. I have about 200 files, some about 1GB compressed and some
>> over
>> > 2GB uncompressed.
>> >
>> > I'm hitting the 2GB maximum partition size. It's been discussed on this
>> list
>> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391).
>> > Stack trace at the end. This happened at 10 hours in (probably when it
>> saw
>> > its first file). I can't just re-run it quickly!
>> >
>> > Does anyone have any advice? Might I solve this by re-partitioning as
>> the
>> > first step after reading the file(s)? Or is it effectively impossible to
>> > read a gz file that expands to over 2GB? Does anyone have any experience
>> > with this?
>> >
>> > Thanks in advance
>> >
>> > Joe
>> >
>> > Stack trace:
>> >
>> > Exception in thread "main" 15/02/18 20:44:25 INFO
>> scheduler.TaskSetManager:
>> > Lost task 5.3 in stage 1.0 (TID 283) on executor:
>> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
>> > [duplicate 6]
>> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 2 in
>> > stage 1.0 failed 4 

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your detailed reply Imran. I'm writing this in Clojure (using
Flambo which uses the Java API) but I don't think that's relevant. So
here's the pseudocode (sorry I've not written Scala for a long time):

val rawData = sc.hadoopFile("/dir/to/gzfiles") // NB multiple files.
val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
val filtered = parsedFiles.filter(notNil)
val partitioned = filtered.repartition(100) // guessed number
val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

val resultA = stuffA(persisted)
val resultB = stuffB(persisted)
val resultC = stuffC(persisted)

So, I think I'm already doing what you suggested. I would have assumed that
partition size would be («size of expanded file» / «number of partitions»).
In this case, 100 (which I picked out of the air).

I wonder whether the «size of expanded file» is actually the size of all
concatenated input files (probably about 800 GB)? In that case should I
multiply it by the number of files? Or perhaps I'm barking up completely
the wrong tree.

Joe




On 19 February 2015 at 14:44, Imran Rashid  wrote:

> Hi Joe,
>
> The issue is not that you have input partitions that are bigger than 2GB
> -- its just that they are getting cached.  You can see in the stack trace,
> the problem is when you try to read data out of the DiskStore:
>
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>
> Also, just because you see this:
>
> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
> tasks from Stage 1 (MappedRDD[17] at mapToPair at
> NativeMethodAccessorImpl.java:-2)
>
> it doesn't *necessarily* mean that this is coming from your map.  It can
> be pretty confusing how your operations on RDDs get turned into stages, it
> could be a lot more than just your map.  and actually, it might not even be
> your map at all -- some of the other operations you invoke call map
> underneath the covers.  So its hard to say what is going on here w/ out
> seeing more code.  Anyway, maybe you've already considered all this (you
> did mention the lazy execution of the DAG), but I wanted to make sure.  it
> might help to use rdd.setName() and also to look at rdd.toDebugString.
>
> As far as what you can do about this -- it could be as simple as moving
> your rdd.persist() to after you have compressed and repartitioned your
> data.  eg., I'm blindly guessing you have something like this:
>
> val rawData = sc.hadoopFile(...)
> rawData.persist(DISK)
> rawData.count()
> val compressedData = rawData.map{...}
> val repartitionedData = compressedData.repartition(N)
> ...
>
> change it to something like:
>
> val rawData = sc.hadoopFile(...)
> val compressedData = rawData.map{...}
> val repartitionedData = compressedData.repartition(N)
> repartitionedData.persist(DISK)
> repartitionedData.count()
> ...
>
>
> The point is, you avoid caching any data until you have ensured that the
> partitions are small.  You might have big partitions before that in
> rawData, but that is OK.
>
> Imran
>
>
> On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass  wrote:
>
>> Thanks for your reply Sean.
>>
>> Looks like it's happening in a map:
>>
>> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
>> tasks from Stage 1 (MappedRDD[17] at mapToPair at
>> NativeMethodAccessorImpl.java:-2)
>>
>> That's my initial 'parse' stage, done before repartitioning. It reduces
>> the data size significantly so I thought it would be sensible to do before
>> repartitioning, which involves moving lots of data around. That might be a
>> stupid idea in hindsight!
>>
>> So the obvious thing to try would be to try repartitioning before the map
>> as the first transformation. I would have done that if I could be sure that
>> it would succeed or fail quickly.
>>
>> I'm not entirely clear about the lazy execution of transformations in
>> DAG. It could be that the error is manifesting during the mapToPair, but
>> caused by the earlier read from text file stage.
>>
>> Thanks for pointers to those compression formats. I'll give them a go
>> (although it's not trivial to re-encode 200 GB of data on S3, so if I can
>> get this working reasonably with gzip I'd like to).
>>
>> Any advice about whether this error can be worked round with an early
>> partition?
>>
>> Cheers
>>
>> Joe
>>
>>
>> On 19 February 2015 at 09:51, Sean Owen  wrote:
>>
>>> gzip and zip are not splittable compression formats; bzip and lzo are.
>>> Ideally, use a splittable compression format.
>>>
>>> Repartitioning is not a great solution since it means a shuffle,
>>> typically.
>>>
>>> This is not necessarily related to how big your partitions are. The
>>> question is, when does this happen? what operation?
>>>
>>> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass  wrote:
>>> > On the advice of some recent discussions on this list, I thought I
>>> would try
>>> > and consume gz files directly. I'm reading them, doing a preliminary
>>> map,
>>> > then repartitioning, then doing normal spark

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
oh, I think you are just choosing a number that is too small for your
number of partitions.  All of the data in "/dir/to/gzfiles" is going to be
sucked into one RDD, with the data divided into partitions.  So if you're
parsing 200 files, each about 2 GB, and then repartitioning down to 100
partitions, you would expect 4 GB per partition.  Though you're filtering
the data down some, there may also be some bloat from from your parsed
objects.  Also if you're not using kryo for serialization, I'd strongly
recommend that over the default serialization, and try to register all your
classes.

I think you can get some information about how much data is in your RDDs
from the UI -- but it might depend on what version you are running of
spark, plus I think the info isn't saved on failed stages, so you might
just need to monitor it in the UI as its happening (I am not 100% sure
about that ...)

So I'd suggest (a) using a lot more partitions (maybe 1k, given your data
size) (b) turn on kryo if you haven't already.



On Thu, Feb 19, 2015 at 9:36 AM, Joe Wass  wrote:

> Thanks for your detailed reply Imran. I'm writing this in Clojure (using
> Flambo which uses the Java API) but I don't think that's relevant. So
> here's the pseudocode (sorry I've not written Scala for a long time):
>
> val rawData = sc.hadoopFile("/dir/to/gzfiles") // NB multiple files.
> val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
> val filtered = parsedFiles.filter(notNil)
> val partitioned = filtered.repartition(100) // guessed number
> val persisted = partitioned.persist(StorageLevels.DISK_ONLY)
>
> val resultA = stuffA(persisted)
> val resultB = stuffB(persisted)
> val resultC = stuffC(persisted)
>
> So, I think I'm already doing what you suggested. I would have assumed
> that partition size would be («size of expanded file» / «number of
> partitions»). In this case, 100 (which I picked out of the air).
>
> I wonder whether the «size of expanded file» is actually the size of all
> concatenated input files (probably about 800 GB)? In that case should I
> multiply it by the number of files? Or perhaps I'm barking up completely
> the wrong tree.
>
> Joe
>
>
>
>
> On 19 February 2015 at 14:44, Imran Rashid  wrote:
>
>> Hi Joe,
>>
>> The issue is not that you have input partitions that are bigger than 2GB
>> -- its just that they are getting cached.  You can see in the stack trace,
>> the problem is when you try to read data out of the DiskStore:
>>
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>>
>> Also, just because you see this:
>>
>> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
>> tasks from Stage 1 (MappedRDD[17] at mapToPair at
>> NativeMethodAccessorImpl.java:-2)
>>
>> it doesn't *necessarily* mean that this is coming from your map.  It can
>> be pretty confusing how your operations on RDDs get turned into stages, it
>> could be a lot more than just your map.  and actually, it might not even be
>> your map at all -- some of the other operations you invoke call map
>> underneath the covers.  So its hard to say what is going on here w/ out
>> seeing more code.  Anyway, maybe you've already considered all this (you
>> did mention the lazy execution of the DAG), but I wanted to make sure.  it
>> might help to use rdd.setName() and also to look at rdd.toDebugString.
>>
>> As far as what you can do about this -- it could be as simple as moving
>> your rdd.persist() to after you have compressed and repartitioned your
>> data.  eg., I'm blindly guessing you have something like this:
>>
>> val rawData = sc.hadoopFile(...)
>> rawData.persist(DISK)
>> rawData.count()
>> val compressedData = rawData.map{...}
>> val repartitionedData = compressedData.repartition(N)
>> ...
>>
>> change it to something like:
>>
>> val rawData = sc.hadoopFile(...)
>> val compressedData = rawData.map{...}
>> val repartitionedData = compressedData.repartition(N)
>> repartitionedData.persist(DISK)
>> repartitionedData.count()
>> ...
>>
>>
>> The point is, you avoid caching any data until you have ensured that the
>> partitions are small.  You might have big partitions before that in
>> rawData, but that is OK.
>>
>> Imran
>>
>>
>> On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass  wrote:
>>
>>> Thanks for your reply Sean.
>>>
>>> Looks like it's happening in a map:
>>>
>>> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
>>> tasks from Stage 1 (MappedRDD[17] at mapToPair at
>>> NativeMethodAccessorImpl.java:-2)
>>>
>>> That's my initial 'parse' stage, done before repartitioning. It reduces
>>> the data size significantly so I thought it would be sensible to do before
>>> repartitioning, which involves moving lots of data around. That might be a
>>> stupid idea in hindsight!
>>>
>>> So the obvious thing to try would be to try repartitioning before the
>>> map as the first transformation. I would have done that if I could be sure
>>> that it would succeed or fail quickly.
>>>
>>> I'm not en