Question on distribution of classes and jobs

2009-04-03 Thread Foss User
If I have written a WordCount.java job in this manner:

conf.setMapperClass(Map.class);
conf.setCombinerClass(Combine.class);
conf.setReducerClass(Reduce.class);

So, you can see that three classes are being used here.  I have
packaged these classes into a jar file called wc.jar and I run it like
this:

$ bin/hadoop jar wc.jar WordCountJob

1) I want to know when the job runs in a 5 machine cluster, is the
whole JAR file distributed across the 5 machines or the individual
class files are distributed individually?

2) Also, let us say the number of reducers are 2 while the number of
mappers are 5. What happens in this case? How are the class files or
jar files distributed?

3) Are they distributed via RPC or HTTP?


Re: How many people is using Hadoop Streaming ?

2009-04-03 Thread Tim Wintle
On Fri, 2009-04-03 at 09:42 -0700, Ricky Ho wrote:
>   1) I can pick the language that offers a different programming
> paradigm (e.g. I may choose functional language, or logic programming
> if they suit the problem better).  In fact, I can even chosen Erlang
> at the map() and Prolog at the reduce().  Mix and match can optimize
> me more.

Agreed (as someone who has written mappers/reducers in Python, perl,
shell script and Scheme before).



why SequenceFile cannot run without native GZipCodec?

2009-04-03 Thread Zheng Shao
I guess the performance will be bad, but we should still be able to read/write 
the file. Correct?

Why do we throw an Exception?

Zheng



RE: Hadoop/HDFS for scientific simulation output data analysis

2009-04-03 Thread Tu, Tiankai
Thanks for the heads-up, Owen. Do you know how long it took to run the
application? And how many files were processed? I am particularly eager
to know the answer to the second question.

I found an article at http://developer.yahoo.net/blogs/hadoop/2008/09/,
where the total number of cores used was over 30,000. The number of
files in that benchmark run was 14,000. The reported average throughput
for read was 18MB/s on 500 nodes and 66MB/s on 4000 nodes. It was
explained in the article (underneath Table 1) that:

"The 4000-node cluster throughput was 7 times better than 500's
for writes and 3.6 times better for reads even though the bigger cluster
carried more (4 v/s 2 tasks) per node load than the smaller one."

Is 66MB/s the aggregated read throughput or per-node throughput? If the
latter were the case, the aggregated bandwidth would have been 4000 x
66MB/s = 264 GB/s, and the speedup on 4000 nodes over 500 nodes should
have been (66/18) * (4000/500) = 28.8. 


-Original Message-
From: Owen O'Malley [mailto:omal...@apache.org] 
Sent: Friday, April 03, 2009 5:20 PM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop/HDFS for scientific simulation output data analysis

On Apr 3, 2009, at 1:41 PM, Tu, Tiankai wrote:

> By the way, what is the largest size---in terms of total bytes, number
> of files, and number of nodes---in your applications? Thanks.

The largest Hadoop application that has been documented is the Yahoo  
Webmap.

10,000 cores
500 TB shuffle
300 TB compressed final output

http://developer.yahoo.net/blogs/hadoop/2008/02/yahoo-worlds-largest-pro
duction-hadoop.html

-- Owen


Re: Hadoop/HDFS for scientific simulation output data analysis

2009-04-03 Thread Owen O'Malley

On Apr 3, 2009, at 1:41 PM, Tu, Tiankai wrote:


By the way, what is the largest size---in terms of total bytes, number
of files, and number of nodes---in your applications? Thanks.


The largest Hadoop application that has been documented is the Yahoo  
Webmap.


10,000 cores
500 TB shuffle
300 TB compressed final output

http://developer.yahoo.net/blogs/hadoop/2008/02/yahoo-worlds-largest-production-hadoop.html

-- Owen


RE: Hadoop/HDFS for scientific simulation output data analysis

2009-04-03 Thread Tu, Tiankai
Thanks for the update and suggestion, Matei. 

I can certainly construct an input text file containing all the files of
a dataset
(http://hadoop.apache.org/core/docs/r0.19.1/streaming.html#How+do+I+proc
ess+files%2C+one+per+map%3F), then let the jobtracker dispatch the file
names to the maps, and open the files directly from within the map
method. But the jobtracker merely treats the file names as text input
and does not make an effort to assign a file (name) to the nodes that
store the file. As a result, a node opening a file is almost certain to
request data from a different data node---which destroys IO locality
(the very strength of Hadoop) and results in worse performance. (I had
verified such behavior earlier using Hadoop streaming.)

By the way, what is the largest size---in terms of total bytes, number
of files, and number of nodes---in your applications? Thanks.


-Original Message-
From: Matei Zaharia [mailto:ma...@cloudera.com] 
Sent: Friday, April 03, 2009 1:18 PM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop/HDFS for scientific simulation output data analysis

Hadoop does checksums for each small chunk of the file (512 bytes by
default) and stores a list of checksums for each chunk with the file,
rather
than storing just one checksum at the end. This makes it easier to read
only
a part of a file and know that it's not corrupt without having to read
and
checksum the whole file. It also lets you use smaller / simpler
checksums
for each chunk, making them more efficient to compute than the larger
checksum that would be needed to provide the same level of safety for
the
whole file.

