[jira] [Commented] (MAPREDUCE-5018) Support raw binary data with Hadoop streaming
[ https://issues.apache.org/jira/browse/MAPREDUCE-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13664486#comment-13664486 ] Jay Hacker commented on MAPREDUCE-5018: --- You're welcome! It might be easier to just split your inputs yourself before putting them in HDFS (see {{split(1)}}), but perhaps your files are already in HDFS. JustBytes shouldn't modify or interpret your data at all; it reads an entire file in binary, gives those exact bytes to your mapper, and writes out the exact bytes your mapper gives. It does not know or care about newlines. I would encourage you to run {{md5sum}} on your data outside HDFS and via {{mapstream}} to verify that it is not changing your data at all, and let me know if it is. > Support raw binary data with Hadoop streaming > - > > Key: MAPREDUCE-5018 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5018 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: contrib/streaming >Reporter: Jay Hacker >Priority: Minor > Attachments: justbytes.jar, MAPREDUCE-5018.patch, mapstream > > > People often have a need to run older programs over many files, and turn to > Hadoop streaming as a reliable, performant batch system. There are good > reasons for this: > 1. Hadoop is convenient: they may already be using it for mapreduce jobs, and > it is easy to spin up a cluster in the cloud. > 2. It is reliable: HDFS replicates data and the scheduler retries failed jobs. > 3. It is reasonably performant: it moves the code to the data, maintaining > locality, and scales with the number of nodes. > Historically Hadoop is of course oriented toward processing key/value pairs, > and so needs to interpret the data passing through it. Unfortunately, this > makes it difficult to use Hadoop streaming with programs that don't deal in > key/value pairs, or with binary data in general. For example, something as > simple as running md5sum to verify the integrity of files will not give the > correct result, due to Hadoop's interpretation of the data. > There have been several attempts at binary serialization schemes for Hadoop > streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed > at efficiently encoding key/value pairs, and not passing data through > unmodified. Even the "RawBytes" serialization scheme adds length fields to > the data, rendering it not-so-raw. > I often have a need to run a Unix filter on files stored in HDFS; currently, > the only way I can do this on the raw data is to copy the data out and run > the filter on one machine, which is inconvenient, slow, and unreliable. It > would be very convenient to run the filter as a map-only job, allowing me to > build on existing (well-tested!) building blocks in the Unix tradition > instead of reimplementing them as mapreduce programs. > However, most existing tools don't know about file splits, and so want to > process whole files; and of course many expect raw binary input and output. > The solution is to run a map-only job with an InputFormat and OutputFormat > that just pass raw bytes and don't split. It turns out to be a little more > complicated with streaming; I have attached a patch with the simplest > solution I could come up with. I call the format "JustBytes" (as "RawBytes" > was already taken), and it should be usable with most recent versions of > Hadoop. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (MAPREDUCE-5018) Support raw binary data with Hadoop streaming
[ https://issues.apache.org/jira/browse/MAPREDUCE-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13654863#comment-13654863 ] Jay Hacker commented on MAPREDUCE-5018: --- [~pratem], you're right, there are cases where it's not efficient. Consider this though: if you have 100 TB of files in HDFS that you want to md5sum (or what have you), would you rather do an inefficient distributed md5sum on the cluster, or copy 100 TB out to a single machine and wait for a single md5sum? Can you even fit that on one machine? You still gain reliability: there are multiple copies of each file, and failed jobs get restarted. It's also just convenient. Here's the trick to make it efficient: use many files, and set the block size of individual files big enough to fit the whole file: {{hadoop fs -D dfs.block.size=1073741824 -put ...}} Then all reads are local, and you get all the performance Hadoop can give you. > Support raw binary data with Hadoop streaming > - > > Key: MAPREDUCE-5018 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5018 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: contrib/streaming >Reporter: Jay Hacker >Priority: Minor > Attachments: justbytes.jar, MAPREDUCE-5018.patch, mapstream > > > People often have a need to run older programs over many files, and turn to > Hadoop streaming as a reliable, performant batch system. There are good > reasons for this: > 1. Hadoop is convenient: they may already be using it for mapreduce jobs, and > it is easy to spin up a cluster in the cloud. > 2. It is reliable: HDFS replicates data and the scheduler retries failed jobs. > 3. It is reasonably performant: it moves the code to the data, maintaining > locality, and scales with the number of nodes. > Historically Hadoop is of course oriented toward processing key/value pairs, > and so needs to interpret the data passing through it. Unfortunately, this > makes it difficult to use Hadoop streaming with programs that don't deal in > key/value pairs, or with binary data in general. For example, something as > simple as running md5sum to verify the integrity of files will not give the > correct result, due to Hadoop's interpretation of the data. > There have been several attempts at binary serialization schemes for Hadoop > streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed > at efficiently encoding key/value pairs, and not passing data through > unmodified. Even the "RawBytes" serialization scheme adds length fields to > the data, rendering it not-so-raw. > I often have a need to run a Unix filter on files stored in HDFS; currently, > the only way I can do this on the raw data is to copy the data out and run > the filter on one machine, which is inconvenient, slow, and unreliable. It > would be very convenient to run the filter as a map-only job, allowing me to > build on existing (well-tested!) building blocks in the Unix tradition > instead of reimplementing them as mapreduce programs. > However, most existing tools don't know about file splits, and so want to > process whole files; and of course many expect raw binary input and output. > The solution is to run a map-only job with an InputFormat and OutputFormat > that just pass raw bytes and don't split. It turns out to be a little more > complicated with streaming; I have attached a patch with the simplest > solution I could come up with. I call the format "JustBytes" (as "RawBytes" > was already taken), and it should be usable with most recent versions of > Hadoop. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (MAPREDUCE-5018) Support raw binary data with Hadoop streaming
[ https://issues.apache.org/jira/browse/MAPREDUCE-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Hacker updated MAPREDUCE-5018: -- Attachment: mapstream justbytes.jar I've attached a jar file with source and compiled binaries for people who want to try it out without recompiling Hadoop. You can use the attached 'mapstream' shell script to run it easily. For those interested in performance, the TL;DR is about 10X slower than native. That's running 'cat' as the mapper on one file that fits in one block, compared to cat on a local ext4 filesystem on the same machine. If your files span multiple blocks, the non-local reads will be even slower. That also doesn't include job overhead. However, most mappers will be more CPU intensive, and the relative overhead of I/O diminishes; YMMV. > Support raw binary data with Hadoop streaming > - > > Key: MAPREDUCE-5018 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5018 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: contrib/streaming >Reporter: Jay Hacker >Priority: Minor > Attachments: justbytes.jar, MAPREDUCE-5018.patch, mapstream > > > People often have a need to run older programs over many files, and turn to > Hadoop streaming as a reliable, performant batch system. There are good > reasons for this: > 1. Hadoop is convenient: they may already be using it for mapreduce jobs, and > it is easy to spin up a cluster in the cloud. > 2. It is reliable: HDFS replicates data and the scheduler retries failed jobs. > 3. It is reasonably performant: it moves the code to the data, maintaining > locality, and scales with the number of nodes. > Historically Hadoop is of course oriented toward processing key/value pairs, > and so needs to interpret the data passing through it. Unfortunately, this > makes it difficult to use Hadoop streaming with programs that don't deal in > key/value pairs, or with binary data in general. For example, something as > simple as running md5sum to verify the integrity of files will not give the > correct result, due to Hadoop's interpretation of the data. > There have been several attempts at binary serialization schemes for Hadoop > streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed > at efficiently encoding key/value pairs, and not passing data through > unmodified. Even the "RawBytes" serialization scheme adds length fields to > the data, rendering it not-so-raw. > I often have a need to run a Unix filter on files stored in HDFS; currently, > the only way I can do this on the raw data is to copy the data out and run > the filter on one machine, which is inconvenient, slow, and unreliable. It > would be very convenient to run the filter as a map-only job, allowing me to > build on existing (well-tested!) building blocks in the Unix tradition > instead of reimplementing them as mapreduce programs. > However, most existing tools don't know about file splits, and so want to > process whole files; and of course many expect raw binary input and output. > The solution is to run a map-only job with an InputFormat and OutputFormat > that just pass raw bytes and don't split. It turns out to be a little more > complicated with streaming; I have attached a patch with the simplest > solution I could come up with. I call the format "JustBytes" (as "RawBytes" > was already taken), and it should be usable with most recent versions of > Hadoop. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (MAPREDUCE-5018) Support raw binary data with Hadoop streaming
[ https://issues.apache.org/jira/browse/MAPREDUCE-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Hacker updated MAPREDUCE-5018: -- Attachment: MAPREDUCE-5018.patch justbytes patch submitted for code review. > Support raw binary data with Hadoop streaming > - > > Key: MAPREDUCE-5018 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5018 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: contrib/streaming >Reporter: Jay Hacker >Priority: Minor > Attachments: MAPREDUCE-5018.patch > > > People often have a need to run older programs over many files, and turn to > Hadoop streaming as a reliable, performant batch system. There are good > reasons for this: > 1. Hadoop is convenient: they may already be using it for mapreduce jobs, and > it is easy to spin up a cluster in the cloud. > 2. It is reliable: HDFS replicates data and the scheduler retries failed jobs. > 3. It is reasonably performant: it moves the code to the data, maintaining > locality, and scales with the number of nodes. > Historically Hadoop is of course oriented toward processing key/value pairs, > and so needs to interpret the data passing through it. Unfortunately, this > makes it difficult to use Hadoop streaming with programs that don't deal in > key/value pairs, or with binary data in general. For example, something as > simple as running md5sum to verify the integrity of files will not give the > correct result, due to Hadoop's interpretation of the data. > There have been several attempts at binary serialization schemes for Hadoop > streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed > at efficiently encoding key/value pairs, and not passing data through > unmodified. Even the "RawBytes" serialization scheme adds length fields to > the data, rendering it not-so-raw. > I often have a need to run a Unix filter on files stored in HDFS; currently, > the only way I can do this on the raw data is to copy the data out and run > the filter on one machine, which is inconvenient, slow, and unreliable. It > would be very convenient to run the filter as a map-only job, allowing me to > build on existing (well-tested!) building blocks in the Unix tradition > instead of reimplementing them as mapreduce programs. > However, most existing tools don't know about file splits, and so want to > process whole files; and of course many expect raw binary input and output. > The solution is to run a map-only job with an InputFormat and OutputFormat > that just pass raw bytes and don't split. It turns out to be a little more > complicated with streaming; I have attached a patch with the simplest > solution I could come up with. I call the format "JustBytes" (as "RawBytes" > was already taken), and it should be usable with most recent versions of > Hadoop. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (MAPREDUCE-5018) Support raw binary data with Hadoop streaming
[ https://issues.apache.org/jira/browse/MAPREDUCE-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Hacker updated MAPREDUCE-5018: -- Target Version/s: trunk Release Note: Add "-io justbytes" I/O format to allow raw binary streaming. Status: Patch Available (was: Open) This patch adds a 'JustBytesWritable' and supporting InputFormat, OutputFormat, InputWriter, and OutputReader to support passing raw, unmodified, unaugmented bytes through Hadoop streaming. The purpose is to be able to run arbitrary Unix filters on entire binary files stored in HDFS as map-only jobs, taking advantage of locality and reliability offered by Hadoop. The code is very straightforward; most methods are only one line. A few design notes: 1. Data is stored in a JustBytesWritable, which is the simplest possible Writable wrapper around a byte[]. It literally just reads until the buffer is full or EOF and remembers the number of bytes. 2. Data is read by JustBytesInputFormat in 64K chunks by default and stored in a JustBytesWritable key; the value is a NullWritable, but no value is ever read or written. They key is used instead of the value to allow the possibility of using it in a reduce. 3. Input files are never split, as most programs are not able to handle splits. 4. Input files are not decompressed, as the purpose is to get raw data to a program, people may want to operate on compressed data (e.g., md5sum on archives), and as most tools do not expect automatic decompression, this is the "least surprising" option. It's also trivial to throw a "zcat" in front of your filter. 5. Output is even simpler than input, and just writes the bytes of a JustBytesWritable key to the output stream. Output is never compressed, for similar reasons as above. 6. The code uses the old mapred API, as that is what streaming uses. Streaming inserts an InputWriter between the InputFormat and the map executable, and an OutputReader between the map executable and the OutputFormat; the JustBytes version simply pass the key bytes on through. I've augmented IdentifierResolver to recognize "-io justbytes" on the command line and set the input/output classes appropriately. I've included a shell script called "mapstream" to run streaming with all required command line parameters; it makes running a binary map-only job as easy as: mapstream indir command outdir which runs "command" on every file in indir and writes the results to outdir. I welcome feedback, especially if there is an even simpler way to do this. I'm not hung up on the JustBytes name, I'd be happy to switch to a better one. If people like the general approach, I will add unit tests and resubmit. Also please let me know if I should break this into separate patches for common and mapreduce. > Support raw binary data with Hadoop streaming > - > > Key: MAPREDUCE-5018 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5018 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: contrib/streaming >Reporter: Jay Hacker >Priority: Minor > > People often have a need to run older programs over many files, and turn to > Hadoop streaming as a reliable, performant batch system. There are good > reasons for this: > 1. Hadoop is convenient: they may already be using it for mapreduce jobs, and > it is easy to spin up a cluster in the cloud. > 2. It is reliable: HDFS replicates data and the scheduler retries failed jobs. > 3. It is reasonably performant: it moves the code to the data, maintaining > locality, and scales with the number of nodes. > Historically Hadoop is of course oriented toward processing key/value pairs, > and so needs to interpret the data passing through it. Unfortunately, this > makes it difficult to use Hadoop streaming with programs that don't deal in > key/value pairs, or with binary data in general. For example, something as > simple as running md5sum to verify the integrity of files will not give the > correct result, due to Hadoop's interpretation of the data. > There have been several attempts at binary serialization schemes for Hadoop > streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed > at efficiently encoding key/value pairs, and not passing data through > unmodified. Even the "RawBytes" serialization scheme adds length fields to > the data, rendering it not-so-raw. > I often have a need to run a Unix filter on files stored in HDFS; currently, > the only way I can do this on the raw data is to copy the data out and run > the filter on one machine, which is inconvenient, slow, and unreliable. It > would be very convenient to run the filter as a map-only job, allowing me to > build on existing (well-tested!) building blocks in the
[jira] [Created] (MAPREDUCE-5018) Support raw binary data with Hadoop streaming
Jay Hacker created MAPREDUCE-5018: - Summary: Support raw binary data with Hadoop streaming Key: MAPREDUCE-5018 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5018 Project: Hadoop Map/Reduce Issue Type: New Feature Components: contrib/streaming Reporter: Jay Hacker Priority: Minor People often have a need to run older programs over many files, and turn to Hadoop streaming as a reliable, performant batch system. There are good reasons for this: 1. Hadoop is convenient: they may already be using it for mapreduce jobs, and it is easy to spin up a cluster in the cloud. 2. It is reliable: HDFS replicates data and the scheduler retries failed jobs. 3. It is reasonably performant: it moves the code to the data, maintaining locality, and scales with the number of nodes. Historically Hadoop is of course oriented toward processing key/value pairs, and so needs to interpret the data passing through it. Unfortunately, this makes it difficult to use Hadoop streaming with programs that don't deal in key/value pairs, or with binary data in general. For example, something as simple as running md5sum to verify the integrity of files will not give the correct result, due to Hadoop's interpretation of the data. There have been several attempts at binary serialization schemes for Hadoop streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed at efficiently encoding key/value pairs, and not passing data through unmodified. Even the "RawBytes" serialization scheme adds length fields to the data, rendering it not-so-raw. I often have a need to run a Unix filter on files stored in HDFS; currently, the only way I can do this on the raw data is to copy the data out and run the filter on one machine, which is inconvenient, slow, and unreliable. It would be very convenient to run the filter as a map-only job, allowing me to build on existing (well-tested!) building blocks in the Unix tradition instead of reimplementing them as mapreduce programs. However, most existing tools don't know about file splits, and so want to process whole files; and of course many expect raw binary input and output. The solution is to run a map-only job with an InputFormat and OutputFormat that just pass raw bytes and don't split. It turns out to be a little more complicated with streaming; I have attached a patch with the simplest solution I could come up with. I call the format "JustBytes" (as "RawBytes" was already taken), and it should be usable with most recent versions of Hadoop. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] Created: (MAPREDUCE-2308) Sort buffer size (io.sort.mb) is limited to < 2 GB
Sort buffer size (io.sort.mb) is limited to < 2 GB -- Key: MAPREDUCE-2308 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2308 Project: Hadoop Map/Reduce Issue Type: Bug Affects Versions: 0.21.0, 0.20.2, 0.20.1 Environment: Cloudera CDH3b3 (0.20.2+) Reporter: Jay Hacker Priority: Minor I have MapReduce jobs that use a large amount of per-task memory, because the algorithm I'm using converges faster if more data is together on a node. I have my JVM heap size set at 3200 MB, and if I use the popular rule of thumb that io.sort.mb should be ~70% of that, I get 2240 MB. I rounded this down to 2048 MB, but map tasks crash with : {noformat} java.io.IOException: Invalid "io.sort.mb": 2048 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.(MapTask.java:790) ... {noformat} MapTask.MapOutputBuffer implements its buffer with a byte[] of size io.sort.mb (in bytes), and is sanity checking the size before allocating the array. The problem is that Java arrays can't have more than 2^31 - 1 elements (even with a 64-bit JVM), and this is a limitation of the Java language specificiation itself. As memory and data sizes grow, this would seem to be a crippling limtiation of Java. It would be nice if this ceiling were documented, and an error issued sooner, e.g. in jobtracker startup upon reading the config. Going forward, we may need to implement some array of arrays hack for large buffers. :( -- This message is automatically generated by JIRA. - For more information on JIRA, see: http://www.atlassian.com/software/jira