Hello everyone,
I am going thru source code of MapReduce. In MergeQueue.merge, I can only
see the SEGMENTS are combined and sorted by length into a list for merge.
However, I could not find the procedure to sort those (key, value) in
segments by key...
here is the function:
1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
2. .
3. .
4. .
5.
6. //if we have lesser number of segments remaining, then just
return the
7. //iterator, else do another single level merge
8. if (numSegments <= factor) {
9. // Reset totalBytesProcessed to track the progress of the
final merge.
10. // This is considered the progress of the reducePhase, the
3rd phase
11. // of reduce task. Currently totalBytesProcessed is not
used in sort
12. // phase of reduce task(i.e. when intermediate merges
happen).
13. totalBytesProcessed = startBytes;
14.
15. //calculate the length of the remaining segments. Required
for
16. //calculating the merge progress
17. long totalBytes = 0;
18. for (int i = 0; i < segmentsToMerge.size(); i++) {
19. totalBytes += segmentsToMerge.get(i).getLength();
20. }
21. if (totalBytes != 0) //being paranoid
22. progPerByte = 1.0f / (float)totalBytes;
23.
24. if (totalBytes != 0)
25. mergeProgress.set(totalBytesProcessed * progPerByte);
26. else
27. mergeProgress.set(1.0f); // Last pass and no segments
left - we're done
28.
29. LOG.info("Down to the last merge-pass, with " + numSegments
+
30. " segments left of total size: " + totalBytes + "
bytes");
31. return this;
32. } else {
33. LOG.info("Merging " + segmentsToMerge.size() +
34. " intermediate segments out of a total of " +
35. (segments.size()+segmentsToMerge.size()));
36.
37. //we want to spread the creation of temp files on multiple
disks if
38. //available under the space constraints
39. long approxOutputSize = 0;
40. for (Segment<K, V> s : segmentsToMerge) {
41. approxOutputSize += s.getLength() +
42.
ChecksumFileSystem.getApproxChkSumLength(
43. s.getLength());
44. }
45. Path tmpFilename =
46. new Path(tmpDir, "intermediate").suffix("." + passNo);
47.
48. Path outputFile = lDirAlloc.getLocalPathForWrite(
49. tmpFilename.toString(),
50. approxOutputSize,
conf);
51.
52. Writer<K, V> writer =
53. new Writer<K, V>(conf, fs, outputFile, keyClass,
valueClass, codec,
54. writesCounter);
55. *writeFile(this, writer, reporter, conf);*
56. writer.close();
57.
58. //we finished one single level merge; now clean up the
priority
59. //queue
60. this.close();
61.
62. // Add the newly create segment to the list of segments to
be merged
63. Segment<K, V> tempSegment =
64. new Segment<K, V>(conf, fs, outputFile, codec, false);
65. segments.add(tempSegment);
66. numSegments = segments.size();
67. Collections.sort(segments, segmentComparator);
68.
69. passNo++;
70. }
71. //we are worried about only the first pass merge factor. So
reset the
72. //factor to what it originally was
73. factor = origFactor;
74. } while(true);
75. }
I can see if number of segments is less than factor, segments are
returned(is this right?). Otherwise, factor number of segments will be
merged pass by pass. But how those <K,V> in different segments get sort in
order ?
Elton