The default buffer size is confusingly not 64 KB, it's 4 KB. It really
is
bad for performance as you saw. But I'd recommend trying 64 or 128 KB
before
jumping to 64 MB. 128K is also the setting Yahoo used in its 2000-node
performance tests (see http://wiki.apache.org/hadoop/FAQ).

The reason big buffers may impair cache locality is that CPU caches are
typically a few MB. If you set your checksum size and buffer size to 64
MB,
then whenever you read a block, the CPU first has to checksum 64 MB
worth of
data then start again at the beginning to read it and pass it through
your
application. During the checksumming process, the first pages of data
fell
out of CPU cache as you checksummed the later ones. Therefore, you have
to
read them from memory again during the second scan. If you just had a 64
KB
block, it would stay in cache since the first time you read it. The same
issue happens if instead of checksumming you were copying from one
buffer to
another (which does happen at a few places in Hadoop, and they tend to
use
io.file.buffer.size). So while I haven't tried measuring performance
with 64
MB vs 128 KB, I wouldn't be surprised if it leads to bad behavior,
because
it's much higher than what anyone runs in production.

Finally, if you just want to sequentially process a file on each node
and
you only want one logical "input record" per map, it might be better not
to
use an input format that reads the record into memory at all. Instead,
you
can have the map directly open the file, and have your InputFormat just
locate the map on the right node. This avoids copying the whole file
into
memory before streaming it through your mapper. If your algorithm does
require random access throughout the file on the other hand, you do need
to
read it all in. I think the WholeFileRecordReader in the FAQ is aimed at
smaller files than 256 MB / 1 GB.

On Fri, Apr 3, 2009 at 9:37 AM, Tu, Tiankai
wrote:

> Thanks for the comments, Matei.
>
> The machines I ran the experiments have 16 GB memory each. I don't see
> how 64 MB buffer could be huge or is bad for memory consumption. In
> fact, I set it to much larger value after initial rounds of tests
showed
> abysmal results using the default 64 KB buffer. Also, why is it
> necessary to compute checksum for every 512 bytes why only an
end-to-end
> (whole file) checksum makes sense? I set it to a much larger value to
> avoid the overhead.
>
> I didn't quite understand what you meant by bad for cache locality.
The
> jobs were IO bound in the first place. Any cache effect came
second---at
> least an order of magnitude negligible. Can you clarify which
particular
> computation (maybe within Hadoop) that was made slow because of a
large
> io buffer size?
>
> What bothered you was exactly what bothered me and prompted me to ask
> the question why the job tracker reported much more bytes read by the
> map task. I can confirm that the experiments were set up correctly. In
> fact, the numbers of map tasks were correctly reported by the job
> tracker. There were 1600 for the 1GB file dataset, 6400 for the 256MB
> file dataset, and so forth.
>
> Tiankai
>
>
>
> -Original Message-
> From: Matei Zaharia [mailto:ma...@cloudera.com]
> Sent: Friday, April 03, 2009 11:21 AM
> To: core-user@hadoop.apache.org
> Subject: Re: Hadoop/HDFS for scientific simulation output data
analysis
>
> Hi 

Re: How many people is using Hadoop Streaming ?

2009-04-03 Thread Owen O'Malley


On Apr 3, 2009, at 10:35 AM, Ricky Ho wrote:

I assume that the key is still sorted, right ?  That mean I will get  
all the "key1, valueX" entries before getting any of the "key2  
valueY" entries and key2 is always bigger than key1.


Yes.

-- Owen


RE: How many people is using Hadoop Streaming ?

2009-04-03 Thread Ricky Ho
Owen, thanks for your elaboration, the data point is very useful.

On your point ...

In java you get
  key1, (value1, value2, ...)
  key2, (value3, ...)
in streaming you get
  key1 value1
  key1 value2
  key2 value3
and your application needs to detect the key changes.
=

I assume that the key is still sorted, right ?  That mean I will get all the 
"key1, valueX" entries before getting any of the "key2 valueY" entries and key2 
is always bigger than key1.

Is this correct ?

Rgds,
Ricky


-Original Message-
From: Owen O'Malley [mailto:omal...@apache.org] 
Sent: Friday, April 03, 2009 8:59 AM
To: core-user@hadoop.apache.org
Subject: Re: How many people is using Hadoop Streaming ?


On Apr 3, 2009, at 9:42 AM, Ricky Ho wrote:

> Has anyone benchmark the performance difference of using Hadoop ?
>  1) Java vs C++
>  2) Java vs Streaming

Yes, a while ago. When I tested it using sort, Java and C++ were  
roughly equal and streaming was 10-20% slower. Most of the cost with  
streaming came from the stringification.

>  1) I can pick the language that offers a different programming  
> paradigm (e.g. I may choose functional language, or logic  
> programming if they suit the problem better).  In fact, I can even  
> chosen Erlang at the map() and Prolog at the reduce().  Mix and  
> match can optimize me more.
>  2) I can pick the language that I am familiar with, or one that I  
> like.
>  3) Easy to switch to another language in a fine-grain incremental  
> way if I choose to do so in future.

