Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your reply Sean.

Looks like it's happening in a map:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at
NativeMethodAccessorImpl.java:-2)

That's my initial 'parse' stage, done before repartitioning. It reduces the
data size significantly so I thought it would be sensible to do before
repartitioning, which involves moving lots of data around. That might be a
stupid idea in hindsight!

So the obvious thing to try would be to try repartitioning before the map
as the first transformation. I would have done that if I could be sure that
it would succeed or fail quickly.

I'm not entirely clear about the lazy execution of transformations in DAG.
It could be that the error is manifesting during the mapToPair, but caused
by the earlier read from text file stage.

Thanks for pointers to those compression formats. I'll give them a go
(although it's not trivial to re-encode 200 GB of data on S3, so if I can
get this working reasonably with gzip I'd like to).

Any advice about whether this error can be worked round with an early
partition?

Cheers

Joe


On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle, typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I would
 try
  and consume gz files directly. I'm reading them, doing a preliminary map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, so I thought that repartitioning would be the next best thing for
  parallelism. I have about 200 files, some about 1GB compressed and some
 over
  2GB uncompressed.
 
  I'm hitting the 2GB maximum partition size. It's been discussed on this
 list
  (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
  Stack trace at the end. This happened at 10 hours in (probably when it
 saw
  its first file). I can't just re-run it quickly!
 
  Does anyone have any advice? Might I solve this by re-partitioning as the
  first step after reading the file(s)? Or is it effectively impossible to
  read a gz file that expands to over 2GB? Does anyone have any experience
  with this?
 
  Thanks in advance
 
  Joe
 
  Stack trace:
 
  Exception in thread main 15/02/18 20:44:25 INFO
 scheduler.TaskSetManager:
  Lost task 5.3 in stage 1.0 (TID 283) on executor:
  java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
  [duplicate 6]
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2 in
  stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
 1.0:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
  org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
  org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
  at
 org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
  at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)



Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
On the advice of some recent discussions on this list, I thought I would
try and consume gz files directly. I'm reading them, doing a preliminary
map, then repartitioning, then doing normal spark things.

As I understand it, zip files aren't readable in partitions because of the
format, so I thought that repartitioning would be the next best thing for
parallelism. I have about 200 files, some about 1GB compressed and some
over 2GB uncompressed.

I'm hitting the 2GB maximum partition size. It's been discussed on this
list (topic: 2GB limit for partitions?, tickets SPARK-1476 and
SPARK-1391).  Stack trace at the end. This happened at 10 hours in
(probably when it saw its first file). I can't just re-run it quickly!

Does anyone have any advice? Might I solve this by re-partitioning as the
first step after reading the file(s)? Or is it effectively impossible to
read a gz file that expands to over 2GB? Does anyone have any experience
with this?

Thanks in advance

Joe

Stack trace:

Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
Lost task 5.3 in stage 1.0 (TID 283) on executor:
java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
[duplicate 6]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Sean Owen
gzip and zip are not splittable compression formats; bzip and lzo are.
Ideally, use a splittable compression format.

Repartitioning is not a great solution since it means a shuffle, typically.

This is not necessarily related to how big your partitions are. The
question is, when does this happen? what operation?

On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
 On the advice of some recent discussions on this list, I thought I would try
 and consume gz files directly. I'm reading them, doing a preliminary map,
 then repartitioning, then doing normal spark things.

 As I understand it, zip files aren't readable in partitions because of the
 format, so I thought that repartitioning would be the next best thing for
 parallelism. I have about 200 files, some about 1GB compressed and some over
 2GB uncompressed.

 I'm hitting the 2GB maximum partition size. It's been discussed on this list
 (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
 Stack trace at the end. This happened at 10 hours in (probably when it saw
 its first file). I can't just re-run it quickly!

 Does anyone have any advice? Might I solve this by re-partitioning as the
 first step after reading the file(s)? Or is it effectively impossible to
 read a gz file that expands to over 2GB? Does anyone have any experience
 with this?

 Thanks in advance

 Joe

 Stack trace:

 Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
 Lost task 5.3 in stage 1.0 (TID 283) on executor:
 java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
 [duplicate 6]
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in
 stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your detailed reply Imran. I'm writing this in Clojure (using
Flambo which uses the Java API) but I don't think that's relevant. So
here's the pseudocode (sorry I've not written Scala for a long time):

