Data locality question
Would anyone be willing to give some input on this StackOverflow question? http://stackoverflow.com/questions/31789176/is-it-possible-to-restrict-a-mapreduce-job-from-accessing-remote-data Thanks in advance!
Re: Data locality
hi, folks, I have the similar question. Is there an easy way to tell(from a user perspective) whether short circuit is enabled? thanks Demai On Mon, Mar 2, 2015 at 11:46 AM, Fei Hu hufe...@gmail.com wrote: Hi All, I developed a scheduler for data locality. Now I want to test the performance of the scheduler, so I need to monitor how many data are read remotely. Is there any tool for monitoring the volume of data moved around the cluster? Thanks, Fei
Data locality
Hi All, I developed a scheduler for data locality. Now I want to test the performance of the scheduler, so I need to monitor how many data are read remotely. Is there any tool for monitoring the volume of data moved around the cluster? Thanks, Fei
Re: Data Locality Importance
Like you said, it depends both on the kind of network you have and the type of your workload. Given your point about S3, I'd guess your input files/blocks are not large enough that moving code to data trumps moving data itself to the code. When that balance tilts a lot, especially when moving large input data files/blocks, data-locality will help improve performance significantly. That or when the read throughput from a remote desk reading it from a local disk. HTH +Vinod On Mar 21, 2014, at 7:06 PM, Mike Sam mikesam...@gmail.com wrote: How important is Data Locality to Hadoop? I mean, if we prefer to separate the HDFS cluster from the MR cluster, we will lose data locality but my question is how bad is this assuming we provider a reasonable network connection between the two clusters? EMR kills data locality when using S3 as storage but we do not see a significant job time difference running same job from the HDFS cluster of the same setup. So, I am wondering how important is Data Locality to Hadoop in practice? Thanks, Mike -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Data Locality Importance
Hi Mike Data locality has an assumption. It assumes storage access (disk, ssd, etc) is faster than network data transferring. Vinod has already explained the benefits. But locality in map stage may not always bring good things. If a fat node saves a large file, it is possible that current MR framework assigns a lots of map tasks from single job to this node, and then, congest its network in shuffle. I am not sure how EMR is implemented in physical layer. If they are all virtual machines, it is possible that your seperate HDFS cluster and MR cluster still get benefits from local data access. Chen On Sat, Mar 22, 2014 at 11:07 PM, Sathya sat...@morisonmenon.com wrote: VOTE FOR MODI or teach me how not to get mails -Original Message- From: Vinod Kumar Vavilapalli [mailto:vino...@hortonworks.com] On Behalf Of Vinod Kumar Vavilapalli Sent: Sunday, March 23, 2014 12:20 AM To: common-user@hadoop.apache.org Subject: Re: Data Locality Importance Like you said, it depends both on the kind of network you have and the type of your workload. Given your point about S3, I'd guess your input files/blocks are not large enough that moving code to data trumps moving data itself to the code. When that balance tilts a lot, especially when moving large input data files/blocks, data-locality will help improve performance significantly. That or when the read throughput from a remote desk reading it from a local disk. HTH +Vinod On Mar 21, 2014, at 7:06 PM, Mike Sam mikesam...@gmail.com wrote: How important is Data Locality to Hadoop? I mean, if we prefer to separate the HDFS cluster from the MR cluster, we will lose data locality but my question is how bad is this assuming we provider a reasonable network connection between the two clusters? EMR kills data locality when using S3 as storage but we do not see a significant job time difference running same job from the HDFS cluster of the same setup. So, I am wondering how important is Data Locality to Hadoop in practice? Thanks, Mike -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. --- This email is free from viruses and malware because avast! Antivirus protection is active. http://www.avast.com
Re: Data Locality and WebHDFS
I may have expressed myself wrong. You don't need to do any test to see how locality works with files of multiple blocks. If you are accessing a file of more than one block over webhdfs, you only have assured locality for the first block of the file. Thanks. On Sun, Mar 16, 2014 at 9:18 PM, RJ Nowling rnowl...@gmail.com wrote: Thank you, Mingjiang and Alejandro. This is interesting. Since we will use the data locality information for scheduling, we could hack this to get the data locality information, at least for the first block. As Alejandro says, we'd have to test what happens for other data blocks -- e.g., what if, knowing the block sizes, we request the second or third block? Interesting food for thought! I see some experiments in my future! Thanks! On Sun, Mar 16, 2014 at 10:14 PM, Alejandro Abdelnur t...@cloudera.comwrote: well, this is for the first block of the file, the rest of the file (blocks being local or not) are streamed out by the same datanode. for small files (one block) you'll get locality, for large files only the first block, and by chance if other blocks are local to that datanode. Alejandro (phone typing) On Mar 16, 2014, at 18:53, Mingjiang Shi m...@gopivotal.com wrote: According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ *Data Locality*: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. *A HDFS Built-in Component*: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS - there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ -- em rnowl...@gmail.com c 954.496.2314 -- Alejandro
Re: Data Locality and WebHDFS
Hi Alejandro, The WebHDFS API allows specifying an offset and length for the request. If I specify an offset that start in the second block for a file (thus skipping the first block all together), will the namenode still direct me to a datanode with the first block or will it direct me to a namenode with the second block? I.e., am I assured data locality only on the first block of the file (as you're saying) or on the first block I am accessing? If it is as you say, then I may want to reach out the WebHDFS developers and see if they would be interested in the additional functionality. Thank you, RJ On Mon, Mar 17, 2014 at 2:40 AM, Alejandro Abdelnur t...@cloudera.comwrote: I may have expressed myself wrong. You don't need to do any test to see how locality works with files of multiple blocks. If you are accessing a file of more than one block over webhdfs, you only have assured locality for the first block of the file. Thanks. On Sun, Mar 16, 2014 at 9:18 PM, RJ Nowling rnowl...@gmail.com wrote: Thank you, Mingjiang and Alejandro. This is interesting. Since we will use the data locality information for scheduling, we could hack this to get the data locality information, at least for the first block. As Alejandro says, we'd have to test what happens for other data blocks -- e.g., what if, knowing the block sizes, we request the second or third block? Interesting food for thought! I see some experiments in my future! Thanks! On Sun, Mar 16, 2014 at 10:14 PM, Alejandro Abdelnur t...@cloudera.comwrote: well, this is for the first block of the file, the rest of the file (blocks being local or not) are streamed out by the same datanode. for small files (one block) you'll get locality, for large files only the first block, and by chance if other blocks are local to that datanode. Alejandro (phone typing) On Mar 16, 2014, at 18:53, Mingjiang Shi m...@gopivotal.com wrote: According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ *Data Locality*: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. *A HDFS Built-in Component*: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS - there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ -- em rnowl...@gmail.com c 954.496.2314 -- Alejandro -- em rnowl...@gmail.com c 954.496.2314
Re: Data Locality and WebHDFS
dont recall how skips are handled in webhdfs, but i would assume that you'll get to the first block As usual, and the skip is handled by the DN serving the file (as webhdfs doesnot know at open that you'll skip) Alejandro (phone typing) On Mar 17, 2014, at 9:47, RJ Nowling rnowl...@gmail.com wrote: Hi Alejandro, The WebHDFS API allows specifying an offset and length for the request. If I specify an offset that start in the second block for a file (thus skipping the first block all together), will the namenode still direct me to a datanode with the first block or will it direct me to a namenode with the second block? I.e., am I assured data locality only on the first block of the file (as you're saying) or on the first block I am accessing? If it is as you say, then I may want to reach out the WebHDFS developers and see if they would be interested in the additional functionality. Thank you, RJ On Mon, Mar 17, 2014 at 2:40 AM, Alejandro Abdelnur t...@cloudera.com wrote: I may have expressed myself wrong. You don't need to do any test to see how locality works with files of multiple blocks. If you are accessing a file of more than one block over webhdfs, you only have assured locality for the first block of the file. Thanks. On Sun, Mar 16, 2014 at 9:18 PM, RJ Nowling rnowl...@gmail.com wrote: Thank you, Mingjiang and Alejandro. This is interesting. Since we will use the data locality information for scheduling, we could hack this to get the data locality information, at least for the first block. As Alejandro says, we'd have to test what happens for other data blocks -- e.g., what if, knowing the block sizes, we request the second or third block? Interesting food for thought! I see some experiments in my future! Thanks! On Sun, Mar 16, 2014 at 10:14 PM, Alejandro Abdelnur t...@cloudera.com wrote: well, this is for the first block of the file, the rest of the file (blocks being local or not) are streamed out by the same datanode. for small files (one block) you'll get locality, for large files only the first block, and by chance if other blocks are local to that datanode. Alejandro (phone typing) On Mar 16, 2014, at 18:53, Mingjiang Shi m...@gopivotal.com wrote: According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ Data Locality: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. A HDFS Built-in Component: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS – there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ -- em rnowl...@gmail.com c 954.496.2314 -- Alejandro -- em rnowl...@gmail.com c 954.496.2314
Re: Data Locality and WebHDFS
The file offset is considered in WebHDFS redirection. It redirects to a datanode with the first block the client going to read, not the first block of the file. Hope it helps. Tsz-Wo On Monday, March 17, 2014 10:09 AM, Alejandro Abdelnur t...@cloudera.com wrote: actually, i am wrong, the webhdfs rest call has an offset. Alejandro (phone typing) On Mar 17, 2014, at 10:07, Alejandro Abdelnur t...@cloudera.com wrote: dont recall how skips are handled in webhdfs, but i would assume that you'll get to the first block As usual, and the skip is handled by the DN serving the file (as webhdfs doesnot know at open that you'll skip) Alejandro (phone typing) On Mar 17, 2014, at 9:47, RJ Nowling rnowl...@gmail.com wrote: Hi Alejandro, The WebHDFS API allows specifying an offset and length for the request. If I specify an offset that start in the second block for a file (thus skipping the first block all together), will the namenode still direct me to a datanode with the first block or will it direct me to a namenode with the second block? I.e., am I assured data locality only on the first block of the file (as you're saying) or on the first block I am accessing? If it is as you say, then I may want to reach out the WebHDFS developers and see if they would be interested in the additional functionality. Thank you, RJ On Mon, Mar 17, 2014 at 2:40 AM, Alejandro Abdelnur t...@cloudera.com wrote: I may have expressed myself wrong. You don't need to do any test to see how locality works with files of multiple blocks. If you are accessing a file of more than one block over webhdfs, you only have assured locality for the first block of the file. Thanks. On Sun, Mar 16, 2014 at 9:18 PM, RJ Nowling rnowl...@gmail.com wrote: Thank you, Mingjiang and Alejandro. This is interesting. Since we will use the data locality information for scheduling, we could hack this to get the data locality information, at least for the first block. As Alejandro says, we'd have to test what happens for other data blocks -- e.g., what if, knowing the block sizes, we request the second or third block? Interesting food for thought! I see some experiments in my future! Thanks! On Sun, Mar 16, 2014 at 10:14 PM, Alejandro Abdelnur t...@cloudera.com wrote: well, this is for the first block of the file, the rest of the file (blocks being local or not) are streamed out by the same datanode. for small files (one block) you'll get locality, for large files only the first block, and by chance if other blocks are local to that datanode. Alejandro (phone typing) On Mar 16, 2014, at 18:53, Mingjiang Shi m...@gopivotal.com wrote: According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ Data Locality: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. A HDFS Built-in Component: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS – there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ -- em rnowl...@gmail.com c 954.496.2314 -- Alejandro -- em rnowl...@gmail.com c 954.496.2314
Re: Data Locality and WebHDFS
Thank you, Tsz. That helps! On Mon, Mar 17, 2014 at 2:30 PM, Tsz Wo Sze szets...@yahoo.com wrote: The file offset is considered in WebHDFS redirection. It redirects to a datanode with the first block the client going to read, not the first block of the file. Hope it helps. Tsz-Wo On Monday, March 17, 2014 10:09 AM, Alejandro Abdelnur t...@cloudera.com wrote: actually, i am wrong, the webhdfs rest call has an offset. Alejandro (phone typing) On Mar 17, 2014, at 10:07, Alejandro Abdelnur t...@cloudera.com wrote: dont recall how skips are handled in webhdfs, but i would assume that you'll get to the first block As usual, and the skip is handled by the DN serving the file (as webhdfs doesnot know at open that you'll skip) Alejandro (phone typing) On Mar 17, 2014, at 9:47, RJ Nowling rnowl...@gmail.com wrote: Hi Alejandro, The WebHDFS API allows specifying an offset and length for the request. If I specify an offset that start in the second block for a file (thus skipping the first block all together), will the namenode still direct me to a datanode with the first block or will it direct me to a namenode with the second block? I.e., am I assured data locality only on the first block of the file (as you're saying) or on the first block I am accessing? If it is as you say, then I may want to reach out the WebHDFS developers and see if they would be interested in the additional functionality. Thank you, RJ On Mon, Mar 17, 2014 at 2:40 AM, Alejandro Abdelnur t...@cloudera.comwrote: I may have expressed myself wrong. You don't need to do any test to see how locality works with files of multiple blocks. If you are accessing a file of more than one block over webhdfs, you only have assured locality for the first block of the file. Thanks. On Sun, Mar 16, 2014 at 9:18 PM, RJ Nowling rnowl...@gmail.com wrote: Thank you, Mingjiang and Alejandro. This is interesting. Since we will use the data locality information for scheduling, we could hack this to get the data locality information, at least for the first block. As Alejandro says, we'd have to test what happens for other data blocks -- e.g., what if, knowing the block sizes, we request the second or third block? Interesting food for thought! I see some experiments in my future! Thanks! On Sun, Mar 16, 2014 at 10:14 PM, Alejandro Abdelnur t...@cloudera.comwrote: well, this is for the first block of the file, the rest of the file (blocks being local or not) are streamed out by the same datanode. for small files (one block) you'll get locality, for large files only the first block, and by chance if other blocks are local to that datanode. Alejandro (phone typing) On Mar 16, 2014, at 18:53, Mingjiang Shi m...@gopivotal.com wrote: According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ *Data Locality*: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. *A HDFS Built-in Component*: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS - there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ -- em rnowl...@gmail.com c 954.496.2314 -- Alejandro -- em rnowl...@gmail.com c 954.496.2314 -- em rnowl...@gmail.com c 954.496.2314
Data Locality and WebHDFS
Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314
Re: Data Locality and WebHDFS
According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ *Data Locality*: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. *A HDFS Built-in Component*: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS - there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ
Re: Data Locality and WebHDFS
well, this is for the first block of the file, the rest of the file (blocks being local or not) are streamed out by the same datanode. for small files (one block) you'll get locality, for large files only the first block, and by chance if other blocks are local to that datanode. Alejandro (phone typing) On Mar 16, 2014, at 18:53, Mingjiang Shi m...@gopivotal.com wrote: According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ Data Locality: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. A HDFS Built-in Component: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS – there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ
Re: Data Locality and WebHDFS
Thank you, Mingjiang and Alejandro. This is interesting. Since we will use the data locality information for scheduling, we could hack this to get the data locality information, at least for the first block. As Alejandro says, we'd have to test what happens for other data blocks -- e.g., what if, knowing the block sizes, we request the second or third block? Interesting food for thought! I see some experiments in my future! Thanks! On Sun, Mar 16, 2014 at 10:14 PM, Alejandro Abdelnur t...@cloudera.comwrote: well, this is for the first block of the file, the rest of the file (blocks being local or not) are streamed out by the same datanode. for small files (one block) you'll get locality, for large files only the first block, and by chance if other blocks are local to that datanode. Alejandro (phone typing) On Mar 16, 2014, at 18:53, Mingjiang Shi m...@gopivotal.com wrote: According to this page: http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/ *Data Locality*: The file read and file write calls are redirected to the corresponding datanodes. It uses the full bandwidth of the Hadoop cluster for streaming data. *A HDFS Built-in Component*: WebHDFS is a first class built-in component of HDFS. It runs inside Namenodes and Datanodes, therefore, it can use all HDFS functionalities. It is a part of HDFS - there are no additional servers to install So it looks like the data locality is built-into webhdfs, client will be redirected to the data node automatically. On Mon, Mar 17, 2014 at 6:07 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I'm writing up a Google Summer of Code proposal to add HDFS support to Disco, an Erlang MapReduce framework. We're interested in using WebHDFS. I have two questions: 1) Does WebHDFS allow querying data locality information? 2) If the data locality information is known, can data on specific data nodes be accessed via Web HDFS? Or do all Web HDFS requests have to go through a single server? Thanks, RJ -- em rnowl...@gmail.com c 954.496.2314 -- Cheers -MJ -- em rnowl...@gmail.com c 954.496.2314
Re: question about preserving data locality in MapReduce with Yarn
Splits are a MapReduce concept . Check out FileInputFormat for how an example of how to get block locations. You can then pass these locations into an AMRMClient.ContainerRequest. -Sandy On Mon, Oct 28, 2013 at 8:10 PM, ricky l rickylee0...@gmail.com wrote: Hi Sandy, thank you very much for the information. It is good to know that MapReduce AM considers the block location information. BTW, I am not very familiar with the concept of splits. Is it specific to MR jobs? If possible, code location would be very helpful for reference as I am trying to implement an application master that needs to consider HDFS data-locality. thx. r. On Mon, Oct 28, 2013 at 10:21 PM, Sandy Ryza sandy.r...@cloudera.comwrote: Hi Ricky, The input splits contain the locations of the blocks they cover. The AM gets the information from the input splits and submits requests for those location. Each container request spans all the replicas that the block is located on. Are you interested in something more specific? -Sandy On Mon, Oct 28, 2013 at 7:09 PM, ricky lee rickylee0...@gmail.comwrote: Well, I thought an application master can somewhat ask where the data exist to a namenode isn't it true? If it does not know where the data reside, does a MapReduce application master specify the resource name as * which means data locality might not be preserved at all? thx, r
Re: question about preserving data locality in MapReduce with Yarn
The code is slightly hard to follow since it's split between the client and the ApplicationMaster. The client invokes InputFormat.getSplits to compute locations and writes it to a file in HDFS. The ApplicationMaster then reads the file and creates resource-requests based on the locations for each input file (3-replicas). See TaskAttemptImpl.dataLocalHosts and TaskAttemptImpl.dataLocalRacks - follow those variables around in the code-base. hth, Arun On Oct 28, 2013, at 11:10 PM, ricky l rickylee0...@gmail.com wrote: Hi Sandy, thank you very much for the information. It is good to know that MapReduce AM considers the block location information. BTW, I am not very familiar with the concept of splits. Is it specific to MR jobs? If possible, code location would be very helpful for reference as I am trying to implement an application master that needs to consider HDFS data-locality. thx. r. On Mon, Oct 28, 2013 at 10:21 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Ricky, The input splits contain the locations of the blocks they cover. The AM gets the information from the input splits and submits requests for those location. Each container request spans all the replicas that the block is located on. Are you interested in something more specific? -Sandy On Mon, Oct 28, 2013 at 7:09 PM, ricky lee rickylee0...@gmail.com wrote: Well, I thought an application master can somewhat ask where the data exist to a namenode isn't it true? If it does not know where the data reside, does a MapReduce application master specify the resource name as * which means data locality might not be preserved at all? thx, r -- Arun C. Murthy Hortonworks Inc. http://hortonworks.com/ -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: question about preserving data locality in MapReduce with Yarn
How do you know where the data exists when you begin? Sent from a remote device. Please excuse any typos... Mike Segel On Oct 28, 2013, at 8:57 PM, ricky lee rickylee0...@gmail.com wrote: Hi, I have a question about maintaining data locality in a MapReduce job launched through Yarn. Based on the Yarn tutorial, it seems like an application master can specify resource name, memory, and cpu when requesting containers. By carefully choosing resource names, I think the data locality can be achieved. I am curious how the current MapReduce application master is doing this. Does it check all needed blocks for a job and choose subset of nodes with the most needed blocks? If someone can point me source code snippets that make this decision, it would be very much appreciated. thx. -r
Re: question about preserving data locality in MapReduce with Yarn
Well, I thought an application master can somewhat ask where the data exist to a namenode isn't it true? If it does not know where the data reside, does a MapReduce application master specify the resource name as * which means data locality might not be preserved at all? thx, r
Re: question about preserving data locality in MapReduce with Yarn
Hi Ricky, The input splits contain the locations of the blocks they cover. The AM gets the information from the input splits and submits requests for those location. Each container request spans all the replicas that the block is located on. Are you interested in something more specific? -Sandy On Mon, Oct 28, 2013 at 7:09 PM, ricky lee rickylee0...@gmail.com wrote: Well, I thought an application master can somewhat ask where the data exist to a namenode isn't it true? If it does not know where the data reside, does a MapReduce application master specify the resource name as * which means data locality might not be preserved at all? thx, r
Re: question about preserving data locality in MapReduce with Yarn
Hi Sandy, thank you very much for the information. It is good to know that MapReduce AM considers the block location information. BTW, I am not very familiar with the concept of splits. Is it specific to MR jobs? If possible, code location would be very helpful for reference as I am trying to implement an application master that needs to consider HDFS data-locality. thx. r. On Mon, Oct 28, 2013 at 10:21 PM, Sandy Ryza sandy.r...@cloudera.comwrote: Hi Ricky, The input splits contain the locations of the blocks they cover. The AM gets the information from the input splits and submits requests for those location. Each container request spans all the replicas that the block is located on. Are you interested in something more specific? -Sandy On Mon, Oct 28, 2013 at 7:09 PM, ricky lee rickylee0...@gmail.com wrote: Well, I thought an application master can somewhat ask where the data exist to a namenode isn't it true? If it does not know where the data reside, does a MapReduce application master specify the resource name as * which means data locality might not be preserved at all? thx, r
RE: Map Tasks do not obey data locality principle........
No, it does not. I have kept the granularity at file level rather than a block. I do not think that should affect the mapping of tasks ? Regards, Nikhil -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Thursday, May 16, 2013 2:31 AM To: user@hadoop.apache.org Subject: Re: Map Tasks do not obey data locality principle Also, does your custom FS report block locations in the exact same format as how HDFS does? On Tue, May 14, 2013 at 4:25 PM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: Hi, I have a 3-node cluster, with JobTracker running on one machine and TaskTrackers on other two (say, slave1 and slave2). Instead of using HDFS, I have written my own FileSystem implementation. Since, unlike HDFS I am unable to provide a shared filesystem view to JobTrackers and TaskTracker thus, I mounted the root container of slave2 on a directory in slave1 (nfs mount). By doing this I am able to submit MR job to JobTracker, with input path as my_scheme://slave1_IP:Port/dir1, etc. MR runs successfully but what happens is that data locality is not ensured i.e. if files A,B,C are kept on slave1 and D,E,F on slave2 then according to data locality, map tasks should be submitted such that map task of A,B,C are submitted to TaskTracker running on slave1 and D,E,F on slave2. Instead of this, it randomly schedules the map task to any of the tasktrackers. If map task of file A is submitted to TaskTracker running on slave2 then it implies that file A is being fetched over the network by slave2. How do I avoid this from happening? Thanks, Nikhil -- Harsh J
Re: Map Tasks do not obey data locality principle........
The scheduling is done based on block locations filled in by the input splits. If there's no hints being provided by your FS, then the result you're seeing is correct. Note that if you don't use a block concept, you ought to consider a whole file as one block and return a location based on that. Essentially, your http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#getFileBlockLocations(org.apache.hadoop.fs.FileStatus,%20long,%20long) form of API calls has to return valid values for scheduling to work. On Thu, May 16, 2013 at 11:38 AM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: No, it does not. I have kept the granularity at file level rather than a block. I do not think that should affect the mapping of tasks ? Regards, Nikhil -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Thursday, May 16, 2013 2:31 AM To: user@hadoop.apache.org Subject: Re: Map Tasks do not obey data locality principle Also, does your custom FS report block locations in the exact same format as how HDFS does? On Tue, May 14, 2013 at 4:25 PM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: Hi, I have a 3-node cluster, with JobTracker running on one machine and TaskTrackers on other two (say, slave1 and slave2). Instead of using HDFS, I have written my own FileSystem implementation. Since, unlike HDFS I am unable to provide a shared filesystem view to JobTrackers and TaskTracker thus, I mounted the root container of slave2 on a directory in slave1 (nfs mount). By doing this I am able to submit MR job to JobTracker, with input path as my_scheme://slave1_IP:Port/dir1, etc. MR runs successfully but what happens is that data locality is not ensured i.e. if files A,B,C are kept on slave1 and D,E,F on slave2 then according to data locality, map tasks should be submitted such that map task of A,B,C are submitted to TaskTracker running on slave1 and D,E,F on slave2. Instead of this, it randomly schedules the map task to any of the tasktrackers. If map task of file A is submitted to TaskTracker running on slave2 then it implies that file A is being fetched over the network by slave2. How do I avoid this from happening? Thanks, Nikhil -- Harsh J -- Harsh J
RE: Map Tasks do not obey data locality principle........
Agreed. Thanks for replying. As hints what I have given is the ip address of the node where the file is residing but still it does not follow data locality. One clarification - If map task for file A is being submitted to a TaskTracker running on different node then does it necessarily mean that entire file A was transferred to the other node? Regards, Nikhil -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Thursday, May 16, 2013 11:47 AM To: user@hadoop.apache.org Subject: Re: Map Tasks do not obey data locality principle The scheduling is done based on block locations filled in by the input splits. If there's no hints being provided by your FS, then the result you're seeing is correct. Note that if you don't use a block concept, you ought to consider a whole file as one block and return a location based on that. Essentially, your http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#getFileBlockLocations(org.apache.hadoop.fs.FileStatus,%20long,%20long) form of API calls has to return valid values for scheduling to work. On Thu, May 16, 2013 at 11:38 AM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: No, it does not. I have kept the granularity at file level rather than a block. I do not think that should affect the mapping of tasks ? Regards, Nikhil -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Thursday, May 16, 2013 2:31 AM To: user@hadoop.apache.org Subject: Re: Map Tasks do not obey data locality principle Also, does your custom FS report block locations in the exact same format as how HDFS does? On Tue, May 14, 2013 at 4:25 PM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: Hi, I have a 3-node cluster, with JobTracker running on one machine and TaskTrackers on other two (say, slave1 and slave2). Instead of using HDFS, I have written my own FileSystem implementation. Since, unlike HDFS I am unable to provide a shared filesystem view to JobTrackers and TaskTracker thus, I mounted the root container of slave2 on a directory in slave1 (nfs mount). By doing this I am able to submit MR job to JobTracker, with input path as my_scheme://slave1_IP:Port/dir1, etc. MR runs successfully but what happens is that data locality is not ensured i.e. if files A,B,C are kept on slave1 and D,E,F on slave2 then according to data locality, map tasks should be submitted such that map task of A,B,C are submitted to TaskTracker running on slave1 and D,E,F on slave2. Instead of this, it randomly schedules the map task to any of the tasktrackers. If map task of file A is submitted to TaskTracker running on slave2 then it implies that file A is being fetched over the network by slave2. How do I avoid this from happening? Thanks, Nikhil -- Harsh J -- Harsh J
Re: Map Tasks do not obey data locality principle........
Hi Nikhil, For (1) - Its hard to specifically tell what you may be doing wrong or differently than expected cause I don't have the source to look it up, but do you at least see the JT log a line saying task X has a split on node Y? Is that line accurate to your inputsplit vs. data location? For (2) - I think the answer is kinda obvious so perhaps the question isn't clear/specific? Map pulls data to operate on it, and if data is remote (i.e. not on its local filesystem in direct or indirect form) then the bytes will be pulled over some form of a network. On Thu, May 16, 2013 at 11:51 AM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: Agreed. Thanks for replying. As hints what I have given is the ip address of the node where the file is residing but still it does not follow data locality. One clarification - If map task for file A is being submitted to a TaskTracker running on different node then does it necessarily mean that entire file A was transferred to the other node? Regards, Nikhil -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Thursday, May 16, 2013 11:47 AM To: user@hadoop.apache.org Subject: Re: Map Tasks do not obey data locality principle The scheduling is done based on block locations filled in by the input splits. If there's no hints being provided by your FS, then the result you're seeing is correct. Note that if you don't use a block concept, you ought to consider a whole file as one block and return a location based on that. Essentially, your http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#getFileBlockLocations(org.apache.hadoop.fs.FileStatus,%20long,%20long) form of API calls has to return valid values for scheduling to work. On Thu, May 16, 2013 at 11:38 AM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: No, it does not. I have kept the granularity at file level rather than a block. I do not think that should affect the mapping of tasks ? Regards, Nikhil -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Thursday, May 16, 2013 2:31 AM To: user@hadoop.apache.org Subject: Re: Map Tasks do not obey data locality principle Also, does your custom FS report block locations in the exact same format as how HDFS does? On Tue, May 14, 2013 at 4:25 PM, Agarwal, Nikhil nikhil.agar...@netapp.com wrote: Hi, I have a 3-node cluster, with JobTracker running on one machine and TaskTrackers on other two (say, slave1 and slave2). Instead of using HDFS, I have written my own FileSystem implementation. Since, unlike HDFS I am unable to provide a shared filesystem view to JobTrackers and TaskTracker thus, I mounted the root container of slave2 on a directory in slave1 (nfs mount). By doing this I am able to submit MR job to JobTracker, with input path as my_scheme://slave1_IP:Port/dir1, etc. MR runs successfully but what happens is that data locality is not ensured i.e. if files A,B,C are kept on slave1 and D,E,F on slave2 then according to data locality, map tasks should be submitted such that map task of A,B,C are submitted to TaskTracker running on slave1 and D,E,F on slave2. Instead of this, it randomly schedules the map task to any of the tasktrackers. If map task of file A is submitted to TaskTracker running on slave2 then it implies that file A is being fetched over the network by slave2. How do I avoid this from happening? Thanks, Nikhil -- Harsh J -- Harsh J -- Harsh J
Map Tasks do not obey data locality principle........
Hi, I have a 3-node cluster, with JobTracker running on one machine and TaskTrackers on other two (say, slave1 and slave2). Instead of using HDFS, I have written my own FileSystem implementation. Since, unlike HDFS I am unable to provide a shared filesystem view to JobTrackers and TaskTracker thus, I mounted the root container of slave2 on a directory in slave1 (nfs mount). By doing this I am able to submit MR job to JobTracker, with input path as my_scheme://slave1_IP:Port/dir1, etc. MR runs successfully but what happens is that data locality is not ensured i.e. if files A,B,C are kept on slave1 and D,E,F on slave2 then according to data locality, map tasks should be submitted such that map task of A,B,C are submitted to TaskTracker running on slave1 and D,E,F on slave2. Instead of this, it randomly schedules the map task to any of the tasktrackers. If map task of file A is submitted to TaskTracker running on slave2 then it implies that file A is being fetched over the network by slave2. How do I avoid this from happening? Thanks, Nikhil
Re: Data locality of map-side join
Thanks for all your answers so far! There's still one question open which I can't seem to find an answer for in the source code or documentation. When I specify the two source directories of my two datasets to be joined through CompositeInputFormat and say dataset A comes first and B second, will Hadoop MR try to run the map task on a datanode that stores a (replication of a) split of A, on a datanode that stores the corresponding split of B, or at least on any datanode that stores either the split of A or B? Since Hadoop MR takes the location of splits into account I believe there must be some strategy how it handles the case when there are two splits per map task, but it is not clear to me how exactly it behaves in this case. Am 25.10.2012 09:17 schrieb Bertrand Dechoux decho...@gmail.com: One underlying issue is that you would like your tool to be able to detect which dataset is the largest and how large is it because with this information different strategies can be chosen. This implies that somehow your tool needs to create/keep/update statistics about your datasets. And that's clearly something which is relevant for an external tool (like hive or pig) but it might not make sense to build that into the core mapred/mapreduce. That would increase coupling for something which is not necessarily relevant for the core of the platform. I know about about Hive. And you could be interested in reading more about it. https://cwiki.apache.org/Hive/statsdev.html Statistics such as the number of rows of a table or partition and the histograms of a particular interesting column are important in many ways. One of the key use cases of statistics is query optimization. Statistics serve as the input to the cost functions of the optimizer so that it can compare different plans and choose among them. Statistics may sometimes meet the purpose of the users' queries. Users can quickly get the answers for some of their queries by only querying stored statistics rather than firing long-running execution plans. Some examples are getting the quantile of the users' age distribution, the top 10 apps that are used by people, and the number of distinct sessions. I don't know if Pig has something similar. Regards Bertrand On Thu, Oct 25, 2012 at 7:49 AM, Harsh J ha...@cloudera.com wrote: Hi Sigurd, From what I've generally noticed, the client-end frameworks (Hive, Pig, etc.) have gotten much more cleverness and efficiency packed in their join parts than the MR join package which probably exists to serve as an example or utility today more than anything else (but works well for what it does). Per the code in the join package, there are no such estimates made today. There is zero use of DistributedCache - the only decisions are made based on the expression (i.e. to select which form of joining record reader to use). Enhancements to this may be accepted though, so feel free to file some JIRAs if you have something to suggest/contribute. Hopefully one day we could have a unified library between client-end tools for common use-cases such as joins, etc. over MR, but there isn't such a thing right now (AFAIK). On Tue, Oct 23, 2012 at 2:52 PM, Sigurd Spieckermann sigurd.spieckerm...@gmail.com wrote: Interesting to know that Hive and Pig are doing something in this direction. I'm dealing with the Hadoop join-package which doesn't use DistributedCache though but it rather pulls the other partition over the network before launching the map task. This is under the assumption that both partitions are too big to load into DC or it's just undesirable to use DC. Is there a similar mechanism implemented in the join-package that considers the size of the two partitions to be joined trying to execute the map task on the datanode that holds the bigger partition? 2012/10/23 Bejoy KS bejoy.had...@gmail.com Hi Sigurd Mapside joins are efficiently implemented in Hive and Pig. I'm talking in terms of how mapside joins are implemented in hive. In map side join, the smaller data set is first loaded into DistributedCache. The larger dataset is streamed as usual and the smaller dataset in memory. For every record in larger data set the look up is made in memory on the smaller set and there by joins are done. In later versions of hive the hive framework itself intelligently determines the smaller data set. In older versions you can specify the smaller data set using some hints in query. Regards Bejoy KS Sent from handheld, please excuse typos. -Original Message- From: Sigurd Spieckermann sigurd.spieckerm...@gmail.com Date: Mon, 22 Oct 2012 22:29:15 To: user@hadoop.apache.org Reply-To: user@hadoop.apache.org Subject: Data locality of map-side join Hi guys, I've been trying to figure out whether a map-side join using the join-package does anything clever regarding data locality with respect to at least one
Re: Data locality of map-side join
Hi Sigurd, From what I've generally noticed, the client-end frameworks (Hive, Pig, etc.) have gotten much more cleverness and efficiency packed in their join parts than the MR join package which probably exists to serve as an example or utility today more than anything else (but works well for what it does). Per the code in the join package, there are no such estimates made today. There is zero use of DistributedCache - the only decisions are made based on the expression (i.e. to select which form of joining record reader to use). Enhancements to this may be accepted though, so feel free to file some JIRAs if you have something to suggest/contribute. Hopefully one day we could have a unified library between client-end tools for common use-cases such as joins, etc. over MR, but there isn't such a thing right now (AFAIK). On Tue, Oct 23, 2012 at 2:52 PM, Sigurd Spieckermann sigurd.spieckerm...@gmail.com wrote: Interesting to know that Hive and Pig are doing something in this direction. I'm dealing with the Hadoop join-package which doesn't use DistributedCache though but it rather pulls the other partition over the network before launching the map task. This is under the assumption that both partitions are too big to load into DC or it's just undesirable to use DC. Is there a similar mechanism implemented in the join-package that considers the size of the two partitions to be joined trying to execute the map task on the datanode that holds the bigger partition? 2012/10/23 Bejoy KS bejoy.had...@gmail.com Hi Sigurd Mapside joins are efficiently implemented in Hive and Pig. I'm talking in terms of how mapside joins are implemented in hive. In map side join, the smaller data set is first loaded into DistributedCache. The larger dataset is streamed as usual and the smaller dataset in memory. For every record in larger data set the look up is made in memory on the smaller set and there by joins are done. In later versions of hive the hive framework itself intelligently determines the smaller data set. In older versions you can specify the smaller data set using some hints in query. Regards Bejoy KS Sent from handheld, please excuse typos. -Original Message- From: Sigurd Spieckermann sigurd.spieckerm...@gmail.com Date: Mon, 22 Oct 2012 22:29:15 To: user@hadoop.apache.org Reply-To: user@hadoop.apache.org Subject: Data locality of map-side join Hi guys, I've been trying to figure out whether a map-side join using the join-package does anything clever regarding data locality with respect to at least one of the partitions to join. To be more specific, if I want to join two datasets and some partition of dataset A is larger than the corresponding partition of dataset B, does Hadoop account for this and try to ensure that the map task is executed on the datanode storing the bigger partition thus reducing data transfer (if the other partition does not happen to be located on that same datanode)? I couldn't conclude the one or the other behavior from the source code and I couldn't find any documentation about this detail. Thanks for clarifying! Sigurd -- Harsh J
Re: Data locality of map-side join
Hi Sigurd Mapside joins are efficiently implemented in Hive and Pig. I'm talking in terms of how mapside joins are implemented in hive. In map side join, the smaller data set is first loaded into DistributedCache. The larger dataset is streamed as usual and the smaller dataset in memory. For every record in larger data set the look up is made in memory on the smaller set and there by joins are done. In later versions of hive the hive framework itself intelligently determines the smaller data set. In older versions you can specify the smaller data set using some hints in query. Regards Bejoy KS Sent from handheld, please excuse typos. -Original Message- From: Sigurd Spieckermann sigurd.spieckerm...@gmail.com Date: Mon, 22 Oct 2012 22:29:15 To: user@hadoop.apache.org Reply-To: user@hadoop.apache.org Subject: Data locality of map-side join Hi guys, I've been trying to figure out whether a map-side join using the join-package does anything clever regarding data locality with respect to at least one of the partitions to join. To be more specific, if I want to join two datasets and some partition of dataset A is larger than the corresponding partition of dataset B, does Hadoop account for this and try to ensure that the map task is executed on the datanode storing the bigger partition thus reducing data transfer (if the other partition does not happen to be located on that same datanode)? I couldn't conclude the one or the other behavior from the source code and I couldn't find any documentation about this detail. Thanks for clarifying! Sigurd
Re: Data locality of map-side join
Interesting to know that Hive and Pig are doing something in this direction. I'm dealing with the Hadoop join-package which doesn't use DistributedCache though but it rather pulls the other partition over the network before launching the map task. This is under the assumption that both partitions are too big to load into DC or it's just undesirable to use DC. Is there a similar mechanism implemented in the join-package that considers the size of the two partitions to be joined trying to execute the map task on the datanode that holds the bigger partition? 2012/10/23 Bejoy KS bejoy.had...@gmail.com Hi Sigurd Mapside joins are efficiently implemented in Hive and Pig. I'm talking in terms of how mapside joins are implemented in hive. In map side join, the smaller data set is first loaded into DistributedCache. The larger dataset is streamed as usual and the smaller dataset in memory. For every record in larger data set the look up is made in memory on the smaller set and there by joins are done. In later versions of hive the hive framework itself intelligently determines the smaller data set. In older versions you can specify the smaller data set using some hints in query. Regards Bejoy KS Sent from handheld, please excuse typos. -Original Message- From: Sigurd Spieckermann sigurd.spieckerm...@gmail.com Date: Mon, 22 Oct 2012 22:29:15 To: user@hadoop.apache.org Reply-To: user@hadoop.apache.org Subject: Data locality of map-side join Hi guys, I've been trying to figure out whether a map-side join using the join-package does anything clever regarding data locality with respect to at least one of the partitions to join. To be more specific, if I want to join two datasets and some partition of dataset A is larger than the corresponding partition of dataset B, does Hadoop account for this and try to ensure that the map task is executed on the datanode storing the bigger partition thus reducing data transfer (if the other partition does not happen to be located on that same datanode)? I couldn't conclude the one or the other behavior from the source code and I couldn't find any documentation about this detail. Thanks for clarifying! Sigurd
Data locality of map-side join
Hi guys, I've been trying to figure out whether a map-side join using the join-package does anything clever regarding data locality with respect to at least one of the partitions to join. To be more specific, if I want to join two datasets and some partition of dataset A is larger than the corresponding partition of dataset B, does Hadoop account for this and try to ensure that the map task is executed on the datanode storing the bigger partition thus reducing data transfer (if the other partition does not happen to be located on that same datanode)? I couldn't conclude the one or the other behavior from the source code and I couldn't find any documentation about this detail. Thanks for clarifying! Sigurd
Re: HBase and MapReduce data locality
Inline. Just a set of you're right :-). It's documented here: http://hbase.apache.org/book.html#regions.arch.locality On Wed, Aug 29, 2012 at 8:06 AM, Robert Dyer rd...@iastate.edu wrote: Ok but does that imply that only 1 of your compute nodes is promised to have all of the data for any given row? The blocks will replicate, but they don't necessarily all replicate to the same nodes right? Right. So if I have say 2 column families (cf1, cf2) and there is 2 physical files on the HDFS for those (per region) then those files are created on one datanode (dn1) which will have all blocks local to that node. Yes. Nit: datanodes don't see files, only blocks. But the logic remains the same. Once it replicates those blocks 2 more times by default, isn't it possible the blocks for cf1 will go to dn2, dn3 while the blocks for cf2 goes to dn4, dn5? Yes, it's possible (and even likely).
HBase and MapReduce data locality
I have been reading up on HBase and my understanding is that the physical files on the HDFS are split first by region and then by column families. Thus each column family has its own physical file (on a per-region basis). If I run a MapReduce task that uses the HBase as input, wouldn't this imply that if the task reads from more than 1 column family the data for that row might not be (entirely) local to the task? Is there a way to tell the HDFS to keep blocks of each region's column families together?
Re: Extension points available for data locality
On Wed, Aug 22, 2012 at 7:46 AM, Harsh J ha...@cloudera.com wrote: Hi, On Tue, Aug 21, 2012 at 6:14 PM, Tharindu Mathew mcclou...@gmail.com wrote: Harsh, What you said was the initial impression I got, but I thought I need to do something more with the name node. Thanks for clearing that out. My guess is that this probably works by using getLocations and mapping this location ip (or host) with the ip (or host) of the task tracker? Is this correct? Yes this is correct, the TT's location (hostname/IP) is what it would map to. Thanks Harsh. Exactly what I needed to here. -- Harsh J -- Regards, Tharindu blog: http://mackiemathew.com/
Re: Extension points available for data locality
Dino, Feng, Thanks for the options, but I guess I need to do it myself. Harsh, What you said was the initial impression I got, but I thought I need to do something more with the name node. Thanks for clearing that out. My guess is that this probably works by using getLocations and mapping this location ip (or host) with the ip (or host) of the task tracker? Is this correct? On Tue, Aug 21, 2012 at 3:14 PM, feng lu amuseme...@gmail.com wrote: Hi Tharindu May you can try the Gora,The Apache Gora open source framework provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key value stores, document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce support. Now it support MySQL in gora-sql model. http://gora.apache.org/ On Tue, Aug 21, 2012 at 5:39 PM, Harsh J ha...@cloudera.com wrote: Tharindu, (Am assuming you've done enough research to know that there's benefit in what you're attempting to do.) Locality of tasks are determined by the job's InputFormat class. Specifically, the locality information returned by the InputSplit objects via InputFormat#getSplits(…) API is what the MR scheduler looks at when trying to launch data local tasks. You can tweak your InputFormat (the one that uses this DB as input?) to return relevant locations based on your DB Cluster, in order to achieve this. On Tue, Aug 21, 2012 at 2:36 PM, Tharindu Mathew mcclou...@gmail.com wrote: Hi, I'm doing some research that involves pulling data stored in a mysql cluster directly for a map reduce job, without storing the data in HDFS. I'd like to run hadoop task tracker nodes directly on the mysql cluster nodes. The purpose of this being, starting mappers directly in the node closest to the data if possible (data locality). I notice that with HDFS, since the name node knows exactly where each data block is, it uses this to achieve data locality. Is there a way to achieve my requirement possibly by extending the name node or otherwise? Thanks in advance. -- Regards, Tharindu blog: http://mackiemathew.com/ -- Harsh J -- Don't Grow Old, Grow Up... :-) -- Regards, Tharindu blog: http://mackiemathew.com/
Re: is HDFS RAID data locality efficient?
Nice explanation guys .. thanks Syed Abdul kather send from Samsung S3 On Aug 9, 2012 12:02 AM, Ajit Ratnaparkhi [via Lucene] ml-node+s472066n32...@n3.nabble.com wrote: Agreed with Steve. That is most important use of HDFS RAID, where you consume less disk space with same reliability and availability guarantee at cost of processing performance. Most of data in hdfs is cold data, without HDFS RAID you end up maintaining 3 replicas of data which is hardly going to be processed again, but you cant remove/move this data to separate archive because if required processing should be as soon as possible. -Ajit On Wed, Aug 8, 2012 at 11:01 PM, Steve Loughran [hidden email]http://user/SendEmail.jtp?type=nodenode=322i=0 wrote: On 8 August 2012 09:46, Sourygna Luangsay [hidden email]http://user/SendEmail.jtp?type=nodenode=322i=1 wrote: Hi folks! One of the scenario I can think in order to take advantage of HDFS RAID without suffering this penalty is:** **- **Using normal HDFS with default replication=3 for my “fresh data” **- **Using HDFS RAID for my historical data (that is barely used by M/R) ** ** exactly: less space use on cold data, with the penalty that access performance can be worse. As the majority of data on a hadoop cluster is usually cold, it's a space and power efficient story for the archive data -- Steve Loughran Hortonworks Inc -- If you reply to this email, your message will be added to the discussion below: http://lucene.472066.n3.nabble.com/is-HDFS-RAID-data-locality-efficient-tp3999891p322.html To unsubscribe from Lucene, click herehttp://lucene.472066.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=472066code=aW4uYWJkdWxAZ21haWwuY29tfDQ3MjA2NnwxMDczOTUyNDEw . NAMLhttp://lucene.472066.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml - THANKS AND REGARDS, SYED ABDUL KATHER -- View this message in context: http://lucene.472066.n3.nabble.com/is-HDFS-RAID-data-locality-efficient-tp3999891p324.html Sent from the Hadoop lucene-users mailing list archive at Nabble.com.
Re: is HDFS RAID data locality efficient?
Just something to think about... There's a company here in Chicago called Cleversafe. I believe they recently made an announcement concerning Hadoop? The interesting thing about RAID is that you're adding to the disk latency and depending on which raid you use you could kill performance on a rebuild of a disk. In terms of uptime of Apache based Hadoop, RAID allows you to actually hot swap the disks and unless you lose both drives (assuming Raid 1, mirroring), your DN doesn't know and doesn't have to go down. So there is some value there, however at the expense of storage and storage costs. You can reduce the replication factor to 2. I don't know that I would go to anything lower because you still can lose the server... In terms of data locality... maybe you lose a bit, however... because you're raiding your storage, you now have less data per node. So you end up with more nodes, right? Just some food for thought. On Aug 8, 2012, at 11:46 AM, Sourygna Luangsay sluang...@pragsis.com wrote: Hi folks! I have just read about the HDFS RAID feature that was added to Hadoop 0.21 or 0.22. and I am quite curious to know if people use it, what kind of use they have and what they think about Map/Reduce data locality. First big actor of this technology is Facebook, that claims to save many PB with it (see http://www.slideshare.net/ydn/hdfs-raid-facebook slides 4 and 5). I understand the following advantages with HDFS RAID: - You can save space - System tolerates more missing blocks Nonetheless, one of the drawback I see is M/R data locality. As far as I understand, the advantage of having 3 replicas of each blocks is not only security if one server fails or a block is corrupted, but also the possibility to have as far as 3 tasktrackers executing the map task with “local data”. If you consider the 4th slide of the Facebook presentation, such infrastructure decreases this possibility to only 1 tasktracker. That means that if this tasktracker is very busy executing other tasks, you have the following choice: - Waiting this tasktracker to finish executing (part of) the current tasks (freeing map slots for instance) - Executing the map task for this block in another tasktracker, transferring the information of the block through the network In both cases, you´ll get a M/R penalty (please, tell me if I am wrong). Has somebody considered such penalty or has some benchmarks to share with us? One of the scenario I can think in order to take advantage of HDFS RAID without suffering this penalty is: - Using normal HDFS with default replication=3 for my “fresh data” - Using HDFS RAID for my historical data (that is barely used by M/R) And you, what are you using HDFS RAID for? Regards, Sourygna Luangsay
Bulk Import Data Locality
Hello, As far as I understand Bulk Import functionality will not take into account the Data Locality question. MR job will create number of reducer tasks same as regions to write into, but it will not advice on which nodes to run these tasks. In that case Reducer task which writes HFiles of some region may not be physically located at the same node as RS that serves that region. The way HDFS writes data, there will be (likely) one full replica of bolcks of HFiles of this Region written on the node where Reducer task was run and other replicas (if replication 1) will be distributed randomly over the cluster. Thus, RS while serving data of that region will (most likely) not look at local data (data will be transferred from other datanodes). I.e. data locality will be broken. Is this correct? If yes, I guess, if we could tell MR framework where (which nodes) to launch certain Reducer tasks, this would help us. I believe this is not possible with MR1, please correct me if I'm wrong. Perhaps, this is this possible with MR2? I assume there's no way to provide a hint to a NameNode where to place blocks of a new File too, right? Thank you, -- Alex Baranau -- Sematext :: http://blog.sematext.com/ :: Hadoop - HBase - ElasticSearch - Solr
Fwd: Bulk Import Data Locality
Thank you a lot for the replies. To me it is clear when data locality gets broken though (and it is not only the failure of the RS, there are other cases). I was hoping more for suggestions around this particular use-case: assuming that nodes/RSs are stable, how to make sure to achieve the data locality when doing bulk import (writing HFiles directly from MR job). Running major compaction helps here (as new files are created instead of old ones *on the DataNode local to RS where region is being compacted), but I'd really want to not do it. This is quite resource intensive and thus expensive process... I was hoping also guys from HDFS/MapReduce teams would comment on my latter Qs. I heard that there is some work in HBase community to allow asking HDFS to replicate blocks of the files together (so that there are full replicas on other nodes, which helps as Lars noted) too. I also heard from a HDFS guy that there are ideas around better replication logic. Little offtop: Also is it correct to say that if i set smaller data block size data locality gets worse, and if data block size gets bigger data locality gets better. *Theoretically* if your region data stored in one HFile (say one flush occurred or major compaction caused that, given that there's one CF) and this HFile is smaller than the configured block size on HDFS, then we can say that 3 (or whatever is replication) replicas of this file (and hence of this region) are full replicas, which makes it easier to preserve data locality if RS fails down (or when anything else cause re-assigning the region). But since Region size is usually much bigger (usually 10-20 times bigger at least), this fact doesn't buy you something. Alex Baranau -- Sematext :: http://blog.sematext.com/ :: Hadoop - HBase - ElasticSearch - Solr On Wed, Jul 18, 2012 at 9:43 PM, Ben Kim benkimkim...@gmail.com wrote: I added some QA's went with Lars. Hope this is somewhat related to your data locality questions. On Jun 15, 2012, at 6:56 AM, Ben Kim wrote: Hi, I've been posting questions in the mailing-list quiet often lately, and here goes another one about data locality I read the excellent blog post about data locality that Lars George wrote at http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html I understand data locality in hbase as locating a region in a region-server where most of its data blocks reside. The opposite is happening, i.e. the region server process triggers for all data it writes to be located on the same physical machine. So that way fast data access is guranteed when running a MR because each map/reduce task is run for each region in the tasktracker where the region co-locates. Correct. But what if the data blocks of the region are evenly spread over multiple region-servers? This will not happen, unless the original server fails. Then the region is moved to another that now needs to do a lot of remote reads over the network. This is way there is work being done to allow for custom placement policies in HDFS. That way you can store the entire region and all copies as complete units on three data nodes. In case of a failure you can then move the region to one of the two copies. This is not available yet though, but it is being worked on (so I heard). Does a MR task has to remotely access the data blocks from other regionservers? For the above failure case, it would be the region server accessing the remote data, yes. How good is hbase locating datablocks where a region resides? That is again the wrong way around. HBase has no clue as to where blocks reside, nor does it know that the file system in fact uses separate blocks. HBase stores files, HDFS does the block magic underneath the hood, and transparent to HBase. Also is it correct to say that if i set smaller data block size data locality gets worse, and if data block size gets bigger data locality gets better. This is not applicable here, I am assuming this stems from the above confusion about which system is handling the blocks, HBase or HDFS. See above. HTH, Lars On Thu, Jul 19, 2012 at 6:39 AM, Cristofer Weber cristofer.we...@neogrid.com wrote: Hi Alex, I ran one of our bulk import jobs with partial payload, without proceeding with major compaction, and you are right: Some hdfs blocks are in a different datanode. -Mensagem original- De: Alex Baranau [mailto:alex.barano...@gmail.com] Enviada em: quarta-feira, 18 de julho de 2012 12:46 Para: hbase-u...@hadoop.apache.org; mapreduce-user@hadoop.apache.org; hdfs-u...@hadoop.apache.org Assunto: Bulk Import Data Locality Hello, As far as I understand Bulk Import functionality will not take into account the Data Locality question. MR job will create number of reducer
Fwd: Bulk Import Data Locality
Thank you a lot for the replies. To me it is clear when data locality gets broken though (and it is not only the failure of the RS, there are other cases). I was hoping more for suggestions around this particular use-case: assuming that nodes/RSs are stable, how to make sure to achieve the data locality when doing bulk import (writing HFiles directly from MR job). Running major compaction helps here (as new files are created instead of old ones *on the DataNode local to RS where region is being compacted), but I'd really want to not do it. This is quite resource intensive and thus expensive process... I was hoping also guys from HDFS/MapReduce teams would comment on my latter Qs. I heard that there is some work in HBase community to allow asking HDFS to replicate blocks of the files together (so that there are full replicas on other nodes, which helps as Lars noted) too. I also heard from a HDFS guy that there are ideas around better replication logic. Little offtop: Also is it correct to say that if i set smaller data block size data locality gets worse, and if data block size gets bigger data locality gets better. *Theoretically* if your region data stored in one HFile (say one flush occurred or major compaction caused that, given that there's one CF) and this HFile is smaller than the configured block size on HDFS, then we can say that 3 (or whatever is replication) replicas of this file (and hence of this region) are full replicas, which makes it easier to preserve data locality if RS fails down (or when anything else cause re-assigning the region). But since Region size is usually much bigger (usually 10-20 times bigger at least), this fact doesn't buy you something. Alex Baranau -- Sematext :: http://blog.sematext.com/ :: Hadoop - HBase - ElasticSearch - Solr On Wed, Jul 18, 2012 at 9:43 PM, Ben Kim benkimkim...@gmail.com wrote: I added some QA's went with Lars. Hope this is somewhat related to your data locality questions. On Jun 15, 2012, at 6:56 AM, Ben Kim wrote: Hi, I've been posting questions in the mailing-list quiet often lately, and here goes another one about data locality I read the excellent blog post about data locality that Lars George wrote at http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html I understand data locality in hbase as locating a region in a region-server where most of its data blocks reside. The opposite is happening, i.e. the region server process triggers for all data it writes to be located on the same physical machine. So that way fast data access is guranteed when running a MR because each map/reduce task is run for each region in the tasktracker where the region co-locates. Correct. But what if the data blocks of the region are evenly spread over multiple region-servers? This will not happen, unless the original server fails. Then the region is moved to another that now needs to do a lot of remote reads over the network. This is way there is work being done to allow for custom placement policies in HDFS. That way you can store the entire region and all copies as complete units on three data nodes. In case of a failure you can then move the region to one of the two copies. This is not available yet though, but it is being worked on (so I heard). Does a MR task has to remotely access the data blocks from other regionservers? For the above failure case, it would be the region server accessing the remote data, yes. How good is hbase locating datablocks where a region resides? That is again the wrong way around. HBase has no clue as to where blocks reside, nor does it know that the file system in fact uses separate blocks. HBase stores files, HDFS does the block magic underneath the hood, and transparent to HBase. Also is it correct to say that if i set smaller data block size data locality gets worse, and if data block size gets bigger data locality gets better. This is not applicable here, I am assuming this stems from the above confusion about which system is handling the blocks, HBase or HDFS. See above. HTH, Lars On Thu, Jul 19, 2012 at 6:39 AM, Cristofer Weber cristofer.we...@neogrid.com wrote: Hi Alex, I ran one of our bulk import jobs with partial payload, without proceeding with major compaction, and you are right: Some hdfs blocks are in a different datanode. -Mensagem original- De: Alex Baranau [mailto:alex.barano...@gmail.com] Enviada em: quarta-feira, 18 de julho de 2012 12:46 Para: hbase-u...@hadoop.apache.org; mapreduce-u...@hadoop.apache.org; hdfs-user@hadoop.apache.org Assunto: Bulk Import Data Locality Hello, As far as I understand Bulk Import functionality will not take into account the Data Locality question. MR job will create number of reducer
reducers and data locality
Hello folks, I have a lot of input splits (10k-50k - 128 mb blocks) which contains text files. I need to process those line by line, then copy the result into roughly equal size of shards. So i generate a random key (from a range of [0:numberOfShards]) which is used to route the map output to different reducers and the size is more less equal. I know that this is not really efficient and i was wondering if i could somehow control how keys are routed. For example could i generate the randomKeys with hostname prefixes and control which keys are sent to each reducer? What do you think? Kind regards Mete
Re: reducers and data locality
Hi Mete A custom Paritioner class can control the flow of keys to the desired reducer. It gives you more control on which key to which reducer. Regards Bejoy KS Sent from handheld, please excuse typos. -Original Message- From: mete efk...@gmail.com Date: Fri, 27 Apr 2012 09:19:21 To: common-user@hadoop.apache.org Reply-To: common-user@hadoop.apache.org Subject: reducers and data locality Hello folks, I have a lot of input splits (10k-50k - 128 mb blocks) which contains text files. I need to process those line by line, then copy the result into roughly equal size of shards. So i generate a random key (from a range of [0:numberOfShards]) which is used to route the map output to different reducers and the size is more less equal. I know that this is not really efficient and i was wondering if i could somehow control how keys are routed. For example could i generate the randomKeys with hostname prefixes and control which keys are sent to each reducer? What do you think? Kind regards Mete
Hadoop PIPES job using C++ and binary data results in data locality problem.
Hi everyone. I am running C++ code using the PIPES wrapper and I am looking for some tutorials, examples or any kind of help with regards to using binary data. My problems is that I am working with large chunks of binary data and converting it to text not an option. My first question is thus, can I pass large chunks (128 MB) of binary data through the PIPES interface? I have not been able to find documentation on this. The way I do things now is that I bypass the Hadoop process by opening and reading the data directly from the C++ code using the HDFS C API. However, that means that I lose the data locality and causes too much network overhead to be viable at large scale. If passing binary data directly is not possible with PIPES, I need somehow to write my own RecordReader that maintains the data locality but still does not actually emit the data, (I just need to make sure the c++ mapper reads the same data from a local source when it is spawned). The recordreader actually does not need to read the data at all. Generating a config string that tells the C++ mapper code what to read would be just fine. The second question is thus, how to write my own RecordReader in the C++ or JAVA? I also would like information on how Hadoop maintains the data locality between RecordReaders and the spawned map tasks. Any information is most welcome. Regards GorGo -- View this message in context: http://old.nabble.com/Hadoop-PIPES-job-using-C%2B%2B-and-binary-data-results-in-data-locality-problem.-tp33112818p33112818.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: Hadoop PIPES job using C++ and binary data results in data locality problem.
I think what you want to try and do is to use JNI rather then pipes or streaming. PIPES has known issues and it is my understanding that its use is now discouraged. The ideal way to do this is to use JNI to send your data to the C code. Be aware that moving large amounts of data through JNI has some of its own challenges, but most of these can be solved by using Direct ByteBuffer. --Bobby Evans On 1/10/12 10:31 AM, GorGo gylf...@ru.is wrote: Hi everyone. I am running C++ code using the PIPES wrapper and I am looking for some tutorials, examples or any kind of help with regards to using binary data. My problems is that I am working with large chunks of binary data and converting it to text not an option. My first question is thus, can I pass large chunks (128 MB) of binary data through the PIPES interface? I have not been able to find documentation on this. The way I do things now is that I bypass the Hadoop process by opening and reading the data directly from the C++ code using the HDFS C API. However, that means that I lose the data locality and causes too much network overhead to be viable at large scale. If passing binary data directly is not possible with PIPES, I need somehow to write my own RecordReader that maintains the data locality but still does not actually emit the data, (I just need to make sure the c++ mapper reads the same data from a local source when it is spawned). The recordreader actually does not need to read the data at all. Generating a config string that tells the C++ mapper code what to read would be just fine. The second question is thus, how to write my own RecordReader in the C++ or JAVA? I also would like information on how Hadoop maintains the data locality between RecordReaders and the spawned map tasks. Any information is most welcome. Regards GorGo -- View this message in context: http://old.nabble.com/Hadoop-PIPES-job-using-C%2B%2B-and-binary-data-results-in-data-locality-problem.-tp33112818p33112818.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Data locality for a custom input format
Hi hadoop devs, I'm implementing a custom input format and want to understand how to make use of data locality. AFAIU, only file input format makes use of data locality since the job tracker picks data locality based on the block location defined in the file input split. So, the job tracker code is partly responsible for this. So providing data locality for a custom input format would be to either either extend file input format or modify job tracker code (if that makes sense even). Is my understanding correct? -- Regards, Tharindu blog: http://mackiemathew.com/
Re: Data locality for a custom input format
Tharindu, InputSplit#getLocations() i.e., http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/mapreduce/InputSplit.html#getLocations() is used to decide locality of a task. You need your custom InputFormat to prepare the right array of these objects. The # of objects == # of map tasks, and the locations array gets used by the scheduler for local assignment. For a FileSplit preparation, this is as easy as passing the block locations obtained from the NameNode. For the rest type of splits, you need to fill them up yourself. On Sat, Nov 12, 2011 at 7:12 PM, Tharindu Mathew mcclou...@gmail.com wrote: Hi hadoop devs, I'm implementing a custom input format and want to understand how to make use of data locality. AFAIU, only file input format makes use of data locality since the job tracker picks data locality based on the block location defined in the file input split. So, the job tracker code is partly responsible for this. So providing data locality for a custom input format would be to either either extend file input format or modify job tracker code (if that makes sense even). Is my understanding correct? -- Regards, Tharindu blog: http://mackiemathew.com/ -- Harsh J
Re: Data locality for a custom input format
There's this article on InfoQ that deals with this issue... ;-) http://www.infoq.com/articles/HadoopInputFormat Sent from a remote device. Please excuse any typos... Mike Segel On Nov 12, 2011, at 7:51 AM, Harsh J ha...@cloudera.com wrote: Tharindu, InputSplit#getLocations() i.e., http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/mapreduce/InputSplit.html#getLocations() is used to decide locality of a task. You need your custom InputFormat to prepare the right array of these objects. The # of objects == # of map tasks, and the locations array gets used by the scheduler for local assignment. For a FileSplit preparation, this is as easy as passing the block locations obtained from the NameNode. For the rest type of splits, you need to fill them up yourself. On Sat, Nov 12, 2011 at 7:12 PM, Tharindu Mathew mcclou...@gmail.com wrote: Hi hadoop devs, I'm implementing a custom input format and want to understand how to make use of data locality. AFAIU, only file input format makes use of data locality since the job tracker picks data locality based on the block location defined in the file input split. So, the job tracker code is partly responsible for this. So providing data locality for a custom input format would be to either either extend file input format or modify job tracker code (if that makes sense even). Is my understanding correct? -- Regards, Tharindu blog: http://mackiemathew.com/ -- Harsh J
Re: data locality
Thanks! 2011/10/26 Steve Loughran ste...@apache.org: On 26/10/11 05:22, Eugene Kirpichov wrote: But I guess it isn't always possible to achieve optimal scheduling, right? What's done then; any account for network topology perhaps? I'd recommend this paper if you are curious, it explains the Fair Scheduler http://www.cs.berkeley.edu/~matei/papers/2010/eurosys_delay_scheduling.pdf You can plug in different schedulers with different policies if you want -- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/
Re: data locality
On 25 October 2011 17:36, ivan.nov...@emc.com wrote: So I guess the job tracker is the one reading the HDFS meta-data and then optimizing the scheduling of map jobs based on that? Currently no, it's the JobClient that does it. Although you use the term scheduling in a way which may confuse some other participants in the thread, since your original question wasn't about what Hadoop people usually call the scheduler. S. On 10/25/11 3:13 PM, Shevek she...@karmasphere.com wrote: We pray to $deity that the mapreduce block size is about the same as (or smaller than) the hdfs block size. We also pray that file format synchronization points are frequent when compared to block boundaries. The JobClient finds the location of each block of each file. It splits the job into FileSplit(s), with one per block. Each FileSplit is processed by a task. The Split contains the locations in which the task should best be run. The last block may be very short. It is then subsumed into the preceding block. Some data is transferred between nodes when the synchronization point for the file format is not at a block boundary. (It basically never is, but we hope it's close, or the purpose of MR locality is defeated.) Specifically to your questions: Most of the data should be read from the local hdfs node under the above assumptions. The communication layer between mapreduce and hdfs is not special. S. On 25 October 2011 11:49, ivan.nov...@emc.com wrote: Hello, I am trying to understand how data locality works in hadoop. If you run a map reduce job do the mappers only read data from the host on which they are running? Is there a communication protocol between the map reduce layer and HDFS layer so that the mapper gets optimized to read data locally? Any pointers on which layer of the stack handles this? Cheers, Ivan
data locality
Hello, I am trying to understand how data locality works in hadoop. If you run a map reduce job do the mappers only read data from the host on which they are running? Is there a communication protocol between the map reduce layer and HDFS layer so that the mapper gets optimized to read data locally? Any pointers on which layer of the stack handles this? Cheers, Ivan
Re: data locality
We pray to $deity that the mapreduce block size is about the same as (or smaller than) the hdfs block size. We also pray that file format synchronization points are frequent when compared to block boundaries. The JobClient finds the location of each block of each file. It splits the job into FileSplit(s), with one per block. Each FileSplit is processed by a task. The Split contains the locations in which the task should best be run. The last block may be very short. It is then subsumed into the preceding block. Some data is transferred between nodes when the synchronization point for the file format is not at a block boundary. (It basically never is, but we hope it's close, or the purpose of MR locality is defeated.) Specifically to your questions: Most of the data should be read from the local hdfs node under the above assumptions. The communication layer between mapreduce and hdfs is not special. S. On 25 October 2011 11:49, ivan.nov...@emc.com wrote: Hello, I am trying to understand how data locality works in hadoop. If you run a map reduce job do the mappers only read data from the host on which they are running? Is there a communication protocol between the map reduce layer and HDFS layer so that the mapper gets optimized to read data locally? Any pointers on which layer of the stack handles this? Cheers, Ivan
Re: data locality
So I guess the job tracker is the one reading the HDFS meta-data and then optimizing the scheduling of map jobs based on that? On 10/25/11 3:13 PM, Shevek she...@karmasphere.com wrote: We pray to $deity that the mapreduce block size is about the same as (or smaller than) the hdfs block size. We also pray that file format synchronization points are frequent when compared to block boundaries. The JobClient finds the location of each block of each file. It splits the job into FileSplit(s), with one per block. Each FileSplit is processed by a task. The Split contains the locations in which the task should best be run. The last block may be very short. It is then subsumed into the preceding block. Some data is transferred between nodes when the synchronization point for the file format is not at a block boundary. (It basically never is, but we hope it's close, or the purpose of MR locality is defeated.) Specifically to your questions: Most of the data should be read from the local hdfs node under the above assumptions. The communication layer between mapreduce and hdfs is not special. S. On 25 October 2011 11:49, ivan.nov...@emc.com wrote: Hello, I am trying to understand how data locality works in hadoop. If you run a map reduce job do the mappers only read data from the host on which they are running? Is there a communication protocol between the map reduce layer and HDFS layer so that the mapper gets optimized to read data locally? Any pointers on which layer of the stack handles this? Cheers, Ivan
Re: data locality
But I guess it isn't always possible to achieve optimal scheduling, right? What's done then; any account for network topology perhaps? 26.10.2011, в 4:42, Mapred Learn mapred.le...@gmail.com написал(а): Yes that's right ! Sent from my iPhone On Oct 25, 2011, at 5:36 PM, ivan.nov...@emc.com wrote: So I guess the job tracker is the one reading the HDFS meta-data and then optimizing the scheduling of map jobs based on that? On 10/25/11 3:13 PM, Shevek she...@karmasphere.com wrote: We pray to $deity that the mapreduce block size is about the same as (or smaller than) the hdfs block size. We also pray that file format synchronization points are frequent when compared to block boundaries. The JobClient finds the location of each block of each file. It splits the job into FileSplit(s), with one per block. Each FileSplit is processed by a task. The Split contains the locations in which the task should best be run. The last block may be very short. It is then subsumed into the preceding block. Some data is transferred between nodes when the synchronization point for the file format is not at a block boundary. (It basically never is, but we hope it's close, or the purpose of MR locality is defeated.) Specifically to your questions: Most of the data should be read from the local hdfs node under the above assumptions. The communication layer between mapreduce and hdfs is not special. S. On 25 October 2011 11:49, ivan.nov...@emc.com wrote: Hello, I am trying to understand how data locality works in hadoop. If you run a map reduce job do the mappers only read data from the host on which they are running? Is there a communication protocol between the map reduce layer and HDFS layer so that the mapper gets optimized to read data locally? Any pointers on which layer of the stack handles this? Cheers, Ivan
Re: Lack of data locality in Hadoop-0.20.2
Hi Matei, Using the fair scheduler of the cloudera distribution seems to have (mostly) solved the problem. Thanks a lot for the suggestion. -Virajith On Tue, Jul 12, 2011 at 7:23 PM, Matei Zaharia ma...@eecs.berkeley.eduwrote: Hi Virajith, The default FIFO scheduler just isn't optimized for locality for small jobs. You should be able to get substantially more locality even with 1 replica if you use the fair scheduler, although the version of the scheduler in 0.20 doesn't contain the locality optimization. Try the Cloudera distribution to get a 0.20-compatible Hadoop that does contain it. I also think your value of 10% inferred on completion time might be a little off, because you have quite a few more data blocks than nodes so it should be easy to make the first few waves of tasks data-local. Try a version of Hadoop that correctly measures this counter. Matei On Jul 12, 2011, at 1:27 PM, Virajith Jalaparti wrote: I agree that the scheduler has lesser leeway when the replication factor is 1. However, I would still expect the number of data-local tasks to be more than 10% even when the replication factor is 1. Presumably, the scheduler would have greater number of opportunities to schedule data-local tasks as compared to just 10%. (Please note that I am inferring that a map was non-local based on the observed completion time. I don't know why but the logs of my jobs don't show the DATA_LOCAL_MAPS counter information.) I will try using higher replication factors and see how much improvement I can get. Thanks, Virajith On Tue, Jul 12, 2011 at 6:15 PM, Arun C Murthy a...@hortonworks.comwrote: As Aaron mentioned the scheduler has very little leeway when you have a single replica. OTOH, schedulers equate rack-locality to node-locality - this makes sense sense for a large-scale system since intra-rack b/w is good enough for most installs of Hadoop. Arun On Jul 12, 2011, at 7:36 AM, Virajith Jalaparti wrote: I am using a replication factor of 1 since I dont to incur the overhead of replication and I am not much worried about reliability. I am just using the default Hadoop scheduler (FIFO, I think!). In case of a single rack, rack-locality doesn't really have any meaning. Obviously everything will run in the same rack. I am concerned about data-local maps. I assumed that Hadoop would do a much better job at ensuring data-local maps but it doesnt seem to be the case here. -Virajith On Tue, Jul 12, 2011 at 3:30 PM, Arun C Murthy a...@hortonworks.comwrote: Why are you running with replication factor of 1? Also, it depends on the scheduler you are using. The CapacityScheduler in 0.20.203 (not 0.20.2) has much better locality for jobs, similarly with FairScheduler. IAC, running on a single rack with replication of 1 implies rack-locality for all tasks which, in most cases, is good enough. Arun On Jul 12, 2011, at 5:45 AM, Virajith Jalaparti wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith
Re: Lack of data locality in Hadoop-0.20.2
How do I find the number of data-local map tasks that are launched? I checked the log files but didnt see any information about this. All the map tasks are rack local since I am running the job just using a single rack. From the completion time per map (comparing it to the case where I have 1Gbps of bandwidth between the nodes i.e. the case where network bandwidth is not a bottle neck), I saw that more than 90% of the maps are actually reading data over the network. I understand that there might be some maps that are actually launched as non-data local task but I am surprised that around 90% of the maps are actually running as non-data local tasks. I have not measured how much bandwidth was being used but I think the whole 50Mbps is being used. Thanks, Virajith On Tue, Jul 12, 2011 at 1:55 PM, Harsh J ha...@cloudera.com wrote: How much of bandwidth did you see being utilized? What was the count of number of tasks launched as data-local map tasks versus rack local ones? A little bit of edge record data is always read over network but that is highly insignificant compared to the amount of data read locally (a whole block size, if available). On Tue, Jul 12, 2011 at 6:15 PM, Virajith Jalaparti virajit...@gmail.com wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith -- Harsh J
Re: Lack of data locality in Hadoop-0.20.2
Each node is configured to run 8map tasks. I am using 2.4 GHz 64-bit Quad Core Xeon using machines. -Virajith On Tue, Jul 12, 2011 at 2:05 PM, Sudharsan Sampath sudha...@gmail.comwrote: what's the map task capacity of each node ? On Tue, Jul 12, 2011 at 6:15 PM, Virajith Jalaparti virajit...@gmail.comwrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith
Re: Lack of data locality in Hadoop-0.20.2
Virajith, You can see the number of data local vs. non.'s counters in the job itself. On Tue, Jul 12, 2011 at 6:36 PM, Virajith Jalaparti virajit...@gmail.com wrote: How do I find the number of data-local map tasks that are launched? I checked the log files but didnt see any information about this. All the map tasks are rack local since I am running the job just using a single rack. From the completion time per map (comparing it to the case where I have 1Gbps of bandwidth between the nodes i.e. the case where network bandwidth is not a bottle neck), I saw that more than 90% of the maps are actually reading data over the network. I understand that there might be some maps that are actually launched as non-data local task but I am surprised that around 90% of the maps are actually running as non-data local tasks. I have not measured how much bandwidth was being used but I think the whole 50Mbps is being used. Thanks, Virajith On Tue, Jul 12, 2011 at 1:55 PM, Harsh J ha...@cloudera.com wrote: How much of bandwidth did you see being utilized? What was the count of number of tasks launched as data-local map tasks versus rack local ones? A little bit of edge record data is always read over network but that is highly insignificant compared to the amount of data read locally (a whole block size, if available). On Tue, Jul 12, 2011 at 6:15 PM, Virajith Jalaparti virajit...@gmail.com wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith -- Harsh J -- Harsh J
Re: Lack of data locality in Hadoop-0.20.2
Why are you running with replication factor of 1? Also, it depends on the scheduler you are using. The CapacityScheduler in 0.20.203 (not 0.20.2) has much better locality for jobs, similarly with FairScheduler. IAC, running on a single rack with replication of 1 implies rack-locality for all tasks which, in most cases, is good enough. Arun On Jul 12, 2011, at 5:45 AM, Virajith Jalaparti wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith
Re: Lack of data locality in Hadoop-0.20.2
I am attaching the config files I was using for these runs with this email. I am not sure if something in them is causing this non-data locality of Hadoop. Thanks, Virajith On Tue, Jul 12, 2011 at 3:36 PM, Virajith Jalaparti virajit...@gmail.comwrote: I am using a replication factor of 1 since I dont to incur the overhead of replication and I am not much worried about reliability. I am just using the default Hadoop scheduler (FIFO, I think!). In case of a single rack, rack-locality doesn't really have any meaning. Obviously everything will run in the same rack. I am concerned about data-local maps. I assumed that Hadoop would do a much better job at ensuring data-local maps but it doesnt seem to be the case here. -Virajith On Tue, Jul 12, 2011 at 3:30 PM, Arun C Murthy a...@hortonworks.comwrote: Why are you running with replication factor of 1? Also, it depends on the scheduler you are using. The CapacityScheduler in 0.20.203 (not 0.20.2) has much better locality for jobs, similarly with FairScheduler. IAC, running on a single rack with replication of 1 implies rack-locality for all tasks which, in most cases, is good enough. Arun On Jul 12, 2011, at 5:45 AM, Virajith Jalaparti wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration propertynamefs.default.name/namevaluehdfs://10.1.1.2:9000/value/property propertynamehadoop.tmp.dir/namevalue/hadoop/mapred/,/mnt/local/mapred//value/property /configuration ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration property namedfs.secondary.http.address/namevalue10.1.1.2:50090/value/property propertynamedfs.datanode.address/namevalue10.1.1.3:50010/value/property propertynamedfs.datanode.http.address/namevalue10.1.1.3:50075/value/property propertynamedfs.datanode.ipc.address/namevalue10.1.1.3:50020/value/property propertynamedfs.http.address/namevalue10.1.1.2:50070/value/property propertynamedfs.data.dir/namevalue/mnt/local/hdfs/data/value/property propertynamedfs.name.dir/namevalue/mnt/local/hdfs/name/value /property propertynamedfs.replication/namevalue1/value /property property namedfs.block.size/name value134217728/value /property /configuration ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration propertynamemapred.job.tracker.http.address/namevalue10.1.1.2:50030/value/property propertynamemapred.task.tracker.http.address/namevalue10.1.1.3:50060/value/property propertynameslave.host.name/namevalue10.1.1.3/value/property propertynamemapred.job.tracker/namevalue10.1.1.2:55000/value/property propertynamemapred.system.dir/namevalue/hadoop/mapred,/mnt/local/mapred/system/value/property propertynamemapred.local.dir/namevalue/hadoop/mapred,/mnt/local/mapred/local/value/property propertynamemapred.temp.dir/namevalue/hadoop/mapred,/mnt/local/mapred/temp/value/property propertynamemapred.tasktracker.map.tasks.maximum/namevalue1000/value/property propertynamemapred.tasktracker.reduce.tasks.maximum/namevalue1000/value/property propertynamemapred.reduce.slowstart.completed.maps/namevalue 1/value/property property namemapred.queue.names/name valuedefault/value /property propertynamemapred.acls.enabled/name valuefalse/value /property propertynamemapred.reduce.parallel.copies/namevalue20/value /property propertynamemapred.map.child.java.opts/namevalue-Xmx512M/value /property propertynamemapred.reduce.child.java.opts/namevalue-Xmx512M/value /property property namemapred.tasktracker.map.tasks.maximum/namevalue8/value /property propertynamemapred.tasktracker.reduce.tasks.maximum/namevalue3/value/property /configuration
RE: Lack of data locality in Hadoop-0.20.2
Well, if you think about it, you'll have more/better locality if more nodes with the same blocks. It gives the scheduler more leeway to find a node that has a block that hasn't been processed yet. Have you tried it with replication of 2 or 3 and seen what that does? --Aaron From: Virajith Jalaparti [mailto:virajit...@gmail.com] Sent: Tuesday, July 12, 2011 7:37 AM To: mapreduce-user@hadoop.apache.org Subject: Re: Lack of data locality in Hadoop-0.20.2 I am using a replication factor of 1 since I dont to incur the overhead of replication and I am not much worried about reliability. I am just using the default Hadoop scheduler (FIFO, I think!). In case of a single rack, rack-locality doesn't really have any meaning. Obviously everything will run in the same rack. I am concerned about data-local maps. I assumed that Hadoop would do a much better job at ensuring data-local maps but it doesnt seem to be the case here. -Virajith On Tue, Jul 12, 2011 at 3:30 PM, Arun C Murthy a...@hortonworks.com wrote: Why are you running with replication factor of 1? Also, it depends on the scheduler you are using. The CapacityScheduler in 0.20.203 (not 0.20.2) has much better locality for jobs, similarly with FairScheduler. IAC, running on a single rack with replication of 1 implies rack-locality for all tasks which, in most cases, is good enough. Arun On Jul 12, 2011, at 5:45 AM, Virajith Jalaparti wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith
Re: Lack of data locality in Hadoop-0.20.2
As Aaron mentioned the scheduler has very little leeway when you have a single replica. OTOH, schedulers equate rack-locality to node-locality - this makes sense sense for a large-scale system since intra-rack b/w is good enough for most installs of Hadoop. Arun On Jul 12, 2011, at 7:36 AM, Virajith Jalaparti wrote: I am using a replication factor of 1 since I dont to incur the overhead of replication and I am not much worried about reliability. I am just using the default Hadoop scheduler (FIFO, I think!). In case of a single rack, rack-locality doesn't really have any meaning. Obviously everything will run in the same rack. I am concerned about data-local maps. I assumed that Hadoop would do a much better job at ensuring data-local maps but it doesnt seem to be the case here. -Virajith On Tue, Jul 12, 2011 at 3:30 PM, Arun C Murthy a...@hortonworks.com wrote: Why are you running with replication factor of 1? Also, it depends on the scheduler you are using. The CapacityScheduler in 0.20.203 (not 0.20.2) has much better locality for jobs, similarly with FairScheduler. IAC, running on a single rack with replication of 1 implies rack-locality for all tasks which, in most cases, is good enough. Arun On Jul 12, 2011, at 5:45 AM, Virajith Jalaparti wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith
Re: Lack of data locality in Hadoop-0.20.2
I agree that the scheduler has lesser leeway when the replication factor is 1. However, I would still expect the number of data-local tasks to be more than 10% even when the replication factor is 1. Presumably, the scheduler would have greater number of opportunities to schedule data-local tasks as compared to just 10%. (Please note that I am inferring that a map was non-local based on the observed completion time. I don't know why but the logs of my jobs don't show the DATA_LOCAL_MAPS counter information.) I will try using higher replication factors and see how much improvement I can get. Thanks, Virajith On Tue, Jul 12, 2011 at 6:15 PM, Arun C Murthy a...@hortonworks.com wrote: As Aaron mentioned the scheduler has very little leeway when you have a single replica. OTOH, schedulers equate rack-locality to node-locality - this makes sense sense for a large-scale system since intra-rack b/w is good enough for most installs of Hadoop. Arun On Jul 12, 2011, at 7:36 AM, Virajith Jalaparti wrote: I am using a replication factor of 1 since I dont to incur the overhead of replication and I am not much worried about reliability. I am just using the default Hadoop scheduler (FIFO, I think!). In case of a single rack, rack-locality doesn't really have any meaning. Obviously everything will run in the same rack. I am concerned about data-local maps. I assumed that Hadoop would do a much better job at ensuring data-local maps but it doesnt seem to be the case here. -Virajith On Tue, Jul 12, 2011 at 3:30 PM, Arun C Murthy a...@hortonworks.comwrote: Why are you running with replication factor of 1? Also, it depends on the scheduler you are using. The CapacityScheduler in 0.20.203 (not 0.20.2) has much better locality for jobs, similarly with FairScheduler. IAC, running on a single rack with replication of 1 implies rack-locality for all tasks which, in most cases, is good enough. Arun On Jul 12, 2011, at 5:45 AM, Virajith Jalaparti wrote: Hi, I was trying to run the Sort example in Hadoop-0.20.2 over 200GB of input data using a 20 node cluster of nodes. HDFS is configured to use 128MB block size (so 1600maps are created) and a replication factor of 1 is being used. All the 20 nodes are also hdfs datanodes. I was using a bandwidth value of 50Mbps between each of the nodes (this was configured using linux tc). I see that around 90% of the map tasks are reading data over the network i.e. most of the map tasks are not being scheduled at the nodes where the data to be processed by them is located. My understanding was that Hadoop tries to schedule as many data-local maps as possible. But in this situation, this does not seem to happen. Any reason why this is happening? and is there a way to actually configure hadoop to ensure the maximum possible node locality? Any help regarding this is very much appreciated. Thanks, Virajith
Re: Lack of data locality in Hadoop-0.20.2
On Jul 12, 2011, at 10:27 AM, Virajith Jalaparti wrote: I agree that the scheduler has lesser leeway when the replication factor is 1. However, I would still expect the number of data-local tasks to be more than 10% even when the replication factor is 1. How did you load your data? Did you load it from outside the grid or from one of the datanodes? If you loaded from one of the datanodes, you'll basically have no real locality, especially with a rep factor of 1.
Re: Lack of data locality in Hadoop-0.20.2
Is the non-data local nature of the maps possible due to the amount of HDFS data read by each map being greater than the HDFS block size? In the job I was running, the HDFS block size dfs.block.size was 134217728 and the HDFS_BYTES_READ by the maps was 134678218 and FILE_BYTES_READ was 134698338. So, HDFS_BYTES_READ is greater than dfs.block.size. Does this imply that most of the map tasks will be non-local? Further would Hadoop ensure that the map task is scheduled on the node which has the larger chunk of the data that is to be read by the task? Thanks, Virajith On Tue, Jul 12, 2011 at 7:20 PM, Allen Wittenauer a...@apache.org wrote: On Jul 12, 2011, at 10:27 AM, Virajith Jalaparti wrote: I agree that the scheduler has lesser leeway when the replication factor is 1. However, I would still expect the number of data-local tasks to be more than 10% even when the replication factor is 1. How did you load your data? Did you load it from outside the grid or from one of the datanodes? If you loaded from one of the datanodes, you'll basically have no real locality, especially with a rep factor of 1.
RE: Lack of data locality in Hadoop-0.20.2
The number of bytes read can exceed the block size somewhat because each block rarely starts/ends on a record (e.g. line) boundary. So usually it reads to read a bit before and/or after the actual block boundary in to correctly read in all of the records it is supposed to. If you look, it's not having to read all that much extra data. --Aaron - From: Virajith Jalaparti [mailto:virajit...@gmail.com] Sent: Tuesday, July 12, 2011 3:21 PM To: mapreduce-user@hadoop.apache.org Subject: Re: Lack of data locality in Hadoop-0.20.2 Is the non-data local nature of the maps possible due to the amount of HDFS data read by each map being greater than the HDFS block size? In the job I was running, the HDFS block size dfs.block.size was 134217728 and the HDFS_BYTES_READ by the maps was 134678218 and FILE_BYTES_READ was 134698338. So, HDFS_BYTES_READ is greater than dfs.block.size. Does this imply that most of the map tasks will be non-local? Further would Hadoop ensure that the map task is scheduled on the node which has the larger chunk of the data that is to be read by the task? Thanks, Virajith On Tue, Jul 12, 2011 at 7:20 PM, Allen Wittenauer a...@apache.org wrote: On Jul 12, 2011, at 10:27 AM, Virajith Jalaparti wrote: I agree that the scheduler has lesser leeway when the replication factor is 1. However, I would still expect the number of data-local tasks to be more than 10% even when the replication factor is 1. How did you load your data? Did you load it from outside the grid or from one of the datanodes? If you loaded from one of the datanodes, you'll basically have no real locality, especially with a rep factor of 1.
Improve data locality for MR job processing tar.gz files
Hi, I have a job that processes raw data inside tarballs. As job input I have a text file listing the full HDFS path of the files that need to be processed, e.g.: ... /user/eric/file451.tar.gz /user/eric/file452.tar.gz /user/eric/file453.tar.gz ... Each mapper gets one line of input at a time, moves the tarball to local storage, unpacks it and processes all files inside. This works very well. However: changes are high that a mapper gets to process a file that is not stored locally on that node so it needs to be transferred. My question: is there any way to get better locality in a job as described above? Best regards, Eric
Streaming data locality
I've seen this asked before, but haven't seen a response yet. If the input to a streaming job is not actual data splits but simple HDFS file names which are then read by the mappers, then how can data locality be achieved. Likewise, is there any easier way to make those files accessible other than using the -cacheFile flag? That requires building a very very long hadoop command (100s of files potentially). I'm worried about overstepping some command-line length limit...plus it would be nice to do this programatically, say with the DistributedCache.addCacheFile() command, but that requires writing your own driver, which I don't see how to do with streaming. Thoughts? Thanks. Keith Wiley kwi...@keithwiley.com www.keithwiley.com I used to be with it, but then they changed what it was. Now, what I'm with isn't it, and what's it seems weird and scary to me. -- Abe (Grandpa) Simpson
Re: Streaming data locality
On 02/03/2011 12:16 PM, Keith Wiley wrote: I've seen this asked before, but haven't seen a response yet. If the input to a streaming job is not actual data splits but simple HDFS file names which are then read by the mappers, then how can data locality be achieved. Likewise, is there any easier way to make those files accessible other than using the -cacheFile flag? That requires building a very very long hadoop command (100s of files potentially). I'm worried about overstepping some command-line length limit...plus it would be nice to do this programatically, say with the DistributedCache.addCacheFile() command, but that requires writing your own driver, which I don't see how to do with streaming. Thoughts? Submit the job in a Java app instead of via streaming? Have a big loop where you repeatedly call job.addInputPath. (Or, if you're going to have a large number of input files, use CombineFileInputFormat for efficiency.) HTH, DR
Re: Streaming data locality
Hello, On Thu, Feb 3, 2011 at 10:46 PM, Keith Wiley kwi...@keithwiley.com wrote: I've seen this asked before, but haven't seen a response yet. If the input to a streaming job is not actual data splits but simple HDFS file names which are then read by the mappers, then how can data locality be achieved. Also, if you're only looking to not split the files, you can pass in a custom FileInputFormat with isSplitable returning false? You'll lose completeness in locality cause of blocks not present in the chosen node though, yes -- But I believe that adding a hundred files to DistributedCache is not the solution, as the DistributedCache data is set to ALL the nodes AFAIK. -- Harsh J www.harshj.com
Re: Streaming data locality
On Feb 3, 2011, at 9:16 AM, Keith Wiley wrote: I've seen this asked before, but haven't seen a response yet. If the input to a streaming job is not actual data splits but simple HDFS file names which are then read by the mappers, then how can data locality be achieved. If I understand your question, the method of processing doesn't matter. The JobTracker places tasks based on input locality. So if you are providing the names of the file you want as input as -input, then the JT will use the locations of those blocks. (Remember: streaming.jar is basically a big wrapper around the Java methods and the parameters you pass to it are essentially the same as you'd provide to a real Java app.) Or are you saying your -input is a list of other files to read? In the case, there is no locality. But again, streaming or otherwise makes no real difference. Likewise, is there any easier way to make those files accessible other than using the -cacheFile flag? That requires building a very very long hadoop command (100s of files potentially). I'm worried about overstepping some command-line length limit...plus it would be nice to do this programatically, say with the DistributedCache.addCacheFile() command, but that requires writing your own driver, which I don't see how to do with streaming. Thoughts? I think you need to give a more concrete example of what you are doing. -cache is used for sending files with your job and should have no bearing on what your input is to your job. Something tells me that you've cooked something up that is overly complex. :D
Re: Streaming data locality
On Feb 3, 2011, at 9:46 AM, Harsh J wrote: Hello, On Thu, Feb 3, 2011 at 10:46 PM, Keith Wiley kwi...@keithwiley.com wrote: I've seen this asked before, but haven't seen a response yet. If the input to a streaming job is not actual data splits but simple HDFS file names which are then read by the mappers, then how can data locality be achieved. Also, if you're only looking to not split the files, you can pass in a The files won't be split, they're only 6MBs. I'm looking to get the files to my streaming job somehow and the method I've chosen is to send mere fileNAMES via the streaming API and have the streaming program open the file from HDFS through a symbolic link in the distribute cache (the link originating from -cacheFile presumably). custom FileInputFormat with isSplitable returning false? You'll lose completeness in locality cause of blocks not present in the chosen node though, yes -- But I believe that adding a hundred files to DistributedCache is not the solution, as the DistributedCache data is set to ALL the nodes AFAIK. My understanding is that the -cacheFile option and the DistributedCache.addCacheFile() method don't copy the entire file to the distributed cache, but rather make tiny symbolic links to the actual HDFS file. Correct? If you don't think I should add 100s of files to the distributed cache (or even 100s of links), then how else can I make the files available to my streaming program? Put another way, do you know of another method by which to permit the streaming programs to read files from HDFS? Thanks. Keith Wiley kwi...@keithwiley.com keithwiley.commusic.keithwiley.com And what if we picked the wrong religion? Every week, we're just making God madder and madder! -- Homer Simpson
Re: Streaming data locality
On Feb 3, 2011, at 6:25 PM, Allen Wittenauer wrote: On Feb 3, 2011, at 9:16 AM, Keith Wiley wrote: I've seen this asked before, but haven't seen a response yet. If the input to a streaming job is not actual data splits but simple HDFS file names which are then read by the mappers, then how can data locality be achieved. If I understand your question, the method of processing doesn't matter. The JobTracker places tasks based on input locality. So if you are providing the names of the file you want as input as -input, then the JT will use the locations of those blocks. Let's see here. My streaming job has a single -input flag which points to a text file containing HDFS paths. Each line contains one TAB. Are you saying that if the key (or the value) on either side of that TAB is an HDFS file path then that record will be assigned to a task in a data local manner? Which is it that determines this locality, the key or the value? (Must be the key, right?) (Remember: streaming.jar is basically a big wrapper around the Java methods and the parameters you pass to it are essentially the same as you'd provide to a real Java app.) Or are you saying your -input is a list of other files to read? In the case, there is no locality. But again, streaming or otherwise makes no real difference. Yes, basically. The input is a list of HDFS file paths to be read and processed on a an individual basis. Likewise, is there any easier way to make those files accessible other than using the -cacheFile flag? That requires building a very very long hadoop command (100s of files potentially). I'm worried about overstepping some command-line length limit...plus it would be nice to do this programatically, say with the DistributedCache.addCacheFile() command, but that requires writing your own driver, which I don't see how to do with streaming. Thoughts? I think you need to give a more concrete example of what you are doing. -cache is used for sending files with your job and should have no bearing on what your input is to your job. Something tells me that you've cooked something up that is overly complex. :D Good point, I'll write a better description of this later. Thanks for the advice. Keith Wiley kwi...@keithwiley.com keithwiley.commusic.keithwiley.com I used to be with it, but then they changed what it was. Now, what I'm with isn't it, and what's it seems weird and scary to me. -- Abe (Grandpa) Simpson
Re: Streaming data locality
On Feb 3, 2011, at 6:25 PM, Allen Wittenauer wrote: I think you need to give a more concrete example of what you are doing. -cache is used for sending files with your job and should have no bearing on what your input is to your job. Something tells me that you've cooked something up that is overly complex. :D Fair enough. Here's my setup. Our data consists of a few million binary (image) files, only about 6MB each uncompressed, 2.5MB gzipped. A typical job does not need to touch every image, rather only a tiny subset of the images is required. In this sense our job is unlike a canonical Hadoop job which combs an entire database. As such our current implementation (not streaming, but normal Hadoop) first uses a SQL query against a nonHadoop database to find the names of the relevant images for a given job, then sets up input splits for those individual files and assigns them as input to the Hadoop job. Side note: since the images are so small and we use so many at a time, they are packed into sequence files. Building the input splits is a very precise piece of code since it must splice out the necessary image files from their locations in the sequence files as eventual Hadoop input splits. The mappers receive byte arrays as input from the record reader (in the form of a WritableComparable class that contains the byte array). Our computational heavy lifting resides in a very large and complex set of native libraries (C++) which our Java mappers call using JNI, passing the byte arrays (the image files slurped into memory) through JNI to a small native .so file which then coordinates the processing with the underlying processing library. The native routines return byte arrays through JNI that represent the converted images. The Java mappers pass the byte arrays on to the reducer stage for final processing. In our current design, the reducer's computational complexity is simple enough to be implemented and performed entirely in Java, sans JNI or native code. This system works fine at an organizational level, but unfortunately we have been plagued by extremely insidious segfaults and memory-allocation exceptions deep in the native library, bugs which absolutely never occur in a nonHadoop or nonJava environment when the library is accessed solo from a native C++ program (or from python scripts). I have determined that the problem is not an incompatibility between the native library and Hadoop since a nonHadoop Java program, using the same JNI interface to access our native routines, exhibits the same errors. This is some fundamental incompatibility between Java/JVM/JNI and our native library. Like I said, the library is very complicated. It involves extremely fancy template programming and I suspect JNI is simply not properly coordinating memory between the two environments. I've valgrinded it, I've looked at the core dumps, I haven't found an underlying cause yet, much less fixed the problem. Which brings me to streaming. Serendipitously, our system seems to work considerably better if I don't use Java mappers and JNI, but rather using streaming to run a small native program that then coordinates the calls into the native library (further evidence that the problem lies somewhere in JNI's functionality). I am not sure how to arrange our input data for streaming however. It is binary of course, which has been a long-standing issue (I believe) with Hadoop streams since the assumption is that the data is 8-bit clean with endline record terminators. Furthermore, I'm not really sure how I would indicate which files are the input for a given job since I don't know how to write a driver program for streaming, I only know how to use the streaming jar directly from the command line (so there is no driver of my choosing, I can't control the driver at all, its the streaming driver and all I can do is give it my mapper and reducer native programs to run. You see my problem? With streaming I have no driver, just the mappers and reducers, but with a Java Hadoop program, I have all three and I can do useful preprocessing in the driver, like setting up the input paths to the subset of images comprising the input dataset for a given job. This is quite perplexing for me. How do I use streams effectively if I can't write my own driver? Anyway, what I've come up with is to create a text file that lists the input files relevant to a job. That one text file is the streaming input, each line of which is an input record. Each line of the file is a properly formatted streaming key/val pair (meaning a TAB separates the key and the value). Interestingly, I don't need the key/value pairs per se, I only need the input filenames. So basically I don't use the key very much and store the input files names in the value (meaning right after the TAB on each line of the input file). My native program obviously receives a set of these lines from
Data locality in FTPFileSystem and RawLocalFilesystem
Hi, In the case of RawLocalFilesystem or FTPFileSystem being used as input of a map-red job, How does the jobtracker apply the data locality logic .i.e How many map tasks to start and in which machines? I want to understand this keeping in mind two scenarios, Scenario 1: RawLocalFileSystem - All the data nodes have a local directory called /fooLocalBar each having 10 files (each 200MB size) to be processed. Scenario 2: FTPFileSystem - A common external machine has a directory called /fooRemoteBar which has 10 files (each 200MB) to be processed ./zahoor
HDFS data locality
Hi, When HMaster assigns regions to region servers does it try to ensure that these files will be located on the same host in HDFS? It does not, does not it? So most likely HBase RegionServers are very chatty over the network, reading and writing from/to the HDFS daemons on other nodes. Is there a way to improve it? To make RegionServer mostly talk to the local DataNode only?
Re: HDFS data locality
Hi, The master currently doesn't do anything specifically to assure the locality. Maybe a future version will do so? During the continuous operation of a regionserver, it will do compactions which is essentially rewriting all the data. The compactions create data locality. Generally on my prod cluster regions get into a stable assignment and stay there for months on end. If you are not taking your cluster up and down multiple times a day, the locality becomes very strong. -ryan On Tue, Nov 17, 2009 at 2:43 PM, Igor Katkov ikat...@gmail.com wrote: Hi, When HMaster assigns regions to region servers does it try to ensure that these files will be located on the same host in HDFS? It does not, does not it? So most likely HBase RegionServers are very chatty over the network, reading and writing from/to the HDFS daemons on other nodes. Is there a way to improve it? To make RegionServer mostly talk to the local DataNode only?
Re: HDFS data locality
The master doesn't assign in function of locality, we rely on the way HDFS works. Also, it's almost impossible to assign regions based on locality as all the files could be on a different node and moving it around for the sake of locality would mean moving around possible hundreds of GB... So when you write a file to HDFS, you first write on the local Datanode then it's streamed to other DNs. If you have a pretty normal production cluster that stays up 24/7, the regions won't move around so the new files created in the regions are on the same node. Also, every 24 hours the major compaction thread rewrites all store files into one (if needed) for each family and, again, you get locality. J-D On Tue, Nov 17, 2009 at 2:43 PM, Igor Katkov ikat...@gmail.com wrote: Hi, When HMaster assigns regions to region servers does it try to ensure that these files will be located on the same host in HDFS? It does not, does not it? So most likely HBase RegionServers are very chatty over the network, reading and writing from/to the HDFS daemons on other nodes. Is there a way to improve it? To make RegionServer mostly talk to the local DataNode only?
Data locality with CompositeInputFormat
When specifying multiple input directories for the CompositeInputFormat, is there any deterministic selection where to the tasks are put (data locality)? Any preference for running rack-local or node-local to the splits of first/last input directory? Thanks, -Christian
data locality in HDFS
hi. I want to run a distributed cluster, where i have say 20 machines/slaves in 3 seperate data centers that belong to the same cluster. Ideally I would like the other machines in the data center to be able to upload files (apache log files in this case) onto the local slaves and then have map/red tasks do their magic without having to move data until the reduce phase where the amount of data will be smaller. does Hadoop have this functionality? how do people handle multi-datacenter logging with hadoop in this case? do you just copy the data into a centeral location? regards Ian
Re: data locality in HDFS
HDFS uses the network topology to distribute and replicate data. An admin has to configure a script that describes the network topology to HDFS. This is specified by using the parameter topology.script.file.name in the Configuration file. This has been tested when nodes are on different subnets in the same data center. This code might not be generic (and is not yet tested) to support multiple-data centers. One can extend this topology by implementing one's own implementation and specifying the new jar using the config parameter topology.node.switch.mapping.impl. You will find more details at http://hadoop.apache.org/core/docs/current/cluster_setup.html#Hadoop+Rack+Awareness thanks, dhruba On Tue, Jun 17, 2008 at 10:18 PM, Ian Holsman (Lists) [EMAIL PROTECTED] wrote: hi. I want to run a distributed cluster, where i have say 20 machines/slaves in 3 seperate data centers that belong to the same cluster. Ideally I would like the other machines in the data center to be able to upload files (apache log files in this case) onto the local slaves and then have map/red tasks do their magic without having to move data until the reduce phase where the amount of data will be smaller. does Hadoop have this functionality? how do people handle multi-datacenter logging with hadoop in this case? do you just copy the data into a centeral location? regards Ian