Additionally, the interface to streaming is very stable. *smile* It  
also supports legacy applications well.

The downsides are that:
   1. The interface is very thin and has minimal functionality.
   2. Streaming combiners don't work very well. Many streaming  
applications buffer in the map
   and run the combiner internally.
   3. Streaming doesn't group the values in the reducer. In Java or C+ 
+, you get:
  key1, (value1, value2, ...)
  key2, (value3, ...)
   in streaming you get
  key1 value1
  key1 value2
  key2 value3
   and your application needs to detect the key changes.
   4. Binary data support has only recently been added to streaming.

> Am I missing something here ?  or is the majority of Hadoop  
> applications written in Hadoop Streaming ?

On Yahoo's research clusters, typically 1/3 of the applications are  
streaming, 1/3 pig, and 1/3 java.

-- Owen


Re: Hadoop/HDFS for scientific simulation output data analysis

2009-04-03 Thread Matei Zaharia
Hadoop does checksums for each small chunk of the file (512 bytes by
default) and stores a list of checksums for each chunk with the file, rather
than storing just one checksum at the end. This makes it easier to read only
a part of a file and know that it's not corrupt without having to read and
checksum the whole file. It also lets you use smaller / simpler checksums
for each chunk, making them more efficient to compute than the larger
checksum that would be needed to provide the same level of safety for the
whole file.

The default buffer size is confusingly not 64 KB, it's 4 KB. It really is
bad for performance as you saw. But I'd recommend trying 64 or 128 KB before
jumping to 64 MB. 128K is also the setting Yahoo used in its 2000-node
performance tests (see http://wiki.apache.org/hadoop/FAQ).

The reason big buffers may impair cache locality is that CPU caches are
typically a few MB. If you set your checksum size and buffer size to 64 MB,
then whenever you read a block, the CPU first has to checksum 64 MB worth of
data then start again at the beginning to read it and pass it through your
application. During the checksumming process, the first pages of data fell
out of CPU cache as you checksummed the later ones. Therefore, you have to
read them from memory again during the second scan. If you just had a 64 KB
block, it would stay in cache since the first time you read it. The same
issue happens if instead of checksumming you were copying from one buffer to
another (which does happen at a few places in Hadoop, and they tend to use
io.file.buffer.size). So while I haven't tried measuring performance with 64
MB vs 128 KB, I wouldn't be surprised if it leads to bad behavior, because
it's much higher than what anyone runs in production.

Finally, if you just want to sequentially process a file on each node and
you only want one logical "input record" per map, it might be better not to
use an input format that reads the record into memory at all. Instead, you
can have the map directly open the file, and have your InputFormat just
locate the map on the right node. This avoids copying the whole file into
memory before streaming it through your mapper. If your algorithm does
require random access throughout the file on the other hand, you do need to
read it all in. I think the WholeFileRecordReader in the FAQ is aimed at
smaller files than 256 MB / 1 GB.

On Fri, Apr 3, 2009 at 9:37 AM, Tu, Tiankai
wrote:

> Thanks for the comments, Matei.
>
> The machines I ran the experiments have 16 GB memory each. I don't see
> how 64 MB buffer could be huge or is bad for memory consumption. In
> fact, I set it to much larger value after initial rounds of tests showed
> abysmal results using the default 64 KB buffer. Also, why is it
> necessary to compute checksum for every 512 bytes why only an end-to-end
> (whole file) checksum makes sense? I set it to a much larger value to
> avoid the overhead.
>
> I didn't quite understand what you meant by bad for cache locality. The
> jobs were IO bound in the first place. Any cache effect came second---at
> least an order of magnitude negligible. Can you clarify which particular
> computation (maybe within Hadoop) that was made slow because of a large
> io buffer size?
>
> What bothered you was exactly what bothered me and prompted me to ask
> the question why the job tracker reported much more bytes read by the
> map task. I can confirm that the experiments were set up correctly. In
> fact, the numbers of map tasks were correctly reported by the job
> tracker. There were 1600 for the 1GB file dataset, 6400 for the 256MB
> file dataset, and so forth.
>
> Tiankai
>
>
>
> -Original Message-
> From: Matei Zaharia [mailto:ma...@cloudera.com]
> Sent: Friday, April 03, 2009 11:21 AM
> To: core-user@hadoop.apache.org
> Subject: Re: Hadoop/HDFS for scientific simulation output data analysis
>
> Hi Tiankai,
>
> The one strange thing I see in your configuration as described is IO
> buffer
> size and IO bytes per checksum set to 64 MB. This is much higher than
> the
> recommended defaults, which are about 64 KB for buffer size and 1 KB or
> 512
> bytes for checksum. (Actually I haven't seen anyone change checksum from
> its
> default of 512 bytes). Having huge buffers is bad for memory consumption
> and
> cache locality.
>
> The other thing that bothers me is that on your 64 MB data set, you have
> 28
> TB of HDFS bytes read. This is off from number of map tasks * bytes per
> map
> by an order of magnitude. Are you sure that you've generated the data
> set
> correctly and that it's the only input path given to your job? Does
> bin/hadoop dfs -dus  come out as 1.6 TB?
>
> Matei
>
> On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
> wrote:
>
> > Hi,
> >
> > I have been exploring the feasibility of using Hadoop/HDFS to analyze
> > terabyte-scale scientific simulation output datasets. After a set of
> > initial experiments, I have a number of questions regarding (1) the
> > configuration set

Re: How many people is using Hadoop Streaming ?

2009-04-03 Thread Owen O'Malley


On Apr 3, 2009, at 9:42 AM, Ricky Ho wrote:


Has anyone benchmark the performance difference of using Hadoop ?
 1) Java vs C++
 2) Java vs Streaming