val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files.
val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
val filtered = parsedFiles.filter(notNil)
val partitioned = filtered.repartition(100) // guessed number
val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

val resultA = stuffA(persisted)
val resultB = stuffB(persisted)
val resultC = stuffC(persisted)

So, I think I'm already doing what you suggested. I would have assumed that
partition size would be («size of expanded file» / «number of partitions»).
In this case, 100 (which I picked out of the air).

I wonder whether the «size of expanded file» is actually the size of all
concatenated input files (probably about 800 GB)? In that case should I
multiply it by the number of files? Or perhaps I'm barking up completely
the wrong tree.

Joe




On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote:

 Hi Joe,

 The issue is not that you have input partitions that are bigger than 2GB
 -- its just that they are getting cached.  You can see in the stack trace,
 the problem is when you try to read data out of the DiskStore:

 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

 Also, just because you see this:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 it doesn't *necessarily* mean that this is coming from your map.  It can
 be pretty confusing how your operations on RDDs get turned into stages, it
 could be a lot more than just your map.  and actually, it might not even be
 your map at all -- some of the other operations you invoke call map
 underneath the covers.  So its hard to say what is going on here w/ out
 seeing more code.  Anyway, maybe you've already considered all this (you
 did mention the lazy execution of the DAG), but I wanted to make sure.  it
 might help to use rdd.setName() and also to look at rdd.toDebugString.

 As far as what you can do about this -- it could be as simple as moving
 your rdd.persist() to after you have compressed and repartitioned your
 data.  eg., I'm blindly guessing you have something like this:

 val rawData = sc.hadoopFile(...)
 rawData.persist(DISK)
 rawData.count()
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 ...

 change it to something like:

 val rawData = sc.hadoopFile(...)
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 repartitionedData.persist(DISK)
 repartitionedData.count()
 ...


 The point is, you avoid caching any data until you have ensured that the
 partitions are small.  You might have big partitions before that in
 rawData, but that is OK.

 Imran


 On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the map
 as the first transformation. I would have done that if I could be sure that
 it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in
 DAG. It could be that the error is manifesting during the mapToPair, but
 caused by the earlier read from text file stage.

 Thanks for pointers to those compression formats. I'll give them a go
 (although it's not trivial to re-encode 200 GB of data on S3, so if I can
 get this working reasonably with gzip I'd like to).

 Any advice about whether this error can be worked round with an early
 partition?

 Cheers

 Joe


 On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle,
 typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I
 would try
  and consume gz files directly. I'm reading them, doing a preliminary
 map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, 

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
Hi Joe,

The issue is not that you have input partitions that are bigger than 2GB --
its just that they are getting cached.  You can see in the stack trace, the
problem is when you try to read data out of the DiskStore:

org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

Also, just because you see this:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:
-2)

it doesn't *necessarily* mean that this is coming from your map.  It can be
pretty confusing how your operations on RDDs get turned into stages, it
could be a lot more than just your map.  and actually, it might not even be
your map at all -- some of the other operations you invoke call map
underneath the covers.  So its hard to say what is going on here w/ out
seeing more code.  Anyway, maybe you've already considered all this (you
did mention the lazy execution of the DAG), but I wanted to make sure.  it
might help to use rdd.setName() and also to look at rdd.toDebugString.

As far as what you can do about this -- it could be as simple as moving
your rdd.persist() to after you have compressed and repartitioned your
data.  eg., I'm blindly guessing you have something like this:

val rawData = sc.hadoopFile(...)
rawData.persist(DISK)
rawData.count()
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
...

change it to something like:

val rawData = sc.hadoopFile(...)
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
repartitionedData.persist(DISK)
repartitionedData.count()
...


The point is, you avoid caching any data until you have ensured that the
partitions are small.  You might have big partitions before that in
rawData, but that is OK.

Imran


