RE: Folder not created using Hadoop Mapreduce code
Maybe just a silly guess, did you close your Writer? Yong Date: Thu, 14 Nov 2013 12:47:13 +0530 Subject: Re: Folder not created using Hadoop Mapreduce code From: unmeshab...@gmail.com To: user@hadoop.apache.org @rab ra: ys using filesystem s mkdir() we can create folders and we can also create it using Path in = new Path("foldername"); On Thu, Nov 14, 2013 at 12:45 PM, unmesha sreeveni wrote: i used to create folders previously like this and it created also. I dnt know why it is not happening now . And it is working with eclipse.ie i am getting "inputfile" file within "in" folder On Thu, Nov 14, 2013 at 12:42 PM, unmesha sreeveni wrote: I tried hadoop fs -lsr / I did nt find there also On Thu, Nov 14, 2013 at 12:28 PM, Rahul Bhattacharjee wrote: it might be creating within the user directory of the user in hdfs. trying creating something starting with a forward slash. Thanks,Rahul On Wed, Nov 13, 2013 at 10:40 PM, Amr Shahin wrote: Do you get an exception or it just fails silently ? On Thu, Nov 14, 2013 at 10:27 AM, unmesha sreeveni wrote: I am trying to create a file with in "in" folder. but when i tried to run this in cluster i noticed that this "in" folder is not within hdfs. why is it so? Any thing wrong? My Driver code is Path in = new Path("in"); Path input = new Path("in/inputfile"); BufferedWriter createinput = new BufferedWriter(new OutputStreamWriter(fs.create(input)));According to this code a "in" folder and a file "inputfile" should be created in working directory of cluster right? -- Thanks & Regards Unmesha Sreeveni U.B Junior Developer -- Thanks & Regards Unmesha Sreeveni U.B Junior Developer -- Thanks & Regards Unmesha Sreeveni U.B Junior Developer -- Thanks & Regards Unmesha Sreeveni U.B Junior Developer
RE: Why the reducer's input group count is higher than my GroupComparator implementation
OK. Finally I found out the reason for this problem, after reading the Hadoop ReducerTask source code. Here is the mistake I made, and something new I learned about secondary sorting. In my originally implementation, I partitioned the data by MMDD, but I didn't add this information in the sort order. That said, if you want to group and partition the data based on field1, field2, and sort the data on field 3, field, you have to sort the data based on field1, field2 first, then sort remaining based on what order you want. My mistake is that I didn't sort on field 2, even I do group and partition also on field 2. New thing I learned after reading source code is that the NOT just one group data could send to one reducer, but multi groups of data. I think this makes sense, as you could have a lot of input groups data, but only small number of reducers to handle them. But the trick is that it is both Grouping Comparator and the sort order together decide if a new input group of reducer created or not. For example, in my data, if I have: type1, id1, MMDD1type1, id2, MMDD2type1, id3, MMDD1type1, id4, MMDD2 These will consider as 4 input group to the reducer, instead of 2 (Even the data only have (type1, MMDD1) and (type1, MMDD2) 2 unique values), but since MMDD is not part of the sorting order, so the data could arrive in the above order, and the GroupCompare will get non-zero result for each compare, then leads to 4 input groups generated. Lesson learned, and be able to read the source code is a huge benefit. Yong From: java8...@hotmail.com To: user@hadoop.apache.org Subject: RE: Why the reducer's input group count is higher than my GroupComparator implementation Date: Tue, 29 Oct 2013 09:46:34 -0400 Yes. The Partitioner uses the same hashcode() on the String generated from the (type + /MM/DD). I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11. Date: Tue, 29 Oct 2013 08:57:32 +0100 Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation From: drdwi...@gmail.com To: user@hadoop.apache.org Did you overwrite the partitioner as well? 2013/10/29 java8964 java8964 Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following: public class MyPartitionKey implements WritableComparable { String type;long id1;String id2;String id3;String id4; long timestamp1;long timestamp2} Then I implemented following methods for this class: public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above public void readFields(DataInput in) // deserialize all the attributes listed above For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1. For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2). I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above. Here is the pseudo code of the Partitioner and GroupComparator: public class MyPartitioner implements Partitioner {@Overridepublic int getPartition(MyPartitionKey key, Value value, int numPartitions) { int hashCode = key.getActivityType().name().hashCode(); StringBuilder sb = new StringBuilder();for (String subPartitionValue : key.getPartitionValue()) {sb.append(subPartitionValue); }return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;} @Overridepublic void configure(JobConf job) { }} // The key getPartitionValue method will return array of string of either or {, MM, DD} of the timestamp1. For GroupComparator: public static class MyGroupComparator extends WritableComparator { protected MyGroupComparator() {super(MyPartitionKey.class, true); } @Overridepublic int compare(WritableComparable w1, WritableComparable w2) {MyPartitionKey key1 = (MyPartitionKey) w1; MyPartitionKey key2 = (MyPartitionKey) w2; int cmp = key1.type.compareTo(key2.type);// different type, send to different groupif (cmp != 0)return cmp; // for the same type, should have the same part
RE: Why the reducer's input group count is higher than my GroupComparator implementation
Yes. The Partitioner uses the same hashcode() on the String generated from the (type + /MM/DD). I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11. Date: Tue, 29 Oct 2013 08:57:32 +0100 Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation From: drdwi...@gmail.com To: user@hadoop.apache.org Did you overwrite the partitioner as well? 2013/10/29 java8964 java8964 Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following: public class MyPartitionKey implements WritableComparable { String type;long id1;String id2;String id3;String id4; long timestamp1;long timestamp2} Then I implemented following methods for this class: public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above public void readFields(DataInput in) // deserialize all the attributes listed above For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1. For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2). I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above. Here is the pseudo code of the Partitioner and GroupComparator: public class MyPartitioner implements Partitioner {@Overridepublic int getPartition(MyPartitionKey key, Value value, int numPartitions) { int hashCode = key.getActivityType().name().hashCode(); StringBuilder sb = new StringBuilder();for (String subPartitionValue : key.getPartitionValue()) {sb.append(subPartitionValue); }return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;} @Overridepublic void configure(JobConf job) { }} // The key getPartitionValue method will return array of string of either or {, MM, DD} of the timestamp1. For GroupComparator: public static class MyGroupComparator extends WritableComparator { protected MyGroupComparator() {super(MyPartitionKey.class, true); } @Overridepublic int compare(WritableComparable w1, WritableComparable w2) {MyPartitionKey key1 = (MyPartitionKey) w1; MyPartitionKey key2 = (MyPartitionKey) w2; int cmp = key1.type.compareTo(key2.type);// different type, send to different groupif (cmp != 0)return cmp; // for the same type, should have the same partition value array lengthString[] partitionValue1 = key1.getPartitionValue(); String[] partitionValue2 = key2.getPartitionValue(); assert partitionValue1.length == partitionValue2.length; StringBuilder sb1 = new StringBuilder();StringBuilder sb2 = new StringBuilder(); for (String subValue : partitionValue1) { sb1.append(subValue);}for (String subValue : partitionValue2) {sb2.append(subValue); }return sb1.toString().compareTo(sb2.toString()); } Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use Text as the key type, with "type + " for yearly dataset, "type + MMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. If I changed MyGroupComparator compare() method, to only compare the type, like following: @Overridepublic int compare(WritableComparable w1, WritableComparable w2) { MyPartitionKey key1 = (MyPartitionKey) w1; MyPartitionKey key2 = (MyPartitionKey) w2;return key1.type.compareTo(key2.type); } The MR job generates 7 input group for the reducer, which is what I expects. But when I start
Why the reducer's input group count is higher than my GroupComparator implementation
Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following: public class MyPartitionKey implements WritableComparable { String type;long id1;String id2;String id3;String id4;long timestamp1;long timestamp2} Then I implemented following methods for this class: public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descendingpublic int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed abovepublic void readFields(DataInput in) // deserialize all the attributes listed above For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1. For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2). I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above. Here is the pseudo code of the Partitioner and GroupComparator: public class MyPartitioner implements Partitioner {@Overridepublic int getPartition(MyPartitionKey key, Value value, int numPartitions) {int hashCode = key.getActivityType().name().hashCode();StringBuilder sb = new StringBuilder();for (String subPartitionValue : key.getPartitionValue()) {sb.append(subPartitionValue);} return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions; } @Overridepublic void configure(JobConf job) {}} // The key getPartitionValue method will return array of string of either or {, MM, DD} of the timestamp1. For GroupComparator: public static class MyGroupComparator extends WritableComparator { protected MyGroupComparator() {super(MyPartitionKey.class, true); } @Overridepublic int compare(WritableComparable w1, WritableComparable w2) {MyPartitionKey key1 = (MyPartitionKey) w1; MyPartitionKey key2 = (MyPartitionKey) w2;int cmp = key1.type.compareTo(key2.type);// different type, send to different groupif (cmp != 0)return cmp; // for the same type, should have the same partition value array lengthString[] partitionValue1 = key1.getPartitionValue(); String[] partitionValue2 = key2.getPartitionValue();assert partitionValue1.length == partitionValue2.length;StringBuilder sb1 = new StringBuilder();StringBuilder sb2 = new StringBuilder(); for (String subValue : partitionValue1) { sb1.append(subValue);}for (String subValue : partitionValue2) {sb2.append(subValue);} return sb1.toString().compareTo(sb2.toString());} Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use Text as the key type, with "type + " for yearly dataset, "type + MMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. If I changed MyGroupComparator compare() method, to only compare the type, like following: @Overridepublic int compare(WritableComparable w1, WritableComparable w2) {MyPartitionKey key1 = (MyPartitionKey) w1; MyPartitionKey key2 = (MyPartitionKey) w2;return key1.type.compareTo(key2.type); }The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the or MM or DD data parsed out from the timestamp1, the input group count became very large. What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions: 1) My understanding is that GroupComparator is the only class to
RE: Mapreduce outputs to a different cluster?
Just use "hdfs://machine.domain:8080/tmp/myfolder" Yong Date: Thu, 24 Oct 2013 20:25:35 -0700 From: myx...@yahoo.com Subject: Re: Mapreduce outputs to a different cluster? To: user@hadoop.apache.org Thanks Shahab & Yong. If cluster B (in which I want to dump output) has url "hdfs://machine.domain:8080" and data folder "/tmp/myfolder", what should I specify as the output path for MR job? Thanks On Thursday, October 24, 2013 5:31 PM, java8964 java8964 wrote: Just specify the output location using the URI to another cluster. As long as the network is accessible, you should be fine.YongDate: Thu, 24 Oct 2013 15:28:27 -0700From: myxjtu@yahoo.comSubject: Mapreduce outputs to a different cluster?To: user@hadoop.apache.orgThe scenario is: I run mapreduce job on cluster A (all source data is in cluster A) but I want the output of the job to cluster B. Is it possible? If yes, please let me know how to do it.Here are some notes of my mapreduce job:1. the data source is an HBase table2. It only has mapper no reducer.ThanksSenqiang
RE: Mapreduce outputs to a different cluster?
Just specify the output location using the URI to another cluster. As long as the network is accessible, you should be fine. Yong Date: Thu, 24 Oct 2013 15:28:27 -0700 From: myx...@yahoo.com Subject: Mapreduce outputs to a different cluster? To: user@hadoop.apache.org The scenario is: I run mapreduce job on cluster A (all source data is in cluster A) but I want the output of the job to cluster B. Is it possible? If yes, please let me know how to do it. Here are some notes of my mapreduce job:1. the data source is an HBase table2. It only has mapper no reducer. ThanksSenqiang
RE: enable snappy on hadoop 1.1.1
Thanks for your guys' help. It looks like the libhadoop.so didn't link the snappy in it, even it claims to support it in 2.1. I already created a ticket for this. I manually compiled the hadoop native library myself, and use that for now. Thanks Yong From: brahmareddy.batt...@huawei.com To: user@hadoop.apache.org Subject: RE: enable snappy on hadoop 1.1.1 Date: Mon, 7 Oct 2013 04:16:47 + Problem might be snappy packages may not bulit(While buliding the hadoop you need to include snappy).. libhadoop.so should contain the snappy packages Please check the following output to check whether snappy is included or not Verification : nm {HADOOP_HOME}/lib/native/Linux-amd64-64/libhadoop.so | grep snappy From: bharath vissapragada [bharathvissapragada1...@gmail.com] Sent: Sunday, October 06, 2013 2:25 AM To: User Subject: Re: enable snappy on hadoop 1.1.1 whats the output of ldd on that lib? Does it link properly? You should compile natives for your platforms as the packaged ones may not link properly. On Sat, Oct 5, 2013 at 2:37 AM, java8964 java8964 wrote: I kind of read the hadoop 1.1.1 source code for this, it is very strange for me now. >From the error, it looks like runtime JVM cannot find the native method of >org/apache/hadoop/io/compress/snappy/SnappyCompressor.compressBytesDirect()I, >that my guess from the error message, but from the log, it looks like all the >native library, include native-hadoop and native snappy are both loaded, as shown in the failed task log: 2013-10-04 16:33:21,635 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library 2013-10-04 16:33:22,006 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0 2013-10-04 16:33:22,020 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@28252825 2013-10-04 16:33:22,111 INFO org.apache.hadoop.mapred.MapTask: numReduceTasks: 1 2013-10-04 16:33:22,116 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 256 2013-10-04 16:33:22,168 INFO org.apache.hadoop.mapred.MapTask: data buffer = 204010960/255013696 2013-10-04 16:33:22,168 INFO org.apache.hadoop.mapred.MapTask: record buffer = 671088/838860 2013-10-04 16:33:22,342 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library is available 2013-10-04 16:33:22,342 INFO org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library loaded 2013-10-04 16:33:44,054 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output 2013-10-04 16:33:44,872 WARN org.apache.hadoop.io.compress.snappy.SnappyCompressor: java.lang.UnsatisfiedLinkError: org/apache/hadoop/io/compress/snappy/SnappyCompressor.initIDs()V 2013-10-04 16:33:44,872 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new compressor 2013-10-04 16:33:44,928 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 2013-10-04 16:33:44,951 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 2013-10-04 16:33:44,951 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName yzhang for UID 1000 from the native implementation 2013-10-04 16:33:44,952 FATAL org.apache.hadoop.mapred.Child: Error running child : java.lang.UnsatisfiedLinkError: org/apache/hadoop/io/compress/snappy/SnappyCompressor.compressBytesDirect()I at org.apache.hadoop.io.compress.snappy.SnappyCompressor.compress(SnappyCompressor.java:229) at org.apache.hadoop.io.compress.BlockCompressorStream.compress(BlockCompressorStream.java:141) at org.apache.hadoop.io.compress.BlockCompressorStream.finish(BlockCompressorStream.java:135) at org.apache.hadoop.mapred.IFile$Writer.close(IFile.java:135) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1450) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(AccessController.java:310) at javax.security.auth.Subject.doAs(Subject.java:573) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149) at org.apache.hadoop.mapred.Child.main(Child.java:249) So is there any way I can check if $HADOOP_HOME/lib/native/Linux-amd64-64/libhadoop.so contains the native method as expected above? This hadoop 1.1.0 is not compiled by me, but coming from IBM biginsight 2.1 as we are evaluating it. I will create a ticket for them, but is this kind of strange, as everything loaded shown in the log, but complains later about a native method? Any reas
RE: enable snappy on hadoop 1.1.1
I kind of read the hadoop 1.1.1 source code for this, it is very strange for me now. >From the error, it looks like runtime JVM cannot find the native method of >org/apache/hadoop/io/compress/snappy/SnappyCompressor.compressBytesDirect()I, >that my guess from the error message, but from the log, it looks like all the >native library, include native-hadoop and native snappy are both loaded, as >shown in the failed task log: 2013-10-04 16:33:21,635 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library2013-10-04 16:33:22,006 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 02013-10-04 16:33:22,020 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@282528252013-10-04 16:33:22,111 INFO org.apache.hadoop.mapred.MapTask: numReduceTasks: 12013-10-04 16:33:22,116 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 2562013-10-04 16:33:22,168 INFO org.apache.hadoop.mapred.MapTask: data buffer = 204010960/2550136962013-10-04 16:33:22,168 INFO org.apache.hadoop.mapred.MapTask: record buffer = 671088/8388602013-10-04 16:33:22,342 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library is available2013-10-04 16:33:22,342 INFO org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library loaded2013-10-04 16:33:44,054 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output2013-10-04 16:33:44,872 WARN org.apache.hadoop.io.compress.snappy.SnappyCompressor: java.lang.UnsatisfiedLinkError: org/apache/hadoop/io/compress/snappy/SnappyCompressor.initIDs()V2013-10-04 16:33:44,872 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new compressor2013-10-04 16:33:44,928 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-12013-10-04 16:33:44,951 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.2013-10-04 16:33:44,951 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName yzhang for UID 1000 from the native implementation2013-10-04 16:33:44,952 FATAL org.apache.hadoop.mapred.Child: Error running child : java.lang.UnsatisfiedLinkError: org/apache/hadoop/io/compress/snappy/SnappyCompressor.compressBytesDirect()I at org.apache.hadoop.io.compress.snappy.SnappyCompressor.compress(SnappyCompressor.java:229) at org.apache.hadoop.io.compress.BlockCompressorStream.compress(BlockCompressorStream.java:141) at org.apache.hadoop.io.compress.BlockCompressorStream.finish(BlockCompressorStream.java:135) at org.apache.hadoop.mapred.IFile$Writer.close(IFile.java:135) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1450) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)at org.apache.hadoop.mapred.Child$4.run(Child.java:255)at java.security.AccessController.doPrivileged(AccessController.java:310) at javax.security.auth.Subject.doAs(Subject.java:573)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149) at org.apache.hadoop.mapred.Child.main(Child.java:249) So is there any way I can check if $HADOOP_HOME/lib/native/Linux-amd64-64/libhadoop.so contains the native method as expected above?This hadoop 1.1.0 is not compiled by me, but coming from IBM biginsight 2.1 as we are evaluating it. I will create a ticket for them, but is this kind of strange, as everything loaded shown in the log, but complains later about a native method? Any reason can cause this? Yong From: java8...@hotmail.com To: user@hadoop.apache.org Subject: enable snappy on hadoop 1.1.1 Date: Fri, 4 Oct 2013 15:44:34 -0400 Hi, I am using hadoop 1.1.1. I want to test to see the snappy compression with hadoop, but I have some problems to make it work on my Linux environment. I am using opensuse 12.3 x86_64. First, when I tried to enable snappy in hadoop 1.1.1 by: conf.setBoolean("mapred.compress.map.output", true); conf.set("mapred.output.compression.type", "RECORD"); conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec"); I got the following error in my test MR job: Exception in thread "main" java.lang.RuntimeException: native snappy library not available So I download the snappy 1.1.0 from https://code.google.com/p/snappy/, compile it and install it successfully under /opt/snappy-1.1.0, and then I link the /opt/snappy-1.1.0/lib64/libsnappy.so to /user/lib64/libsnappy.so Now after I restart the hadoop and tried my test MR job again, this time, it didn't give me the originally error, but a new error like this: Error: java.lang.UnsatisfiedLinkEr
enable snappy on hadoop 1.1.1
Hi, I am using hadoop 1.1.1. I want to test to see the snappy compression with hadoop, but I have some problems to make it work on my Linux environment. I am using opensuse 12.3 x86_64. First, when I tried to enable snappy in hadoop 1.1.1 by: conf.setBoolean("mapred.compress.map.output", true); conf.set("mapred.output.compression.type", "RECORD"); conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec"); I got the following error in my test MR job: Exception in thread "main" java.lang.RuntimeException: native snappy library not available So I download the snappy 1.1.0 from https://code.google.com/p/snappy/, compile it and install it successfully under /opt/snappy-1.1.0, and then I link the /opt/snappy-1.1.0/lib64/libsnappy.so to /user/lib64/libsnappy.so Now after I restart the hadoop and tried my test MR job again, this time, it didn't give me the originally error, but a new error like this: Error: java.lang.UnsatisfiedLinkError: org/apache/hadoop/io/compress/snappy/SnappyCompressor.compressBytesDirect()I at org.apache.hadoop.io.compress.snappy.SnappyCompressor.compress(SnappyCompressor.java:229) at org.apache.hadoop.io.compress.BlockCompressorStream.compress(BlockCompressorStream.java:141) at org.apache.hadoop.io.compress.BlockCompressorStream.finish(BlockCompressorStream.java:135) at org.apache.hadoop.mapred.IFile$Writer.close(IFile.java:135) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1450) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:852) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1343) I wrote a test problem, like hadoop did load the library: System.loadlibrary("snappy") it works fine in my test program. I don't know why at runtime, the Class SnappyCompressor.compressByteDirect() gave back that kind of error. From the source code, it looks like a native c program from here https://code.google.com/p/hadoop-snappy/source/browse/trunk/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java Any idea why this is happening on my environment? Thanks Yong
Will different files in HDFS trigger different mapper
Hi, I have a question related to how the mapper generated for the input files from HDFS. I understand the split and blocks concept in the HDFS, but my originally understanding is that one mapper will only process data from one file in HDFS, no matter how small this file it is. Is that correct? The reason for this is that in some ETL, I did see the logic to understand the data set based on the file name convention. So in the mapper, before processing the first KV, we can build some logic in the map() method to get the file name of the current input, and init some logic here. After that, we don't need to worry data could be from another file later, as one mapper task will only handle data from one file, even when the file is very small. So small files not only cause trouble in NN memory, it also wastes the Map tasks, as map task could consume too less data. But today, when I run following hive query (hadoop 1.0.4 and hive 0.9.1), select partition_column, count(*) from test_table group by partition_column It only generates 2 mappers in MR job. This is an external hive table, and the input bytes for this MR job is only 338M, but the data files in the HDFS for this table is more than 100, even though a lot of them is very small, as this is one node cluster, but it is configured as one node full cluster mode, not local mode. Should the MR job generated here trigger at least 100 mappers? Is this because in hive that my original assumption not work any more? Thanks Yong
RE: File formats in Hadoop: Sequence files vs AVRO vs RC vs ORC
I am also thinking about this for my current project, so here I share some of my thoughts, but maybe some of them are not correct. 1) In my previous projects years ago, we store a lot of data as plain text, as at that time, people thinks the Big data can store all the data, no need to worry about space issues, until we run out of space in the cluster very fast :-). So lesson number 1, don't store them as text file.2) To compress text file, we need a container for the file, like 'Seq file', 'Avro' or 'proto buf'. I didn't use RC/ORC before, so it will be interested to know more about them later.3) I did a benchmark before, for the data sets we are using, there is also a webpage about the benchmark result. You can google it. I believe the performance from them are close. The real questions are:a) Language supportsb) Flexible of serialization formatc) How easy it can be used in tools like 'pig/hive' etc.d) How good it supported in hadoop. >From my experience, sequence file is not good supported outside of Java >language, and it is just a key/value storage, if your data have nest structure >data, like your XML/JSON data, you still need a serialization format like >google protobuf or Avro to handle it. Store directly XML/JSON in HDFS is >really not a good idea. As any InputFormat to support split for them them all >requires strict format of the data, and compression won't work very nicely on >these kind of data. We originally used google protobuf a lot, as twitter releases the elephant-bird as open source to support it in hadoop. It is a big plus for it at that time. But recently, we also start to consider Avro seriously now, as it is better supported directly in hadoop. I also like its schema-less vs schema objects both options design. It gives us some flexibility in designing MR jobs. Thanks Yong > From: wolfgang.wyre...@hotmail.com > To: user@hadoop.apache.org > Subject: File formats in Hadoop: Sequence files vs AVRO vs RC vs ORC > Date: Mon, 30 Sep 2013 09:40:44 +0200 > > Hello, > > the file format topic is still confusing me and I would appreciate if you > could share your thoughts and experience with me. > > From reading different books/articles/websites I understand that > - Sequence files (used frequently but not only for binary data), > - AVRO, > - RC (was developed to work best with Hive -columnar storage) and > - ORC (a successor of RC to give Hive another performance boost - Stinger > initiative) > are all container file formats to solve the "small files problem" and all > support compression and splitting. > Additionally, each file format was developed with specific features/benefits > in mind. > > Imagine I have the following text source data > - 1 TB of XML documents (some millions of small files) > - 1 TB of JSON documents (some hundred thousands of medium sized files) > - 1 TB of Apache log files (some thousands of bigger files) > > How should I store this data in HDFS to process it using Java MapReduce and > Pig and Hive? > I want to use the best tool for my specific problem - with "best" > performance of course - i.e. maybe one problem on the apache log data can be > best solved using Java MapReduce, another one using Hive or Pig. > > Should I simply put the data into HDFS as the data comes from - i.e. as > plain text files? > Or should I convert all my data to a container file format like sequence > files, AVRO, RC or ORC? > > Based on this example, I believe > - the XML documents will be need to be converted to a container file format > to overcome the "small files problem". > - the JSON documents could/should not be affected by the "small files > problem" > - the Apache files should definitely not be affected by the "small files > problem", so they could be stored as plain text files. > > So, some source data needs to be converted to a container file format, > others not necessarily. > But what is really advisable? > > Is it advisable to store all data (XML, JSON, Apache logs) in one specific > container file format in the cluster- let's say you decide to use sequence > files? > Having only one file format in HDFS is of course a benefit in terms of > managing the files and writing Java MapReduce/Pig/Hive code against it. > Sequence files in this case is certainly not a bad idea, but Hive queries > could probably better benefit from let's say RC/ORC. > > Therefore, is it better to use a mix of plain text files and/or one or more > container file formats simultaneously? > > I know that there will be no crystal-clear answer here as it always > "depends", but what approach should be taken here, or what is usually used > in the community out there? > > I welcome any feedback and experiences you made. > > Thanks >
RE: All datanodes are bad IOException when trying to implement multithreading serialization
Not exactly know what you are trying to do, but it seems like the memory is your bottle neck, and you think you have enough CPU resource, so you want to use multi-thread to utilize CPU resources? You can start multi-threads in your mapper, as if you think your mapper logic is very cpu intensive, and want to make it faster by multi-threads. But reading next split in the current mapper doesn't sounds like a good idea. why you want to do that? What happen if that split is being allocated to another mapper task? If you have more CPU resources than your memory resource in the cluster, it just means your cluster's resource is not well-balanced. If you cannot fix that in physical level, leave it as is. If you think it makes sense to use multi-thread in the mapper logic, go ahead using it, but only consuming the current split. If you think the split is too small for the current mapper, change your block size for the files for this kind of mapper. In HDFS, the block size is at file level. You can set it be yourself. Yong From: zhangyunming1...@gmail.com Date: Sun, 29 Sep 2013 21:12:40 -0500 Subject: Re: All datanodes are bad IOException when trying to implement multithreading serialization To: user@hadoop.apache.org Thanks Sonai, Felix, I have researched into combined file format before. The problem I am trying to solve here is that I want to reduce the number of mappers running concurrently on a single node. Normally, on a machine with 8 GB of RAM and 8 Cores, I need to run 8 JVMs(mapper) to exploit 8 core CPU resources. However, this limits the heap size of each JVM(mapper) to 1 GB. I want to be able to use 2-4 JVMs (mappers) concurrently and still use the 8 cores (this will allow me to set the heap size of each JVM to 2-4GB). The additional heap memory is important for my application. This means that I use multithreading within a mapper to use more than 1 core per JVM. The modifications I made was trying to have a single mapper read key,val pairs from the same input split concurrently. As a result, I could process the input split using two or three threads working on different portions of the input split. Sorry if I had not made this clear in the previous email. I have written my own implementation of Mapper's run method to accomplish this, but I also need to have LineRecordReader reading from the input split concurrently. That's why I modified the LineRecordReader in the way I attached java.io.IOException: Unexpected checksum mismatch while writing blk_-8687605593694081588_1014 from /1\ 92.168.102.99:40057 at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.verifyChunks(BlockReceiver.java:221) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:447) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:532) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:398) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:107) at java.lang.Thread.run(Thread.java:619) I suspect this might be related to threading for HDFS. May be I can't read a file in HDFS in a multithreaded fashion (one thread from the beginning and another one from the middle of the file for example)? Any suggestions? Thanks a lot! Yunming On Sun, Sep 29, 2013 at 8:58 PM, Felix Chern wrote: The number of mappers usually is same as the number of the files you fed to it. To reduce the number you can use CombineFileInputFormat. I recently wrote an article about it. You can take a look if this fits your needs. http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/ Felix On Sep 29, 2013, at 6:45 PM, yunming zhang wrote: I am actually trying to reduce the number of mappers because my application takes up a lot of memory (in the order of 1-2 GB ram per mapper). I want to be able to use a few mappers but still maintain good CPU utilization through multithreading within a single mapper. Multithreaded Mapper does't work because it duplicates in memory data structures. Thanks Yunming On Sun, Sep 29, 2013 at 6:59 PM, Sonal Goyal wrote: Wouldn't you rather just change your split size so that you can have more mappers work on your input? What else are you doing in the mappers? Sent from my iPad On Sep 30, 2013, at 2:22 AM, yunming zhang wrote: Hi, I was playing with Hadoop code trying to have a single Mapper support reading a input split using multiple threads. I am getting All datanodes are bad IOException, and I am not sure what is the issue. The reason for this work is that I suspect my computation was slow because it takes too long to create the Text() objects from inputsplit using a single thread. I tried to modify the LineRecordReader (since I am mostly using TextInputFormat) to provide additional methods to retrieve lines from the input spl
RE: Extending DFSInputStream class
Just curious, any reason you don't want to use the DFSDataInputStream? Yong Date: Thu, 26 Sep 2013 16:46:00 +0200 Subject: Extending DFSInputStream class From: tmp5...@gmail.com To: user@hadoop.apache.org Hi I would like to wrap DFSInputStream by extension. However it seems that the DFSInputStream constructor is package private. Is there anyway to achieve my goal? Also just out of curiosity why you have made this class inaccessible for developers, or am I missing something? regards tmp
Hadoop sequence file's benefits
Hi, I have a question related to sequence file. I wonder why I should use it under what kind of circumstance? Let's say if I have a csv file, I can store that directly in HDFS. But if I do know that the first 2 fields are some kind of key, and most of MR jobs will query on that key, will it make sense to store the data as sequence file in this case? And what benefits it can bring? Best benefit I want to get is to reduce the IO for MR job, but not sure if sequence file can give me that.If the data is stored as key/value pair in the sequence file, and since mapper/reducer will certain only use the key part mostly of time to compare/sort, what difference it makes if I just store as flat file, and only use the first 2 fields as the key? In the mapper of the sequence file, anyway it will scan the whole content of the file. If only key part will be compared, do we save IO by NOT deserializing the value part, if some optimization done here? Sound like we can avoid deserializing value part when unnecessary. Is that the benefit? If not, why would I use key/value format, instead of just (Text, Text)? Assume that my data doesn't have any binary data. Thanks
RE: MAP_INPUT_RECORDS counter in the reducer
Or you do the calculation in the reducer close() method, even though I am not sure in the reducer you can get the Mapper's count. But even you can't, here is what can do:1) Save the JobConf reference in your Mapper conf metehod2) Store the Map_INPUT_RECORDS counter in the configuration object as your own properties, in the close() method of the mapper3) Retrieve that property in the reducer close() method, then you have both numbers at that time. Yong Date: Tue, 17 Sep 2013 09:49:06 -0400 Subject: Re: MAP_INPUT_RECORDS counter in the reducer From: shahab.yu...@gmail.com To: user@hadoop.apache.org In the normal configuration, the issue here is that Reducers can start before all the Maps have finished so it is not possible to get the number (or make sense of it even if you are able to,) Having said that, you can specifically make sure that Reducers don't start until all your maps have completed. It will of course slow down your job. I don't know whether with this option it will work or not, but you can try (until experts have some advise already.) Regards,Shahab On Tue, Sep 17, 2013 at 6:09 AM, Yaron Gonen wrote: Hi,Is there a way for the reducer to get the total number of input records to the map phase? For example, I want the reducer to normalize a sum by dividing it in the number of records. I tried getting the value of that counter by using the line: context.getCounter(Task.Counter.MAP_INPUT_RECORDS).getValue(); in the reducer code, but I got 0. Thanks!Yaron
Looking for some advice
Hi, I currently have a project to process the data using MR. I have some thoughts about it, and am looking for some advices if anyone had any feedback. Currently in this project, I have lot of events data related to email tracking coming into the HDFC. So the events are the data for email tracking, like email_sent, email_open, email_bnc, link_click etc. Our online system can give me the data in the following 2 kinds of format in raw data: 1) The delta data list the event by type, by timestamp. For example:email_sent, t1, ...email_read, t2, ...email_sent, t3, .. If I choose this data format, I can put different type into different data set, and just store them in the HDFS, partitioned by Time. This is the easiest for ETL, but not very useful to use the data, as most business analyzing want to link all the event as a chain, like email_sent, email_read, email_click, email_bounce etc. So I want to join the data in the ETL, and store them in the HDFS in the way it will be used most likely by the end user. But linking the email event is very expensive, as it is hard to find out this email_read event is for which email_sent, and most importantly, to get the original email_sent timestamp. Fortunately, our online system (Stored in big Cassandra cluster), can give me data in another format: 2) The delta data include the whole email chain for the whole delta period.For example:email_sent, t1 .email_sent, email_read, t2..email_sent, t3...email_sent, email_read, link_click, t4 ... But here is the trade off, even though it is a delta, but it doesn't ONLY contain delta. For example, in the above example, the 2nd line data, it is an email read event, and gives me the linking email_sent nicely, but the original email_sent event most likely already gave to me in any previous delta data. So I have to merge the email_read to the original email_sent, which already existed in the HDFS, and which only supports append. To make this whole thing work, I have to replace the original record in the HDFS. We have about 200-300M events generated per day, so it is a challenge to make it right. Here is my initial thoughts, and look for any feedback and advices: 1) Ideally, the data will store in the HDFS, partitioned by the original email sent timestamp, hourly.2) Within each hour, it maybe a good idea to store as the map file, using (email_address + timestamp) as the key. So index can be built on top of that, make the lookup fast later.3) When the delta comes in (design for daily as first step), choose the 2nd format above as the source raw data, using the first MR job to group the data hourly based on the email_sent. Here is one of my question I am not sure how to best address it.In each reducer, I will have all the data which is having changes for the email originally sent in that hour, and I want to join them back to the hour of data, to do the replacing. But I don't think I can just do this in the reducer directly, right? Ideally, at this time, I want to do a merge join, maybe based on the timestamp. I have to save the reducer output in HDFS some temp location, and do the join again in another MR, right? Without the 1st MR job, I won't know how many hour partition data will be touched by this new incoming delta.4) The challenge part is in the delta data, it also will contain a lot new email_sent events. So the above join really has to be a full outer join.5) The data pattern of the new_email_sent vs old_update is about 9:1, at least based on my current research. Will it really make sense to get the data from both above 1) and 2) format? As from 1), I can get all the new email_sent, and discard the rest. Then I need to go to 2 to identify the part needs to be merged. But in this case, I have to consume 2 big data dumps, which sounds bad idea. Bottom line, I would like to know:1) What file format I should consider to store my data, map file makes sense?2) Any part I need to pay attention to design these MR jobs? How to make the whole thing efficient?3) Even in the map file, what format I should consider to serialize the value object? Should I use google Protobuf? or Apache Avro? or something else, and why? Thanks
RE: help!!!,what is happened with my project?
Did you do a hadoop version upgrade before this error happened? Yong Date: Wed, 11 Sep 2013 16:57:54 +0800 From: heya...@jiandan100.cn To: user@hadoop.apache.org CC: user-unsubscr...@hadoop.apache.org Subject: help!!!,what is happened with my project? Hi: Today when I run a task,I get some warnings,what is happened? 2013-09-11 16:45:17,486 INFO org.Apache.adopt.util.NativeCodeLoader: Loaded the native-hadoop library 2013-09-11 16:45:18,680 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0 2013-09-11 16:45:18,708 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2ddc7394 2013-09-11 16:45:18,998 INFO org.apache.hadoop.mapred.MapTask: Processing split: hdfs://192.168.1.240:9000/user/hadoop/input/20130815-log.log:0+118673837 2013-09-11 16:45:19,027 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 40 2013-09-11 16:45:19,113 INFO org.apache.hadoop.mapred.MapTask: data buffer = 31876710/39845888 2013-09-11 16:45:19,113 INFO org.apache.hadoop.mapred.MapTask: record buffer = 104857/131072 2013-09-11 16:45:19,136 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library not loaded 2013-09-11 16:45:23,374 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2013-09-11 16:45:23,374 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 3082708; bufvoid = 39845888 2013-09-11 16:45:23,374 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; kvend = 104857; length = 131072 2013-09-11 16:45:23,661 INFO org.apache.hadoop.mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader@36781b93 java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:323) at org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:78) at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.close(DFSClient.java:2326) at java.io.FilterInputStream.close(FilterInputStream.java:181) at org.apache.hadoop.util.LineReader.close(LineReader.java:145) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:187) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.close(MapTask.java:496) at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1776) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:778) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) at org.apache.hadoop.mapred.Child.main(Child.java:249) 2013-09-11 16:45:23,662 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output
RE: distcp failed "Copy failed: ENOENT: No such file or directory"
The error doesn't mean the file not existed in the HDFS, but it means local disk. If you read the error stack trace: at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:581) It indicates the error happened on Local file system. If you try to copy data from an existing cluster to a new setup cluster, then It most likely some misconfiguration in your new cluster causing this. When you use the distcp command, hadoop will generate a lot of MR jobs to copy the data in Mapper tasks. (No reducers in this case). You should check the JT to find out which task node gave this error, and check the log to identify the root cause, especially check the local.dir or log.dir settings in the new cluster. Yong Date: Fri, 6 Sep 2013 15:36:48 +0800 Subject: distcp failed "Copy failed: ENOENT: No such file or directory" From: justlo...@gmail.com To: user@hadoop.apache.org # sudo -u hdfs hadoop distcp hdfs://192.168.10.22:9000/alex hdfs://192.168.10.220/benchmarks 13/09/06 15:34:11 INFO tools.DistCp: srcPaths=[hdfs://192.168.10.22:9000/alex] 13/09/06 15:34:11 INFO tools.DistCp: destPath=hdfs://192.168.10.220/benchmarks 13/09/06 15:34:12 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id 13/09/06 15:34:12 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 13/09/06 15:34:12 WARN conf.Configuration: slave.host.name is deprecated. Instead, use dfs.datanode.hostname With failures, global counters are inaccurate; consider running with -i Copy failed: ENOENT: No such file or directory at org.apache.hadoop.io.nativeio.NativeIO.chmod(Native Method) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:581) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:427) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:579) at org.apache.hadoop.tools.DistCp.setup(DistCp.java:1038) at org.apache.hadoop.tools.DistCp.copy(DistCp.java:666) at org.apache.hadoop.tools.DistCp.run(DistCp.java:881) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84) at org.apache.hadoop.tools.DistCp.main(DistCp.java:908) the src and dest dir is all exist # sudo -u hdfs hadoop fs -ls /alex Found 3 items drwxr-xr-x - root supergroup 0 2013-08-16 10:14 /alex/mydump drwxr-xr-x - root supergroup 0 2013-08-28 10:48 /alex/tdump drwxr-xr-x - root supergroup 0 2013-08-28 10:54 /alex/tdump2 # sudo -u hdfs hadoop fs -ls /benchmarks Found 1 items drwxr-xr-x - hdfs hdfs 0 2013-08-23 17:02 /benchmarks/TestDFSIO
RE: secondary sort - number of reducers
Well, The reducers normally will take much longer than the mappers stage, because the copy/shuffle/sort all happened at this time, and they are the hard part. But before we simply say it is part of life, you need to dig into more of your MR jobs to find out if you can make it faster. You are the person most familiar with your data, and you wrote the code to group/partition them, and send them to the reducers. Even you set up 255 reducers, the question is, do each of them get its fair share?You need to read the COUNTER information of each reducer, and found out how many reducer groups each reducer gets, and how many input bytes it get, etc. Simple example, if you send 200G data, and group them by DATE, if all the data belongs to 2 days, and one of them contains 90% of data, then in this case, giving 255 reducers won't help, as only 2 reducers will consume data, and one of them will consume 90% of data, and will finish in a very long time, which WILL delay the whole MR job, while the rest reducers will finish within seconds. In this case, maybe you need to rethink what should be your key, and make sure each reducer get its fair share of volume of data. After the above fix (in fact, normally it will fix 90% of reducer performance problems, especially you have 255 reducer tasks available, so each one average will only get 1G data, good for your huge cluster only needs to process 256G data :-), if you want to make it even faster, then check you code. Do you have to use String.compareTo()? Is it slow? Google hadoop rawcomparator to see if you can do something here. After that, if you still think the reducer stage slow, check you cluster system. Does the reducer spend most time on copy stage, or sort, or in your reducer class? Find out the where the time spends, then identify the solution. Yong Date: Fri, 30 Aug 2013 11:02:05 -0400 Subject: Re: secondary sort - number of reducers From: adeelmahm...@gmail.com To: user@hadoop.apache.org my secondary sort on multiple keys seem to work fine with smaller data sets but with bigger data sets (like 256 gig and 800M+ records) the mapper phase gets done pretty quick (about 15 mins) but then the reducer phase seem to take forever. I am using 255 reducers. basic idea is that my composite key has both group and sort keys in it which i parse in the appropriate comparator classes to perform grouping and sorting .. my thinking is that mappers is where most of the work is done 1. mapper itself (create composite key and value)2. recods sorting3. partiotioner if all this gets done in 15 mins then reducer has the simple task of1. grouping comparator 2. reducer itself (simply output records) should take less time than mappers .. instead it essentially gets stuck in reduce phase .. im gonna paste my code here to see if anything stands out as a fundamental design issue //PARTITIONERpublic int getPartition(Text key, HCatRecord record, int numReduceTasks) { //extract the group key from composite key String groupKey = key.toString().split("\\|")[0]; return (groupKey.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } GROUP COMAPRATORpublic int compare(WritableComparable a, WritableComparable b) {//compare to text objects String thisGroupKey = ((Text) a).toString().split("\\|")[0]; String otherGroupKey = ((Text) b).toString().split("\\|")[0]; //extract return thisGroupKey.compareTo(otherGroupKey); } SORT COMPARATOR is similar to group comparator and is in map phase and gets done quick //REDUCER public void reduce(Text key, Iterable records, Context context) throws IOException, InterruptedException { log.info("in reducer for key " + key.toString()); Iterator recordsIter = records.iterator(); //we are only interested in the first record after sorting and grouping if(recordsIter.hasNext()){ HCatRecord rec = recordsIter.next();context.write(nw, rec); log.info("returned record >> " + rec.toString()); } } On Fri, Aug 30, 2013 at 9:24 AM, Adeel Qureshi wrote: yup it was negative and by doing this now it seems to be working fine On Fri, Aug 30, 2013 at 3:09 AM, Shekhar Sharma wrote: Is the hash code of that key is negative.? Do something like this return groupKey.hashCode() & Integer.MAX_VALUE % numParts; Regards, Som Shekhar Sharma +91-8197243810 On Fri, Aug 30, 2013 at 6:25 AM, Adeel Qureshi wrote: > okay so when i specify the number of reducers e.g. in my example i m using 4 > (for a much smaller data set) it works if I use a single column in my > composite key .. but if I add multiple columns in the composite key > separated by a delimi .. it then throws the illegal partition error (keys > before the pipe ar
RE: secondary sort - number of reducers
The method getPartition() needs to return a positive number. Simply use hashCode() method is not enough. See the Hadoop HashPartitioner implementation: return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; When I first read this code, I always wonder why not use Math.abs? Is ( & Integer.MAX_VALUE) faster? Yong Date: Thu, 29 Aug 2013 20:55:46 -0400 Subject: Re: secondary sort - number of reducers From: adeelmahm...@gmail.com To: user@hadoop.apache.org okay so when i specify the number of reducers e.g. in my example i m using 4 (for a much smaller data set) it works if I use a single column in my composite key .. but if I add multiple columns in the composite key separated by a delimi .. it then throws the illegal partition error (keys before the pipe are group keys and after the pipe are the sort keys and my partioner only uses the group keys java.io.IOException: Illegal partition for Atlanta:GA|Atlanta:GA:1:Adeel (-1) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at com.att.hadoop.hivesort.HSMapper.map(HSMapper.java:39)at com.att.hadoop.hivesort.HSMapper.map(HSMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.Child$4.run(Child.java:255)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136) at org.apache.hadoop.mapred.Child.main(Child.java:249) public int getPartition(Text key, HCatRecord record, int numParts) { //extract the group key from composite key String groupKey = key.toString().split("\\|")[0]; return groupKey.hashCode() % numParts; } On Thu, Aug 29, 2013 at 8:31 PM, Shekhar Sharma wrote: No...partitionr decides which keys should go to which reducer...and number of reducers you need to decide...No of reducers depends on factors like number of key value pair, use case etc Regards, Som Shekhar Sharma +91-8197243810 On Fri, Aug 30, 2013 at 5:54 AM, Adeel Qureshi wrote: > so it cant figure out an appropriate number of reducers as it does for > mappers .. in my case hadoop is using 2100+ mappers and then only 1 reducer > .. since im overriding the partitioner class shouldnt that decide how > manyredeucers there should be based on how many different partition values > being returned by the custom partiotioner > > > On Thu, Aug 29, 2013 at 7:38 PM, Ian Wrigley wrote: >> >> If you don't specify the number of Reducers, Hadoop will use the default >> -- which, unless you've changed it, is 1. >> >> Regards >> >> Ian. >> >> On Aug 29, 2013, at 4:23 PM, Adeel Qureshi wrote: >> >> I have implemented secondary sort in my MR job and for some reason if i >> dont specify the number of reducers it uses 1 which doesnt seems right >> because im working with 800M+ records and one reducer slows things down >> significantly. Is this some kind of limitation with the secondary sort that >> it has to use a single reducer .. that kind of would defeat the purpose of >> having a scalable solution such as secondary sort. I would appreciate any >> help. >> >> Thanks >> Adeel >> >> >> >> --- >> Ian Wrigley >> Sr. Curriculum Manager >> Cloudera, Inc >> Cell: (323) 819 4075 >> >
RE: copy files from hdfs to local fs
What's wrong by using old Unix pipe? hadoop fs -cat /user/input/foo.txt | head -100 > local_file Date: Thu, 29 Aug 2013 13:50:37 -0700 Subject: Re: copy files from hdfs to local fs From: chengi.liu...@gmail.com To: user@hadoop.apache.org tail will work as well.. ??? but i want to extract just (say) n lines out of this file? On Thu, Aug 29, 2013 at 1:43 PM, Kim Chew wrote: hadoop fs -copyToLocal or hadoop fs -get It copies the whole file and won't be able just to copy part of the file, what is interesting is there is a "tail" command but no "head". Kim On Thu, Aug 29, 2013 at 1:35 PM, Chengi Liu wrote: Ok, A very stupid question... I have a large file in /user/input/foo.txt I want to copy first 100 lines from this location to local filesystem... And the data is very sensitive so i am bit hesistant to experiment. What is the right way to copy sample data from hdfs to local fs.
RE: Jar issue
I am not sure the original suggestion will work for your case. My understanding is the you want to use some API, only exists in slf4j versiobn 1.6.4, but this library with different version already existed in your hadoop environment, which is quite possible. To change the maven build of the application maybe not work due to: 1) If some API of 1.6.4 being used in the application code, then it must be shipped with your code into hadoop cluster.2) What you are looking for maybe is this parameter "mapreducer.user.classpath.first", you can google it, which allow the user's class files loaded before the Hadoop's in mapper/reducer tasks.3) Keep in mind that even your code should be fine now, but if the library jar you submit MAYBE not backward compatibility, then it may cause problem in the Mapper/Reducer tasks for Hadoop code, as now the slf4j 1.6.4 version of class code being loaded into JVM. But this case is very rare, as a lot of you time you will submit a later version of jar than what being contained in hadoop, and most of them are backward compatibility.4) Maybe Hadoop can use different classloader to load the user's jar file, like OLD J2EE container did, to solve this kind of problem in the future.Yong Date: Tue, 27 Aug 2013 09:05:51 -0700 Subject: Re: Jar issue From: jamalsha...@gmail.com To: user@hadoop.apache.org I am right now using libjars option.How do i do what you suggested using that route? On Tue, Aug 27, 2013 at 8:51 AM, Shahab Yunus wrote: One idea is, you can use the exclusion property of maven (provided you are using that to build your application) while including hadoop dependencies and exclude sl4j that is coming within hadoop and then include your own sl4j as a separate dependency. Something like this: org.apache.hbase hbase 0.92.1-cdh4.1.2 log4j log4j slf4j-log4j12 org.slf4j Regards,Shahab
RE: Partitioner vs GroupComparator
As Harsh said, sometime you want to do the 2nd sort, but for MR, it can only be sorted by key, not by value. A lot of time, you want to the reducer output sort by a field, but only do the sort within a group, kind of like 'windowing sort' in relation DB SQL. For example, if you have a data about all the employee, you want the MR job to sort the Employee by salary, but within each department. So what you choose the key as the omit from Mapper? Department_id? If so, then it is hard to make the result sorted by salary. Using "Department_id + salary", then we cannot put all the data from one department into one reducer. In this case, you separate keys composing way from grouping way. You still use 'Department_id+salary' as the key, but override the GroupComparator to group ONLY by "Department_id", but in the meantime, you sort the data on both 'Department_id + salary'. The final goal is to make sure that all the data for the same department arrive in the same reducer, and when they arrive, they will be sorted by salary too, by utilizing the MR's sort/shuffle build-in ability. Yong Date: Fri, 23 Aug 2013 13:06:01 -0400 Subject: Re: Partitioner vs GroupComparator From: shahab.yu...@gmail.com To: user@hadoop.apache.org @Jan, why not, not send the 'hidden' part of the key as a value? Why not then pass value as null or with some other value part. So in the reducer side there is no duplication and you can extract the 'hidden' part of the key yourself (which should be possible as you will be encapsulating it in a some class/object model...? Regards,Shahab On Fri, Aug 23, 2013 at 12:22 PM, Jan Lukavský wrote: Hi all, when speaking about this, has anyone ever measured how much more data needs to be transferred over the network when using GroupingComparator the way Harsh suggests? What do I mean, when you use the GroupingComparator, it hides you the real key that you have emitted from Mapper. You just see the first key in the reduce group and any data that was carried in the key needs to be duplicated in the value in order to be accessible on the reduce end. Let's say you have key consisting of two parts (base, extension), you partition by the 'base' part and use GroupingComparator to group keys with the same base part. Than you have no other chance than to emit from Mapper something like this - (key: (base, extension), value: extension), which means the 'extension' part is duplicated in the data, that has to be transferred over the network. This overhead can be diminished by using compression between map and reduce side, but I believe that in some cases this can be significant. It would be nice if the API allowed to access the 'real' key for each value, not only the first key of the reduce group. The only way to get rid of this overhead now is by not using the GroupingComparator and instead store some internal state in the Reducer class, that is persisted across mutliple calls to reduce() method, which in my opinion makes using GroupingComparator this way less 'preferred' way of doing secondary sort. Does anyone have any experience with this overhead? Jan On 08/23/2013 06:05 PM, Harsh J wrote: The partitioner runs on the map-end. It assigns a partition ID (reducer ID) to each key. The grouping comparator runs on the reduce-end. It helps reducers, which read off a merge-sorted single file, to understand how to break the sequential file into reduce calls of . Typically one never overrides the GroupingComparator, and it is usually the same as the SortComparator. But if you wish to do things such as Secondary Sort, then overriding this comes useful - cause you may want to sort over two parts of a key object, but only group by one part, etc.. On Fri, Aug 23, 2013 at 8:49 PM, Eugene Morozov wrote: Hello, I have two different types of keys emerged from Map and processed by Reduce. These keys have some part in common. And I'd like to have similar keys in one reducer. For that purpose I used Partitioner and partition everything gets in by this common part. It seems to be fine, but MRUnit seems doesn't know anything about Partitioners. So, here is where GroupComparator comes into play. It seems that MRUnit well aware of the guy, but it surprises me: it looks like Partitioner and GroupComparator are actually doing exactly same - they both somehow group keys to have them in one reducer. Could you shed some light on it, please. --
RE: running map tasks in remote node
It is possible to do what you are trying to do, but only make sense if your MR job is very CPU intensive, and you want to use the CPU resource in your cluster, instead of the IO. You may want to do some research about what is the HDFS's role in Hadoop. First but not least, it provides a central storage for all the files will be processed by MR jobs. If you don't want to use HDFS, so you need to identify a share storage to be shared among all the nodes in your cluster. HDFS is NOT required, but a shared storage is required in the cluster. For simply your question, let's just use NFS to replace HDFS. It is possible for a POC to help you understand how to set it up. Assume your have a cluster with 3 nodes (one NN, two DN. The JT running on NN, and TT running on DN). So instead of using HDFS, you can try to use NFS by this way: 1) Mount /share_data in all of your 2 data nodes. They need to have the same mount. So /share_data in each data node point to the same NFS location. It doesn't matter where you host this NFS share, but just make sure each data node mount it as the same /share_data2) Create a folder under /share_data, put all your data into that folder.3) When kick off your MR jobs, you need to give a full URL of the input path, like 'file:///shared_data/myfolder', also a full URL of the output path, like 'file:///shared_data/output'. In this way, each mapper will understand that in fact they will run the data from local file system, instead of HDFS. That's the reason you want to make sure each task node has the same mount path, as 'file:///shared_data/myfolder' should work fine for each task node. Check this and make sure that /share_data/myfolder all point to the same path in each of your task node.4) You want each mapper to process one file, so instead of using the default 'TextInputFormat', use a 'WholeFileInputFormat', this will make sure that every file under '/share_data/myfolder' won't be split and sent to the same mapper processor. 5) In the above set up, I don't think you need to start NameNode or DataNode process any more, anyway you just use JobTracker and TaskTracker.6) Obviously when your data is big, the NFS share will be your bottleneck. So maybe you can replace it with Share Network Storage, but above set up gives you a start point.7) Keep in mind when set up like above, you lost the Data Replication, Data Locality etc, that's why I said it ONLY makes sense if your MR job is CPU intensive. You simple want to use the Mapper/Reducer tasks to process your data, instead of any scalability of IO. Make sense? Yong Date: Fri, 23 Aug 2013 15:43:38 +0530 Subject: Re: running map tasks in remote node From: rab...@gmail.com To: user@hadoop.apache.org Thanks for the reply. I am basically exploring possible ways to work with hadoop framework for one of my use case. I have my limitations in using hdfs but agree with the fact that using map reduce in conjunction with hdfs makes sense. I successfully tested wholeFileInputFormat by some googling. Now, coming to my use case. I would like to keep some files in my master node and want to do some processing in the cloud nodes. The policy does not allow us to configure and use cloud nodes as HDFS. However, I would like to span a map process in those nodes. Hence, I set input path as local file system, for example, $HOME/inputs. I have a file listing filenames (10 lines) in this input directory. I use NLineInputFormat and span 10 map process. Each map process gets a line. The map process will then do a file transfer and process it. However, I get an error in the map saying that the FileNotFoundException $HOME/inputs. I am sure this directory is present in my master but not in the slave nodes. When I copy this input directory to slave nodes, it works fine. I am not able to figure out how to fix this and the reason for the error. I am not understand why it complains about the input directory is not present. As far as I know, slave nodes get a map and map method contains contents of the input file. This should be fine for the map logic to work. with regardsrabmdu On Thu, Aug 22, 2013 at 4:40 PM, java8964 java8964 wrote: If you don't plan to use HDFS, what kind of sharing file system you are going to use between cluster? NFS?For what you want to do, even though it doesn't make too much sense, but you need to the first problem as the shared file system. Second, if you want to process the files file by file, instead of block by block in HDFS, then you need to use the WholeFileInputFormat (google this how to write one). So you don't need a file to list all the files to be processed, just put them into one folder in the sharing file system, then send this folder to your MR job. In this way, as long as each node can access it through some file system URL, each file will be pr
RE: running map tasks in remote node
If you don't plan to use HDFS, what kind of sharing file system you are going to use between cluster? NFS?For what you want to do, even though it doesn't make too much sense, but you need to the first problem as the shared file system. Second, if you want to process the files file by file, instead of block by block in HDFS, then you need to use the WholeFileInputFormat (google this how to write one). So you don't need a file to list all the files to be processed, just put them into one folder in the sharing file system, then send this folder to your MR job. In this way, as long as each node can access it through some file system URL, each file will be processed in each mapper. Yong Date: Wed, 21 Aug 2013 17:39:10 +0530 Subject: running map tasks in remote node From: rab...@gmail.com To: user@hadoop.apache.org Hello, Here is the new bie question of the day. For one of my use cases, I want to use hadoop map reduce without HDFS. Here, I will have a text file containing a list of file names to process. Assume that I have 10 lines (10 files to process) in the input text file and I wish to generate 10 map tasks and execute them in parallel in 10 nodes. I started with basic tutorial on hadoop and could setup single node hadoop cluster and successfully tested wordcount code. Now, I took two machines A (master) and B (slave). I did the below configuration in these machines to setup a two node cluster. hdfs-site.xml dfs.replication 1 dfs.name.dir /tmp/hadoop-bala/dfs/name dfs.data.dir /tmp/hadoop-bala/dfs/data mapred.job.trackerA:9001 mapred-site.xml mapred.job.trackerA:9001 mapreduce.tasktracker.map.tasks.maximum 1 core-site.xml fs.default.name hdfs://A:9000 In A and B, I do have a file named ‘slaves’ with an entry ‘B’ in it and another file called ‘masters’ wherein an entry ‘A’ is there. I have kept my input file at A. I see the map method process the input file line by line but they are all processed in A. Ideally, I would expect those processing to take place in B. Can anyone highlight where I am going wrong? regardsrab
java.io.IOException: Task process exit with nonzero status of -1
Hi, This is a 4 node hadoop cluster running on CentOS 6.3 with Oracle JDK (64bit) 1.6.0_43. Each node has 32G memory, with max 8 mapper tasks and 4 reducer tasks being set. The hadoop version is 1.0.4. This is setup on Datastax DES 3.0.2, which is using Cassandra CFS as underline DFS, instead of HDFS with NameNode. I understand this kind of setting is not really being tested with hadoop MR, but the above MR errors should not relate to it, at least from my guess. I am running a simple MR job, partition data by DATE for 700G of 600 files. The MR logic is very straightforward, but in our above staging environment, I saw a lot of Reducers failed with the above error. I want to know the reason and fix it. 1) There is no log related to this error in the reducer task attempt log in user log directory. The only log related to this is in the system.log, which generated by cassandra processor: INFO [JVM Runner jvm_201308141528_0003_r_625176200 spawned.] 2013-08-15 07:28:59,326 JvmManager.java (line 510) JVM : jvm_201308141528_0003_r_625176200 exited with exit code -1. Number of tasks it ran: 0 2) I believe this error is related to the system resource, but just cannot google anything to be the root cause. From the log, I believe the JVM terminated/crashed for the reducer task, but I don't know the reason. 3) I checked the limits of the user which process is running under, here is the info, and I didn't spot any obvious problems.-bash-4.1$ ulimit -acore file size (blocks, -c) 0data seg size (kbytes, -d) unlimitedscheduling priority (-e) 0file size (blocks, -f) unlimitedpending signals (-i) 256589max locked memory (kbytes, -l) unlimitedmax memory size (kbytes, -m) unlimitedopen files (-n) 40pipe size(512 bytes, -p) 8POSIX message queues (bytes, -q) 819200real-time priority (-r) 0stack size (kbytes, -s) 10240cpu time (seconds, -t) unlimitedmax user processes (-u) 32768virtual memory (kbytes, -v) unlimitedfile locks (-x) unlimited 4) Since this is a new cluster, there is really not too much hadoop setting changed from the default value. I did run the reducer as '-mx2048m', to set the heap size of JVM to 2G, as 1st time the reducers failed with OOM error. I google around, as it looks like people recommend to set "mapred.child.ulimit" to 3x of heap size, which should be around 6G in this case. I can give that a try, but in the nodes, the virtual memory is set to unlimited for user whom is running under, so I am not sure if this will really fix it. 5) Another possibility I found in google is that the child process return -1 when it failed to write to user logs, as Linux EXT3 has a limitation about how many file/directories can be created under one folder (32k?). But my system is using EXT4, and there is not too many MR jobs running so far. 6) I am really not sure what is the root cause of this, as exit code -1 could mean a lot. But I wonder any one here can give me more hints, or any help about debugging this issue in my environment? Is there any way in hapoop or JVM setting I can set to dump more info/log about why the JVM terminated at runtime with exit code -1? Thanks Yong
RE: Encryption in HDFS
I am also interested in your research. Can you share some insight about the following questions? 1) When you use CompressionCodec, can the encrypted file split? From my understand, there is no encrypt way can make the file decryption individually by block, right? For example, if I have 1G file, encrypted using AES, how do you or can you decrypt the file block by block, instead of just using one mapper to decrypt the whole file? 2) In your CompressionCodec implementation, do you use the DecompressorStream or BlockDecompressorStream? If BlockDecompressorStream, can you share some examples? Right now, I have some problems to use BlockDecompressorStream to do the exactly same thing as you did.3) Do you have any plan to share your code, especially if you did use BlockDecompressorStream and make the encryption file decrypted block by block in the hadoop MapReduce job. Thanks Yong From: render...@gmail.com Date: Tue, 26 Feb 2013 14:10:08 +0900 Subject: Encryption in HDFS To: user@hadoop.apache.org Hello, I'm a university student. I implemented AES and Triple DES with CompressionCodec in java cryptography architecture (JCA)The encryption is performed by a client node using Hadoop API. Map tasks read blocks from HDFS and these blocks are decrypted by each map tasks.I tested my implementation with generic HDFS. My cluster consists of 3 nodes (1 master node, 3 worker nodes) and each machines have quad core processor (i7-2600) and 4GB memory. A test input is 1TB text file which consists of 32 multiple text files (1 text file is 32GB) I expected that the encryption takes much more time than generic HDFS. The performance does not differ significantly. The decryption step takes about 5-7% more than generic HDFS. The encryption step takes about 20-30% more than generic HDFS because it is implemented by single thread and executed by 1 client node. So the encryption can get more performance. May there be any error in my test? I know there are several implementation for encryting files in HDFS. Are these implementations enough to secure HDFS? best regards, seonpark * Sorry for my bad english
RE: Question related to Decompressor interface
Can someone share some idea what the Hadoop source code of class org.apache.hadoop.io.compress.BlockDecompressorStream, method rawReadInt() is trying to do here? There is a comment in the code this this method shouldn't return negative number, but in my testing file, it contains the following bytes from the inputStream: 248, 19, 20, 116, which corresponding to b1, b2, b3, b4. After the 4 bytes is read fromt the input stream, then the return result will be a negative number here, as (b1 << 24) = -134217728(b2 << 16) = 1245184(b3 << 8) = 5120(b4 << 0) = 116 I am not sure what logic of this method is trying to do here, can anyone share some idea about it? Thanks private int rawReadInt() throws IOException { int b1 = in.read(); int b2 = in.read(); int b3 = in.read(); int b4 = in.read(); if ((b1 | b2 | b3 | b4) < 0) throw new EOFException(); return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0)); } From: java8...@hotmail.com To: user@hadoop.apache.org Subject: Question related to Decompressor interface Date: Sat, 9 Feb 2013 15:49:31 -0500 HI, Currently I am researching about options of encrypting the data in the MapReduce, as we plan to use the Amazon EMR or EC2 services for our data. I am thinking that the compression codec is good place to integrate with the encryption logic, and I found out there are some people having the same idea as mine. I google around and found out this code: https://github.com/geisbruch/HadoopCryptoCompressor/ It doesn't seem maintained any more, but it gave me a starting point. I download the source code, and try to do some tests with it. It doesn't work out of box. There are some bugs I have to fix to make it work. I believe it contains 'AES' as an example algorithm. But right now, I faced a problem when I tried to use it in my testing MapReduer program. Here is the stack trace I got: 2013-02-08 23:16:47,038 INFO org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor: buf length = 512, and offset = 0, length = -132967308java.lang.IndexOutOfBoundsExceptionat java.nio.ByteBuffer.wrap(ByteBuffer.java:352)at org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor.setInput(CryptoBasicDecompressor.java:100) at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:97) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:83) at java.io.InputStream.read(InputStream.java:82)at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:114) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:458) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)at org.apache.hadoop.mapred.Child$4.run(Child.java:268)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:396)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332) at org.apache.hadoop.mapred.Child.main(Child.java:262) I know the error is thrown out of this custom CryptoBasicDecompressor class, but I really have questions related to the interface it implemented: Decompressor. There is limited document about this interface, for example, when and how the method setInput() will be invoked. If I want to write my own Decompressor, what do these methods mean in the interface?In the above case, I enable some debug information, you can see that in this case, the byte[] array passed to setInput method, only have 512 as the length, but the 3rd parameter of length passed in is a negative number: -132967308. That caused the IndexOutOfBoundsException. If I check the GzipDecompressor class of this method in the hadoop, the code will also throw IndexOutoutBoundsException in this case, so this is a RuntimeException case. Why it happened in my test case? Here is my test case: I have a simpel log text file about 700k. I encrypted it with above code using 'AES'. I can encrypted and decrypted to get my original content. The file name is foo.log.crypto, this file extension is registered to invoke this CryptoBasicDecompressor in my testing hadoop using CDH4.1.2 release (hadoop 2.0). Everything works as I expected. The CryptoBasicDecompressor is invoked when the input file is foo.log.crypto, as you can see in the above stack trace. But I don't know why the 3rd parameter (length) in setInput() is a negative number at runtime.
RE: Loader for small files
Hi, Davie: I am not sure I understand this suggestion. Why smaller block size will help this performance issue? >From what the original question about, it looks like the performance problem >is due to that there are a lot of small files, and each file will run in its >own mapper. As hadoop needs to start a lot of mappers (I think creating a mapper also takes time and resource), but each mapper only take small amount of data (maybe hundreds K or several M of data, much less than the block size), most of the time is wasting on creating task instance for mapper, but each mapper finishes very quickly. This is the reason of performance problem, right? Do I understand the problem wrong? If so, reducing the block size won't help in this case, right? To fix it, we need to merge multi-files into one mapper, so let one mapper has enough data to process. Unless my understanding is total wrong, I don't know how reducing block size will help in this case. Thanks Yong > Subject: Re: Loader for small files > From: davidlabarb...@localresponse.com > Date: Mon, 11 Feb 2013 15:38:54 -0500 > CC: user@hadoop.apache.org > To: u...@pig.apache.org > > What process creates the data in HDFS? You should be able to set the block > size there and avoid the copy. > > I would test the dfs.block.size on the copy and see if you get the mapper > split you want before worrying about optimizing. > > David > > On Feb 11, 2013, at 2:10 PM, Something Something > wrote: > > > David: Your suggestion would add an additional step of copying data from > > one place to another. Not bad, but not ideal. Is there no way to avoid > > copying of data? > > > > BTW, we have tried changing the following options to no avail :( > > > > set pig.splitCombination false; > > > > & a few other 'dfs' options given below: > > > > mapreduce.min.split.size > > mapreduce.max.split.size > > > > Thanks. > > > > On Mon, Feb 11, 2013 at 10:29 AM, David LaBarbera < > > davidlabarb...@localresponse.com> wrote: > > > >> You could store your data in smaller block sizes. Do something like > >> hadoop fs HADOOP_OPTS="-Ddfs.block.size=1048576 > >> -Dfs.local.block.size=1048576" -cp /org-input /small-block-input > >> You might only need one of those parameters. You can verify the block size > >> with > >> hadoop fsck /small-block-input > >> > >> In your pig script, you'll probably need to set > >> pig.maxCombinedSplitSize > >> to something around the block size > >> > >> David > >> > >> On Feb 11, 2013, at 1:24 PM, Something Something > >> wrote: > >> > >>> Sorry.. Moving 'hbase' mailing list to BCC 'cause this is not related to > >>> HBase. Adding 'hadoop' user group. > >>> > >>> On Mon, Feb 11, 2013 at 10:22 AM, Something Something < > >>> mailinglist...@gmail.com> wrote: > >>> > Hello, > > We are running into performance issues with Pig/Hadoop because our input > files are small. Everything goes to only 1 Mapper. To get around > >> this, we > are trying to use our own Loader like this: > > 1) Extend PigStorage: > > public class SmallFileStorage extends PigStorage { > > public SmallFileStorage(String delimiter) { > super(delimiter); > } > > @Override > public InputFormat getInputFormat() { > return new NLineInputFormat(); > } > } > > > > 2) Add command line argument to the Pig command as follows: > > -Dmapreduce.input.lineinputformat.linespermap=50 > > > > 3) Use SmallFileStorage in the Pig script as follows: > > USING com.xxx.yyy.SmallFileStorage ('\t') > > > But this doesn't seem to work. We still see that everything is going to > one mapper. Before we spend any more time on this, I am wondering if > >> this > is a good approach – OR – if there's a better approach? Please let me > know. Thanks. > > > > >> > >> >
RE: number input files to mapreduce job
I don't think you can get list of all input files in the mapper, but what you can get is the current file's information. In the context object reference, you can get the InputSplit(), which should give you all the information you want of the current input file. http://hadoop.apache.org/docs/r2.0.2-alpha/api/org/apache/hadoop/mapred/FileSplit.html Date: Tue, 12 Feb 2013 12:35:16 +0530 Subject: number input files to mapreduce job From: vikascjadha...@gmail.com To: user@hadoop.apache.org Hi all,How to get number of Input files and thier to particular mapreduce job in java MapReduce program. -- Thanx and Regards Vikas Jadhav
RE: Confused about splitting
Hi, Chris: Here is my understand about the file split and Data block. The HDFS will store your file into multi data blocks, each block will be 64M or 128M depend on your setting. Of course, the file could contain multi records. So the boundary of the record won't match with the block boundary (in fact, most of them don't match).It is the responsibility of RecorderReader to figure that out. The RecorderReader will be given byte[] of the file split (or block) it should handle, and most likely the end of this block won't BE an end of Record. So when the RecorderReader read the end of block, it will ALSO continue to the first part of byte[] of next block, to build up a whole recorder of last one. Based on this contract, the RecorderReader instance which handles the next block, will ignore the first part of byte[], as they are just part of a previous recorder, and go straight to the starting point of next Record. The above logic is all based on assuming that the file is split-able. I did a project with the log file could contain "embedded newline characters", so the TextInputFormat/LineRecorderReader coming from Hadoop won't work in this case, and I have to write my own InputFormat/RecorderReader to handle the above logic. To make File/InputFormat/RecorderReader support split-able is important for performance, as the data can be processed concurrently block by block. But some file format, especially compressing formats, like GZIP, do not support file split-able. In this case, each file can ONLY be handle by one mapper. If you want to store your data into Gzip format, maybe you want to control your file size, make it close to the block size. For data stored in google protocol buffer, you probably have to write your own InputFormat/RecorderReader to make it split-able. You can consider LZO format, as it is compressing and also support split. You can search the elephant-bird, which is a framework from twitter to support google protocol buffer and lzo data format, make your life easier. Thanks Yong Date: Sun, 10 Feb 2013 10:36:24 -0500 Subject: Confused about splitting From: cpigg...@gmail.com To: user@hadoop.apache.org I'm a little confused about splitting and readers. The data in my application is stored in files of google protocol buffers. There are multiple protocol buffers per file. There have been a number of simple ways to put multiple protobufs in a single file, usually involving writing some kind of length field before. We did something a little more complicated by defining a frame similar to HDLC: frames are enveloped by a flag, escapes provided so the flag can't occur within the frame; and there is a 32-bit CRC-like checksum just before the closing flag. The protobufs are all a type named RitRecord, and we have our own reader that's something like this: public interface RitRecordReader { RitRecord getNext(); } The data collection appication stores these things in ordinary flat files (the whole thing is run through a GzipOutputFilter first, so the files are compressed). I'm having trouble understanding how to best apply this to HDFS for map function consumption. Our data collector writes 1 megabyte files, but I can combine them for map/reduce performance. To avoid TOO much wasted space I was thinking about 16, 32, or 64 MB HDFS blocks (tbd). What I don't get is this: suppose we have a long file that spans multiple HDFS blocks. I think I end up with problems similar to this guy: http://blog.rguha.net/?p=293 where one of my RitRecord objects is half in one HDFS block and half in another HDFS block. If the mapper is assigning tasks to nodes along HDFS blocks then I'm going to end up with a problem. It's not yet clear to me how to solve this. I could make the problem LESS likely with bigger blocks (like the default 128MB) but even then, the problem doesn't completely go away (for me, a >128MB file is unlikely but not impossible). --Chris
RE: Question related to Decompressor interface
Hi, Dave: Thanks for you reply. I am not sure how the EncryptedWritable will work, can you share more ideas about it? For example, if I have a text file as my source raw file. Now I need to store it in HDFS. If I use any encryption to encrypt the whole file, then there is no good InputFormat or RecordReader to process it, unless whole file is decrypted first at runtime, then using TextInputFormat to process it, right? What you suggest is when I encrypted the file, store it as a SequenceFile, using anything I want as the key, then encrypt each line (Record), and stores it as the value, put both (key, value) pair into the sequence file, is that right? Then in the runtime, each value can be decrypted from the sequence file, and ready for next step in the by the EncryptedWritable class. Is my understanding correct? In this case, of course I don't need to worry about split any more, as each record is encrypted/decrypted separately. I think it is a valid option, but problem is that the data has to be encrypted by this EncryptedWritable class. What I was thinking about is allow data source to encrypt its data any way they want, as long as it is supported by Java security package, then only provide the private key to the runtime to decrypt it. Yong From: davidpark...@yahoo.com To: user@hadoop.apache.org Subject: RE: Question related to Decompressor interface Date: Sun, 10 Feb 2013 09:36:40 +0700 I can’t answer your question about the Decompressor interface, but I have a query for you. Why not just create an EncryptedWritable object? Encrypt/decrypt the bytes on the read/write method, that should be darn near trivial. Then stick with good ‘ol SequenceFile, which, as you note, is splittable. Otherwise you’d have to deal with making the output splittable, and given encrypted data, the only solution that I see is basically rolling your own SequenceFile with encrypted innards. Come to think of it, a simple, standardized EncryptedWritable object out of the box with Hadoop would be great. Or perhaps better yet, an EncryptedWritableWrapper so we can convert any existing Writable into an encrypted form. Dave From: java8964 java8964 [mailto:java8...@hotmail.com] Sent: Sunday, February 10, 2013 3:50 AM To: user@hadoop.apache.org Subject: Question related to Decompressor interface HI, Currently I am researching about options of encrypting the data in the MapReduce, as we plan to use the Amazon EMR or EC2 services for our data. I am thinking that the compression codec is good place to integrate with the encryption logic, and I found out there are some people having the same idea as mine. I google around and found out this code: https://github.com/geisbruch/HadoopCryptoCompressor/ It doesn't seem maintained any more, but it gave me a starting point. I download the source code, and try to do some tests with it. It doesn't work out of box. There are some bugs I have to fix to make it work. I believe it contains 'AES' as an example algorithm. But right now, I faced a problem when I tried to use it in my testing MapReduer program. Here is the stack trace I got: 2013-02-08 23:16:47,038 INFO org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor: buf length = 512, and offset = 0, length = -132967308java.lang.IndexOutOfBoundsExceptionat java.nio.ByteBuffer.wrap(ByteBuffer.java:352)at org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor.setInput(CryptoBasicDecompressor.java:100) at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:97) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:83) at java.io.InputStream.read(InputStream.java:82)at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:114) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:458) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)at org.apache.hadoop.mapred.Child$4.run(Child.java:268)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:396)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332) at org.apache.hadoop.mapred.Child.main(Child.java:262) I know the error is thrown out of this custom CryptoBasicDecompressor class, but I really have questions related to the interface it implemented: Decompressor. T
RE: What to do/check/debug/root cause analysis when jobtracker hang
Our cluster on cdh3u4 has the same problem. I think it is caused by some bugs in JobTracker. I believe Cloudera knows about this issue. After upgrading to cdh3u5, we havn't faced this issue yet, but I am not sure if it is confirmed to fix in the CDH3U5. Yong > Date: Mon, 4 Feb 2013 15:21:18 -0800 > Subject: What to do/check/debug/root cause analysis when jobtracker hang > From: silvianhad...@gmail.com > To: user@hadoop.apache.org > > Lately, jobtracker in one of our production cluster fall into hang state. > The load 5,10,15min is like 1 ish; > with top command, jobtracker has 100% cpu all the time. > > So, i went ahead to try top -H -p jobtracker_pid, and always see a > thread that have 100% cpu all the time. > > Unless we restart jobtracker, the hang state would never go away. > > I found OOM in jobtracker log file during the hang state. > > how could i know what is really going on on the one and only one > thread that has 100% cpu. > > how could i prove that we run out of memory because amount of job > _OR_ > there is memory leak in application side. ? > > > I tried jstack to dump, and http://jobtracker:50030/stacks > > i just don't know what I should really look at output of those commands. > > The cluster is cdh3u4, on Centos6.2, with disable transparent_hugepage. > > > > hopefully this make sense, > -P
RE: Profiling the Mapper using hprof on Hadoop 0.20.205
What range you gave it for mapred.task.profile.maps? And you sure your mapper will invoke the methods you expect in the traces? Yong Date: Wed, 6 Feb 2013 23:50:08 +0200 Subject: Profiling the Mapper using hprof on Hadoop 0.20.205 From: yaron.go...@gmail.com To: user@hadoop.apache.org Hi,I wish to profile my mapper, so I've set the properties mapred.task.profile and mapred.task.profile.maps in mapred-site.xml. At the end of the job I'm getting a profile.out file, however I think it's not the profile of the mapper... none of the methods I use in the mapper appear on any of the traces. Am I doing something wrong? I'm using a small cluster with 4 machines. Thanks,Yaron
RE: Cumulative value using mapreduce
tive Amount DR Cumulative Amount 1000 0 1000 500 1000 2000 3000 2000 Hope the problem is clear now. Please provide your suggestions on the approach to the solution. Regards, Sarath. On Friday 05 October 2012 02:51 AM, Bertrand Dechoux wrote: I indeed didn't catch the cumulative sum part. Then I guess it begs for what-is-often-called-a-secondary-sort, if you want to compute different cumulative sums during the same job. It can be more or less easy to implement depending on which API/library/tool you are using. Ted comments on performance are spot on. Regards Bertrand On Thu, Oct 4, 2012 at 9:02 PM, java8964 java8964 wrote: I did the cumulative sum in the HIVE UDF, as one of the project for my employer. 1) You need to decide the grouping elements for your cumulative. For example, an account, a department etc. In the mapper, combine these information as your omit key. 2) If you don't have any grouping requirement, you just want a cumulative sum for all your data, then send all the data to one common key, so they will all go to the same reducer. 3) When you calculate the cumulative sum, does the output need to have a sorting order? If so, you need to do the 2nd sorting, so the data will be sorted as the order you want in the reducer. 4) In the reducer, just do the sum, omit every value per original record (Not per key). I will suggest you do this in the UDF of HIVE, as it is much easy, if you can build a HIVE schema on top of your data. Yong From: tdunn...@maprtech.com Date: Thu, 4 Oct 2012 18:52:09 +0100 Subject: Re: Cumulative value using mapreduce To: user@hadoop.apache.org Bertrand is almost right. The only difference is that the original poster asked about cumulative sum. This can be done in reducer exactly as Bertrand described except for two points that make it different from word count: a) you can't use a combiner b) the output of the program is as large as the input so it will have different performance characteristics than aggregation programs like wordcount. Bertrand's key recommendation to go read a book is the most important advice. On Thu, Oct 4, 2012 at 5:20 PM, Bertrand Dechoux wrote: Hi, It sounds like a 1) group information by account 2) compute sum per account If that not the case, you should precise a bit more about your context. This computing looks like a small variant of wordcount. If you do not know how to do it, you should read books about Hadoop MapReduce and/or online tutorial. Yahoo's is old but still a
RE: Cumulative value using mapreduce
I did the cumulative sum in the HIVE UDF, as one of the project for my employer. 1) You need to decide the grouping elements for your cumulative. For example, an account, a department etc. In the mapper, combine these information as your omit key.2) If you don't have any grouping requirement, you just want a cumulative sum for all your data, then send all the data to one common key, so they will all go to the same reducer.3) When you calculate the cumulative sum, does the output need to have a sorting order? If so, you need to do the 2nd sorting, so the data will be sorted as the order you want in the reducer.4) In the reducer, just do the sum, omit every value per original record (Not per key). I will suggest you do this in the UDF of HIVE, as it is much easy, if you can build a HIVE schema on top of your data. Yong From: tdunn...@maprtech.com Date: Thu, 4 Oct 2012 18:52:09 +0100 Subject: Re: Cumulative value using mapreduce To: user@hadoop.apache.org Bertrand is almost right. The only difference is that the original poster asked about cumulative sum. This can be done in reducer exactly as Bertrand described except for two points that make it different from word count: a) you can't use a combiner b) the output of the program is as large as the input so it will have different performance characteristics than aggregation programs like wordcount. Bertrand's key recommendation to go read a book is the most important advice. On Thu, Oct 4, 2012 at 5:20 PM, Bertrand Dechoux wrote: Hi, It sounds like a1) group information by account2) compute sum per account If that not the case, you should precise a bit more about your context. This computing looks like a small variant of wordcount. If you do not know how to do it, you should read books about Hadoop MapReduce and/or online tutorial. Yahoo's is old but still a nice read to begin with : http://developer.yahoo.com/hadoop/tutorial/ Regards, Bertrand On Thu, Oct 4, 2012 at 3:58 PM, Sarath wrote: Hi, I have a file which has some financial transaction data. Each transaction will have amount and a credit/debit indicator. I want to write a mapreduce program which computes cumulative credit & debit amounts at each record and append these values to the record before dumping into the output file. Is this possible? How can I achieve this? Where should i put the logic of computing the cumulative values? Regards, Sarath. -- Bertrand Dechoux
why hadoop does not provide a round robin partitioner
Hi, During my development of ETLs on hadoop platform, there is one question I want to ask, why hadoop didn't provide a round robin partitioner? >From my experience, it is very powerful option for small limited distinct >value keys case, and balance the ETL resource. Here is what I want to say: 1) Sometimes, you will have an ETL with small number of Keys, for example, partitioned the data by Dates, or by Hours etc. So in every ETL load, I will have very limited count of unique key values (Maybe 10, if I load 10 days data, or 24 if I load one days data and use the hour as the key).2) The HashPartitioner is good, given it will randomly generate the partitioner number, if you have a large number of distinct keys.3) A lot of times, I have enough spare reducers, but because the hashCode() method happens to return several keys into one partitioner, all the data of those keys will go to the same reducer process, which is not very efficiently as there are some spare reducers just happen to get nothing to do.4) Of course I can implement my own partitioner to control this, but I wonder it should not to be too harder to implements a round robin partitioner as in general case, which will equally distribute the different keys into the available reducers. Of course, with the distinct count of keys grows, the performance of this partitioner decrease badly. But if we know the count of distinct keys is small enough, use this kind of parittioner will be a good option, right? Thanks Yong