Yes, a while ago. When I tested it using sort, Java and C++ were  
roughly equal and streaming was 10-20% slower. Most of the cost with  
streaming came from the stringification.


 1) I can pick the language that offers a different programming  
paradigm (e.g. I may choose functional language, or logic  
programming if they suit the problem better).  In fact, I can even  
chosen Erlang at the map() and Prolog at the reduce().  Mix and  
match can optimize me more.
 2) I can pick the language that I am familiar with, or one that I  
like.
 3) Easy to switch to another language in a fine-grain incremental  
way if I choose to do so in future.


Additionally, the interface to streaming is very stable. *smile* It  
also supports legacy applications well.


The downsides are that:
  1. The interface is very thin and has minimal functionality.
  2. Streaming combiners don't work very well. Many streaming  
applications buffer in the map

  and run the combiner internally.
  3. Streaming doesn't group the values in the reducer. In Java or C+ 
+, you get:

 key1, (value1, value2, ...)
 key2, (value3, ...)
  in streaming you get
 key1 value1
 key1 value2
 key2 value3
  and your application needs to detect the key changes.
  4. Binary data support has only recently been added to streaming.

Am I missing something here ?  or is the majority of Hadoop  
applications written in Hadoop Streaming ?


On Yahoo's research clusters, typically 1/3 of the applications are  
streaming, 1/3 pig, and 1/3 java.


-- Owen


How many people is using Hadoop Streaming ?

2009-04-03 Thread Ricky Ho
Has anyone benchmark the performance difference of using Hadoop ?
  1) Java vs C++
  2) Java vs Streaming

>From looking at the Hadoop architecture, since TaskTracker will fork a 
>separate process anyway to run the user supplied map() and reduce() function, 
>I don't see the performance overhead of using Hadoop Streaming (of course the 
>efficiency of the chosen script will be a factor but I think this is 
>orthogonal).  On the other hand, I see a lot of benefits of using Streaming, 
>including ...

  1) I can pick the language that offers a different programming paradigm (e.g. 
I may choose functional language, or logic programming if they suit the problem 
better).  In fact, I can even chosen Erlang at the map() and Prolog at the 
reduce().  Mix and match can optimize me more.
  2) I can pick the language that I am familiar with, or one that I like.
  3) Easy to switch to another language in a fine-grain incremental way if I 
choose to do so in future.

Even if I am a Java programmer, I still can write a Main() method to take the 
standard in and standard out data and I don't see I am losing much by doing 
that.  The benefit is my code can be easily moved to another language in future.

Am I missing something here ?  or is the majority of Hadoop applications 
written in Hadoop Streaming ?

Rgds,
Ricky


RE: Hadoop/HDFS for scientific simulation output data analysis

2009-04-03 Thread Tu, Tiankai
Thanks for the comments, Matei.

The machines I ran the experiments have 16 GB memory each. I don't see
how 64 MB buffer could be huge or is bad for memory consumption. In
fact, I set it to much larger value after initial rounds of tests showed
abysmal results using the default 64 KB buffer. Also, why is it
necessary to compute checksum for every 512 bytes why only an end-to-end
(whole file) checksum makes sense? I set it to a much larger value to
avoid the overhead. 

I didn't quite understand what you meant by bad for cache locality. The
jobs were IO bound in the first place. Any cache effect came second---at
least an order of magnitude negligible. Can you clarify which particular
computation (maybe within Hadoop) that was made slow because of a large
io buffer size?

What bothered you was exactly what bothered me and prompted me to ask
the question why the job tracker reported much more bytes read by the
map task. I can confirm that the experiments were set up correctly. In
fact, the numbers of map tasks were correctly reported by the job
tracker. There were 1600 for the 1GB file dataset, 6400 for the 256MB
file dataset, and so forth. 

Tiankai

 

-Original Message-
From: Matei Zaharia [mailto:ma...@cloudera.com] 
Sent: Friday, April 03, 2009 11:21 AM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop/HDFS for scientific simulation output data analysis

Hi Tiankai,

The one strange thing I see in your configuration as described is IO
buffer
size and IO bytes per checksum set to 64 MB. This is much higher than
the
recommended defaults, which are about 64 KB for buffer size and 1 KB or
512
bytes for checksum. (Actually I haven't seen anyone change checksum from
its
default of 512 bytes). Having huge buffers is bad for memory consumption
and
cache locality.

