Author: todd Date: Thu Jul 24 06:17:33 2014 New Revision: 1613004 URL: http://svn.apache.org/r1613004 Log: MAPREDUCE-5996. native-task: Rename system tests into standard directory layout. Contributed by Todd Lipcon.
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/EnforceNativeOutputCollectorDelegator.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/MockValueClass.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ScenarioConfiguration.java hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestConstants.java Removed: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/ Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt?rev=1613004&r1=1613003&r2=1613004&view=diff ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt (original) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt Thu Jul 24 06:17:33 2014 @@ -3,3 +3,4 @@ Changes for Hadoop Native Map Output Col MAPREDUCE-5985. native-task: Fix build on macosx. Contributed by Binglin Chang MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd) +MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd) Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.combinertest; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.IntSumReducer; +import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.TokenizerMapper; +import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile; +import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Before; +import org.junit.Test; + +public class CombinerTest { + private FileSystem fs; + private String inputpath; + private String nativeoutputpath; + private String hadoopoutputpath; + + @Test + public void testWordCountCombiner() { + try { + + final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration(); + nativeConf.addResource(TestConstants.COMBINER_CONF_PATH); + final Job nativejob = getJob("nativewordcount", nativeConf, inputpath, nativeoutputpath); + + final Configuration commonConf = ScenarioConfiguration.getNormalConfiguration(); + commonConf.addResource(TestConstants.COMBINER_CONF_PATH); + + final Job normaljob = getJob("normalwordcount", commonConf, inputpath, hadoopoutputpath); + + nativejob.waitForCompletion(true); + + Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); + + normaljob.waitForCompletion(true); + Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); + + assertEquals(true, ResultVerifier.verify(nativeoutputpath, hadoopoutputpath)); + assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", + nativeReduceGroups.getValue(), normalReduceGroups.getValue()); + + } catch (final Exception e) { + e.printStackTrace(); + assertEquals("run exception", true, false); + } + } + + @Before + public void startUp() throws Exception { + final ScenarioConfiguration conf = new ScenarioConfiguration(); + conf.addcombinerConf(); + + this.fs = FileSystem.get(conf); + + this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY, + TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount"; + + if (!fs.exists(new Path(inputpath))) { + new TestInputFile( + conf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE, 1000000), + Text.class.getName(), + Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a')); + } + + this.nativeoutputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH, + TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativewordcount"; + this.hadoopoutputpath = conf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH, + TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalwordcount"; + } + + protected static Job getJob(String jobname, Configuration inputConf, String inputpath, String outputpath) + throws Exception { + final Configuration conf = new Configuration(inputConf); + conf.set("fileoutputpath", outputpath); + final FileSystem fs = FileSystem.get(conf); + if (fs.exists(new Path(outputpath))) { + fs.delete(new Path(outputpath)); + } + fs.close(); + final Job job = new Job(conf, jobname); + job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.addInputPath(job, new Path(inputpath)); + FileOutputFormat.setOutputPath(job, new Path(outputpath)); + return job; + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.combinertest; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile; +import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Test; + +public class LargeKVCombinerTest { + + @Test + public void testLargeValueCombiner(){ + final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration(); + final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration(); + normalConf.addResource(TestConstants.COMBINER_CONF_PATH); + nativeConf.addResource(TestConstants.COMBINER_CONF_PATH); + final int deafult_KVSize_Maximum = 1 << 22; // 4M + final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST, + deafult_KVSize_Maximum); + final String inputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY, + TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/largeKV"; + final String nativeOutputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH, + TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativeLargeKV"; + final String hadoopOutputPath = normalConf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH, + TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalLargeKV"; + try { + final FileSystem fs = FileSystem.get(normalConf); + for (int i = 65536; i <= KVSize_Maximu; i *= 4) { + + int max = i; + int min = Math.max(i / 4, max - 10); + + System.out.println("===KV Size Test: min size: " + min + ", max size: " + max); + + normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min)); + normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max)); + nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min)); + nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max)); + fs.delete(new Path(inputPath), true); + new TestInputFile(normalConf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE, + 1000000), IntWritable.class.getName(), + Text.class.getName(), normalConf).createSequenceTestFile(inputPath, 1); + + final Job normaljob = CombinerTest.getJob("normalwordcount", normalConf, inputPath, hadoopOutputPath); + final Job nativejob = CombinerTest.getJob("nativewordcount", nativeConf, inputPath, nativeOutputPath); + + nativejob.waitForCompletion(true); + Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); + + normaljob.waitForCompletion(true); + Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); + + final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath); + + final String reason = "LargeKVCombinerTest failed with, min size: " + min + + ", max size: " + max + ", normal out: " + hadoopOutputPath + ", native Out: " + nativeOutputPath; + + assertEquals(reason, true, compareRet); +// assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", +// nativeReduceGroups.getValue(), normalReduceGroups.getValue()); + } + fs.close(); + } catch (final Exception e) { + e.printStackTrace(); + assertEquals("run exception", true, false); + } + } + +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.combinertest; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile; +import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.apache.hadoop.mapreduce.Counter; +import org.junit.Before; +import org.junit.Test; + +public class OldAPICombinerTest { + private FileSystem fs; + private String inputpath; + + @Test + public void testWordCountCombinerWithOldAPI() throws Exception { + final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration(); + nativeConf.addResource(TestConstants.COMBINER_CONF_PATH); + final String nativeoutput = nativeConf.get(TestConstants.OLDAPI_NATIVETASK_TEST_COMBINER_OUTPUTPATH); + final JobConf nativeJob = getOldAPIJobconf(nativeConf, "nativeCombinerWithOldAPI", inputpath, nativeoutput); + RunningJob nativeRunning = JobClient.runJob(nativeJob); + + Counter nativeReduceGroups = nativeRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); + + final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration(); + normalConf.addResource(TestConstants.COMBINER_CONF_PATH); + final String normaloutput = normalConf.get(TestConstants.OLDAPI_NORMAL_TEST_COMBINER_OUTPUTPATH); + final JobConf normalJob = getOldAPIJobconf(normalConf, "normalCombinerWithOldAPI", inputpath, normaloutput); + + RunningJob normalRunning = JobClient.runJob(normalJob); + Counter normalReduceGroups = normalRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); + + final boolean compareRet = ResultVerifier.verify(nativeoutput, normaloutput); + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + + assertEquals("The input reduce record count must be same", nativeReduceGroups.getValue(), normalReduceGroups.getValue()); + } + + @Before + public void startUp() throws Exception { + final ScenarioConfiguration conf = new ScenarioConfiguration(); + conf.addcombinerConf(); + this.fs = FileSystem.get(conf); + this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY, + TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount"; + + if (!fs.exists(new Path(inputpath))) { + new TestInputFile(conf.getInt("nativetask.combiner.wordcount.filesize", 1000000), Text.class.getName(), + Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a')); + } + } + + private static JobConf getOldAPIJobconf(Configuration configuration, String name, String input, String output) + throws Exception { + final JobConf jobConf = new JobConf(configuration); + final FileSystem fs = FileSystem.get(configuration); + if (fs.exists(new Path(output))) { + fs.delete(new Path(output), true); + } + fs.close(); + jobConf.setJobName(name); + jobConf.setOutputKeyClass(Text.class); + jobConf.setOutputValueClass(IntWritable.class); + jobConf.setMapperClass(WordCountWithOldAPI.TokenizerMapperWithOldAPI.class); + jobConf.setCombinerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class); + jobConf.setReducerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class); + + jobConf.setInputFormat(SequenceFileInputFormat.class); + jobConf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(jobConf, new Path(input)); + FileOutputFormat.setOutputPath(jobConf, new Path(output)); + return jobConf; + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.combinertest; + +import java.io.IOException; +import java.util.StringTokenizer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +public class WordCount { + + private static Log LOG = LogFactory.getLog(WordCount.class); + + public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { + + private final static IntWritable one = new IntWritable(1); + private final Text word = new Text(); + + @Override + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + final StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } + + public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + private final IntWritable result = new IntWritable(); + + @Override + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, + InterruptedException { + int sum = 0; + for (final IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } + + public static void main(String[] args) throws Exception { + final Configuration conf = new Configuration(); + final String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length != 2) { + System.err.println("Usage: wordcount <in> <out>"); + System.exit(2); + } + final Job job = new Job(conf, conf.get(MRJobConfig.JOB_NAME, "word count")); + job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, new Path(otherArgs[0])); + FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.combinertest; + +import java.io.IOException; +import java.util.Iterator; +import java.util.StringTokenizer; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +public class WordCountWithOldAPI { + + public static class TokenizerMapperWithOldAPI extends MapReduceBase implements + Mapper<Object, Text, Text, IntWritable> { + private final static IntWritable one = new IntWritable(1); + private final Text word = new Text(); + + @Override + public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) + throws IOException { + final StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + output.collect(word, one); + } + } + } + + public static class IntSumReducerWithOldAPI extends MapReduceBase implements + Reducer<Text, IntWritable, Text, IntWritable> { + private final IntWritable result = new IntWritable(); + + @Override + public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, + Reporter reporter) throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + result.set(sum); + output.collect(key, result); + } + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.compresstest; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class CompressMapper { + public static final String inputFile = "./compress/input.txt"; + public static final String outputFileDir = "./compress/output/"; + + public static class TextCompressMapper extends Mapper<Text, Text, Text, Text> { + + @Override + protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static Job getCompressJob(String jobname, Configuration conf) { + Job job = null; + try { + job = new Job(conf, jobname + "-CompressMapperJob"); + job.setJarByClass(CompressMapper.class); + job.setMapperClass(TextCompressMapper.class); + job.setOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + final Path outputpath = new Path(outputFileDir + jobname); + // if output file exists ,delete it + final FileSystem hdfs = FileSystem.get(new ScenarioConfiguration()); + if (hdfs.exists(outputpath)) { + hdfs.delete(outputpath); + } + hdfs.close(); + job.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.addInputPath(job, new Path(inputFile)); + FileOutputFormat.setOutputPath(job, outputpath); + } catch (final Exception e) { + e.printStackTrace(); + } + return job; + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.compresstest; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile; +import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Before; +import org.junit.Test; + +public class CompressTest { + + @Test + public void testSnappyCompress() throws Exception { + final Configuration conf = ScenarioConfiguration.getNativeConfiguration(); + conf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH); + final Job job = CompressMapper.getCompressJob("nativesnappy", conf); + job.waitForCompletion(true); + + final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration(); + hadoopconf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH); + final Job hadoopjob = CompressMapper.getCompressJob("hadoopsnappy", hadoopconf); + hadoopjob.waitForCompletion(true); + + final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativesnappy", + CompressMapper.outputFileDir + "hadoopsnappy"); + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + } + + @Test + public void testGzipCompress() throws Exception { + final Configuration conf = ScenarioConfiguration.getNativeConfiguration(); + conf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH); + final Job job = CompressMapper.getCompressJob("nativegzip", conf); + job.waitForCompletion(true); + + final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration(); + hadoopconf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH); + final Job hadoopjob = CompressMapper.getCompressJob("hadoopgzip", hadoopconf); + hadoopjob.waitForCompletion(true); + + final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativegzip", + CompressMapper.outputFileDir + "hadoopgzip"); + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + } + + @Test + public void testBzip2Compress() throws Exception { + final Configuration nativeconf = ScenarioConfiguration.getNativeConfiguration(); + nativeconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH); + final Job nativejob = CompressMapper.getCompressJob("nativebzip2", nativeconf); + nativejob.waitForCompletion(true); + + final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration(); + hadoopconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH); + final Job hadoopjob = CompressMapper.getCompressJob("hadoopbzip2", hadoopconf); + hadoopjob.waitForCompletion(true); + + final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativebzip2", + CompressMapper.outputFileDir + "hadoopbzip2"); + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + } + + @Test + public void testLz4Compress() throws Exception { + final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration(); + nativeConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH); + final Job nativeJob = CompressMapper.getCompressJob("nativelz4", nativeConf); + nativeJob.waitForCompletion(true); + + final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration(); + hadoopConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH); + final Job hadoopJob = CompressMapper.getCompressJob("hadooplz4", hadoopConf); + hadoopJob.waitForCompletion(true); + final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativelz4", + CompressMapper.outputFileDir + "hadooplz4"); + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + } + + @Test + public void testDefaultCompress() throws Exception { + final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration(); + nativeConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH); + final Job nativeJob = CompressMapper.getCompressJob("nativedefault", nativeConf); + nativeJob.waitForCompletion(true); + + final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration(); + hadoopConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH); + final Job hadoopJob = CompressMapper.getCompressJob("hadoopdefault", hadoopConf); + hadoopJob.waitForCompletion(true); + final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativedefault", + CompressMapper.outputFileDir + "hadoopdefault"); + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + } + + @Before + public void startUp() throws Exception { + final ScenarioConfiguration conf = new ScenarioConfiguration(); + final FileSystem fs = FileSystem.get(conf); + final Path path = new Path(CompressMapper.inputFile); + fs.delete(path); + if (!fs.exists(path)) { + new TestInputFile(ScenarioConfiguration.getNormalConfiguration().getInt( + TestConstants.NATIVETASK_COMPRESS_FILESIZE, 100000), + Text.class.getName(), Text.class.getName(), conf) + .createSequenceTestFile(CompressMapper.inputFile); + + } + fs.close(); + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.kvtest; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Reducer; + +public class HashSumReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, IntWritable> { + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(os); + + @Override + public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException { + int hashSum = 0; + for (final VTYPE val : values) { + if (val instanceof Writable) { + os.reset(); + ((Writable) val).write(dos); + final int hash = Arrays.hashCode(os.toByteArray()); + hashSum += hash; + } + } + + context.write(key, new IntWritable(hashSum)); + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.kvtest; + +import java.io.IOException; +import java.util.zip.CRC32; + +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class KVJob { + public static final String INPUTPATH = "nativetask.kvtest.inputfile.path"; + public static final String OUTPUTPATH = "nativetask.kvtest.outputfile.path"; + Job job = null; + + public static class ValueMapper<KTYPE, VTYPE> extends Mapper<KTYPE, VTYPE, KTYPE, VTYPE> { + @Override + public void map(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class KVMReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> { + public void reduce(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class KVReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> { + + @Override + public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException { + long resultlong = 0;// 8 bytes match BytesFactory.fromBytes function + final CRC32 crc32 = new CRC32(); + for (final VTYPE val : values) { + crc32.reset(); + crc32.update(BytesFactory.toBytes(val)); + resultlong += crc32.getValue(); + } + final VTYPE V = null; + context.write(key, (VTYPE) BytesFactory.newObject(Longs.toByteArray(resultlong), V.getClass().getName())); + } + } + + public KVJob(String jobname, Configuration conf, Class<?> keyclass, Class<?> valueclass, String inputpath, + String outputpath) throws Exception { + job = new Job(conf, jobname); + job.setJarByClass(KVJob.class); + job.setMapperClass(KVJob.ValueMapper.class); + job.setOutputKeyClass(keyclass); + job.setMapOutputValueClass(valueclass); + + if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) { + final FileSystem fs = FileSystem.get(conf); + fs.delete(new Path(inputpath), true); + fs.close(); + final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get( + TestConstants.FILESIZE_KEY, "1000")), + keyclass.getName(), valueclass.getName(), conf); + testfile.createSequenceTestFile(inputpath); + + } + job.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.addInputPath(job, new Path(inputpath)); + FileOutputFormat.setOutputPath(job, new Path(outputpath)); + } + + public void runJob() throws Exception { + + job.waitForCompletion(true); + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.kvtest; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class KVTest { + private static Class<?>[] keyclasses = null; + private static Class<?>[] valueclasses = null; + private static String[] keyclassNames = null; + private static String[] valueclassNames = null; + + private static Configuration nativekvtestconf = ScenarioConfiguration.getNativeConfiguration(); + private static Configuration hadoopkvtestconf = ScenarioConfiguration.getNormalConfiguration(); + static { + nativekvtestconf.addResource(TestConstants.KVTEST_CONF_PATH); + hadoopkvtestconf.addResource(TestConstants.KVTEST_CONF_PATH); + } + + @Parameters(name = "key:{0}\nvalue:{1}") + public static Iterable<Class<?>[]> data() { + final String valueclassesStr = nativekvtestconf + .get(TestConstants.NATIVETASK_KVTEST_VALUECLASSES); + System.out.println(valueclassesStr); + valueclassNames = valueclassesStr.replaceAll("\\s", "").split(";");// delete + // " " + final ArrayList<Class<?>> tmpvalueclasses = new ArrayList<Class<?>>(); + for (int i = 0; i < valueclassNames.length; i++) { + try { + if (valueclassNames[i].equals("")) { + continue; + } + tmpvalueclasses.add(Class.forName(valueclassNames[i])); + } catch (final ClassNotFoundException e) { + e.printStackTrace(); + } + } + valueclasses = tmpvalueclasses.toArray(new Class[tmpvalueclasses.size()]); + final String keyclassesStr = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_KEYCLASSES); + System.out.println(keyclassesStr); + keyclassNames = keyclassesStr.replaceAll("\\s", "").split(";");// delete + // " " + final ArrayList<Class<?>> tmpkeyclasses = new ArrayList<Class<?>>(); + for (int i = 0; i < keyclassNames.length; i++) { + try { + if (keyclassNames[i].equals("")) { + continue; + } + tmpkeyclasses.add(Class.forName(keyclassNames[i])); + } catch (final ClassNotFoundException e) { + e.printStackTrace(); + } + } + keyclasses = tmpkeyclasses.toArray(new Class[tmpkeyclasses.size()]); + final Class<?>[][] kvgroup = new Class<?>[keyclassNames.length * valueclassNames.length][2]; + for (int i = 0; i < keyclassNames.length; i++) { + final int tmpindex = i * valueclassNames.length; + for (int j = 0; j < valueclassNames.length; j++) { + kvgroup[tmpindex + j][0] = keyclasses[i]; + kvgroup[tmpindex + j][1] = valueclasses[j]; + } + } + return Arrays.asList(kvgroup); + } + + private final Class<?> keyclass; + private final Class<?> valueclass; + + public KVTest(Class<?> keyclass, Class<?> valueclass) { + this.keyclass = keyclass; + this.valueclass = valueclass; + + } + + @Test + public void testKVCompability() { + + try { + final String nativeoutput = this.runNativeTest( + "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass); + final String normaloutput = this.runNormalTest( + "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass); + final boolean compareRet = ResultVerifier.verify(normaloutput, nativeoutput); + final String input = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/" + + keyclass.getName() + + "/" + valueclass.getName(); + if(compareRet){ + final FileSystem fs = FileSystem.get(hadoopkvtestconf); + fs.delete(new Path(nativeoutput), true); + fs.delete(new Path(normaloutput), true); + fs.delete(new Path(input), true); + fs.close(); + } + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + } catch (final IOException e) { + assertEquals("test run exception:", null, e); + } catch (final Exception e) { + assertEquals("test run exception:", null, e); + } + } + + @Before + public void startUp() { + + } + + private String runNativeTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException { + final String inputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/" + + keyclass.getName() + + "/" + valueclass.getName(); + final String outputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/" + + keyclass.getName() + "/" + valueclass.getName(); + // if output file exists ,then delete it + final FileSystem fs = FileSystem.get(nativekvtestconf); + fs.delete(new Path(outputpath)); + fs.close(); + nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true"); + try { + final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass, valueclass, inputpath, outputpath); + keyJob.runJob(); + } catch (final Exception e) { + return "native testcase run time error."; + } + return outputpath; + } + + private String runNormalTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException { + final String inputpath = hadoopkvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/" + + keyclass.getName() + + "/" + valueclass.getName(); + final String outputpath = hadoopkvtestconf + .get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR) + + "/" + + keyclass.getName() + "/" + valueclass.getName(); + // if output file exists ,then delete it + final FileSystem fs = FileSystem.get(hadoopkvtestconf); + fs.delete(new Path(outputpath)); + fs.close(); + hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false"); + try { + final KVJob keyJob = new KVJob(jobname, hadoopkvtestconf, keyclass, valueclass, inputpath, outputpath); + keyJob.runJob(); + } catch (final Exception e) { + return "normal testcase run time error."; + } + return outputpath; + } + +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.kvtest; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.junit.Test; + +public class LargeKVTest { + + @Test + public void testKeySize() { + runKVSizeTests(Text.class, IntWritable.class); + } + + @Test + public void testValueSize() { + runKVSizeTests(IntWritable.class, Text.class); + } + + private static Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration(); + private static Configuration normalConf = ScenarioConfiguration.getNormalConfiguration(); + static { + nativeConf.addResource(TestConstants.KVTEST_CONF_PATH); + nativeConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true"); + normalConf.addResource(TestConstants.KVTEST_CONF_PATH); + normalConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false"); + } + + public void runKVSizeTests(Class<?> keyClass, Class<?> valueClass) { + if (!keyClass.equals(Text.class) && !valueClass.equals(Text.class)) { + return; + } + final int deafult_KVSize_Maximum = 1 << 22; // 4M + final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST, + deafult_KVSize_Maximum); + try { + + for (int i = 65536; i <= KVSize_Maximu; i *= 4) { + int min = i / 4; + int max = i; + nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min)); + nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max)); + normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min)); + normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max)); + + System.out.println("===KV Size Test: min size: " + min + ", max size: " + max + ", keyClass: " + + keyClass.getName() + ", valueClass: " + valueClass.getName()); + + final String nativeOutPut = runNativeLargeKVTest("Test Large Value Size:" + String.valueOf(i), keyClass, + valueClass, nativeConf); + final String normalOutPut = this.runNormalLargeKVTest("Test Large Key Size:" + String.valueOf(i), keyClass, + valueClass, normalConf); + final boolean compareRet = ResultVerifier.verify(normalOutPut, nativeOutPut); + final String reason = "keytype: " + keyClass.getName() + ", valuetype: " + valueClass.getName() + + ", failed with " + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min + + ", max size: " + max + ", normal out: " + normalOutPut + ", native Out: " + nativeOutPut; + assertEquals(reason, true, compareRet); + } + } catch (final Exception e) { + // TODO: handle exception + // assertEquals("test run exception:", null, e); + e.printStackTrace(); + } + } + + private String runNativeLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf) + throws Exception { + final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName() + + "/" + valueclass.getName(); + final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/LargeKV/" + keyclass.getName() + + "/" + valueclass.getName(); + // if output file exists ,then delete it + final FileSystem fs = FileSystem.get(conf); + fs.delete(new Path(outputpath), true); + fs.close(); + try { + final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath); + keyJob.runJob(); + } catch (final Exception e) { + return "normal testcase run time error."; + } + return outputpath; + } + + private String runNormalLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf) + throws IOException { + final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName() + + "/" + valueclass.getName(); + final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR) + "/LargeKV/" + + keyclass.getName() + "/" + valueclass.getName(); + // if output file exists ,then delete it + final FileSystem fs = FileSystem.get(conf); + fs.delete(new Path(outputpath), true); + fs.close(); + try { + final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath); + keyJob.runJob(); + } catch (final Exception e) { + return "normal testcase run time error."; + } + return outputpath; + } +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.kvtest; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.VIntWritable; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; + + +public class TestInputFile { + + public static class KVSizeScope { + private static final int DefaultMinNum = 1; + private static final int DefaultMaxNum = 64; + + public int minBytesNum; + public int maxBytesNum; + + public KVSizeScope() { + this.minBytesNum = DefaultMinNum; + this.maxBytesNum = DefaultMaxNum; + } + + public KVSizeScope(int min, int max) { + this.minBytesNum = min; + this.maxBytesNum = max; + } + } + + private static HashMap<String, KVSizeScope> map = new HashMap<String, KVSizeScope>(); + + private byte[] databuf = null; + private final String keyClsName, valueClsName; + private int filesize = 0; + private int keyMaxBytesNum, keyMinBytesNum; + private int valueMaxBytesNum, valueMinBytesNum; + private SequenceFile.Writer writer = null; + Random r = new Random(); + public static final int DATABUFSIZE = 1 << 22; // 4M + + private enum State { + KEY, VALUE + }; + + static { + map.put(BooleanWritable.class.getName(), new KVSizeScope(1, 1)); + map.put(DoubleWritable.class.getName(), new KVSizeScope(8, 8)); + map.put(FloatWritable.class.getName(), new KVSizeScope(4, 4)); + map.put(VLongWritable.class.getName(), new KVSizeScope(8, 8)); + map.put(ByteWritable.class.getName(), new KVSizeScope(1, 1)); + map.put(LongWritable.class.getName(), new KVSizeScope(8, 8)); + map.put(VIntWritable.class.getName(), new KVSizeScope(4, 4)); + map.put(IntWritable.class.getName(), new KVSizeScope(4, 4)); + } + + public TestInputFile(int filesize, String keytype, String valuetype, Configuration conf) throws Exception { + this.filesize = filesize; + this.databuf = new byte[DATABUFSIZE]; + this.keyClsName = keytype; + this.valueClsName = valuetype; + final int defaultMinBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MIN, 1); + final int defaultMaxBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX, 64); + + if (map.get(keytype) != null) { + keyMinBytesNum = map.get(keytype).minBytesNum; + keyMaxBytesNum = map.get(keytype).maxBytesNum; + } else { + keyMinBytesNum = defaultMinBytes; + keyMaxBytesNum = defaultMaxBytes; + } + + if (map.get(valuetype) != null) { + valueMinBytesNum = map.get(valuetype).minBytesNum; + valueMaxBytesNum = map.get(valuetype).maxBytesNum; + } else { + valueMinBytesNum = defaultMinBytes; + valueMaxBytesNum = defaultMaxBytes; + } + } + + public void createSequenceTestFile(String filepath) throws Exception { + int FULL_BYTE_SPACE = 256; + createSequenceTestFile(filepath, FULL_BYTE_SPACE); + } + + public void createSequenceTestFile(String filepath, int base) throws Exception { + createSequenceTestFile(filepath, base, (byte)0); + } + + public void createSequenceTestFile(String filepath, int base, byte start) throws Exception { + System.out.println("create file " + filepath); + System.out.println(keyClsName + " " + valueClsName); + Class<?> tmpkeycls, tmpvaluecls; + try { + tmpkeycls = Class.forName(keyClsName); + } catch (final ClassNotFoundException e) { + throw new Exception("key class not found: ", e); + } + try { + tmpvaluecls = Class.forName(valueClsName); + } catch (final ClassNotFoundException e) { + throw new Exception("key class not found: ", e); + } + try { + final Path outputfilepath = new Path(filepath); + final ScenarioConfiguration conf= new ScenarioConfiguration(); + final FileSystem hdfs = outputfilepath.getFileSystem(conf); + writer = new SequenceFile.Writer(hdfs, conf, outputfilepath, tmpkeycls, tmpvaluecls); + } catch (final Exception e) { + e.printStackTrace(); + } + + int tmpfilesize = this.filesize; + while (tmpfilesize > DATABUFSIZE) { + nextRandomBytes(databuf, base, start); + final int size = flushBuf(DATABUFSIZE); + tmpfilesize -= size; + } + nextRandomBytes(databuf, base, start); + flushBuf(tmpfilesize); + + if (writer != null) { + IOUtils.closeStream(writer); + } else { + throw new Exception("no writer to create sequenceTestFile!"); + } + } + + private void nextRandomBytes(byte[] buf, int base) { + nextRandomBytes(buf, base, (byte)0); + } + + private void nextRandomBytes(byte[] buf, int base, byte start) { + r.nextBytes(buf); + for (int i = 0; i < buf.length; i++) { + buf[i] = (byte) ((buf[i] & 0xFF) % base + start); + } + } + + private int flushBuf(int buflen) throws Exception { + final Random r = new Random(); + int keybytesnum = 0; + int valuebytesnum = 0; + int offset = 0; + + while (offset < buflen) { + final int remains = buflen - offset; + keybytesnum = keyMaxBytesNum; + if (keyMaxBytesNum != keyMinBytesNum) { + keybytesnum = keyMinBytesNum + r.nextInt(keyMaxBytesNum - keyMinBytesNum); + } + + valuebytesnum = valueMaxBytesNum; + if (valueMaxBytesNum != valueMinBytesNum) { + valuebytesnum = valueMinBytesNum + r.nextInt(valueMaxBytesNum - valueMinBytesNum); + } + + if (keybytesnum + valuebytesnum > remains) { + break; + } + + final byte[] key = new byte[keybytesnum]; + final byte[] value = new byte[valuebytesnum]; + + System.arraycopy(databuf, offset, key, 0, keybytesnum); + offset += keybytesnum; + + System.arraycopy(databuf, offset, value, 0, valuebytesnum); + offset += valuebytesnum; + + try { + writer.append(BytesFactory.newObject(key, this.keyClsName), BytesFactory.newObject(value, this.valueClsName)); + } catch (final IOException e) { + e.printStackTrace(); + throw new Exception("sequence file create failed", e); + } + } + return offset; + } + +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.nonsorttest; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile; +import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier; +import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; +import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.junit.Before; +import org.junit.Test; + +public class NonSortTest { + + @Test + public void nonSortTest() throws Exception { + Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration(); + nativeConf.addResource(TestConstants.NONSORT_TEST_CONF); + nativeConf.set(TestConstants.NATIVETASK_MAP_OUTPUT_SORT, "false"); + String inputpath = nativeConf.get(TestConstants.NONSORT_TEST_INPUTDIR); + String outputpath = nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT); + final Job nativeNonSort = getJob(nativeConf, "NativeNonSort", inputpath, outputpath); + nativeNonSort.waitForCompletion(true); + + Configuration normalConf = ScenarioConfiguration.getNormalConfiguration(); + normalConf.addResource(TestConstants.NONSORT_TEST_CONF); + inputpath = normalConf.get(TestConstants.NONSORT_TEST_INPUTDIR); + outputpath = normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT); + final Job hadoopWithSort = getJob(normalConf, "NormalJob", inputpath, outputpath); + hadoopWithSort.waitForCompletion(true); + + final boolean compareRet = ResultVerifier.verify(nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT), + normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT)); + assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + } + + @Before + public void startUp() throws Exception { + final ScenarioConfiguration configuration = new ScenarioConfiguration(); + configuration.addNonSortTestConf(); + final FileSystem fs = FileSystem.get(configuration); + final Path path = new Path(configuration.get(TestConstants.NONSORT_TEST_INPUTDIR)); + if (!fs.exists(path)) { + new TestInputFile(configuration.getInt("nativetask.nonsorttest.filesize", 10000000), Text.class.getName(), + Text.class.getName(), configuration).createSequenceTestFile(path.toString()); + } + fs.close(); + } + + private Job getJob(Configuration conf, String jobName, String inputpath, String outputpath) throws IOException { + final FileSystem fs = FileSystem.get(conf); + if (fs.exists(new Path(outputpath))) { + fs.delete(new Path(outputpath), true); + } + fs.close(); + final Job job = new Job(conf, jobName); + job.setJarByClass(NonSortTestMR.class); + job.setMapperClass(NonSortTestMR.Map.class); + job.setReducerClass(NonSortTestMR.KeyHashSumReduce.class); + job.setOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputValueClass(LongWritable.class); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.addInputPath(job, new Path(inputpath)); + FileOutputFormat.setOutputPath(job, new Path(outputpath)); + return job; + } + +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.nonsorttest; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.StringTokenizer; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; + +public class NonSortTestMR { + + public static class Map extends Mapper<Object, Text, Text, IntWritable> { + private final static IntWritable one = new IntWritable(1); + private final Text word = new Text(); + + @Override + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + final String line = value.toString(); + final StringTokenizer tokenizer = new StringTokenizer(line); + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + context.write(word, one); + } + } + } + + public static class KeyHashSumReduce extends Reducer<Text, IntWritable, Text, LongWritable> { + long sum = 0; + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(os); + + @Override + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, + InterruptedException { + for (final IntWritable val : values) { + os.reset(); + key.write(dos); + final int hash = Arrays.hashCode(os.toByteArray()); + sum += hash; + } + } + + @Override + public void cleanup(Context context) throws IOException, InterruptedException { + context.write(new Text("NonSortTest"), new LongWritable(sum)); + } + } + +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java?rev=1613004&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java Thu Jul 24 06:17:33 2014 @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.testutil; + +import java.util.Random; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.VIntWritable; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.mapred.nativetask.util.BytesUtil; + + +public class BytesFactory { + public static Random r = new Random(); + + public static Object newObject(byte[] seed, String className) { + r.setSeed(seed.hashCode()); + if (className.equals(IntWritable.class.getName())) { + return new IntWritable(Ints.fromByteArray(seed)); + } else if (className.equals(FloatWritable.class.getName())) { + return new FloatWritable(r.nextFloat()); + } else if (className.equals(DoubleWritable.class.getName())) { + return new DoubleWritable(r.nextDouble()); + } else if (className.equals(LongWritable.class.getName())) { + return new LongWritable(Longs.fromByteArray(seed)); + } else if (className.equals(VIntWritable.class.getName())) { + return new VIntWritable(Ints.fromByteArray(seed)); + } else if (className.equals(VLongWritable.class.getName())) { + return new VLongWritable(Longs.fromByteArray(seed)); + } else if (className.equals(BooleanWritable.class.getName())) { + return new BooleanWritable(seed[0] % 2 == 1 ? true : false); + } else if (className.equals(Text.class.getName())) { + return new Text(BytesUtil.toStringBinary(seed)); + } else if (className.equals(ByteWritable.class.getName())) { + return new ByteWritable(seed.length > 0 ? seed[0] : 0); + } else if (className.equals(BytesWritable.class.getName())) { + return new BytesWritable(seed); + } else if (className.equals(UTF8.class.getName())) { + return new UTF8(BytesUtil.toStringBinary(seed)); + } else if (className.equals(MockValueClass.class.getName())) { + return new MockValueClass(seed); + } else { + return null; + } + } + + + public static <VTYPE> byte[] fromBytes(byte[] bytes) throws Exception { + throw new Exception("Not supported"); + } + + public static <VTYPE> byte[] toBytes(VTYPE obj) { + final String className = obj.getClass().getName(); + if (className.equals(IntWritable.class.getName())) { + return Ints.toByteArray(((IntWritable) obj).get()); + } else if (className.equals(FloatWritable.class.getName())) { + return BytesUtil.toBytes(((FloatWritable) obj).get()); + } else if (className.equals(DoubleWritable.class.getName())) { + return BytesUtil.toBytes(((DoubleWritable) obj).get()); + } else if (className.equals(LongWritable.class.getName())) { + return Longs.toByteArray(((LongWritable) obj).get()); + } else if (className.equals(VIntWritable.class.getName())) { + return Ints.toByteArray(((VIntWritable) obj).get()); + } else if (className.equals(VLongWritable.class.getName())) { + return Longs.toByteArray(((VLongWritable) obj).get()); + } else if (className.equals(BooleanWritable.class.getName())) { + return BytesUtil.toBytes(((BooleanWritable) obj).get()); + } else if (className.equals(Text.class.getName())) { + return ((Text)obj).copyBytes(); + } else if (className.equals(ByteWritable.class.getName())) { + return Ints.toByteArray((int) ((ByteWritable) obj).get()); + } else if (className.equals(BytesWritable.class.getName())) { + // TODO: copyBytes instead? + return ((BytesWritable) obj).getBytes(); + } else { + return new byte[0]; + } + } +}