MAPREDUCE-6067. native-task: fix some counter issues (Binglin Chang)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00322161
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00322161
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00322161

Branch: refs/heads/HDFS-6584
Commit: 00322161b5d4d54770c2f0823e036537edecf5bf
Parents: 1081d9c
Author: Binglin Chang <bch...@apache.org>
Authored: Fri Sep 5 14:20:39 2014 +0800
Committer: Binglin Chang <bch...@apache.org>
Committed: Fri Sep 5 14:20:39 2014 +0800

----------------------------------------------------------------------
 .../CHANGES.MAPREDUCE-2841.txt                  |  1 +
 .../NativeMapOutputCollectorDelegator.java      |  4 +
 .../mapred/nativetask/StatusReportChecker.java  |  5 --
 .../src/handler/MCollectorOutputHandler.cc      |  3 +-
 .../src/main/native/src/lib/IFile.cc            |  6 +-
 .../src/main/native/src/lib/IFile.h             |  3 +-
 .../main/native/src/lib/MapOutputCollector.cc   | 85 ++++++++++++++------
 .../main/native/src/lib/MapOutputCollector.h    |  8 +-
 .../src/main/native/src/lib/Merge.cc            | 24 ------
 .../src/main/native/src/lib/PartitionBucket.h   |  2 -
 .../src/main/native/src/lib/TaskCounters.cc     | 10 +--
 .../src/main/native/src/lib/TaskCounters.h      |  8 --
 .../nativetask/combinertest/CombinerTest.java   |  8 +-
 .../combinertest/LargeKVCombinerTest.java       |  6 +-
 .../nativetask/compresstest/CompressTest.java   |  3 +
 .../hadoop/mapred/nativetask/kvtest/KVTest.java | 74 +++++++----------
 .../mapred/nativetask/kvtest/LargeKVTest.java   | 82 +++++++++----------
 .../nativetask/nonsorttest/NonSortTest.java     |  6 +-
 .../nativetask/testutil/ResultVerifier.java     | 24 +++++-
 19 files changed, 179 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt 
b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
index 269a2f6..279b960 100644
--- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
+++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
@@ -20,3 +20,4 @@ MAPREDUCE-6054. native-task: Speed up tests (todd)
 MAPREDUCE-6058. native-task: KVTest and LargeKVTest should check mr job is 
sucessful (Binglin Chang)
 MAPREDUCE-6056. native-task: move system test working dir to target dir and 
cleanup test config xml files (Manu Zhang via bchang)
 MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc 