The other thing that bothers me is that on your 64 MB data set, you have
28
TB of HDFS bytes read. This is off from number of map tasks * bytes per
map
by an order of magnitude. Are you sure that you've generated the data
set
correctly and that it's the only input path given to your job? Does
bin/hadoop dfs -dus  come out as 1.6 TB?

Matei

On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
wrote:

> Hi,
>
> I have been exploring the feasibility of using Hadoop/HDFS to analyze
> terabyte-scale scientific simulation output datasets. After a set of
> initial experiments, I have a number of questions regarding (1) the
> configuration setting and (2) the IO read performance.
>
>

>

> --
> Unlike typical Hadoop applications, post-simulation analysis usually
> processes one file at a time. So I wrote a
> WholeFileInputFormat/WholeFileRecordReader that reads an entire file
> without parsing the content, as suggested by the Hadoop wiki FAQ.
>
> Specifying WholeFileInputFormat as as input file format
> (conf.setInputFormat(FrameInputFormat.class), I constructed a simple
> MapReduce program with the sole purpose to measure how fast
Hadoop/HDFS
> can read data. Here is the gist of the test program:
>
> - The map method does nothing, it returns immediately when called
> - No reduce task (conf.setNumReduceTasks(0)
> - JVM reused (conf.setNumTasksToExecutePerJvm(-1))
>
> The detailed hardware/software configurations are listed below:
>
> Hardware:
> - 103 nodes, each with two 2.33GHz quad-core processors and 16 GB
memory
> - 1 GigE connection out of each node and connecting to a 1GigE switch
in
> the rack (3 racks in total)
> - Each rack switch has 4 10-GigE connections to a backbone
> full-bandwidth 10-GigE switch (second-tier switch)
> - Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
> node
> - Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
> after dropping linux vfs cache)
>
> Software:
> - 2.6.26-10smp kernel
> - Hadoop 0.19.1
> - Three nodes as namenode, secondary name node, and job tracker,
> respectively
> - Remaining 100 node as slaves, each running as both datanode and
> tasktracker
>
> Relevant hadoop-site.xml setting:
> - dfs.namenode.handler.count = 100
> - io.file.buffer.size = 67108864
> - io.bytes.per.checksum = 67108864
> - mapred.task.timeout = 120
> - mapred.map.max.attempts = 8
> - mapred.tasktracker.map.tasks.maximum = 8
> - dfs.replication = 3
> - toploogy.script.file.name set properly to a correct Python script
>
> Dataset characteristics:
>
> - Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2 MB,
> respectively
> - Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
> 256MB files, etc.)
> - Datasets populated into HDFS via a parallel C MPI program (linked
with
> libhdfs.so) running on the 100 slave nodes
> - dfs block size set to be the file size (otherwise, accessing a
single
> file may require network data transfer)
>
> I launched the test MapReduce job one after an

RE: Amazon Elastic MapReduce

2009-04-03 Thread Ricky Ho
I disagree.  This is like arguing that everyone should learn everything 
otherwise they don't know how to do everything.

A better situation is having the algorithm designer just focusing in how to 
break down their algorithm into Map/Reduce form and test it out immediately, 
rather than requiring them to learn all the admin aspects of Hadoop, which 
becomes a hurdle for them to move fast.

Rgds,
Ricky

-Original Message-
From: Steve Loughran [mailto:ste...@apache.org] 
Sent: Friday, April 03, 2009 2:19 AM
To: core-user@hadoop.apache.org
Subject: Re: Amazon Elastic MapReduce

Brian Bockelman wrote:
> 
> On Apr 2, 2009, at 3:13 AM, zhang jianfeng wrote:
> 
>> seems like I should pay for additional money, so why not configure a 
>> hadoop
>> cluster in EC2 by myself. This already have been automatic using script.
>>
>>
> 
> Not everyone has a support team or an operations team or enough time to 
> learn how to do it themselves.  You're basically paying for the fact 
> that the only thing you need to know to use Hadoop is:
> 1) Be able to write the Java classes.
> 2) Press the "go" button on a webpage somewhere.
> 
> You could use Hadoop with little-to-zero systems knowledge (and without 
> institutional support), which would always make some researchers happy.
> 
> Brian

True, but this way nobody gets the opportunity to learn how to do it 
themselves, which can be a tactical error one comes to regret further 
down the line. By learning the pain of cluster management today, you get 
to keep it under control as your data grows.

I am curious what bug patches AWS will supply, for they have been very 
silent on their hadoop work to date.


Re: Hadoop/HDFS for scientific simulation output data analysis

2009-04-03 Thread Matei Zaharia
Hi Tiankai,

The one strange thing I see in your configuration as described is IO buffer
size and IO bytes per checksum set to 64 MB. This is much higher than the
recommended defaults, which are about 64 KB for buffer size and 1 KB or 512
bytes for checksum. (Actually I haven't seen anyone change checksum from its
default of 512 bytes). Having huge buffers is bad for memory consumption and
cache locality.

