MAPREDUCE-6067. native-task: fix some counter issues (Binglin Chang)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00322161 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00322161 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00322161 Branch: refs/heads/trunk Commit: 00322161b5d4d54770c2f0823e036537edecf5bf Parents: 1081d9c Author: Binglin Chang <bch...@apache.org> Authored: Fri Sep 5 14:20:39 2014 +0800 Committer: Binglin Chang <bch...@apache.org> Committed: Fri Sep 5 14:20:39 2014 +0800 ---------------------------------------------------------------------- .../CHANGES.MAPREDUCE-2841.txt | 1 + .../NativeMapOutputCollectorDelegator.java | 4 + .../mapred/nativetask/StatusReportChecker.java | 5 -- .../src/handler/MCollectorOutputHandler.cc | 3 +- .../src/main/native/src/lib/IFile.cc | 6 +- .../src/main/native/src/lib/IFile.h | 3 +- .../main/native/src/lib/MapOutputCollector.cc | 85 ++++++++++++++------ .../main/native/src/lib/MapOutputCollector.h | 8 +- .../src/main/native/src/lib/Merge.cc | 24 ------ .../src/main/native/src/lib/PartitionBucket.h | 2 - .../src/main/native/src/lib/TaskCounters.cc | 10 +-- .../src/main/native/src/lib/TaskCounters.h | 8 -- .../nativetask/combinertest/CombinerTest.java | 8 +- .../combinertest/LargeKVCombinerTest.java | 6 +- .../nativetask/compresstest/CompressTest.java | 3 + .../hadoop/mapred/nativetask/kvtest/KVTest.java | 74 +++++++---------- .../mapred/nativetask/kvtest/LargeKVTest.java | 82 +++++++++---------- .../nativetask/nonsorttest/NonSortTest.java | 6 +- .../nativetask/testutil/ResultVerifier.java | 24 +++++- 19 files changed, 179 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt index 269a2f6..279b960 100644 --- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt +++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt @@ -20,3 +20,4 @@ MAPREDUCE-6054. native-task: Speed up tests (todd) MAPREDUCE-6058. native-task: KVTest and LargeKVTest should check mr job is sucessful (Binglin Chang) MAPREDUCE-6056. native-task: move system test working dir to target dir and cleanup test config xml files (Manu Zhang via bchang) MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc cleanup (todd) +MAPREDUCE-6067. native-task: fix some counter issues (Binglin Chang) http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java index 224b95b..828d7df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer; import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.util.QuickSort; /** @@ -46,6 +47,7 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect private JobConf job; private NativeCollectorOnlyHandler<K, V> handler; + private Context context; private StatusReportChecker updater; @Override @@ -58,6 +60,7 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect handler.close(); if (null != updater) { updater.stop(); + NativeRuntime.reportStatus(context.getReporter()); } } @@ -69,6 +72,7 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect @SuppressWarnings("unchecked") @Override public void init(Context context) throws IOException, ClassNotFoundException { + this.context = context; this.job = context.getJobConf(); Platforms.init(job); http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java index f152074..1e76d39 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java @@ -76,12 +76,7 @@ class StatusReportChecker implements Runnable { reporter.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); - reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS); - reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS); - reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS); reporter.getCounter(TaskCounter.SPILLED_RECORDS); - reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); - reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); } public synchronized void start() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc index 7967034..4df5e64 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc @@ -18,6 +18,7 @@ #include "commons.h" #include "util/StringUtil.h" +#include "lib/TaskCounters.h" #include "MCollectorOutputHandler.h" #include "NativeObjectFactory.h" #include "MapOutputCollector.h" @@ -94,4 +95,4 @@ KVBuffer * MCollectorOutputHandler::allocateKVBuffer(uint32_t partitionId, uint3 return dest; } -} //namespace +} // namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc index d11d823..4a5edda 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc @@ -98,7 +98,7 @@ IFileWriter * IFileWriter::create(const std::string & filepath, const MapOutputS IFileWriter::IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype, KeyValueType vtype, const string & codec, Counter * counter, bool deleteTargetStream) : _stream(stream), _dest(NULL), _checksumType(checksumType), _kType(ktype), _vType(vtype), - _codec(codec), _recordCounter(counter), _deleteTargetStream(deleteTargetStream) { + _codec(codec), _recordCounter(counter), _recordCount(0), _deleteTargetStream(deleteTargetStream) { _dest = new ChecksumOutputStream(_stream, _checksumType); _appendBuffer.init(128 * 1024, _dest, _codec); } @@ -184,6 +184,7 @@ void IFileWriter::write(const char * key, uint32_t keyLen, const char * value, u if (NULL != _recordCounter) { _recordCounter->increase(); } + _recordCount++; switch (_vType) { case TextType: @@ -214,7 +215,7 @@ SingleSpillInfo * IFileWriter::getSpillInfo() { _codec); } -void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) { +void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset, uint64_t & recordCount) { if (_spillFileSegments.size() > 0) { offset = _spillFileSegments[_spillFileSegments.size() - 1].uncompressedEndOffset; realOffset = _spillFileSegments[_spillFileSegments.size() - 1].realEndOffset; @@ -222,6 +223,7 @@ void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) { offset = 0; realOffset = 0; } + recordCount = _recordCount; } } // namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h index 76d6fbc..d86e98a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h @@ -129,6 +129,7 @@ protected: AppendBuffer _appendBuffer; vector<IFileSegment> _spillFileSegments; Counter * _recordCounter; + uint64_t _recordCount; bool _deleteTargetStream; @@ -153,7 +154,7 @@ public: SingleSpillInfo * getSpillInfo(); - void getStatistics(uint64_t & offset, uint64_t & realOffset); + void getStatistics(uint64_t & offset, uint64_t & realOffset, uint64_t & recordCount); virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) { write((const char*)key, keyLen, (const char*)value, valueLen); http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc index d2b116d..d151f1f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc @@ -76,9 +76,11 @@ void CombineRunnerWrapper::combine(CombineContext type, KVIterator * kvIterator, ///////////////////////////////////////////////////////////////// MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, SpillOutputService * spillService) - : _config(NULL), _numPartitions(numberPartitions), _buckets(NULL), _keyComparator(NULL), - _combineRunner(NULL), _spilledRecords(NULL), _spillOutput(spillService), - _defaultBlockSize(0), _pool(NULL) { + : _config(NULL), _numPartitions(numberPartitions), _buckets(NULL), + _keyComparator(NULL), _combineRunner(NULL), + _mapOutputRecords(NULL), _mapOutputBytes(NULL), + _mapOutputMaterializedBytes(NULL), _spilledRecords(NULL), + _spillOutput(spillService), _defaultBlockSize(0), _pool(NULL) { _pool = new MemoryPool(); } @@ -108,7 +110,7 @@ MapOutputCollector::~MapOutputCollector() { } void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity, - ComparatorPtr keyComparator, Counter * spilledRecords, ICombineRunner * combiner) { + ComparatorPtr keyComparator, ICombineRunner * combiner) { this->_combineRunner = combiner; @@ -128,7 +130,15 @@ void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity _buckets[partitionId] = pb; } - _spilledRecords = spilledRecords; + _mapOutputRecords = NativeObjectFactory::GetCounter( + TaskCounters::TASK_COUNTER_GROUP, TaskCounters::MAP_OUTPUT_RECORDS); + _mapOutputBytes = NativeObjectFactory::GetCounter( + TaskCounters::TASK_COUNTER_GROUP, TaskCounters::MAP_OUTPUT_BYTES); + _mapOutputMaterializedBytes = NativeObjectFactory::GetCounter( + TaskCounters::TASK_COUNTER_GROUP, + TaskCounters::MAP_OUTPUT_MATERIALIZED_BYTES); + _spilledRecords = NativeObjectFactory::GetCounter( + TaskCounters::TASK_COUNTER_GROUP, TaskCounters::SPILLED_RECORDS); _collectTimer.reset(); } @@ -155,9 +165,6 @@ void MapOutputCollector::configure(Config * config) { ComparatorPtr comparator = getComparator(config, _spec); - Counter * spilledRecord = NativeObjectFactory::GetCounter(TaskCounters::TASK_COUNTER_GROUP, - TaskCounters::SPILLED_RECORDS); - ICombineRunner * combiner = NULL; if (NULL != config->get(NATIVE_COMBINER) // config name for old api and new api @@ -166,7 +173,7 @@ void MapOutputCollector::configure(Config * config) { combiner = new CombineRunnerWrapper(config, _spillOutput); } - init(defaultBlockSize, capacity, comparator, spilledRecord, combiner); + init(defaultBlockSize, capacity, comparator, combiner); } KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) { @@ -182,7 +189,7 @@ KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t k if (NULL == spillpath || spillpath->length() == 0) { THROW_EXCEPTION(IOException, "Illegal(empty) spill files path"); } else { - middleSpill(*spillpath, ""); + middleSpill(*spillpath, "", false); delete spillpath; } @@ -193,6 +200,8 @@ KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t k THROW_EXCEPTION(OutOfMemoryException, "key/value pair larger than io.sort.mb"); } } + _mapOutputRecords->increase(); + _mapOutputBytes->increase(kvlength - KVBuffer::headerLength()); return dest; } @@ -272,10 +281,9 @@ void MapOutputCollector::sortPartitions(SortOrder orderType, SortAlgorithm sortT } void MapOutputCollector::middleSpill(const std::string & spillOutput, - const std::string & indexFilePath) { + const std::string & indexFilePath, bool final) { uint64_t collecttime = _collectTimer.now() - _collectTimer.last(); - const uint64_t M = 1000000; //million if (spillOutput.empty()) { THROW_EXCEPTION(IOException, "MapOutputCollector: Spill file path empty"); @@ -293,10 +301,24 @@ void MapOutputCollector::middleSpill(const std::string & spillOutput, info->path = spillOutput; uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime; - LOG( - "[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: %s, id: %d, collect: %"PRIu64" ms, sort: %"PRIu64" ms, spill: %"PRIu64" ms, records: %"PRIu64", uncompressed total bytes: %"PRIu64", compressed total bytes: %"PRIu64"}", - info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, metrics.sortTime / M, spillTime / M, - metrics.recordCount, info->getEndPosition(), info->getRealEndPosition()); + const uint64_t M = 1000000; //million + LOG("%s-spill: { id: %d, collect: %"PRIu64" ms, " + "in-memory sort: %"PRIu64" ms, in-memory records: %"PRIu64", " + "merge&spill: %"PRIu64" ms, uncompressed size: %"PRIu64", " + "real size: %"PRIu64" path: %s }", + final ? "Final" : "Mid", + _spillInfos.getSpillCount(), + collecttime / M, + metrics.sortTime / M, + metrics.recordCount, + spillTime / M, + info->getEndPosition(), + info->getRealEndPosition(), + spillOutput.c_str()); + + if (final) { + _mapOutputMaterializedBytes->increase(info->getRealEndPosition()); + } if (indexFilePath.length() > 0) { info->writeSpillInfo(indexFilePath); @@ -320,11 +342,8 @@ void MapOutputCollector::middleSpill(const std::string & spillOutput, void MapOutputCollector::finalSpill(const std::string & filepath, const std::string & idx_file_path) { - const uint64_t M = 1000000; //million - LOG("[MapOutputCollector::final_merge_and_spill] Spilling file path: %s", filepath.c_str()); - if (_spillInfos.getSpillCount() == 0) { - middleSpill(filepath, idx_file_path); + middleSpill(filepath, idx_file_path, true); return; } @@ -339,16 +358,32 @@ void MapOutputCollector::finalSpill(const std::string & filepath, SortMetrics metrics; sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics); - LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: %"PRIu64" ms, records: %"PRIu64"}", - metrics.sortTime / M, metrics.recordCount); merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions)); Timer timer; merger->merge(); - LOG( - "[MapOutputCollector::final_merge_and_spill] Merge and Spill:{spilled file id: %d, merge and spill time: %"PRIu64" ms}", - _spillInfos.getSpillCount(), (timer.now() - timer.last()) / M); + + uint64_t outputSize; + uint64_t realOutputSize; + uint64_t recordCount; + writer->getStatistics(outputSize, realOutputSize, recordCount); + + const uint64_t M = 1000000; //million + LOG("Final-merge-spill: { id: %d, in-memory sort: %"PRIu64" ms, " + "in-memory records: %"PRIu64", merge&spill: %"PRIu64" ms, " + "records: %"PRIu64", uncompressed size: %"PRIu64", " + "real size: %"PRIu64" path: %s }", + _spillInfos.getSpillCount(), + metrics.sortTime / M, + metrics.recordCount, + (timer.now() - timer.last()) / M, + recordCount, + outputSize, + realOutputSize, + filepath.c_str()); + + _mapOutputMaterializedBytes->increase(realOutputSize); delete merger; http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h index b7150ac..474b8b1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h @@ -85,7 +85,11 @@ private: ICombineRunner * _combineRunner; + Counter * _mapOutputRecords; + Counter * _mapOutputBytes; + Counter * _mapOutputMaterializedBytes; Counter * _spilledRecords; + SpillOutputService * _spillOutput; uint32_t _defaultBlockSize; @@ -118,7 +122,7 @@ public: private: void init(uint32_t maxBlockSize, uint32_t memory_capacity, ComparatorPtr keyComparator, - Counter * spilledRecord, ICombineRunner * combiner); + ICombineRunner * combiner); void reset(); @@ -149,7 +153,7 @@ private: * normal spill use options in _config * @param filepaths: spill file path */ - void middleSpill(const std::string & spillOutput, const std::string & indexFilePath); + void middleSpill(const std::string & spillOutput, const std::string & indexFilePath, bool final); /** * final merge and/or spill use options in _config, and http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc index 5434980..126b82d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc @@ -131,7 +131,6 @@ bool Merger::next(Buffer & key, Buffer & value) { } void Merger::merge() { - Timer timer; uint64_t total_record = 0; _heap.reserve(_entries.size()); MergeEntryPtr * base = &(_heap[0]); @@ -153,29 +152,6 @@ void Merger::merge() { } endPartition(); } - - uint64_t interval = (timer.now() - timer.last()); - uint64_t M = 1000000; //1 million - - uint64_t output_size; - uint64_t real_output_size; - _writer->getStatistics(output_size, real_output_size); - - if (total_record != 0) { - LOG("[Merge] Merged segment#: %lu, record#: %"PRIu64", avg record size: %"PRIu64", uncompressed total bytes: %"PRIu64", compressed total bytes: %"PRIu64", time: %"PRIu64" ms", - _entries.size(), - total_record, - output_size / (total_record), - output_size, - real_output_size, - interval / M); - } else { - LOG("[Merge] Merged segments#, %lu, uncompressed total bytes: %"PRIu64", compressed total bytes: %"PRIu64", time: %"PRIu64" ms", - _entries.size(), - output_size, - real_output_size, - interval / M); - } } } // namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h index cf5e33b..3be52ff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h @@ -117,8 +117,6 @@ public: memBlock = new MemoryBlock(buff, allocated); _memBlocks.push_back(memBlock); return memBlock->allocateKVBuffer(kvLength); - } else { - LOG("MemoryPool is full, fail to allocate new MemBlock, block size: %d, kv length: %d", expect, kvLength); } } return NULL; http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc index 7aa7db8..39a7c43 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc @@ -22,22 +22,14 @@ namespace NativeTask { #define DEFINE_COUNTER(name) const char * TaskCounters::name = #name; -const char * TaskCounters::TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; +const char * TaskCounters::TASK_COUNTER_GROUP = "org.apache.hadoop.mapreduce.TaskCounter"; DEFINE_COUNTER(MAP_INPUT_RECORDS) DEFINE_COUNTER(MAP_OUTPUT_RECORDS) -DEFINE_COUNTER(MAP_SKIPPED_RECORDS) -DEFINE_COUNTER(MAP_INPUT_BYTES) DEFINE_COUNTER(MAP_OUTPUT_BYTES) DEFINE_COUNTER(MAP_OUTPUT_MATERIALIZED_BYTES) DEFINE_COUNTER(COMBINE_INPUT_RECORDS) DEFINE_COUNTER(COMBINE_OUTPUT_RECORDS) -DEFINE_COUNTER(REDUCE_INPUT_GROUPS) -DEFINE_COUNTER(REDUCE_SHUFFLE_BYTES) -DEFINE_COUNTER(REDUCE_INPUT_RECORDS) -DEFINE_COUNTER(REDUCE_OUTPUT_RECORDS) -DEFINE_COUNTER(REDUCE_SKIPPED_GROUPS) -DEFINE_COUNTER(REDUCE_SKIPPED_RECORDS) DEFINE_COUNTER(SPILLED_RECORDS) const char * TaskCounters::FILESYSTEM_COUNTER_GROUP = "FileSystemCounters"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h index 6afc207..23cedf9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h @@ -27,18 +27,10 @@ public: static const char * MAP_INPUT_RECORDS; static const char * MAP_OUTPUT_RECORDS; - static const char * MAP_SKIPPED_RECORDS; - static const char * MAP_INPUT_BYTES; static const char * MAP_OUTPUT_BYTES; static const char * MAP_OUTPUT_MATERIALIZED_BYTES; static const char * COMBINE_INPUT_RECORDS; static const char * COMBINE_OUTPUT_RECORDS; - static const char * REDUCE_INPUT_GROUPS; - static const char * REDUCE_SHUFFLE_BYTES; - static const char * REDUCE_INPUT_RECORDS; - static const char * REDUCE_OUTPUT_RECORDS; - static const char * REDUCE_SKIPPED_GROUPS; - static const char * REDUCE_SKIPPED_RECORDS; static const char * SPILLED_RECORDS; static const char * FILESYSTEM_COUNTER_GROUP; http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java index d7f05be..61b4356 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java @@ -63,15 +63,9 @@ public class CombinerTest { final Job normaljob = getJob("normalwordcount", commonConf, inputpath, hadoopoutputpath); assertTrue(nativejob.waitForCompletion(true)); - - Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); - assertTrue(normaljob.waitForCompletion(true)); - Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); - assertEquals(true, ResultVerifier.verify(nativeoutputpath, hadoopoutputpath)); - assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", - nativeReduceGroups.getValue(), normalReduceGroups.getValue()); + ResultVerifier.verifyCounters(normaljob, nativejob, true); } @Before http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java index 4ba4550..d054793 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskCounter; import org.junit.AfterClass; import org.apache.hadoop.util.NativeCodeLoader; import org.junit.Assume; @@ -87,10 +88,8 @@ public class LargeKVCombinerTest { final Job nativejob = CombinerTest.getJob("nativewordcount", nativeConf, inputPath, nativeOutputPath); assertTrue(nativejob.waitForCompletion(true)); - Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); assertTrue(normaljob.waitForCompletion(true)); - Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS); final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath); @@ -98,8 +97,7 @@ public class LargeKVCombinerTest { + ", max size: " + max + ", normal out: " + hadoopOutputPath + ", native Out: " + nativeOutputPath; assertEquals(reason, true, compareRet); -// assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", -// nativeReduceGroups.getValue(), normalReduceGroups.getValue()); + ResultVerifier.verifyCounters(normaljob, nativejob, true); } fs.close(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java index 3a03d29..b8f9dfc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java @@ -69,6 +69,7 @@ public class CompressTest { final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath); assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + ResultVerifier.verifyCounters(hadoopjob, job); } @Test @@ -91,6 +92,7 @@ public class CompressTest { final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath); assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + ResultVerifier.verifyCounters(hadoopjob, job); } @Test @@ -112,6 +114,7 @@ public class CompressTest { assertTrue(hadoopJob.waitForCompletion(true)); final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath); assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + ResultVerifier.verifyCounters(hadoopJob, nativeJob); } @Before http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java index 784c14f..6b658ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java @@ -114,21 +114,34 @@ public class KVTest { @Test public void testKVCompability() throws Exception { - final String nativeoutput = this.runNativeTest( - "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass); - final String normaloutput = this.runNormalTest( - "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass); - final boolean compareRet = ResultVerifier.verify(normaloutput, nativeoutput); - final String input = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/" - + keyclass.getName() + "/" + valueclass.getName(); - if(compareRet){ - final FileSystem fs = FileSystem.get(hadoopkvtestconf); - fs.delete(new Path(nativeoutput), true); - fs.delete(new Path(normaloutput), true); - fs.delete(new Path(input), true); - fs.close(); - } - assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + final FileSystem fs = FileSystem.get(nativekvtestconf); + final String jobName = "Test:" + keyclass.getSimpleName() + "--" + + valueclass.getSimpleName(); + final String inputPath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/" + + keyclass.getName() + "/" + valueclass.getName(); + final String nativeOutputPath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR + + "/" + keyclass.getName() + "/" + valueclass.getName(); + // if output file exists ,then delete it + fs.delete(new Path(nativeOutputPath), true); + nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true"); + final KVJob nativeJob = new KVJob(jobName, nativekvtestconf, keyclass, + valueclass, inputPath, nativeOutputPath); + assertTrue("job should complete successfully", nativeJob.runJob()); + + final String normalOutputPath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR + + "/" + keyclass.getName() + "/" + valueclass.getName(); + // if output file exists ,then delete it + fs.delete(new Path(normalOutputPath), true); + hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false"); + final KVJob normalJob = new KVJob(jobName, hadoopkvtestconf, keyclass, + valueclass, inputPath, normalOutputPath); + assertTrue("job should complete successfully", normalJob.runJob()); + + final boolean compareRet = ResultVerifier.verify(normalOutputPath, + nativeOutputPath); + assertEquals("job output not the same", true, compareRet); + ResultVerifier.verifyCounters(normalJob.job, nativeJob.job); + fs.close(); } @AfterClass @@ -137,35 +150,4 @@ public class KVTest { fs.delete(new Path(TestConstants.NATIVETASK_KVTEST_DIR), true); fs.close(); } - - private String runNativeTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws Exception { - final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/" - + keyclass.getName() + "/" + valueclass.getName(); - final String outputpath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR + "/" - + keyclass.getName() + "/" + valueclass.getName(); - // if output file exists ,then delete it - final FileSystem fs = FileSystem.get(nativekvtestconf); - fs.delete(new Path(outputpath)); - fs.close(); - nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true"); - final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass, valueclass, inputpath, outputpath); - assertTrue("job should complete successfully", keyJob.runJob()); - return outputpath; - } - - private String runNormalTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws Exception { - final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/" - + keyclass.getName() + "/" + valueclass.getName(); - final String outputpath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR + "/" - + keyclass.getName() + "/" + valueclass.getName(); - // if output file exists ,then delete it - final FileSystem fs = FileSystem.get(hadoopkvtestconf); - fs.delete(new Path(outputpath)); - fs.close(); - hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false"); - final KVJob keyJob = new KVJob(jobname, hadoopkvtestconf, keyclass, valueclass, inputpath, outputpath); - assertTrue("job should complete successfully", keyJob.runJob()); - return outputpath; - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java index 0202549..5f1619e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java @@ -78,11 +78,13 @@ public class LargeKVTest { if (!keyClass.equals(Text.class) && !valueClass.equals(Text.class)) { return; } - final int deafult_KVSize_Maximum = 1 << 22; // 4M - final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST, - deafult_KVSize_Maximum); + final int deafultKVSizeMaximum = 1 << 22; // 4M + final int kvSizeMaximum = normalConf.getInt( + TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST, + deafultKVSizeMaximum); + final FileSystem fs = FileSystem.get(normalConf); - for (int i = 65536; i <= KVSize_Maximu; i *= 4) { + for (int i = 65536; i <= kvSizeMaximum; i *= 4) { int min = i / 4; int max = i; nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min)); @@ -90,48 +92,40 @@ public class LargeKVTest { normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min)); normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max)); - LOG.info("===KV Size Test: min size: " + min + ", max size: " + max + ", keyClass: " - + keyClass.getName() + ", valueClass: " + valueClass.getName()); - - final String nativeOutPut = runNativeLargeKVTest("Test Large Value Size:" + String.valueOf(i), keyClass, - valueClass, nativeConf); - final String normalOutPut = this.runNormalLargeKVTest("Test Large Key Size:" + String.valueOf(i), keyClass, - valueClass, normalConf); - final boolean compareRet = ResultVerifier.verify(normalOutPut, nativeOutPut); - final String reason = "keytype: " + keyClass.getName() + ", valuetype: " + valueClass.getName() - + ", failed with " + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min - + ", max size: " + max + ", normal out: " + normalOutPut + ", native Out: " + nativeOutPut; - assertEquals(reason, true, compareRet); - } - } + LOG.info("===KV Size Test: min size: " + min + ", max size: " + max + + ", keyClass: " + keyClass.getName() + ", valueClass: " + + valueClass.getName()); - private String runNativeLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf) - throws Exception { - final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/LargeKV/" - + keyclass.getName() + "/" + valueclass.getName(); - final String outputpath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR + "/LargeKV/" - + keyclass.getName() + "/" + valueclass.getName(); - // if output file exists ,then delete it - final FileSystem fs = FileSystem.get(conf); - fs.delete(new Path(outputpath), true); - fs.close(); - final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath); - assertTrue("job should complete successfully", keyJob.runJob()); - return outputpath; - } + final String inputPath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + + "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName(); - private String runNormalLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf) - throws Exception { - final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/LargeKV/" - + keyclass.getName() + "/" + valueclass.getName(); - final String outputpath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR + "/LargeKV/" - + keyclass.getName() + "/" + valueclass.getName(); - // if output file exists ,then delete it - final FileSystem fs = FileSystem.get(conf); - fs.delete(new Path(outputpath), true); + final String nativeOutputPath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR + + "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName(); + // if output file exists ,then delete it + fs.delete(new Path(nativeOutputPath), true); + final KVJob nativeJob = new KVJob("Test Large Value Size:" + + String.valueOf(i), nativeConf, keyClass, valueClass, inputPath, + nativeOutputPath); + assertTrue("job should complete successfully", nativeJob.runJob()); + + final String normalOutputPath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR + + "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName(); + // if output file exists ,then delete it + fs.delete(new Path(normalOutputPath), true); + final KVJob normalJob = new KVJob("Test Large Key Size:" + String.valueOf(i), + normalConf, keyClass, valueClass, inputPath, normalOutputPath); + assertTrue("job should complete successfully", normalJob.runJob()); + + final boolean compareRet = ResultVerifier.verify(normalOutputPath, + nativeOutputPath); + final String reason = "keytype: " + keyClass.getName() + ", valuetype: " + + valueClass.getName() + ", failed with " + + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min + + ", max size: " + max + ", normal out: " + normalOutputPath + + ", native Out: " + nativeOutputPath; + assertEquals(reason, true, compareRet); + ResultVerifier.verifyCounters(normalJob.job, nativeJob.job); + } fs.close(); - final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath); - assertTrue("job should complete successfully", keyJob.runJob()); - return outputpath; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java index a8d2d92..2258726 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred.nativetask.nonsorttest; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -53,19 +54,20 @@ public class NonSortTest { final Job nativeNonSort = getJob(nativeConf, "NativeNonSort", TestConstants.NATIVETASK_NONSORT_TEST_INPUTDIR, TestConstants.NATIVETASK_NONSORT_TEST_NATIVE_OUTPUT); - nativeNonSort.waitForCompletion(true); + assertTrue(nativeNonSort.waitForCompletion(true)); Configuration normalConf = ScenarioConfiguration.getNormalConfiguration(); normalConf.addResource(TestConstants.NONSORT_TEST_CONF); final Job hadoopWithSort = getJob(normalConf, "NormalJob", TestConstants.NATIVETASK_NONSORT_TEST_INPUTDIR, TestConstants.NATIVETASK_NONSORT_TEST_NORMAL_OUTPUT); - hadoopWithSort.waitForCompletion(true); + assertTrue(hadoopWithSort.waitForCompletion(true)); final boolean compareRet = ResultVerifier.verify( TestConstants.NATIVETASK_NONSORT_TEST_NATIVE_OUTPUT, TestConstants.NATIVETASK_NONSORT_TEST_NORMAL_OUTPUT); assertEquals("file compare result: if they are the same ,then return true", true, compareRet); + ResultVerifier.verifyCounters(hadoopWithSort, nativeNonSort); } @Before http://git-wip-us.apache.org/repos/asf/hadoop/blob/00322161/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java index b665971..8a7916b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.mapred.nativetask.testutil; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.zip.CRC32; @@ -25,6 +27,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskCounter; public class ResultVerifier { /** @@ -136,6 +141,23 @@ public class ResultVerifier { return true; } - public static void main(String[] args) { + public static void verifyCounters(Job normalJob, Job nativeJob, boolean hasCombiner) throws IOException { + Counters normalCounters = normalJob.getCounters(); + Counters nativeCounters = nativeJob.getCounters(); + assertEquals("Counter MAP_OUTPUT_RECORDS should be equal", + normalCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(), + nativeCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue()); + assertEquals("Counter REDUCE_INPUT_GROUPS should be equal", + normalCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue(), + nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue()); + if (!hasCombiner) { + assertEquals("Counter REDUCE_INPUT_RECORDS should be equal", + normalCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue(), + nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue()); + } + } + + public static void verifyCounters(Job normalJob, Job nativeJob) throws IOException { + verifyCounters(normalJob, nativeJob, false); } }