On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the map
 as the first transformation. I would have done that if I could be sure that
 it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in DAG.
 It could be that the error is manifesting during the mapToPair, but caused
 by the earlier read from text file stage.

 Thanks for pointers to those compression formats. I'll give them a go
 (although it's not trivial to re-encode 200 GB of data on S3, so if I can
 get this working reasonably with gzip I'd like to).

 Any advice about whether this error can be worked round with an early
 partition?

 Cheers

 Joe


 On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle,
 typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I
 would try
  and consume gz files directly. I'm reading them, doing a preliminary
 map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, so I thought that repartitioning would be the next best thing
 for
  parallelism. I have about 200 files, some about 1GB compressed and some
 over
  2GB uncompressed.
 
  I'm hitting the 2GB maximum partition size. It's been discussed on this
 list
  (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
  Stack trace at the end. This happened at 10 hours in (probably when it
 saw
  its first file). I can't just re-run it quickly!
 
  Does anyone have any advice? Might I solve this by re-partitioning as
 the
  first step after reading the file(s)? Or is it effectively impossible to
  read a gz file that expands to over 2GB? Does anyone have any experience
  with this?
 
  Thanks in advance
 
  Joe
 
  Stack trace:
 
  Exception in thread main 15/02/18 20:44:25 INFO
 scheduler.TaskSetManager:
  Lost task 5.3 in stage 1.0 (TID 283) on executor:
  java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
  [duplicate 6]
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2 in
  stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
 1.0:
  java.lang.IllegalArgumentException: Size exceeds 

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
oh, I think you are just choosing a number that is too small for your
number of partitions.  All of the data in /dir/to/gzfiles is going to be
sucked into one RDD, with the data divided into partitions.  So if you're
parsing 200 files, each about 2 GB, and then repartitioning down to 100
partitions, you would expect 4 GB per partition.  Though you're filtering
the data down some, there may also be some bloat from from your parsed
objects.  Also if you're not using kryo for serialization, I'd strongly
recommend that over the default serialization, and try to register all your
classes.

I think you can get some information about how much data is in your RDDs
from the UI -- but it might depend on what version you are running of
spark, plus I think the info isn't saved on failed stages, so you might
just need to monitor it in the UI as its happening (I am not 100% sure
about that ...)

So I'd suggest (a) using a lot more partitions (maybe 1k, given your data
size) (b) turn on kryo if you haven't already.



On Thu, Feb 19, 2015 at 9:36 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your detailed reply Imran. I'm writing this in Clojure (using
 Flambo which uses the Java API) but I don't think that's relevant. So
 here's the pseudocode (sorry I've not written Scala for a long time):

 val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files.
 val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
 val filtered = parsedFiles.filter(notNil)
 val partitioned = filtered.repartition(100) // guessed number
 val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

 val resultA = stuffA(persisted)
 val resultB = stuffB(persisted)
 val resultC = stuffC(persisted)

 So, I think I'm already doing what you suggested. I would have assumed
 that partition size would be («size of expanded file» / «number of
 partitions»). In this case, 100 (which I picked out of the air).

 I wonder whether the «size of expanded file» is actually the size of all
 concatenated input files (probably about 800 GB)? In that case should I
 multiply it by the number of files? Or perhaps I'm barking up completely
 the wrong tree.

 Joe




 On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote:

 Hi Joe,

 The issue is not that you have input partitions that are bigger than 2GB
 -- its just that they are getting cached.  You can see in the stack trace,
 the problem is when you try to read data out of the DiskStore:

 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

 Also, just because you see this:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 it doesn't *necessarily* mean that this is coming from your map.  It can
 be pretty confusing how your operations on RDDs get turned into stages, it
 could be a lot more than just your map.  and actually, it might not even be
 your map at all -- some of the other operations you invoke call map
 underneath the covers.  So its hard to say what is going on here w/ out
 seeing more code.  Anyway, maybe you've already considered all this (you
 did mention the lazy execution of the DAG), but I wanted to make sure.  it
 might help to use rdd.setName() and also to look at rdd.toDebugString.

 As far as what you can do about this -- it could be as simple as moving
 your rdd.persist() to after you have compressed and repartitioned your
 data.  eg., I'm blindly guessing you have something like this:

 val rawData = sc.hadoopFile(...)
 rawData.persist(DISK)
 rawData.count()
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 ...

 change it to something like:

 val rawData = sc.hadoopFile(...)
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 repartitionedData.persist(DISK)
 repartitionedData.count()
 ...


 The point is, you avoid caching any data until you have ensured that the
 partitions are small.  You might have big partitions before that in
 rawData, but that is OK.

 Imran


 On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the
 map as the first transformation. I would have done that if I could be sure
 that it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in
 DAG. It could be that the error is manifesting during the mapToPair, but