The other thing that bothers me is that on your 64 MB data set, you have 28
TB of HDFS bytes read. This is off from number of map tasks * bytes per map
by an order of magnitude. Are you sure that you've generated the data set
correctly and that it's the only input path given to your job? Does
bin/hadoop dfs -dus  come out as 1.6 TB?

Matei

On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
wrote:

> Hi,
>
> I have been exploring the feasibility of using Hadoop/HDFS to analyze
> terabyte-scale scientific simulation output datasets. After a set of
> initial experiments, I have a number of questions regarding (1) the
> configuration setting and (2) the IO read performance.
>
> 
> 
> --
> Unlike typical Hadoop applications, post-simulation analysis usually
> processes one file at a time. So I wrote a
> WholeFileInputFormat/WholeFileRecordReader that reads an entire file
> without parsing the content, as suggested by the Hadoop wiki FAQ.
>
> Specifying WholeFileInputFormat as as input file format
> (conf.setInputFormat(FrameInputFormat.class), I constructed a simple
> MapReduce program with the sole purpose to measure how fast Hadoop/HDFS
> can read data. Here is the gist of the test program:
>
> - The map method does nothing, it returns immediately when called
> - No reduce task (conf.setNumReduceTasks(0)
> - JVM reused (conf.setNumTasksToExecutePerJvm(-1))
>
> The detailed hardware/software configurations are listed below:
>
> Hardware:
> - 103 nodes, each with two 2.33GHz quad-core processors and 16 GB memory
> - 1 GigE connection out of each node and connecting to a 1GigE switch in
> the rack (3 racks in total)
> - Each rack switch has 4 10-GigE connections to a backbone
> full-bandwidth 10-GigE switch (second-tier switch)
> - Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
> node
> - Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
> after dropping linux vfs cache)
>
> Software:
> - 2.6.26-10smp kernel
> - Hadoop 0.19.1
> - Three nodes as namenode, secondary name node, and job tracker,
> respectively
> - Remaining 100 node as slaves, each running as both datanode and
> tasktracker
>
> Relevant hadoop-site.xml setting:
> - dfs.namenode.handler.count = 100
> - io.file.buffer.size = 67108864
> - io.bytes.per.checksum = 67108864
> - mapred.task.timeout = 120
> - mapred.map.max.attempts = 8
> - mapred.tasktracker.map.tasks.maximum = 8
> - dfs.replication = 3
> - toploogy.script.file.name set properly to a correct Python script
>
> Dataset characteristics:
>
> - Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2 MB,
> respectively
> - Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
> 256MB files, etc.)
> - Datasets populated into HDFS via a parallel C MPI program (linked with
> libhdfs.so) running on the 100 slave nodes
> - dfs block size set to be the file size (otherwise, accessing a single
> file may require network data transfer)
>
> I launched the test MapReduce job one after another (so there was no
> interference) and collected the following performance results:
>
> Dataset name, Finished in,  Failed/Killed task attempts, HDFS bytes read
> (Map=Total), Rack-local map tasks, Launched map tasks, data-local map
> tasks
>
> 1GB file dataset, 16mins11sec, 0/382, (2,578,054,119,424), 98, 1982,
> 1873
> 256MB file dataset, 50min9sec,0/397, (7,754,295,017,472), 156, 6797,
> 6639
> 64MB file dataset,4hrs18mins21sec,394/251,(28,712,795,897,856), 153,
> 26245, 26068
>
> The job for the 2MB file dataset failed to run due to the following
> error:
>
> 09/03/27 21:39:58 INFO mapred.FileInputFormat: Total input paths to
> process : 819200
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>at java.util.Arrays.copyOf(Arrays.java:2786)
>at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:71)
>at java.io.DataOutputStream.writeByte(DataOutputStream.java:136)
>at org.apache.hadoop.io.UTF8.writeChars(UTF8.java:274)
>
> After running into this error, the job tracker no longer accepted jobs.
> I stopped and restarted the job tracker with a larger heap size setup
> (8GB). But it still didn't accept new jobs.
>
> 
> 
> --

Re: Amazon Elastic MapReduce

2009-04-03 Thread Lukáš Vlček
I may be wrong but I would welcome this. As far as I understand the hot
topic in cloud computing these days is standardization ... and I would be
happy if Hadoop could be considered as a standard for cloud computing
architecture. So the more Amazon pushes Hadoop the more it could be accepted
by other players in this market (and the better for customers when switching
from one cloud provider to the other). Just my 2 cents.
Regards,
Lukas

On Fri, Apr 3, 2009 at 4:36 PM, Stuart Sierra
wrote:

> On Thu, Apr 2, 2009 at 4:13 AM, zhang jianfeng  wrote:
> > seems like I should pay for additional money, so why not configure a
> hadoop
> > cluster in EC2 by myself. This already have been automatic using script.
>
> Personally, I'm excited about this.  They're charging a tiny fraction
> above the standard EC2 rate.  I like that the cluster shuts down
> automatically when the job completes -- you don't have to sit around
> and watch it.  Yeah, you can automate that, but it's one more thing to
> think about.
>
> -Stuart
>



-- 
http://blog.lukas-vlcek.com/


