Hello, I developed a custom compression codec for Hadoop. Of course Hadoop is set to use my codec when compressing data. For testing purposes, I use the following two commands:
Compression test command: ----------------------------------- hadoop jar /opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p0.3/lib/hadoop//../hadoop-mapreduce/hadoop-streaming.jar -Dmapreduce.output.fileoutputformat.compress=true -input /originalFiles/ -output /compressedFiles/ -mapper cat -reducer cat Decompression test command: ----------------------------------- hadoop jar /opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p0.3/lib/hadoop//../hadoop-mapreduce/hadoop-streaming.jar -Dmapreduce.output.fileoutputformat.compress=false -input /compressedFiles/ -output /decompressedFiles/ -mapper cat -reducer cat As you can see, both of them are quite similar: only the compression option changes and the input/output directories. The first command compresses the input data then 'cat' (the Linux command, you know) it to the output file. The second one decompresses the input data (which are supposed to be compressed) then 'cat' it to the output file. As I understand, Hadoop is supposed to auto-detect compressed input data and decompress it using the right codec. Those test compression and decompression work well when Hadoop is set to use a default codec, like BZip2 or Snappy. However, when using my custom compression codec, only the compression works: the decompression is sluggish and triggers errors (Java heap space): packageJobJar: [] [/opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p0.3/lib/hadoop-mapreduce/hadoop-streaming-2.3.0-cdh5.1.2.jar] /tmp/streamjob6475393520304432687.jar tmpDir=null 14/09/09 15:33:21 INFO client.RMProxy: Connecting to ResourceManager at bluga2/10.1.96.222:8032 14/09/09 15:33:22 INFO client.RMProxy: Connecting to ResourceManager at bluga2/10.1.96.222:8032 14/09/09 15:33:23 INFO mapred.FileInputFormat: Total input paths to process : 1 14/09/09 15:33:23 INFO mapreduce.JobSubmitter: number of splits:1 14/09/09 15:33:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1410264242020_0016 14/09/09 15:33:24 INFO impl.YarnClientImpl: Submitted application application_1410264242020_0016 14/09/09 15:33:24 INFO mapreduce.Job: The url to track the job: http://bluga2:8088/proxy/application_1410264242020_0016/ 14/09/09 15:33:24 INFO mapreduce.Job: Running job: job_1410264242020_0016 14/09/09 15:33:30 INFO mapreduce.Job: Job job_1410264242020_0016 running in uber mode : false 14/09/09 15:33:30 INFO mapreduce.Job: map 0% reduce 0% 14/09/09 15:35:12 INFO mapreduce.Job: map 100% reduce 0% 14/09/09 15:35:13 INFO mapreduce.Job: Task Id : attempt_1410264242020_0016_m_000000_0, Status : FAILED Error: Java heap space 14/09/09 15:35:14 INFO mapreduce.Job: map 0% reduce 0% 14/09/09 15:35:41 INFO mapreduce.Job: Task Id : attempt_1410264242020_0016_m_000000_1, Status : FAILED Error: Java heap space 14/09/09 15:36:02 INFO mapreduce.Job: Task Id : attempt_1410264242020_0016_m_000000_2, Status : FAILED Error: Java heap space 14/09/09 15:36:49 INFO mapreduce.Job: map 100% reduce 0% 14/09/09 15:36:50 INFO mapreduce.Job: map 100% reduce 100% 14/09/09 15:36:56 INFO mapreduce.Job: Job job_1410264242020_0016 failed with state FAILED due to: Task failed task_1410264242020_0016_m_000000 Job failed as tasks failed. failedMaps:1 failedReduces:0 14/09/09 15:36:58 INFO mapreduce.Job: Counters: 9 Job Counters Failed map tasks=4 Launched map tasks=4 Other local map tasks=3 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=190606 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=190606 Total vcore-seconds taken by all map tasks=190606 Total megabyte-seconds taken by all map tasks=195180544 14/09/09 15:36:58 ERROR streaming.StreamJob: Job not Successful! Streaming Command Failed! I already tried to increase the map maximum heap size (mapreduce.map.java.opts.max.heap's YARN property) from 1 GiB to 2 GiB but the decompression still doesn't work. By the way, I'm compressing and decompressing a small ~2MB file and use the latest Cloudera version. I built a quick Java test environment to try to reproduce the Hadoop codec call (instantiating the codec, creating a new compression stream from it ...). I noticed that the decompression is an infinite loop where only the first block of compressed data is decompressed, infinitely. This could explain the above Java heap space error. What am I doing wrong/what did I forget ? How could my codec decompress data without troubles? Thank you for helping ! Kévin Poupon