Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1436936&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Jan 22 14:10:42 2013 @@ -0,0 +1,797 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.IFile; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapred.Merger; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.IFile.Reader; +import org.apache.hadoop.mapred.IFile.Writer; +import org.apache.hadoop.mapred.Merger.Segment; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; +import org.apache.hadoop.mapred.Task.CombineValuesIterator; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.annotations.VisibleForTesting; + +@SuppressWarnings(value={"unchecked"}) +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class MergeManagerImpl<K, V> implements MergeManager<K, V> { + + private static final Log LOG = LogFactory.getLog(MergeManagerImpl.class); + + /* Maximum percentage of the in-memory limit that a single shuffle can + * consume*/ + private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT + = 0.25f; + + private final TaskAttemptID reduceId; + + private final JobConf jobConf; + private final FileSystem localFS; + private final FileSystem rfs; + private final LocalDirAllocator localDirAllocator; + + protected MapOutputFile mapOutputFile; + + Set<InMemoryMapOutput<K, V>> inMemoryMergedMapOutputs = + new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>()); + private IntermediateMemoryToMemoryMerger memToMemMerger; + + Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs = + new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>()); + private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger; + + Set<Path> onDiskMapOutputs = new TreeSet<Path>(); + private final OnDiskMerger onDiskMerger; + + private final long memoryLimit; + private long usedMemory; + private long commitMemory; + private final long maxSingleShuffleLimit; + + private final int memToMemMergeOutputsThreshold; + private final long mergeThreshold; + + private final int ioSortFactor; + + private final Reporter reporter; + private final ExceptionReporter exceptionReporter; + + /** + * Combiner class to run during in-memory merge, if defined. + */ + private final Class<? extends Reducer> combinerClass; + + /** + * Resettable collector used for combine. + */ + private final CombineOutputCollector<K,V> combineCollector; + + private final Counters.Counter spilledRecordsCounter; + + private final Counters.Counter reduceCombineInputCounter; + + private final Counters.Counter mergedMapOutputsCounter; + + private final CompressionCodec codec; + + private final Progress mergePhase; + + public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, + FileSystem localFS, + LocalDirAllocator localDirAllocator, + Reporter reporter, + CompressionCodec codec, + Class<? extends Reducer> combinerClass, + CombineOutputCollector<K,V> combineCollector, + Counters.Counter spilledRecordsCounter, + Counters.Counter reduceCombineInputCounter, + Counters.Counter mergedMapOutputsCounter, + ExceptionReporter exceptionReporter, + Progress mergePhase, MapOutputFile mapOutputFile) { + this.reduceId = reduceId; + this.jobConf = jobConf; + this.localDirAllocator = localDirAllocator; + this.exceptionReporter = exceptionReporter; + + this.reporter = reporter; + this.codec = codec; + this.combinerClass = combinerClass; + this.combineCollector = combineCollector; + this.reduceCombineInputCounter = reduceCombineInputCounter; + this.spilledRecordsCounter = spilledRecordsCounter; + this.mergedMapOutputsCounter = mergedMapOutputsCounter; + this.mapOutputFile = mapOutputFile; + this.mapOutputFile.setConf(jobConf); + + this.localFS = localFS; + this.rfs = ((LocalFileSystem)localFS).getRaw(); + + final float maxInMemCopyUse = + jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f); + if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { + throw new IllegalArgumentException("Invalid value for " + + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " + + maxInMemCopyUse); + } + + // Allow unit tests to fix Runtime memory + this.memoryLimit = + (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, + Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) + * maxInMemCopyUse); + + this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); + + final float singleShuffleMemoryLimitPercent = + jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, + DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT); + if (singleShuffleMemoryLimitPercent <= 0.0f + || singleShuffleMemoryLimitPercent > 1.0f) { + throw new IllegalArgumentException("Invalid value for " + + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": " + + singleShuffleMemoryLimitPercent); + } + + usedMemory = 0L; + commitMemory = 0L; + this.maxSingleShuffleLimit = + (long)(memoryLimit * singleShuffleMemoryLimitPercent); + this.memToMemMergeOutputsThreshold = + jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); + this.mergeThreshold = (long)(this.memoryLimit * + jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, + 0.90f)); + LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " + + "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " + + "mergeThreshold=" + mergeThreshold + ", " + + "ioSortFactor=" + ioSortFactor + ", " + + "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold); + + if (this.maxSingleShuffleLimit >= this.mergeThreshold) { + throw new RuntimeException("Invlaid configuration: " + + "maxSingleShuffleLimit should be less than mergeThreshold" + + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + + "mergeThreshold: " + this.mergeThreshold); + } + + boolean allowMemToMemMerge = + jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false); + if (allowMemToMemMerge) { + this.memToMemMerger = + new IntermediateMemoryToMemoryMerger(this, + memToMemMergeOutputsThreshold); + this.memToMemMerger.start(); + } else { + this.memToMemMerger = null; + } + + this.inMemoryMerger = createInMemoryMerger(); + this.inMemoryMerger.start(); + + this.onDiskMerger = new OnDiskMerger(this); + this.onDiskMerger.start(); + + this.mergePhase = mergePhase; + } + + protected MergeThread<InMemoryMapOutput<K,V>, K,V> createInMemoryMerger() { + return new InMemoryMerger(this); + } + + TaskAttemptID getReduceId() { + return reduceId; + } + + @VisibleForTesting + ExceptionReporter getExceptionReporter() { + return exceptionReporter; + } + + @Override + public void waitForResource() throws InterruptedException { + inMemoryMerger.waitForMerge(); + } + + private boolean canShuffleToMemory(long requestedSize) { + return (requestedSize < maxSingleShuffleLimit); + } + + @Override + public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, + long requestedSize, + int fetcher + ) throws IOException { + if (!canShuffleToMemory(requestedSize)) { + LOG.info(mapId + ": Shuffling to disk since " + requestedSize + + " is greater than maxSingleShuffleLimit (" + + maxSingleShuffleLimit + ")"); + return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize, + jobConf, mapOutputFile, fetcher, true); + } + + // Stall shuffle if we are above the memory limit + + // It is possible that all threads could just be stalling and not make + // progress at all. This could happen when: + // + // requested size is causing the used memory to go above limit && + // requested size < singleShuffleLimit && + // current used size < mergeThreshold (merge will not get triggered) + // + // To avoid this from happening, we allow exactly one thread to go past + // the memory limit. We check (usedMemory > memoryLimit) and not + // (usedMemory + requestedSize > memoryLimit). When this thread is done + // fetching, this will automatically trigger a merge thereby unlocking + // all the stalled threads + + if (usedMemory > memoryLimit) { + LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + + ") is greater than memoryLimit (" + memoryLimit + ")." + + " CommitMemory is (" + commitMemory + ")"); + return null; + } + + // Allow the in-memory shuffle to progress + LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")." + + "CommitMemory is (" + commitMemory + ")"); + return unconditionalReserve(mapId, requestedSize, true); + } + + /** + * Unconditional Reserve is used by the Memory-to-Memory thread + * @return + */ + private synchronized InMemoryMapOutput<K, V> unconditionalReserve( + TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) { + usedMemory += requestedSize; + return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize, + codec, primaryMapOutput); + } + + synchronized void unreserve(long size) { + usedMemory -= size; + } + + public synchronized void closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) { + inMemoryMapOutputs.add(mapOutput); + LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() + + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory); + + commitMemory+= mapOutput.getSize(); + + // Can hang if mergeThreshold is really low. + if (commitMemory >= mergeThreshold) { + LOG.info("Starting inMemoryMerger's merge since commitMemory=" + + commitMemory + " > mergeThreshold=" + mergeThreshold + + ". Current usedMemory=" + usedMemory); + inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); + inMemoryMergedMapOutputs.clear(); + inMemoryMerger.startMerge(inMemoryMapOutputs); + commitMemory = 0L; // Reset commitMemory. + } + + if (memToMemMerger != null) { + if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { + memToMemMerger.startMerge(inMemoryMapOutputs); + } + } + } + + + public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K,V> mapOutput) { + inMemoryMergedMapOutputs.add(mapOutput); + LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + + ", inMemoryMergedMapOutputs.size() -> " + + inMemoryMergedMapOutputs.size()); + } + + public synchronized void closeOnDiskFile(Path file) { + onDiskMapOutputs.add(file); + + if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { + onDiskMerger.startMerge(onDiskMapOutputs); + } + } + + @Override + public RawKeyValueIterator close() throws Throwable { + // Wait for on-going merges to complete + if (memToMemMerger != null) { + memToMemMerger.close(); + } + inMemoryMerger.close(); + onDiskMerger.close(); + + List<InMemoryMapOutput<K, V>> memory = + new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); + memory.addAll(inMemoryMapOutputs); + List<Path> disk = new ArrayList<Path>(onDiskMapOutputs); + return finalMerge(jobConf, rfs, memory, disk); + } + + private class IntermediateMemoryToMemoryMerger + extends MergeThread<InMemoryMapOutput<K, V>, K, V> { + + public IntermediateMemoryToMemoryMerger(MergeManagerImpl<K, V> manager, + int mergeFactor) { + super(manager, mergeFactor, exceptionReporter); + setName("InMemoryMerger - Thread to do in-memory merge of in-memory " + + "shuffled map-outputs"); + setDaemon(true); + } + + @Override + public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException { + if (inputs == null || inputs.size() == 0) { + return; + } + + TaskAttemptID dummyMapId = inputs.get(0).getMapId(); + List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); + long mergeOutputSize = + createInMemorySegments(inputs, inMemorySegments, 0); + int noInMemorySegments = inMemorySegments.size(); + + InMemoryMapOutput<K, V> mergedMapOutputs = + unconditionalReserve(dummyMapId, mergeOutputSize, false); + + Writer<K, V> writer = + new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream()); + + LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + + " segments of total-size: " + mergeOutputSize); + + RawKeyValueIterator rIter = + Merger.merge(jobConf, rfs, + (Class<K>)jobConf.getMapOutputKeyClass(), + (Class<V>)jobConf.getMapOutputValueClass(), + inMemorySegments, inMemorySegments.size(), + new Path(reduceId.toString()), + (RawComparator<K>)jobConf.getOutputKeyComparator(), + reporter, null, null, null); + Merger.writeFile(rIter, writer, reporter, jobConf); + writer.close(); + + LOG.info(reduceId + + " Memory-to-Memory merge of the " + noInMemorySegments + + " files in-memory complete."); + + // Note the output of the merge + closeInMemoryMergedFile(mergedMapOutputs); + } + } + + private class InMemoryMerger extends MergeThread<InMemoryMapOutput<K,V>, K,V> { + + public InMemoryMerger(MergeManagerImpl<K, V> manager) { + super(manager, Integer.MAX_VALUE, exceptionReporter); + setName + ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs"); + setDaemon(true); + } + + @Override + public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException { + if (inputs == null || inputs.size() == 0) { + return; + } + + //name this output file same as the name of the first file that is + //there in the current list of inmem files (this is guaranteed to + //be absent on the disk currently. So we don't overwrite a prev. + //created spill). Also we need to create the output file now since + //it is not guaranteed that this file will be present after merge + //is called (we delete empty files as soon as we see them + //in the merge method) + + //figure out the mapId + TaskAttemptID mapId = inputs.get(0).getMapId(); + TaskID mapTaskId = mapId.getTaskID(); + + List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); + long mergeOutputSize = + createInMemorySegments(inputs, inMemorySegments,0); + int noInMemorySegments = inMemorySegments.size(); + + Path outputPath = + mapOutputFile.getInputFileForWrite(mapTaskId, + mergeOutputSize).suffix( + Task.MERGED_OUTPUT_PREFIX); + + Writer<K,V> writer = + new Writer<K,V>(jobConf, rfs, outputPath, + (Class<K>) jobConf.getMapOutputKeyClass(), + (Class<V>) jobConf.getMapOutputValueClass(), + codec, null); + + RawKeyValueIterator rIter = null; + try { + LOG.info("Initiating in-memory merge with " + noInMemorySegments + + " segments..."); + + rIter = Merger.merge(jobConf, rfs, + (Class<K>)jobConf.getMapOutputKeyClass(), + (Class<V>)jobConf.getMapOutputValueClass(), + inMemorySegments, inMemorySegments.size(), + new Path(reduceId.toString()), + (RawComparator<K>)jobConf.getOutputKeyComparator(), + reporter, spilledRecordsCounter, null, null); + + if (null == combinerClass) { + Merger.writeFile(rIter, writer, reporter, jobConf); + } else { + combineCollector.setWriter(writer); + combineAndSpill(rIter, reduceCombineInputCounter); + } + writer.close(); + + LOG.info(reduceId + + " Merge of the " + noInMemorySegments + + " files in-memory complete." + + " Local file is " + outputPath + " of size " + + localFS.getFileStatus(outputPath).getLen()); + } catch (IOException e) { + //make sure that we delete the ondisk file that we created + //earlier when we invoked cloneFileAttributes + localFS.delete(outputPath, true); + throw e; + } + + // Note the output of the merge + closeOnDiskFile(outputPath); + } + + } + + private class OnDiskMerger extends MergeThread<Path,K,V> { + + public OnDiskMerger(MergeManagerImpl<K, V> manager) { + super(manager, Integer.MAX_VALUE, exceptionReporter); + setName("OnDiskMerger - Thread to merge on-disk map-outputs"); + setDaemon(true); + } + + @Override + public void merge(List<Path> inputs) throws IOException { + // sanity check + if (inputs == null || inputs.isEmpty()) { + LOG.info("No ondisk files to merge..."); + return; + } + + long approxOutputSize = 0; + int bytesPerSum = + jobConf.getInt("io.bytes.per.checksum", 512); + + LOG.info("OnDiskMerger: We have " + inputs.size() + + " map outputs on disk. Triggering merge..."); + + // 1. Prepare the list of files to be merged. + for (Path file : inputs) { + approxOutputSize += localFS.getFileStatus(file).getLen(); + } + + // add the checksum length + approxOutputSize += + ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); + + // 2. Start the on-disk merge process + Path outputPath = + localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), + approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); + Writer<K,V> writer = + new Writer<K,V>(jobConf, rfs, outputPath, + (Class<K>) jobConf.getMapOutputKeyClass(), + (Class<V>) jobConf.getMapOutputValueClass(), + codec, null); + RawKeyValueIterator iter = null; + Path tmpDir = new Path(reduceId.toString()); + try { + iter = Merger.merge(jobConf, rfs, + (Class<K>) jobConf.getMapOutputKeyClass(), + (Class<V>) jobConf.getMapOutputValueClass(), + codec, inputs.toArray(new Path[inputs.size()]), + true, ioSortFactor, tmpDir, + (RawComparator<K>) jobConf.getOutputKeyComparator(), + reporter, spilledRecordsCounter, null, + mergedMapOutputsCounter, null); + + Merger.writeFile(iter, writer, reporter, jobConf); + writer.close(); + } catch (IOException e) { + localFS.delete(outputPath, true); + throw e; + } + + closeOnDiskFile(outputPath); + + LOG.info(reduceId + + " Finished merging " + inputs.size() + + " map output files on disk of total-size " + + approxOutputSize + "." + + " Local output file is " + outputPath + " of size " + + localFS.getFileStatus(outputPath).getLen()); + } + } + + private void combineAndSpill( + RawKeyValueIterator kvIter, + Counters.Counter inCounter) throws IOException { + JobConf job = jobConf; + Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); + Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); + Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); + RawComparator<K> comparator = + (RawComparator<K>)job.getOutputKeyComparator(); + try { + CombineValuesIterator values = new CombineValuesIterator( + kvIter, comparator, keyClass, valClass, job, Reporter.NULL, + inCounter); + while (values.more()) { + combiner.reduce(values.getKey(), values, combineCollector, + Reporter.NULL); + values.nextKey(); + } + } finally { + combiner.close(); + } + } + + private long createInMemorySegments(List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, + List<Segment<K, V>> inMemorySegments, + long leaveBytes + ) throws IOException { + long totalSize = 0L; + // We could use fullSize could come from the RamManager, but files can be + // closed but not yet present in inMemoryMapOutputs + long fullSize = 0L; + for (InMemoryMapOutput<K,V> mo : inMemoryMapOutputs) { + fullSize += mo.getMemory().length; + } + while(fullSize > leaveBytes) { + InMemoryMapOutput<K,V> mo = inMemoryMapOutputs.remove(0); + byte[] data = mo.getMemory(); + long size = data.length; + totalSize += size; + fullSize -= size; + Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this, + mo.getMapId(), + data, 0, (int)size); + inMemorySegments.add(new Segment<K,V>(reader, true, + (mo.isPrimaryMapOutput() ? + mergedMapOutputsCounter : null))); + } + return totalSize; + } + + class RawKVIteratorReader extends IFile.Reader<K,V> { + + private final RawKeyValueIterator kvIter; + + public RawKVIteratorReader(RawKeyValueIterator kvIter, long size) + throws IOException { + super(null, null, size, null, spilledRecordsCounter); + this.kvIter = kvIter; + } + public boolean nextRawKey(DataInputBuffer key) throws IOException { + if (kvIter.next()) { + final DataInputBuffer kb = kvIter.getKey(); + final int kp = kb.getPosition(); + final int klen = kb.getLength() - kp; + key.reset(kb.getData(), kp, klen); + bytesRead += klen; + return true; + } + return false; + } + public void nextRawValue(DataInputBuffer value) throws IOException { + final DataInputBuffer vb = kvIter.getValue(); + final int vp = vb.getPosition(); + final int vlen = vb.getLength() - vp; + value.reset(vb.getData(), vp, vlen); + bytesRead += vlen; + } + public long getPosition() throws IOException { + return bytesRead; + } + + public void close() throws IOException { + kvIter.close(); + } + } + + private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, + List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, + List<Path> onDiskMapOutputs + ) throws IOException { + LOG.info("finalMerge called with " + + inMemoryMapOutputs.size() + " in-memory map-outputs and " + + onDiskMapOutputs.size() + " on-disk map-outputs"); + + final float maxRedPer = + job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); + if (maxRedPer > 1.0 || maxRedPer < 0.0) { + throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + + maxRedPer); + } + int maxInMemReduce = (int)Math.min( + Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE); + + + // merge config params + Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); + Class<V> valueClass = (Class<V>)job.getMapOutputValueClass(); + boolean keepInputs = job.getKeepFailedTaskFiles(); + final Path tmpDir = new Path(reduceId.toString()); + final RawComparator<K> comparator = + (RawComparator<K>)job.getOutputKeyComparator(); + + // segments required to vacate memory + List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>(); + long inMemToDiskBytes = 0; + boolean mergePhaseFinished = false; + if (inMemoryMapOutputs.size() > 0) { + TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID(); + inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, + memDiskSegments, + maxInMemReduce); + final int numMemDiskSegments = memDiskSegments.size(); + if (numMemDiskSegments > 0 && + ioSortFactor > onDiskMapOutputs.size()) { + + // If we reach here, it implies that we have less than io.sort.factor + // disk segments and this will be incremented by 1 (result of the + // memory segments merge). Since this total would still be + // <= io.sort.factor, we will not do any more intermediate merges, + // the merge of all these disk segments would be directly fed to the + // reduce method + + mergePhaseFinished = true; + // must spill to disk, but can't retain in-mem for intermediate merge + final Path outputPath = + mapOutputFile.getInputFileForWrite(mapId, + inMemToDiskBytes).suffix( + Task.MERGED_OUTPUT_PREFIX); + final RawKeyValueIterator rIter = Merger.merge(job, fs, + keyClass, valueClass, memDiskSegments, numMemDiskSegments, + tmpDir, comparator, reporter, spilledRecordsCounter, null, + mergePhase); + final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, + keyClass, valueClass, codec, null); + try { + Merger.writeFile(rIter, writer, reporter, job); + // add to list of final disk outputs. + onDiskMapOutputs.add(outputPath); + } catch (IOException e) { + if (null != outputPath) { + try { + fs.delete(outputPath, true); + } catch (IOException ie) { + // NOTHING + } + } + throw e; + } finally { + if (null != writer) { + writer.close(); + } + } + LOG.info("Merged " + numMemDiskSegments + " segments, " + + inMemToDiskBytes + " bytes to disk to satisfy " + + "reduce memory limit"); + inMemToDiskBytes = 0; + memDiskSegments.clear(); + } else if (inMemToDiskBytes != 0) { + LOG.info("Keeping " + numMemDiskSegments + " segments, " + + inMemToDiskBytes + " bytes in memory for " + + "intermediate, on-disk merge"); + } + } + + // segments on disk + List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>(); + long onDiskBytes = inMemToDiskBytes; + Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]); + for (Path file : onDisk) { + onDiskBytes += fs.getFileStatus(file).getLen(); + LOG.debug("Disk file: " + file + " Length is " + + fs.getFileStatus(file).getLen()); + diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs, + (file.toString().endsWith( + Task.MERGED_OUTPUT_PREFIX) ? + null : mergedMapOutputsCounter) + )); + } + LOG.info("Merging " + onDisk.length + " files, " + + onDiskBytes + " bytes from disk"); + Collections.sort(diskSegments, new Comparator<Segment<K,V>>() { + public int compare(Segment<K, V> o1, Segment<K, V> o2) { + if (o1.getLength() == o2.getLength()) { + return 0; + } + return o1.getLength() < o2.getLength() ? -1 : 1; + } + }); + + // build final list of segments from merged backed by disk + in-mem + List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>(); + long inMemBytes = createInMemorySegments(inMemoryMapOutputs, + finalSegments, 0); + LOG.info("Merging " + finalSegments.size() + " segments, " + + inMemBytes + " bytes from memory into reduce"); + if (0 != onDiskBytes) { + final int numInMemSegments = memDiskSegments.size(); + diskSegments.addAll(0, memDiskSegments); + memDiskSegments.clear(); + // Pass mergePhase only if there is a going to be intermediate + // merges. See comment where mergePhaseFinished is being set + Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; + RawKeyValueIterator diskMerge = Merger.merge( + job, fs, keyClass, valueClass, diskSegments, + ioSortFactor, numInMemSegments, tmpDir, comparator, + reporter, false, spilledRecordsCounter, null, thisPhase); + diskSegments.clear(); + if (0 == finalSegments.size()) { + return diskMerge; + } + finalSegments.add(new Segment<K,V>( + new RawKVIteratorReader(diskMerge, onDiskBytes), true)); + } + return Merger.merge(job, fs, keyClass, valueClass, + finalSegments, finalSegments.size(), tmpDir, + comparator, reporter, spilledRecordsCounter, null, + null); + + } +}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Tue Jan 22 14:10:42 2013 @@ -34,12 +34,12 @@ abstract class MergeThread<T,K,V> extend private AtomicInteger numPending = new AtomicInteger(0); private LinkedList<List<T>> pendingToBeMerged; - protected final MergeManager<K,V> manager; + protected final MergeManagerImpl<K,V> manager; private final ExceptionReporter reporter; private boolean closed = false; private final int mergeFactor; - public MergeThread(MergeManager<K,V> manager, int mergeFactor, + public MergeThread(MergeManagerImpl<K,V> manager, int mergeFactor, ExceptionReporter reporter) { this.pendingToBeMerged = new LinkedList<List<T>>(); this.manager = manager; Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1436936&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Tue Jan 22 14:10:42 2013 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.io.IOUtils; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.MapOutputFile; + +import org.apache.hadoop.mapreduce.TaskAttemptID; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class OnDiskMapOutput<K, V> extends MapOutput<K, V> { + private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class); + private final FileSystem localFS; + private final Path tmpOutputPath; + private final Path outputPath; + private final MergeManagerImpl<K, V> merger; + private final OutputStream disk; + + public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, + MergeManagerImpl<K, V> merger, long size, + JobConf conf, + MapOutputFile mapOutputFile, + int fetcher, boolean primaryMapOutput) + throws IOException { + super(mapId, size, primaryMapOutput); + this.merger = merger; + this.localFS = FileSystem.getLocal(conf); + outputPath = + mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size); + tmpOutputPath = outputPath.suffix(String.valueOf(fetcher)); + + disk = localFS.create(tmpOutputPath); + + } + + @Override + public void shuffle(MapHost host, InputStream input, + long compressedLength, long decompressedLength, + ShuffleClientMetrics metrics, + Reporter reporter) throws IOException { + // Copy data to local-disk + long bytesLeft = compressedLength; + try { + final int BYTES_TO_READ = 64 * 1024; + byte[] buf = new byte[BYTES_TO_READ]; + while (bytesLeft > 0) { + int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + if (n < 0) { + throw new IOException("read past end of stream reading " + + getMapId()); + } + disk.write(buf, 0, n); + bytesLeft -= n; + metrics.inputBytes(n); + reporter.progress(); + } + + LOG.info("Read " + (compressedLength - bytesLeft) + + " bytes from map-output for " + getMapId()); + + disk.close(); + } catch (IOException ioe) { + // Close the streams + IOUtils.cleanup(LOG, input, disk); + + // Re-throw + throw ioe; + } + + // Sanity check + if (bytesLeft != 0) { + throw new IOException("Incomplete map output received for " + + getMapId() + " from " + + host.getHostName() + " (" + + bytesLeft + " bytes missing of " + + compressedLength + ")"); + } + } + + @Override + public void commit() throws IOException { + localFS.rename(tmpOutputPath, outputPath); + merger.closeOnDiskFile(outputPath); + } + + @Override + public void abort() { + try { + localFS.delete(tmpOutputPath, false); + } catch (IOException ie) { + LOG.info("failure to clean up " + tmpOutputPath, ie); + } + } + + @Override + public String getDescription() { + return "DISK"; + } +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Jan 22 14:10:42 2013 @@ -21,17 +21,10 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.RawKeyValueIterator; -import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Task; -import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.ShuffleConsumerPlugin; @@ -77,17 +70,21 @@ public class Shuffle<K, V> implements Sh this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); - scheduler = - new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase, - context.getShuffledMapsCounter(), - context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); - merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(), - context.getLocalDirAllocator(), reporter, context.getCodec(), - context.getCombinerClass(), context.getCombineCollector(), - context.getSpilledRecordsCounter(), - context.getReduceCombineInputCounter(), - context.getMergedMapOutputsCounter(), - this, context.getMergePhase(), context.getMapOutputFile()); + scheduler = new ShuffleScheduler<K,V>(jobConf, taskStatus, this, + copyPhase, context.getShuffledMapsCounter(), + context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); + merger = createMergeManager(context); + } + + protected MergeManager<K, V> createMergeManager( + ShuffleConsumerPlugin.Context context) { + return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(), + context.getLocalDirAllocator(), reporter, context.getCodec(), + context.getCombinerClass(), context.getCombineCollector(), + context.getSpilledRecordsCounter(), + context.getReduceCombineInputCounter(), + context.getMergedMapOutputsCounter(), this, context.getMergePhase(), + context.getMapOutputFile()); } @Override Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Tue Jan 22 14:10:42 2013 @@ -53,7 +53,7 @@ public class TestFetcher { private HttpURLConnection connection; public FakeFetcher(JobConf job, TaskAttemptID reduceId, - ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter, + ShuffleScheduler<K,V> scheduler, MergeManagerImpl<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret, HttpURLConnection connection) { super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, @@ -77,7 +77,7 @@ public class TestFetcher { JobConf job = new JobConf(); TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class); - MergeManager<Text, Text> mm = mock(MergeManager.class); + MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class); Reporter r = mock(Reporter.class); ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); ExceptionReporter except = mock(ExceptionReporter.class); @@ -132,7 +132,7 @@ public class TestFetcher { JobConf job = new JobConf(); TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class); - MergeManager<Text, Text> mm = mock(MergeManager.class); + MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class); Reporter r = mock(Reporter.class); ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); ExceptionReporter except = mock(ExceptionReporter.class); @@ -167,10 +167,9 @@ public class TestFetcher { header.write(new DataOutputStream(bout)); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); when(connection.getInputStream()).thenReturn(in); - //Defaults to WAIT, which is what we want to test - MapOutput<Text,Text> mapOut = new MapOutput<Text, Text>(map1ID); + //Defaults to null, which is what we want to test when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) - .thenReturn(mapOut); + .thenReturn(null); underTest.copyFromHost(host); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Tue Jan 22 14:10:42 2013 @@ -32,13 +32,13 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; import org.junit.Assert; import org.junit.Test; public class TestMergeManager { @Test(timeout=10000) + @SuppressWarnings("unchecked") public void testMemoryMerge() throws Exception { final int TOTAL_MEM_BYTES = 10000; final int OUTPUT_SIZE = 7950; @@ -55,45 +55,47 @@ public class TestMergeManager { // reserve enough map output to cause a merge when it is committed MapOutput<Text, Text> out1 = mgr.reserve(null, OUTPUT_SIZE, 0); - Assert.assertEquals("Should be a memory merge", - Type.MEMORY, out1.getType()); - fillOutput(out1); + Assert.assertTrue("Should be a memory merge", + (out1 instanceof InMemoryMapOutput)); + InMemoryMapOutput<Text, Text> mout1 = (InMemoryMapOutput<Text, Text>)out1; + fillOutput(mout1); MapOutput<Text, Text> out2 = mgr.reserve(null, OUTPUT_SIZE, 0); - Assert.assertEquals("Should be a memory merge", - Type.MEMORY, out2.getType()); - fillOutput(out2); + Assert.assertTrue("Should be a memory merge", + (out2 instanceof InMemoryMapOutput)); + InMemoryMapOutput<Text, Text> mout2 = (InMemoryMapOutput<Text, Text>)out2; + fillOutput(mout2); // next reservation should be a WAIT MapOutput<Text, Text> out3 = mgr.reserve(null, OUTPUT_SIZE, 0); - Assert.assertEquals("Should be told to wait", - Type.WAIT, out3.getType()); + Assert.assertEquals("Should be told to wait", null, out3); // trigger the first merge and wait for merge thread to start merging // and free enough output to reserve more - out1.commit(); - out2.commit(); + mout1.commit(); + mout2.commit(); mergeStart.await(); Assert.assertEquals(1, mgr.getNumMerges()); // reserve enough map output to cause another merge when committed out1 = mgr.reserve(null, OUTPUT_SIZE, 0); - Assert.assertEquals("Should be a memory merge", - Type.MEMORY, out1.getType()); - fillOutput(out1); + Assert.assertTrue("Should be a memory merge", + (out1 instanceof InMemoryMapOutput)); + mout1 = (InMemoryMapOutput<Text, Text>)out1; + fillOutput(mout1); out2 = mgr.reserve(null, OUTPUT_SIZE, 0); - Assert.assertEquals("Should be a memory merge", - Type.MEMORY, out2.getType()); - fillOutput(out2); + Assert.assertTrue("Should be a memory merge", + (out2 instanceof InMemoryMapOutput)); + mout2 = (InMemoryMapOutput<Text, Text>)out2; + fillOutput(mout2); - // next reservation should be a WAIT + // next reservation should be null out3 = mgr.reserve(null, OUTPUT_SIZE, 0); - Assert.assertEquals("Should be told to wait", - Type.WAIT, out3.getType()); + Assert.assertEquals("Should be told to wait", null, out3); // commit output *before* merge thread completes - out1.commit(); - out2.commit(); + mout1.commit(); + mout2.commit(); // allow the first merge to complete mergeComplete.await(); @@ -110,7 +112,7 @@ public class TestMergeManager { 0, reporter.getNumExceptions()); } - private void fillOutput(MapOutput<Text, Text> output) throws IOException { + private void fillOutput(InMemoryMapOutput<Text, Text> output) throws IOException { BoundedByteArrayOutputStream stream = output.getArrayStream(); int count = stream.getLimit(); for (int i=0; i < count; ++i) { @@ -118,7 +120,7 @@ public class TestMergeManager { } } - private static class StubbedMergeManager extends MergeManager<Text, Text> { + private static class StubbedMergeManager extends MergeManagerImpl<Text, Text> { private TestMergeThread mergeThread; public StubbedMergeManager(JobConf conf, ExceptionReporter reporter, @@ -129,7 +131,7 @@ public class TestMergeManager { } @Override - protected MergeThread<MapOutput<Text, Text>, Text, Text> createInMemoryMerger() { + protected MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> createInMemoryMerger() { mergeThread = new TestMergeThread(this, getExceptionReporter()); return mergeThread; } @@ -140,12 +142,12 @@ public class TestMergeManager { } private static class TestMergeThread - extends MergeThread<MapOutput<Text,Text>, Text, Text> { + extends MergeThread<InMemoryMapOutput<Text,Text>, Text, Text> { private AtomicInteger numMerges; private CyclicBarrier mergeStart; private CyclicBarrier mergeComplete; - public TestMergeThread(MergeManager<Text, Text> mergeManager, + public TestMergeThread(MergeManagerImpl<Text, Text> mergeManager, ExceptionReporter reporter) { super(mergeManager, Integer.MAX_VALUE, reporter); numMerges = new AtomicInteger(0); @@ -162,11 +164,11 @@ public class TestMergeManager { } @Override - public void merge(List<MapOutput<Text, Text>> inputs) + public void merge(List<InMemoryMapOutput<Text, Text>> inputs) throws IOException { synchronized (this) { numMerges.incrementAndGet(); - for (MapOutput<Text, Text> input : inputs) { + for (InMemoryMapOutput<Text, Text> input : inputs) { manager.unreserve(input.getSize()); } }