Re: Amazon Elastic MapReduce

2009-04-03 Thread Stuart Sierra
On Thu, Apr 2, 2009 at 4:13 AM, zhang jianfeng  wrote:
> seems like I should pay for additional money, so why not configure a hadoop
> cluster in EC2 by myself. This already have been automatic using script.

Personally, I'm excited about this.  They're charging a tiny fraction
above the standard EC2 rate.  I like that the cluster shuts down
automatically when the job completes -- you don't have to sit around
and watch it.  Yeah, you can automate that, but it's one more thing to
think about.

-Stuart


Re: RPM spec file for 0.19.1

2009-04-03 Thread Ian Soboroff
Steve Loughran  writes:

> -RPM and deb packaging would be nice

Indeed.  The best thing would be to have the hadoop build system output
them, for some sensible subset of systems.

> -the jdk requirements are too harsh as it should run on openjdk's JRE
> or jrockit; no need for sun only. Too bad the only way to say that is
> leave off all jdk dependencies.

I haven't tried running Hadoop on anything but the Sun JDK, much less
built it from source (well, the rpmbuild did that so I guess I have).

> -I worry about how they patch the rc.d files. I can see why, but
> wonder what that does with the RPM ownership

Those are just fine: (from hadoop-init.tmpl)

#!/bin/bash
# 
# (c) Copyright 2009 Cloudera, Inc.
# 
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
...

Ian



Re: RPM spec file for 0.19.1

2009-04-03 Thread Ian Soboroff
Steve Loughran  writes:

> I think from your perpective it makes sense as it stops anyone getting
> itchy fingers and doing their own RPMs. 

Um, what's wrong with that?

Ian




Re: RPM spec file for 0.19.1

2009-04-03 Thread Ian Soboroff

If you guys want to spin RPMs for the community, that's great.  My main
motivation was that I wanted the current version rather than 0.18.3.

There is of course (as Steve points out) a larger discussion about if
you want RPMs, what should be in them.  In particular, some might want
to include the configuration in the RPMs.  That's a good reason to post
SRPMs, because then it's not so hard to re-roll the RPMs with different
configurations.

(Personally I wouldn't manage configs with RPM, it's just a pain to
propagate changes.  Instead, we are looking at using Puppet for general
cluster configuration needs, and RPMs for the basic "binaries".)

Ian

Christophe Bisciglia  writes:

> Hey Ian, we are totally fine with this - the only reason we didn't
> contribute the SPEC file is that it is the output of our internal
> build system, and we don't have the bandwidth to properly maintain
> multiple RPMs.
>
> That said, we chatted about this a bit today, and were wondering if
> the community would like us to host RPMs for all releases in our
> "devel" repository. We can't stand behind these from a reliability
> angle the same way we can with our "blessed" RPMs, but it's a
> manageable amount of additional work to have our build system spit
> those out as well.
>
> If you'd like us to do this, please add a "me too" to this page:
> http://www.getsatisfaction.com/cloudera/topics/should_we_release_host_rpms_for_all_releases
>
> We could even skip the branding on the "devel" releases :-)
>
> Cheers,
> Christophe
>
> On Thu, Apr 2, 2009 at 12:46 PM, Ian Soboroff  wrote:
>>
>> I created a JIRA (https://issues.apache.org/jira/browse/HADOOP-5615)
>> with a spec file for building a 0.19.1 RPM.
>>
>> I like the idea of Cloudera's RPM file very much.  In particular, it has
>> nifty /etc/init.d scripts and RPM is nice for managing updates.
>> However, it's for an older, patched version of Hadoop.
>>
>> This spec file is actually just Cloudera's, with suitable edits.  The
>> spec file does not contain an explicit license... if Cloudera have
>> strong feelings about it, let me know and I'll pull the JIRA attachment.
>>
>> The JIRA includes instructions on how to roll the RPMs yourself.  I
>> would have attached the SRPM but they're too big for JIRA.  I can offer
>> noarch RPMs build with this spec file if someone wants to host them.
>>
>> Ian
>>
>>



best practice: mapred.local vs dfs drives

2009-04-03 Thread Craig Macdonald

Hello all,

Following recent hardware discussions, I thought I'd ask a related 
question. Our cluster nodes have 3 drives: 1x 160GB system/scratch and 
2x 500GB DFS drives.


The 160GB system drive is partitioned such that 100GB is for job 
mapred.local space. However, we find that for our application, 
mapred.local free space for map output space is the limiting parameter 
on the number of reducers we can have (our application prefers less 
reducers).


How do people normally work for dfs vs mapred.local space. Do you (a) 
share the DFS drives with the task tracker temporary files, Or do you 
(b) keep them on separate partitions or drives?


We originally went with (b) because it prevented a run-away job from 
eating all the DFS space on the machine, however, I'm beginning to 
realise the disadvantages.


Any comments?

Thanks

Craig



Re: Amazon Elastic MapReduce

2009-04-03 Thread Tim Wintle
On Fri, 2009-04-03 at 11:19 +0100, Steve Loughran wrote:
> True, but this way nobody gets the opportunity to learn how to do it 
> themselves, which can be a tactical error one comes to regret further 
> down the line. By learning the pain of cluster management today, you get 
> to keep it under control as your data grows.

Personally I don't want to have to learn (and especially not support in
production) the EC2 / S3 part, so it does sound appealing.