cleanup (todd)
+MAPREDUCE-6067. native-task: fix some counter issues (Binglin Chang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
index 224b95b..828d7df 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
 import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.util.QuickSort;
 
 /**
@@ -46,6 +47,7 @@ public class NativeMapOutputCollectorDelegator<K, V> 
implements MapOutputCollect
   private JobConf job;
   private NativeCollectorOnlyHandler<K, V> handler;
 
+  private Context context;
   private StatusReportChecker updater;
 
   @Override
@@ -58,6 +60,7 @@ public class NativeMapOutputCollectorDelegator<K, V> 
implements MapOutputCollect
     handler.close();
     if (null != updater) {
       updater.stop();
+      NativeRuntime.reportStatus(context.getReporter());
     }
   }
 
@@ -69,6 +72,7 @@ public class NativeMapOutputCollectorDelegator<K, V> 
implements MapOutputCollect
   @SuppressWarnings("unchecked")
   @Override
   public void init(Context context) throws IOException, ClassNotFoundException 
{
+    this.context = context;
     this.job = context.getJobConf();
 
     Platforms.init(job);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
index f152074..1e76d39 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
@@ -76,12 +76,7 @@ class StatusReportChecker implements Runnable {
     reporter.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
     reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
     reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
-    reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
-    reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
-    reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
     reporter.getCounter(TaskCounter.SPILLED_RECORDS);
-    reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
-    reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
   }
 
   public synchronized void start() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc
index 7967034..4df5e64 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc
@@ -18,6 +18,7 @@
 
 #include "commons.h"
 #include "util/StringUtil.h"
+#include "lib/TaskCounters.h"
 #include "MCollectorOutputHandler.h"
 #include "NativeObjectFactory.h"
 #include "MapOutputCollector.h"
@@ -94,4 +95,4 @@ KVBuffer * MCollectorOutputHandler::allocateKVBuffer(uint32_t 
partitionId, uint3
   return dest;
 }
 
-}      //namespace
+} // namespace NativeTask

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc
index d11d823..4a5edda 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc
@@ -98,7 +98,7 @@ IFileWriter * IFileWriter::create(const std::string & 
filepath, const MapOutputS
 IFileWriter::IFileWriter(OutputStream * stream, ChecksumType checksumType, 
KeyValueType ktype,
     KeyValueType vtype, const string & codec, Counter * counter, bool 
deleteTargetStream)
     : _stream(stream), _dest(NULL), _checksumType(checksumType), 
_kType(ktype), _vType(vtype),
-        _codec(codec), _recordCounter(counter), 
_deleteTargetStream(deleteTargetStream) {
+        _codec(codec), _recordCounter(counter), _recordCount(0), 
_deleteTargetStream(deleteTargetStream) {
   _dest = new ChecksumOutputStream(_stream, _checksumType);
   _appendBuffer.init(128 * 1024, _dest, _codec);
 }
@@ -184,6 +184,7 @@ void IFileWriter::write(const char * key, uint32_t keyLen, 
const char * value, u
   if (NULL != _recordCounter) {
     _recordCounter->increase();
   }
+  _recordCount++;
 
   switch (_vType) {
   case TextType:
@@ -214,7 +215,7 @@ SingleSpillInfo * IFileWriter::getSpillInfo() {
       _codec);
 }
 
-void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) {
+void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset, 
uint64_t & recordCount) {
   if (_spillFileSegments.size() > 0) {
     offset = _spillFileSegments[_spillFileSegments.size() - 
1].uncompressedEndOffset;
     realOffset = _spillFileSegments[_spillFileSegments.size() - 
1].realEndOffset;
@@ -222,6 +223,7 @@ void IFileWriter::getStatistics(uint64_t & offset, uint64_t 
& realOffset) {
     offset = 0;
     realOffset = 0;
   }
+  recordCount = _recordCount;
 }
 
 } // namespace NativeTask

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h
index 76d6fbc..d86e98a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h
@@ -129,6 +129,7 @@ protected:
   AppendBuffer _appendBuffer;
   vector<IFileSegment> _spillFileSegments;
   Counter * _recordCounter;
+  uint64_t _recordCount;
 
   bool _deleteTargetStream;
 
@@ -153,7 +154,7 @@ public:
 
   SingleSpillInfo * getSpillInfo();
 
-  void getStatistics(uint64_t & offset, uint64_t & realOffset);
+  void getStatistics(uint64_t & offset, uint64_t & realOffset, uint64_t & 
recordCount);
 
   virtual void collect(const void * key, uint32_t keyLen, const void * value, 
uint32_t valueLen) {
     write((const char*)key, keyLen, (const char*)value, valueLen);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc
index d2b116d..d151f1f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc
@@ -76,9 +76,11 @@ void CombineRunnerWrapper::combine(CombineContext type, 
KVIterator * kvIterator,
 /////////////////////////////////////////////////////////////////
 
 MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, 
SpillOutputService * spillService)
-    : _config(NULL), _numPartitions(numberPartitions), _buckets(NULL), 
_keyComparator(NULL),
-        _combineRunner(NULL), _spilledRecords(NULL), 
_spillOutput(spillService),
-        _defaultBlockSize(0),  _pool(NULL) {
+    : _config(NULL), _numPartitions(numberPartitions), _buckets(NULL),
+      _keyComparator(NULL), _combineRunner(NULL),
+      _mapOutputRecords(NULL), _mapOutputBytes(NULL),
+      _mapOutputMaterializedBytes(NULL), _spilledRecords(NULL),
+      _spillOutput(spillService), _defaultBlockSize(0), _pool(NULL) {
   _pool = new MemoryPool();
 }
 
@@ -108,7 +110,7 @@ MapOutputCollector::~MapOutputCollector() {
 }
 
 void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t 
memoryCapacity,
-    ComparatorPtr keyComparator, Counter * spilledRecords, ICombineRunner * 
combiner) {
+    ComparatorPtr keyComparator, ICombineRunner * combiner) {
 
   this->_combineRunner = combiner;
 
@@ -128,7 +130,15 @@ void MapOutputCollector::init(uint32_t defaultBlockSize, 
uint32_t memoryCapacity
     _buckets[partitionId] = pb;
   }
 
-  _spilledRecords = spilledRecords;
+  _mapOutputRecords = NativeObjectFactory::GetCounter(
+      TaskCounters::TASK_COUNTER_GROUP, TaskCounters::MAP_OUTPUT_RECORDS);
+  _mapOutputBytes = NativeObjectFactory::GetCounter(
+      TaskCounters::TASK_COUNTER_GROUP, TaskCounters::MAP_OUTPUT_BYTES);
+  _mapOutputMaterializedBytes = NativeObjectFactory::GetCounter(
+      TaskCounters::TASK_COUNTER_GROUP,
+      TaskCounters::MAP_OUTPUT_MATERIALIZED_BYTES);
+  _spilledRecords = NativeObjectFactory::GetCounter(
+      TaskCounters::TASK_COUNTER_GROUP, TaskCounters::SPILLED_RECORDS);
 
   _collectTimer.reset();
 }
@@ -155,9 +165,6 @@ void MapOutputCollector::configure(Config * config) {
 
   ComparatorPtr comparator = getComparator(config, _spec);
 
-  Counter * spilledRecord = 
NativeObjectFactory::GetCounter(TaskCounters::TASK_COUNTER_GROUP,
-      TaskCounters::SPILLED_RECORDS);
-
   ICombineRunner * combiner = NULL;
   if (NULL != config->get(NATIVE_COMBINER)
       // config name for old api and new api
@@ -166,7 +173,7 @@ void MapOutputCollector::configure(Config * config) {
     combiner = new CombineRunnerWrapper(config, _spillOutput);
   }
 
-  init(defaultBlockSize, capacity, comparator, spilledRecord, combiner);
+  init(defaultBlockSize, capacity, comparator, combiner);
 }
 
 KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t 
kvlength) {
@@ -182,7 +189,7 @@ KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t 
partitionId, uint32_t k
     if (NULL == spillpath || spillpath->length() == 0) {
       THROW_EXCEPTION(IOException, "Illegal(empty) spill files path");
     } else {
-      middleSpill(*spillpath, "");
+      middleSpill(*spillpath, "", false);
       delete spillpath;
     }
 
@@ -193,6 +200,8 @@ KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t 
partitionId, uint32_t k
       THROW_EXCEPTION(OutOfMemoryException, "key/value pair larger than 
io.sort.mb");
     }
   }
+  _mapOutputRecords->increase();
+  _mapOutputBytes->increase(kvlength - KVBuffer::headerLength());
   return dest;
 }
 
@@ -272,10 +281,9 @@ void MapOutputCollector::sortPartitions(SortOrder 
orderType, SortAlgorithm sortT
 }
 
 void MapOutputCollector::middleSpill(const std::string & spillOutput,
-    const std::string & indexFilePath) {
+    const std::string & indexFilePath, bool final) {
 
   uint64_t collecttime = _collectTimer.now() - _collectTimer.last();
-  const uint64_t M = 1000000; //million
 
   if (spillOutput.empty()) {
     THROW_EXCEPTION(IOException, "MapOutputCollector: Spill file path empty");
@@ -293,10 +301,24 @@ void MapOutputCollector::middleSpill(const std::string & 
spillOutput,
     info->path = spillOutput;
     uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime;
 
-    LOG(
-        "[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: 
%s, id: %d, collect: %"PRIu64" ms, sort: %"PRIu64" ms, spill: %"PRIu64" ms, 
records: %"PRIu64", uncompressed total bytes: %"PRIu64", compressed total 
bytes: %"PRIu64"}",
-        info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, 
metrics.sortTime  / M, spillTime  / M,
-        metrics.recordCount, info->getEndPosition(), 
info->getRealEndPosition());
+    const uint64_t M = 1000000; //million
+    LOG("%s-spill: { id: %d, collect: %"PRIu64" ms, "
+        "in-memory sort: %"PRIu64" ms, in-memory records: %"PRIu64", "
+        "merge&spill: %"PRIu64" ms, uncompressed size: %"PRIu64", "
+        "real size: %"PRIu64" path: %s }",
+        final ? "Final" : "Mid",
+        _spillInfos.getSpillCount(),
+        collecttime / M,
+        metrics.sortTime  / M,
+        metrics.recordCount,
+        spillTime  / M,
+        info->getEndPosition(),
+        info->getRealEndPosition(),
+        spillOutput.c_str());
+
+    if (final) {
+      _mapOutputMaterializedBytes->increase(info->getRealEndPosition());
+    }
 
     if (indexFilePath.length() > 0) {
       info->writeSpillInfo(indexFilePath);
@@ -320,11 +342,8 @@ void MapOutputCollector::middleSpill(const std::string & 
spillOutput,
 void MapOutputCollector::finalSpill(const std::string & filepath,
     const std::string & idx_file_path) {
 
-  const uint64_t M = 1000000; //million
-  LOG("[MapOutputCollector::final_merge_and_spill] Spilling file path: %s", 
filepath.c_str());
-
   if (_spillInfos.getSpillCount() == 0) {
-    middleSpill(filepath, idx_file_path);
+    middleSpill(filepath, idx_file_path, true);
     return;
   }
 
@@ -339,16 +358,32 @@ void MapOutputCollector::finalSpill(const std::string & 
filepath,
 
   SortMetrics metrics;
   sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics);
-  LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: 
%"PRIu64" ms, records: %"PRIu64"}",
-      metrics.sortTime / M, metrics.recordCount);
 
   merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions));
 
   Timer timer;
   merger->merge();
-  LOG(
-      "[MapOutputCollector::final_merge_and_spill]  Merge and Spill:{spilled 
file id: %d, merge and spill time: %"PRIu64" ms}",
-      _spillInfos.getSpillCount(), (timer.now() - timer.last()) / M);
+
+  uint64_t outputSize;
+  uint64_t realOutputSize;
+  uint64_t recordCount;
+  writer->getStatistics(outputSize, realOutputSize, recordCount);
+
+  const uint64_t M = 1000000; //million
+  LOG("Final-merge-spill: { id: %d, in-memory sort: %"PRIu64" ms, "
+      "in-memory records: %"PRIu64", merge&spill: %"PRIu64" ms, "
+      "records: %"PRIu64", uncompressed size: %"PRIu64", "
+      "real size: %"PRIu64" path: %s }",
+      _spillInfos.getSpillCount(),
+      metrics.sortTime / M,
+      metrics.recordCount,
+      (timer.now() - timer.last()) / M,
+      recordCount,
+      outputSize,
+      realOutputSize,
+      filepath.c_str());
+
+  _mapOutputMaterializedBytes->increase(realOutputSize);
 
   delete merger;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h
index b7150ac..474b8b1 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h
@@ -85,7 +85,11 @@ private:
 
   ICombineRunner * _combineRunner;
 
+  Counter * _mapOutputRecords;
+  Counter * _mapOutputBytes;
+  Counter * _mapOutputMaterializedBytes;
   Counter * _spilledRecords;
+
   SpillOutputService * _spillOutput;
 
   uint32_t _defaultBlockSize;
@@ -118,7 +122,7 @@ public:
 
 private:
   void init(uint32_t maxBlockSize, uint32_t memory_capacity, ComparatorPtr 
keyComparator,
-      Counter * spilledRecord, ICombineRunner * combiner);
+      ICombineRunner * combiner);
 
   void reset();
 
@@ -149,7 +153,7 @@ private:
    * normal spill use options in _config
    * @param filepaths: spill file path
    */
-  void middleSpill(const std::string & spillOutput, const std::string & 
indexFilePath);
+  void middleSpill(const std::string & spillOutput, const std::string & 
indexFilePath, bool final);
 
   /**
    * final merge and/or spill use options in _config, and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
index 5434980..126b82d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
@@ -131,7 +131,6 @@ bool Merger::next(Buffer & key, Buffer & value) {
 }
 
 void Merger::merge() {
-  Timer timer;
   uint64_t total_record = 0;
   _heap.reserve(_entries.size());
   MergeEntryPtr * base = &(_heap[0]);
@@ -153,29 +152,6 @@ void Merger::merge() {
     }
     endPartition();
   }
-
-  uint64_t interval = (timer.now() - timer.last());
-  uint64_t M = 1000000; //1 million
-
-  uint64_t output_size;
-  uint64_t real_output_size;
-  _writer->getStatistics(output_size, real_output_size);
-
-  if (total_record != 0) {
-    LOG("[Merge] Merged segment#: %lu, record#: %"PRIu64", avg record size: 
%"PRIu64", uncompressed total bytes: %"PRIu64", compressed total bytes: 
%"PRIu64", time: %"PRIu64" ms",
-        _entries.size(),
-        total_record,
-        output_size / (total_record),
-        output_size,
-        real_output_size,
-        interval / M);
-  } else {
-    LOG("[Merge] Merged segments#, %lu, uncompressed total bytes: %"PRIu64", 
compressed total bytes: %"PRIu64", time: %"PRIu64" ms",
-        _entries.size(),
-        output_size,
-        real_output_size,
-        interval / M);
-  }
 }
 
 } // namespace NativeTask

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
index cf5e33b..3be52ff 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
@@ -117,8 +117,6 @@ public:
         memBlock = new MemoryBlock(buff, allocated);
         _memBlocks.push_back(memBlock);
         return memBlock->allocateKVBuffer(kvLength);
-      } else {
-        LOG("MemoryPool is full, fail to allocate new MemBlock, block size: 
%d, kv length: %d", expect, kvLength);
       }
     }
     return NULL;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
index 7aa7db8..39a7c43 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
@@ -22,22 +22,14 @@ namespace NativeTask {
 
 #define DEFINE_COUNTER(name) const char * TaskCounters::name = #name;
 
-const char * TaskCounters::TASK_COUNTER_GROUP = 
"org.apache.hadoop.mapred.Task$Counter";
+const char * TaskCounters::TASK_COUNTER_GROUP = 
"org.apache.hadoop.mapreduce.TaskCounter";
 
 DEFINE_COUNTER(MAP_INPUT_RECORDS)
 DEFINE_COUNTER(MAP_OUTPUT_RECORDS)
-DEFINE_COUNTER(MAP_SKIPPED_RECORDS)
-DEFINE_COUNTER(MAP_INPUT_BYTES)
 DEFINE_COUNTER(MAP_OUTPUT_BYTES)
 DEFINE_COUNTER(MAP_OUTPUT_MATERIALIZED_BYTES)
 DEFINE_COUNTER(COMBINE_INPUT_RECORDS)
 DEFINE_COUNTER(COMBINE_OUTPUT_RECORDS)
-DEFINE_COUNTER(REDUCE_INPUT_GROUPS)
-DEFINE_COUNTER(REDUCE_SHUFFLE_BYTES)
-DEFINE_COUNTER(REDUCE_INPUT_RECORDS)
-DEFINE_COUNTER(REDUCE_OUTPUT_RECORDS)
-DEFINE_COUNTER(REDUCE_SKIPPED_GROUPS)
-DEFINE_COUNTER(REDUCE_SKIPPED_RECORDS)
 DEFINE_COUNTER(SPILLED_RECORDS)
 
 const char * TaskCounters::FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
index 6afc207..23cedf9 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
@@ -27,18 +27,10 @@ public:
 
   static const char * MAP_INPUT_RECORDS;
   static const char * MAP_OUTPUT_RECORDS;
-  static const char * MAP_SKIPPED_RECORDS;
-  static const char * MAP_INPUT_BYTES;
   static const char * MAP_OUTPUT_BYTES;
   static const char * MAP_OUTPUT_MATERIALIZED_BYTES;
   static const char * COMBINE_INPUT_RECORDS;
   static const char * COMBINE_OUTPUT_RECORDS;
-  static const char * REDUCE_INPUT_GROUPS;
-  static const char * REDUCE_SHUFFLE_BYTES;
-  static const char * REDUCE_INPUT_RECORDS;
-  static const char * REDUCE_OUTPUT_RECORDS;
-  static const char * REDUCE_SKIPPED_GROUPS;
-  static const char * REDUCE_SKIPPED_RECORDS;
   static const char * SPILLED_RECORDS;
 
   static const char * FILESYSTEM_COUNTER_GROUP;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
index d7f05be..61b4356 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
@@ -63,15 +63,9 @@ public class CombinerTest {
     final Job normaljob = getJob("normalwordcount", commonConf, inputpath, 
hadoopoutputpath);
 
     assertTrue(nativejob.waitForCompletion(true));
-
-    Counter nativeReduceGroups = 
nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
-
     assertTrue(normaljob.waitForCompletion(true));
-    Counter normalReduceGroups = 
normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
-
     assertEquals(true, ResultVerifier.verify(nativeoutputpath, 
hadoopoutputpath));
-    assertEquals("Native Reduce reduce group counter should equal orignal 
reduce group counter",
-      nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+    ResultVerifier.verifyCounters(normaljob, nativejob, true);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
index 4ba4550..d054793 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
 import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.junit.AfterClass;
 import org.apache.hadoop.util.NativeCodeLoader;
 import org.junit.Assume;
@@ -87,10 +88,8 @@ public class LargeKVCombinerTest {
       final Job nativejob = CombinerTest.getJob("nativewordcount", nativeConf, 
inputPath, nativeOutputPath);
 
       assertTrue(nativejob.waitForCompletion(true));
-        Counter nativeReduceGroups = 
nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
 
       assertTrue(normaljob.waitForCompletion(true));
-        Counter normalReduceGroups = 
normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
 
       final boolean compareRet = ResultVerifier.verify(nativeOutputPath, 
hadoopOutputPath);
 
@@ -98,8 +97,7 @@ public class LargeKVCombinerTest {
         + ", max size: " + max + ", normal out: " + hadoopOutputPath + ", 
native Out: " + nativeOutputPath;
 
       assertEquals(reason, true, compareRet);
-//        assertEquals("Native Reduce reduce group counter should equal 
orignal reduce group counter",
-//            nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+      ResultVerifier.verifyCounters(normaljob, nativejob, true);
     }
     fs.close();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
index 3a03d29..b8f9dfc 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
@@ -69,6 +69,7 @@ public class CompressTest {
 
     final boolean compareRet = ResultVerifier.verify(nativeOutputPath, 
hadoopOutputPath);
     assertEquals("file compare result: if they are the same ,then return 
true", true, compareRet);
+    ResultVerifier.verifyCounters(hadoopjob, job);
   }
 
   @Test
@@ -91,6 +92,7 @@ public class CompressTest {
 
     final boolean compareRet = ResultVerifier.verify(nativeOutputPath, 
hadoopOutputPath);
     assertEquals("file compare result: if they are the same ,then return 
true", true, compareRet);
+    ResultVerifier.verifyCounters(hadoopjob, job);
   }
 
   @Test
@@ -112,6 +114,7 @@ public class CompressTest {
     assertTrue(hadoopJob.waitForCompletion(true));
     final boolean compareRet = ResultVerifier.verify(nativeOutputPath, 
hadoopOutputPath);
     assertEquals("file compare result: if they are the same ,then return 
true", true, compareRet);
+    ResultVerifier.verifyCounters(hadoopJob, nativeJob);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
index 784c14f..6b658ac 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
@@ -114,21 +114,34 @@ public class KVTest {
 
   @Test
   public void testKVCompability() throws Exception {
-    final String nativeoutput = this.runNativeTest(
-      "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), 
keyclass, valueclass);
-    final String normaloutput = this.runNormalTest(
-      "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), 
keyclass, valueclass);
-    final boolean compareRet = ResultVerifier.verify(normaloutput, 
nativeoutput);
-    final String input = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
-      + keyclass.getName() + "/" + valueclass.getName();
-    if(compareRet){
-      final FileSystem fs = FileSystem.get(hadoopkvtestconf);
-      fs.delete(new Path(nativeoutput), true);
-      fs.delete(new Path(normaloutput), true);
-      fs.delete(new Path(input), true);
-      fs.close();
-    }
-    assertEquals("file compare result: if they are the same ,then return 
true", true, compareRet);
+    final FileSystem fs = FileSystem.get(nativekvtestconf);
+    final String jobName = "Test:" + keyclass.getSimpleName() + "--"
+        + valueclass.getSimpleName();
+    final String inputPath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
+        + keyclass.getName() + "/" + valueclass.getName();
+    final String nativeOutputPath = 
TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR
+        + "/" + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    fs.delete(new Path(nativeOutputPath), true);
+    nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
+    final KVJob nativeJob = new KVJob(jobName, nativekvtestconf, keyclass,
+        valueclass, inputPath, nativeOutputPath);
+    assertTrue("job should complete successfully", nativeJob.runJob());
+
+    final String normalOutputPath = 
TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR
+        + "/" + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    fs.delete(new Path(normalOutputPath), true);
+    hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
+    final KVJob normalJob = new KVJob(jobName, hadoopkvtestconf, keyclass,
+        valueclass, inputPath, normalOutputPath);
+    assertTrue("job should complete successfully", normalJob.runJob());
+
+    final boolean compareRet = ResultVerifier.verify(normalOutputPath,
+        nativeOutputPath);
+    assertEquals("job output not the same", true, compareRet);
+    ResultVerifier.verifyCounters(normalJob.job, nativeJob.job);
+    fs.close();
   }
 
   @AfterClass
@@ -137,35 +150,4 @@ public class KVTest {
     fs.delete(new Path(TestConstants.NATIVETASK_KVTEST_DIR), true);
     fs.close();
   }
-
-  private String runNativeTest(String jobname, Class<?> keyclass, Class<?> 
valueclass) throws Exception {
-    final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
-      + keyclass.getName() + "/" + valueclass.getName();
-    final String outputpath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR 
+ "/"
-      + keyclass.getName() + "/" + valueclass.getName();
-    // if output file exists ,then delete it
-    final FileSystem fs = FileSystem.get(nativekvtestconf);
-    fs.delete(new Path(outputpath));
-    fs.close();
-    nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
-    final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass, 
valueclass, inputpath, outputpath);
-    assertTrue("job should complete successfully", keyJob.runJob());
-    return outputpath;
-  }
-
-  private String runNormalTest(String jobname, Class<?> keyclass, Class<?> 
valueclass) throws Exception {
-    final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
-        + keyclass.getName() + "/" + valueclass.getName();
-    final String outputpath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR 
+ "/"
-        + keyclass.getName() + "/" + valueclass.getName();
-    // if output file exists ,then delete it
-    final FileSystem fs = FileSystem.get(hadoopkvtestconf);
-    fs.delete(new Path(outputpath));
-    fs.close();
-    hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
-    final KVJob keyJob = new KVJob(jobname, hadoopkvtestconf, keyclass, 
valueclass, inputpath, outputpath);
-    assertTrue("job should complete successfully", keyJob.runJob());
-    return outputpath;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
index 0202549..5f1619e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
@@ -78,11 +78,13 @@ public class LargeKVTest {
     if (!keyClass.equals(Text.class) && !valueClass.equals(Text.class)) {
       return;
     }
-    final int deafult_KVSize_Maximum = 1 << 22; // 4M
-    final int KVSize_Maximu = 
normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
-        deafult_KVSize_Maximum);
+    final int deafultKVSizeMaximum = 1 << 22; // 4M
+    final int kvSizeMaximum = normalConf.getInt(
+        TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
+        deafultKVSizeMaximum);
+    final FileSystem fs = FileSystem.get(normalConf);
 
-    for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
+    for (int i = 65536; i <= kvSizeMaximum; i *= 4) {
       int min = i / 4;
       int max = i;
       nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
@@ -90,48 +92,40 @@ public class LargeKVTest {
       normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
       normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
 
-      LOG.info("===KV Size Test: min size: " + min + ", max size: " + max + ", 
keyClass: "
-        + keyClass.getName() + ", valueClass: " + valueClass.getName());
-
-      final String nativeOutPut = runNativeLargeKVTest("Test Large Value 
Size:" + String.valueOf(i), keyClass,
-        valueClass, nativeConf);
-      final String normalOutPut = this.runNormalLargeKVTest("Test Large Key 
Size:" + String.valueOf(i), keyClass,
-        valueClass, normalConf);
-      final boolean compareRet = ResultVerifier.verify(normalOutPut, 
nativeOutPut);
-      final String reason = "keytype: " + keyClass.getName() + ", valuetype: " 
+ valueClass.getName()
-        + ", failed with " + (keyClass.equals(Text.class) ? "key" : "value") + 
", min size: " + min
-        + ", max size: " + max + ", normal out: " + normalOutPut + ", native 
Out: " + nativeOutPut;
-      assertEquals(reason, true, compareRet);
-    }
-  }
+      LOG.info("===KV Size Test: min size: " + min + ", max size: " + max
+          + ", keyClass: " + keyClass.getName() + ", valueClass: "
+          + valueClass.getName());
 
-  private String runNativeLargeKVTest(String jobname, Class<?> keyclass, 
Class<?> valueclass, Configuration conf)
-      throws Exception {
-    final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + 
"/LargeKV/"
-      + keyclass.getName() + "/" + valueclass.getName();
-    final String outputpath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR 
+ "/LargeKV/"
-      + keyclass.getName() + "/" + valueclass.getName();
-    // if output file exists ,then delete it
-    final FileSystem fs = FileSystem.get(conf);
-    fs.delete(new Path(outputpath), true);
-    fs.close();
-    final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, 
inputpath, outputpath);
-    assertTrue("job should complete successfully", keyJob.runJob());
-    return outputpath;
-  }
+      final String inputPath = TestConstants.NATIVETASK_KVTEST_INPUTDIR
+          + "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName();
 
-  private String runNormalLargeKVTest(String jobname, Class<?> keyclass, 
Class<?> valueclass, Configuration conf)
-      throws Exception {
-    final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + 
"/LargeKV/"
-      + keyclass.getName() + "/" + valueclass.getName();
-    final String outputpath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR 
+ "/LargeKV/"
-      + keyclass.getName() + "/" + valueclass.getName();
-    // if output file exists ,then delete it
-    final FileSystem fs = FileSystem.get(conf);
-    fs.delete(new Path(outputpath), true);
+      final String nativeOutputPath = 
TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR
+          + "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName();
+      // if output file exists ,then delete it
+      fs.delete(new Path(nativeOutputPath), true);
+      final KVJob nativeJob = new KVJob("Test Large Value Size:"
+          + String.valueOf(i), nativeConf, keyClass, valueClass, inputPath,
+          nativeOutputPath);
+      assertTrue("job should complete successfully", nativeJob.runJob());
+
+      final String normalOutputPath = 
TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR
+          + "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName();
+      // if output file exists ,then delete it
+      fs.delete(new Path(normalOutputPath), true);
+      final KVJob normalJob = new KVJob("Test Large Key Size:" + 
String.valueOf(i),
+          normalConf, keyClass, valueClass, inputPath, normalOutputPath);
+      assertTrue("job should complete successfully", normalJob.runJob());
+
+      final boolean compareRet = ResultVerifier.verify(normalOutputPath,
+          nativeOutputPath);
+      final String reason = "keytype: " + keyClass.getName() + ", valuetype: "
+          + valueClass.getName() + ", failed with "
+          + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + 
min
+          + ", max size: " + max + ", normal out: " + normalOutputPath
+          + ", native Out: " + nativeOutputPath;
+      assertEquals(reason, true, compareRet);
+      ResultVerifier.verifyCounters(normalJob.job, nativeJob.job);
+    }
     fs.close();
-    final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, 
inputpath, outputpath);
-    assertTrue("job should complete successfully", keyJob.runJob());
-    return outputpath;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
index a8d2d92..2258726 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred.nativetask.nonsorttest;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
@@ -53,19 +54,20 @@ public class NonSortTest {
     final Job nativeNonSort = getJob(nativeConf, "NativeNonSort",
       TestConstants.NATIVETASK_NONSORT_TEST_INPUTDIR,
       TestConstants.NATIVETASK_NONSORT_TEST_NATIVE_OUTPUT);
-    nativeNonSort.waitForCompletion(true);
+    assertTrue(nativeNonSort.waitForCompletion(true));
 
     Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
     normalConf.addResource(TestConstants.NONSORT_TEST_CONF);
     final Job hadoopWithSort = getJob(normalConf, "NormalJob",
       TestConstants.NATIVETASK_NONSORT_TEST_INPUTDIR,
       TestConstants.NATIVETASK_NONSORT_TEST_NORMAL_OUTPUT);
-    hadoopWithSort.waitForCompletion(true);
+    assertTrue(hadoopWithSort.waitForCompletion(true));
 
     final boolean compareRet = ResultVerifier.verify(
       TestConstants.NATIVETASK_NONSORT_TEST_NATIVE_OUTPUT,
       TestConstants.NATIVETASK_NONSORT_TEST_NORMAL_OUTPUT);
     assertEquals("file compare result: if they are the same ,then return 
true", true, compareRet);
+    ResultVerifier.verifyCounters(hadoopWithSort, nativeNonSort);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java
index b665971..8a7916b 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred.nativetask.testutil;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
 import java.util.zip.CRC32;
 
@@ -25,6 +27,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCounter;
 
 public class ResultVerifier {
   /**
@@ -136,6 +141,23 @@ public class ResultVerifier {
     return true;
   }
 
-  public static void main(String[] args) {
+  public static void verifyCounters(Job normalJob, Job nativeJob, boolean 
hasCombiner) throws IOException {
+    Counters normalCounters = normalJob.getCounters();
+    Counters nativeCounters = nativeJob.getCounters();
+    assertEquals("Counter MAP_OUTPUT_RECORDS should be equal",
+        normalCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(),
+        nativeCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue());
+    assertEquals("Counter REDUCE_INPUT_GROUPS should be equal",
+        normalCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue(),
+        
nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue());
+    if (!hasCombiner) {
+      assertEquals("Counter REDUCE_INPUT_RECORDS should be equal",
+          
normalCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue(),
+          
nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue());
+    }
+  }
+
+  public static void verifyCounters(Job normalJob, Job nativeJob) throws 
IOException {
+    verifyCounters(normalJob, nativeJob, false);
   }
 }

Reply via email to