Re: HDFS or NFS as a cache?

2017-10-02 Thread Miguel Morales
See: https://github.com/rdblue/s3committer and
https://www.youtube.com/watch?v=8F2Jqw5_OnI&feature=youtu.be


On Mon, Oct 2, 2017 at 11:31 AM, Marcelo Vanzin  wrote:

> You don't need to collect data in the driver to save it. The code in
> the original question doesn't use "collect()", so it's actually doing
> a distributed write.
>
>
> On Mon, Oct 2, 2017 at 11:26 AM, JG Perrin  wrote:
> > Steve,
> >
> >
> >
> > If I refer to the collect() API, it says “Running collect requires moving
> > all the data into the application's driver process, and doing so on a
> very
> > large dataset can crash the driver process with OutOfMemoryError.” So why
> > would you need a distributed FS?
> >
> >
> >
> > jg
> >
> >
> >
> > From: Steve Loughran [mailto:ste...@hortonworks.com]
> > Sent: Saturday, September 30, 2017 6:10 AM
> > To: JG Perrin 
> > Cc: Alexander Czech ;
> user@spark.apache.org
> > Subject: Re: HDFS or NFS as a cache?
> >
> >
> >
> >
> >
> > On 29 Sep 2017, at 20:03, JG Perrin  wrote:
> >
> >
> >
> > You will collect in the driver (often the master) and it will save the
> data,
> > so for saving, you will not have to set up HDFS.
> >
> >
> >
> > no, it doesn't work quite like that.
> >
> >
> >
> > 1. workers generate their data and save somwhere
> >
> > 2. on "task commit" they move their data to some location where it will
> be
> > visible for "job commit" (rename, upload, whatever)
> >
> > 3. job commit —which is done in the driver,— takes all the committed task
> > data and makes it visible in the destination directory.
> >
> > 4. Then they create a _SUCCESS file to say "done!"
> >
> >
> >
> >
> >
> > This is done with Spark talking between workers and drivers to guarantee
> > that only one task working on a specific part of the data commits their
> > work, only
> >
> > committing the job once all tasks have finished
> >
> >
> >
> > The v1 mapreduce committer implements (2) by moving files under a job
> > attempt dir, and (3) by moving it from the job attempt dir to the
> > destination. one rename per task commit, another rename of every file on
> job
> > commit. In HFDS, Azure wasb and other stores with an O(1) atomic rename,
> > this isn't *too* expensve, though that final job commit rename still
> takes
> > time to list and move lots of files
> >
> >
> >
> > The v2 committer implements (2) by renaming to the destination directory
> and
> > (3) as a no-op. Rename in the tasks then, but not not that second,
> > serialized one at the end
> >
> >
> >
> > There's no copy of data from workers to driver, instead you need a shared
> > output filesystem so that the job committer can do its work alongside the
> > tasks.
> >
> >
> >
> > There are alternatives committer agorithms,
> >
> >
> >
> > 1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
> >
> > 2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code
> > (https://github.com/SparkTC/stocator/)
> >
> > 3. Ongoing work in Hadoop itself for better committers. Goal: year end &
> > Hadoop 3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The
> oode is
> > all there, Parquet is a troublespot, and more testing is welcome from
> anyone
> > who wants to help.
> >
> > 4. Databricks have "something"; specifics aren't covered, but I assume
> its
> > dynamo DB based
> >
> >
> >
> >
> >
> > -Steve
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
> > Sent: Friday, September 29, 2017 8:15 AM
> > To: user@spark.apache.org
> > Subject: HDFS or NFS as a cache?
> >
> >
> >
> > I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> > parquet files to S3. But the S3 performance for various reasons is bad
> when
> > I access s3 through the parquet write method:
> >
> > df.write.parquet('s3a://bucket/parquet')
> >
> > Now I want to setup a small cache for the parquet output. One output is
> > about 12-15 GB in size. Would it be enough to setup a NFS-directory on
> the
> > master, write the output to it and then move it to S3? Or should I setup
> a
> > HDFS on the Master? Or should I even opt for an additional cluster
> running a
> > HDFS solution on more than one node?
> >
> > thanks!
> >
> >
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: HDFS or NFS as a cache?

2017-10-02 Thread Marcelo Vanzin
You don't need to collect data in the driver to save it. The code in
the original question doesn't use "collect()", so it's actually doing
a distributed write.


On Mon, Oct 2, 2017 at 11:26 AM, JG Perrin  wrote:
> Steve,
>
>
>
> If I refer to the collect() API, it says “Running collect requires moving
> all the data into the application's driver process, and doing so on a very
> large dataset can crash the driver process with OutOfMemoryError.” So why
> would you need a distributed FS?
>
>
>
> jg
>
>
>
> From: Steve Loughran [mailto:ste...@hortonworks.com]
> Sent: Saturday, September 30, 2017 6:10 AM
> To: JG Perrin 
> Cc: Alexander Czech ; user@spark.apache.org
> Subject: Re: HDFS or NFS as a cache?
>
>
>
>
>
> On 29 Sep 2017, at 20:03, JG Perrin  wrote:
>
>
>
> You will collect in the driver (often the master) and it will save the data,
> so for saving, you will not have to set up HDFS.
>
>
>
> no, it doesn't work quite like that.
>
>
>
> 1. workers generate their data and save somwhere
>
> 2. on "task commit" they move their data to some location where it will be
> visible for "job commit" (rename, upload, whatever)
>
> 3. job commit —which is done in the driver,— takes all the committed task
> data and makes it visible in the destination directory.
>
> 4. Then they create a _SUCCESS file to say "done!"
>
>
>
>
>
> This is done with Spark talking between workers and drivers to guarantee
> that only one task working on a specific part of the data commits their
> work, only
>
> committing the job once all tasks have finished
>
>
>
> The v1 mapreduce committer implements (2) by moving files under a job
> attempt dir, and (3) by moving it from the job attempt dir to the
> destination. one rename per task commit, another rename of every file on job
> commit. In HFDS, Azure wasb and other stores with an O(1) atomic rename,
> this isn't *too* expensve, though that final job commit rename still takes
> time to list and move lots of files
>
>
>
> The v2 committer implements (2) by renaming to the destination directory and
> (3) as a no-op. Rename in the tasks then, but not not that second,
> serialized one at the end
>
>
>
> There's no copy of data from workers to driver, instead you need a shared
> output filesystem so that the job committer can do its work alongside the
> tasks.
>
>
>
> There are alternatives committer agorithms,
>
>
>
> 1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
>
> 2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code
> (https://github.com/SparkTC/stocator/)
>
> 3. Ongoing work in Hadoop itself for better committers. Goal: year end &
> Hadoop 3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The oode is
> all there, Parquet is a troublespot, and more testing is welcome from anyone
> who wants to help.
>
> 4. Databricks have "something"; specifics aren't covered, but I assume its
> dynamo DB based
>
>
>
>
>
> -Steve
>
>
>
>
>
>
>
>
>
>
>
>
>
> From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
> Sent: Friday, September 29, 2017 8:15 AM
> To: user@spark.apache.org
> Subject: HDFS or NFS as a cache?
>
>
>
> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> parquet files to S3. But the S3 performance for various reasons is bad when
> I access s3 through the parquet write method:
>
> df.write.parquet('s3a://bucket/parquet')
>
> Now I want to setup a small cache for the parquet output. One output is
> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
> master, write the output to it and then move it to S3? Or should I setup a
> HDFS on the Master? Or should I even opt for an additional cluster running a
> HDFS solution on more than one node?
>
> thanks!
>
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: HDFS or NFS as a cache?

2017-10-02 Thread JG Perrin
Steve,

If I refer to the collect() API, it says “Running collect requires moving all 
the data into the application's driver process, and doing so on a very large 
dataset can crash the driver process with OutOfMemoryError.” So why would you 
need a distributed FS?

jg

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Saturday, September 30, 2017 6:10 AM
To: JG Perrin 
Cc: Alexander Czech ; user@spark.apache.org
Subject: Re: HDFS or NFS as a cache?


On 29 Sep 2017, at 20:03, JG Perrin 
mailto:jper...@lumeris.com>> wrote:

You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

no, it doesn't work quite like that.

1. workers generate their data and save somwhere
2. on "task commit" they move their data to some location where it will be 
visible for "job commit" (rename, upload, whatever)
3. job commit —which is done in the driver,— takes all the committed task data 
and makes it visible in the destination directory.
4. Then they create a _SUCCESS file to say "done!"


This is done with Spark talking between workers and drivers to guarantee that 
only one task working on a specific part of the data commits their work, only
committing the job once all tasks have finished

The v1 mapreduce committer implements (2) by moving files under a job attempt 
dir, and (3) by moving it from the job attempt dir to the destination. one 
rename per task commit, another rename of every file on job commit. In HFDS, 
Azure wasb and other stores with an O(1) atomic rename, this isn't *too* 
expensve, though that final job commit rename still takes time to list and move 
lots of files

The v2 committer implements (2) by renaming to the destination directory and 
(3) as a no-op. Rename in the tasks then, but not not that second, serialized 
one at the end

There's no copy of data from workers to driver, instead you need a shared 
output filesystem so that the job committer can do its work alongside the tasks.

There are alternatives committer agorithms,

1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code 
(https://github.com/SparkTC/stocator/)
3. Ongoing work in Hadoop itself for better committers. Goal: year end & Hadoop 
3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The oode is all there, 
Parquet is a troublespot, and more testing is welcome from anyone who wants to 
help.
4. Databricks have "something"; specifics aren't covered, but I assume its 
dynamo DB based


-Steve







From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!



Re: HDFS or NFS as a cache?

2017-09-30 Thread Steve Loughran

On 29 Sep 2017, at 20:03, JG Perrin 
mailto:jper...@lumeris.com>> wrote:

You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

no, it doesn't work quite like that.

1. workers generate their data and save somwhere
2. on "task commit" they move their data to some location where it will be 
visible for "job commit" (rename, upload, whatever)
3. job commit —which is done in the driver,— takes all the committed task data 
and makes it visible in the destination directory.
4. Then they create a _SUCCESS file to say "done!"


This is done with Spark talking between workers and drivers to guarantee that 
only one task working on a specific part of the data commits their work, only
committing the job once all tasks have finished

The v1 mapreduce committer implements (2) by moving files under a job attempt 
dir, and (3) by moving it from the job attempt dir to the destination. one 
rename per task commit, another rename of every file on job commit. In HFDS, 
Azure wasb and other stores with an O(1) atomic rename, this isn't *too* 
expensve, though that final job commit rename still takes time to list and move 
lots of files

The v2 committer implements (2) by renaming to the destination directory and 
(3) as a no-op. Rename in the tasks then, but not not that second, serialized 
one at the end

There's no copy of data from workers to driver, instead you need a shared 
output filesystem so that the job committer can do its work alongside the tasks.

There are alternatives committer agorithms,

1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code 
(https://github.com/SparkTC/stocator/)
3. Ongoing work in Hadoop itself for better committers. Goal: year end & Hadoop 
3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The oode is all there, 
Parquet is a troublespot, and more testing is welcome from anyone who wants to 
help.
4. Databricks have "something"; specifics aren't covered, but I assume its 
dynamo DB based


-Steve






From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!



Re: HDFS or NFS as a cache?

2017-09-30 Thread Steve Loughran

On 29 Sep 2017, at 15:59, Alexander Czech 
mailto:alexander.cz...@googlemail.com>> wrote:

Yes I have identified the rename as the problem, that is why I think the extra 
bandwidth of the larger instances might not help. Also there is a consistency 
issue with S3 because of the how the rename works so that I probably lose data.

correct

rename is mimicked with a COPY + DELETE; copy is in S3 and your bandwidth 
appears to be 6-10 MB/s


On Fri, Sep 29, 2017 at 4:42 PM, Vadim Semenov 
mailto:vadim.seme...@datadoghq.com>> wrote:
How many files you produce? I believe it spends a lot of time on renaming the 
files because of the output committer.
Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they have 
10GbE and you can get good throughput for S3.

On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech 
mailto:alexander.cz...@googlemail.com>> wrote:
I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')

Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?

thanks!





RE: HDFS or NFS as a cache?

2017-09-29 Thread JG Perrin
You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!


Re: HDFS or NFS as a cache?

2017-09-29 Thread Alexander Czech
Yes I have identified the rename as the problem, that is why I think the
extra bandwidth of the larger instances might not help. Also there is a
consistency issue with S3 because of the how the rename works so that I
probably lose data.

On Fri, Sep 29, 2017 at 4:42 PM, Vadim Semenov 
wrote:

> How many files you produce? I believe it spends a lot of time on renaming
> the files because of the output committer.
> Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they
> have 10GbE and you can get good throughput for S3.
>
> On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <
> alexander.cz...@googlemail.com> wrote:
>
>> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
>> parquet files to S3. But the S3 performance for various reasons is bad when
>> I access s3 through the parquet write method:
>>
>> df.write.parquet('s3a://bucket/parquet')
>>
>> Now I want to setup a small cache for the parquet output. One output is
>> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
>> master, write the output to it and then move it to S3? Or should I setup a
>> HDFS on the Master? Or should I even opt for an additional cluster running
>> a HDFS solution on more than one node?
>>
>> thanks!
>>
>
>


Re: HDFS or NFS as a cache?

2017-09-29 Thread Vadim Semenov
How many files you produce? I believe it spends a lot of time on renaming
the files because of the output committer.
Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they
have 10GbE and you can get good throughput for S3.

On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> parquet files to S3. But the S3 performance for various reasons is bad when
> I access s3 through the parquet write method:
>
> df.write.parquet('s3a://bucket/parquet')
>
> Now I want to setup a small cache for the parquet output. One output is
> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
> master, write the output to it and then move it to S3? Or should I setup a
> HDFS on the Master? Or should I even opt for an additional cluster running
> a HDFS solution on more than one node?
>
> thanks!
>


HDFS or NFS as a cache?

2017-09-29 Thread Alexander Czech
I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
parquet files to S3. But the S3 performance for various reasons is bad when
I access s3 through the parquet write method:

df.write.parquet('s3a://bucket/parquet')

Now I want to setup a small cache for the parquet output. One output is
about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
master, write the output to it and then move it to S3? Or should I setup a
HDFS on the Master? Or should I even opt for an additional cluster running
a HDFS solution on more than one node?

thanks!