On a side note, I'd hope that at some point they give some control over
the priority of the overall job - on the level of "you can boot up these
machines whenever you want", or "boot up these machines now" - that
should let them manage the load on their hardware and reduce costs
(which I'd obviously expect them to pass on the users of low-priority
jobs). I'm not sure how that would fit into the "give me 10 nodes"
method at the moment.

> 
> I am curious what bug patches AWS will supply, for they have been very 
> silent on their hadoop work to date.

I'm hoping it will involve security of EC2 images, but not expectant.





Re: Amazon Elastic MapReduce

2009-04-03 Thread Steve Loughran

Brian Bockelman wrote:


On Apr 2, 2009, at 3:13 AM, zhang jianfeng wrote:

seems like I should pay for additional money, so why not configure a 
hadoop

cluster in EC2 by myself. This already have been automatic using script.




Not everyone has a support team or an operations team or enough time to 
learn how to do it themselves.  You're basically paying for the fact 
that the only thing you need to know to use Hadoop is:

1) Be able to write the Java classes.
2) Press the "go" button on a webpage somewhere.

You could use Hadoop with little-to-zero systems knowledge (and without 
institutional support), which would always make some researchers happy.


Brian


True, but this way nobody gets the opportunity to learn how to do it 
themselves, which can be a tactical error one comes to regret further 
down the line. By learning the pain of cluster management today, you get 
to keep it under control as your data grows.


I am curious what bug patches AWS will supply, for they have been very 
silent on their hadoop work to date.


Re: Using HDFS to serve www requests

2009-04-03 Thread Steve Loughran

Snehal Nagmote wrote:

can you please explain exactly adding NIO bridge means what and how it can be
done , what could 
be advantages in this case ?  


NIO: java non-blocking IO. It's a standard API to talk to different 
filesystems; support has been discussed in jira. If the DFS APIs were 
accessible under an NIO front end, then applications written for the NIO 
APIs would work with the supported filesystems, with no need to code 
specifically for hadoop's not-yet-stable APIs







Steve Loughran wrote:

Edward Capriolo wrote:

It is a little more natural to connect to HDFS from apache tomcat.
This will allow you to skip the FUSE mounts and just use the HDFS-API.

I have modified this code to run inside tomcat.
http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample

I will not testify to how well this setup will perform under internet
traffic, but it does work.

If someone adds an NIO bridge to hadoop filesystems then it would be 
easier; leaving you only with the performance issues.







Re: RPM spec file for 0.19.1

2009-04-03 Thread Steve Loughran

Christophe Bisciglia wrote:

Hey Ian, we are totally fine with this - the only reason we didn't
contribute the SPEC file is that it is the output of our internal
build system, and we don't have the bandwidth to properly maintain
multiple RPMs.

That said, we chatted about this a bit today, and were wondering if
the community would like us to host RPMs for all releases in our
"devel" repository. We can't stand behind these from a reliability
angle the same way we can with our "blessed" RPMs, but it's a
manageable amount of additional work to have our build system spit
those out as well.



I think from your perpective it makes sense as it stops anyone getting 
itchy fingers and doing their own RPMs. At the same time, I think we do 
need to make it possible/easy to do RPMs *and have them consistent*. If 
hadoop-core makes RPMs that don't work with your settings rpms, you get 
to field to the support calls.


-steve


Re: RPM spec file for 0.19.1

2009-04-03 Thread Steve Loughran

Ian Soboroff wrote:

I created a JIRA (https://issues.apache.org/jira/browse/HADOOP-5615)
with a spec file for building a 0.19.1 RPM.

I like the idea of Cloudera's RPM file very much.  In particular, it has
nifty /etc/init.d scripts and RPM is nice for managing updates.
However, it's for an older, patched version of Hadoop.

This spec file is actually just Cloudera's, with suitable edits.  The
spec file does not contain an explicit license... if Cloudera have
strong feelings about it, let me know and I'll pull the JIRA attachment.

The JIRA includes instructions on how to roll the RPMs yourself.  I
would have attached the SRPM but they're too big for JIRA.  I can offer
noarch RPMs build with this spec file if someone wants to host them.

Ian



-RPM and deb packaging would be nice

-the .spec file should be driven by ant properties to get dependencies 
from the ivy files
-the jdk requirements are too harsh as it should run on openjdk's JRE or 
jrockit; no need for sun only. Too bad the only way to say that is leave 
off all jdk dependencies.
-I worry about how they patch the rc.d files. I can see why, but wonder 
what that does with the RPM ownership


As someone whose software does get released as RPMs (and tar files 
containing everything needed to create your own), I can state with 
experience that RPMs are very hard to get right, and very hard to test. 
The hardest thing to get right (and to test) is live update of the RPMs 
while the app is running. I am happy for the cloudera team to have taken 
on this problem.