[jira] [Commented] (MAPREDUCE-5605) Memory-centric MapReduce aiming to solve the I/O bottleneck
[ https://issues.apache.org/jira/browse/MAPREDUCE-5605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13817207#comment-13817207 ] Lijie Xu commented on MAPREDUCE-5605: - How to implement MapReduce-like framework effectively and efficiently is really a problem. I just want to know how you deal with the following issues that are related to the in-memory processing framework. Multi-threads vs. Multi-processes. Hadoop uses multi-processes to implement tasks, while Apache Spark uses multi-threads to implement them. Spark chooses multi-threads because it wants to share data between tasks/jobs but sharing data between processes (JVMs) is not efficient. Based on the architecture of this proposal, it seems that you want to congregate the intermediate data of several mappers and reducers together in memory. So that more controls can be done to optimize I/O. However, the concrete dataflow is not given, so I want to know if there is data sharing between tasks/jobs and how large the shared data will be. Fault-tolerance: Compared with multi-threads, multi-processes policy has its advantages: easy to manage and easy to guarantee fault-tolerance. Hadoop is disk-based and process-based, so failure of a mapper/reducer can be easily handled by rerunning the lost task on an appropriate node. If it is changed to memory-based, the safety of intermediate data (e.g., outputs of a mapper) is not easy to guarantee. Furthermore, the crashes of threads or JVM itself should be paid attention. Idempotent: This term means that rerunning any task will not affect the final result. Putting the input/output/intermediate data of several tasks together needs special management to keep this feature. Trade-off of memory usage and disk usage: Since memory is limited and data is huge, we still need disk to store/swap some data. So when/how to swap the overcrowded in-memory data onto disk is an important issue and related to the performance. > Memory-centric MapReduce aiming to solve the I/O bottleneck > --- > > Key: MAPREDUCE-5605 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5605 > Project: Hadoop Map/Reduce > Issue Type: Improvement >Affects Versions: 1.0.1 > Environment: x86-64 Linux/Unix > 64-bit jdk7 preferred >Reporter: Ming Chen >Assignee: Ming Chen > Fix For: 1.0.1 > > Attachments: MAPREDUCE-5605-v1.patch, > hadoop-core-1.0.1-mammoth-0.9.0.jar > > > Memory is a very important resource to bridge the gap between CPUs and I/O > devices. So the idea is to maximize the usage of memory to solve the problem > of I/O bottleneck. We developed a multi-threaded task execution engine, which > runs in a single JVM on a node. In the execution engine, we have implemented > the algorithm of memory scheduling to realize global memory management, based > on which we further developed the techniques such as sequential disk > accessing, multi-cache and solved the problem of full garbage collection in > the JVM. The benchmark results shows that it can get impressive improvement > in typical cases. When the a system is relatively short of memory (eg, HPC, > small- and medium-size enterprises), the improvement will be even more > impressive. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (MAPREDUCE-5605) Memory-centric MapReduce aiming to solve the I/O bottleneck
[ https://issues.apache.org/jira/browse/MAPREDUCE-5605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13814504#comment-13814504 ] Lijie Xu commented on MAPREDUCE-5605: - Nice job. Ming, is there a document to detail the design and implementation of this issue. I'm wondering if there are some performance or safety problems in this issue. > Memory-centric MapReduce aiming to solve the I/O bottleneck > --- > > Key: MAPREDUCE-5605 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5605 > Project: Hadoop Map/Reduce > Issue Type: Improvement >Affects Versions: 1.0.1 > Environment: x86-64 Linux/Unix > 64-bit jdk7 preferred >Reporter: Ming Chen >Assignee: Ming Chen > Fix For: 1.0.1 > > Attachments: MAPREDUCE-5605-v1.patch, > hadoop-core-1.0.1-mammoth-0.9.0.jar > > > Memory is a very important resource to bridge the gap between CPUs and I/O > devices. So the idea is to maximize the usage of memory to solve the problem > of I/O bottleneck. We developed a multi-threaded task execution engine, which > runs in a single JVM on a node. In the execution engine, we have implemented > the algorithm of memory scheduling to realize global memory management, based > on which we further developed the techniques such as sequential disk > accessing, multi-cache and solved the problem of full garbage collection in > the JVM. The benchmark results shows that it can get impressive improvement > in typical cases. When the a system is relatively short of memory (eg, HPC, > small- and medium-size enterprises), the improvement will be even more > impressive. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (MAPREDUCE-5494) Hash-based MapReduce
[ https://issues.apache.org/jira/browse/MAPREDUCE-5494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13761624#comment-13761624 ] Lijie Xu commented on MAPREDUCE-5494: - Cool, Hadoop's sort-based and disk-based MapReduce framework should be dramatically improved to better job's performance. As for this hash-based MapReduce, Apache Spark uses a similar hash-based GroupBy-Aggregate approach. Compared with Spark's implementation, this proposal may have some problems. 1) At map side, the only problem is that Combine() cannot be done easily. For jobs with Combine(), Spark uses a hashmap to collect the pairs emitted by map() directly without a buffer. Combine() can be done when the pairs are put into the hashmap one by one. But it requires that the semantic meaning of this Combine() is similar with fold() in functional language. After map(), hashmap outputs all the into different buffers (one for each reducer) according to each pair's partition id. Then, the are reserved in the buffers or spilled onto disk for further shuffling. The defect of Spark's approach is that hashmap is memory-consuming. So it is a trade-off between performance and fault-tolerance. For jobs without Combine(), Spark uses your proposed method. 2) At reduce side, firstly, I want to argue the sort-based MapReduce in current Hadoop versions. Why should we shuffle all the map outputs into a reducer before doing reduce()? Since all the pairs in remote map outputs are sorted, reduce() can perform as same as Merge() on the fly. Maybe the reason is that it is not easy to iterating pairs when map outputs are compressed. Secondly, return to the proposed approach. "the hash shuffle will fetch its own partitions from maps as usually" is not very clear. Maybe you want to do shuffle in this way: Firstly, copying remote map outputs and store the shuffled segments into memory as usual. When a threshold is achieved, you iterate every record in every in-memory segment. During iterating, you choose a pretty good hash function to partition each record and store it onto related disk files. After all the records are stored onto disk, reduce() begins to load the partition file into hashmap. After all the records are cached into hashmap, Reduce() begins to perform on each >. The secondary level partition guarantees the hashmap can be kept in memory. I think this approach is applicable. However, how to choose the secondary partition function is tricky and Combine() is discarded too. Another performance issue is that this memory-to-disk and disk-to-memory back-and-forth copy is not efficient. Spark uses a large hashmap to do "shuffle and combine()" simultaneously as descried in mapper's combine(), but it requires memory is enough to hold the hashmap. We can enlarge the reduce number to lower down this requirement. Above all, if we only change sort-based to hash-based, the job's performance may not be promoted. A better way is to combine the hash-based and memory-based approaches together as Spark does. We can use memory aggressively (e.g., use large hashmap instead of back-and-forth copy). If there is a memory error, we can change the job's running style back into sort-based. > Hash-based MapReduce > > > Key: MAPREDUCE-5494 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5494 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: mrv2 >Affects Versions: trunk >Reporter: Jerry Chen >Assignee: Jerry Chen > Original Estimate: 1,344h > Remaining Estimate: 1,344h > > To support parallel processing, the MapReduce computation model essentially > implements group data by key, then apply the reduce function to each group. > The currently implementation of MapReduce framework uses sort-merge to > guarantee the computation model. While “sort-merge” is relatively expensive > when only grouping is needed. And this is what hash capable to do. > > We propose to implement Hash-based MapReduce which utilizes hash to guarantee > the computation model instead of the sort merge. This technique will works as > following: > > 1. At map side, the hash map output collector will collect the map output > directly to the corresponding partition buffer (for each reduces) without > sorting (first level partition). Each partition buffer can be flushed to disk > if the buffer is full or close to full. To handling disk IO efficiently when > there are too many partitions (reduces), the map side can be optimized by > using a shared buffer for different partitions. Counting sort on partition > number can be performed when flushing the shared buffer. > 2. At reduce side, the hash shuffle will fetch its own partitions from maps > as usually. While fetching, the records will be further partiti
[jira] [Created] (MAPREDUCE-5482) Release kvoffsets and kvindices before going to merge phase
Lijie Xu created MAPREDUCE-5482: --- Summary: Release kvoffsets and kvindices before going to merge phase Key: MAPREDUCE-5482 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5482 Project: Hadoop Map/Reduce Issue Type: Bug Components: task Affects Versions: 1.2.0 Environment: Any Environment Reporter: Lijie Xu In org.apache.hadoop.mapred.MapTask.MapOutputBuffer.flush() method, we only released the kvbuffer before the merge phase (i.e., mergeParts()). Though kvindices and kvoffsets are small in many cases, we should release them theoretically before mergeParts(). try { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { throw (IOException)new IOException("Spill failed" ).initCause(e); } // release sort buffer before the merge kvbuffer = null; mergeParts(); Path outputPath = mapOutputFile.getOutputFile(); fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen()); } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (MAPREDUCE-5461) Let users be able to get latest Key in reduce()
[ https://issues.apache.org/jira/browse/MAPREDUCE-5461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated MAPREDUCE-5461: Description: Reducer generates for reduce(). In some cases such as SecondarySort, although current V and next V share the same K, their actual corresponding Ks are different. For example, in SecondarySort, map() outputs Key Value <1, 3>3 <1, 1>1 <2, 5>5 <1, 8>8 After partition by Key.getFirst(), sort and group by Key.getFirst(), reducer gets: Key Value --Group 1-- <1, 1>1 <1, 3>3 <1, 8>8 --Group 2-- <2, 5>5 reduce() receives: Key List <1, 1> List<1, 3, 8> <2, 5> List<5> When invoking V.next(), we can get next V (e.g, 3). But we do not have API to get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., <1,1>). If we let user be able to get latest key, SecondarySort does not need to emit value in map(). So that the network traffic is better. Another example is Join. If we can get latest Key, we do not need to put table label in both key and value. was: Reducer generates for reduce(). In some cases such as SecondarySort, although current V and next V share the same K, their actual corresponding Ks are different. For example, in SecondarySort, map() outputs Key Value <1, 3>3 <1, 1>1 <2, 5>5 <1, 8>8 After partition by Key.getFirst(), sort and group by Key.getFirst(), reducer gets: Key Value --Group 1-- <1, 1>1 <1, 3>3 <1, 8>8 --Group 2-- <2, 5>5 reduce() receives: Key List <1, 1> List<1, 3, 8> <2, 5> List<5> When invoking V.next(), we can get next V (e.g, 3). But we do not have API to get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., <1,1>). If we let user be able to get latest key, SecondarySort does not need to emit value in map(). So that the network traffic is better. Another example is Join. If we can get latest Key, we do need to put table label in both key and value. > Let users be able to get latest Key in reduce() > --- > > Key: MAPREDUCE-5461 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-5461 > Project: Hadoop Map/Reduce > Issue Type: Improvement > Components: task >Affects Versions: 1.2.1 > Environment: Any environment >Reporter: Lijie Xu > Labels: features > Original Estimate: 12h > Remaining Estimate: 12h > > Reducer generates for reduce(). In some cases such as > SecondarySort, although current V and next V share the same K, their actual > corresponding Ks are different. For example, in SecondarySort, map() outputs > Key Value > <1, 3>3 > <1, 1>1 > <2, 5>5 > <1, 8>8 > After partition by Key.getFirst(), sort and group by Key.getFirst(), > reducer gets: > Key Value > --Group 1-- > <1, 1>1 > <1, 3>3 > <1, 8>8 > --Group 2-- > <2, 5>5 > reduce() receives: > Key List > <1, 1> List<1, 3, 8> > <2, 5> List<5> > When invoking V.next(), we can get next V (e.g, 3). But we do not have API to > get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., > <1,1>). > If we let user be able to get latest key, SecondarySort does not need to emit > value in map(). So that the network traffic is better. > Another example is Join. If we can get latest Key, we do not need to put > table label in both key and value. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (MAPREDUCE-5461) Let users be able to get latest Key in reduce()
Lijie Xu created MAPREDUCE-5461: --- Summary: Let users be able to get latest Key in reduce() Key: MAPREDUCE-5461 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5461 Project: Hadoop Map/Reduce Issue Type: Improvement Components: task Affects Versions: 1.2.1 Environment: Any environment Reporter: Lijie Xu Reducer generates for reduce(). In some cases such as SecondarySort, although current V and next V share the same K, their actual corresponding Ks are different. For example, in SecondarySort, map() outputs Key Value <1, 3>3 <1, 1>1 <2, 5>5 <1, 8>8 After partition by Key.getFirst(), sort and group by Key.getFirst(), reducer gets: Key Value --Group 1-- <1, 1>1 <1, 3>3 <1, 8>8 --Group 2-- <2, 5>5 reduce() receives: Key List <1, 1> List<1, 3, 8> <2, 5> List<5> When invoking V.next(), we can get next V (e.g, 3). But we do not have API to get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., <1,1>). If we let user be able to get latest key, SecondarySort does not need to emit value in map(). So that the network traffic is better. Another example is Join. If we can get latest Key, we do need to put table label in both key and value. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (MAPREDUCE-4882) Error in estimating the length of the output file in Spill Phase
[ https://issues.apache.org/jira/browse/MAPREDUCE-4882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13563459#comment-13563459 ] Lijie Xu commented on MAPREDUCE-4882: - [~jerrychenhf] Thanks, I checked this patch and think it is correct. In fact, I had run many jobs under this change and found nothing abnormal. If I find more problems about this change, I will report. > Error in estimating the length of the output file in Spill Phase > > > Key: MAPREDUCE-4882 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-4882 > Project: Hadoop Map/Reduce > Issue Type: Bug >Affects Versions: 0.20.2, 1.0.3 > Environment: Any Environment >Reporter: Lijie Xu >Assignee: Jerry Chen > Labels: patch > Attachments: MAPREDUCE-4882.patch > > Original Estimate: 1h > Remaining Estimate: 1h > > The sortAndSpill() method in MapTask.java has an error in estimating the > length of the output file. > The "long size" should be "(bufvoid - bufstart) + bufend" not "(bufvoid - > bufend) + bufstart" when "bufend < bufstart". > Here is the original code in MapTask.java. > private void sortAndSpill() throws IOException, ClassNotFoundException, >InterruptedException { > //approximate the length of the output file to be the length of the > //buffer + header lengths for the partitions > long size = (bufend >= bufstart > ? bufend - bufstart > : (bufvoid - bufend) + bufstart) + > partitions * APPROX_HEADER_LENGTH; > FSDataOutputStream out = null; > -- > I had a test on "TeraSort". A snippet from mapper's log is as follows: > MapTask: Spilling map output: record full = true > MapTask: bufstart = 157286200; bufend = 10485460; bufvoid = 199229440 > MapTask: kvstart = 262142; kvend = 131069; length = 655360 > MapTask: Finished spill 3 > In this occasioin, Spill Bytes should be (199229440 - 157286200) + 10485460 = > 52428700 (52 MB) because the number of spilled records is 524287 and each > record costs 100B. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
[ https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13563455#comment-13563455 ] Lijie Xu commented on MAPREDUCE-4883: - [~jerrychenhf]Good job! Yes, maxSingleShuffleLimit should not be changed. In practice, maxSingleShuffleLimit boundary is rarely achieved. I have another question, here. I think "mapred.job.reduce.input.buffer.percent" is bizarre and hardly used. Is it possible to remove this parameter although it is always set to 0.0f. > Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM > -- > > Key: MAPREDUCE-4883 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883 > Project: Hadoop Map/Reduce > Issue Type: Improvement >Affects Versions: 0.20.2, 1.0.3 > Environment: Especially for 64bit JVM >Reporter: Lijie Xu >Assignee: Jerry Chen > Labels: patch > Attachments: MAPREDUCE-4883.patch > > Original Estimate: 12h > Remaining Estimate: 12h > > In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer > size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for > 32bit JVM. > But for 64bit JVM, although reducer's JVM size can be set more than 2048MB > (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle > buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * > maxInMemCopyUse". > So the pointed piece of code in ReduceTask.java needs modification for 64bit > JVM. > --- > private final long maxSize; > private final long maxSingleShuffleLimit; > > private long size = 0; > > private Object dataAvailable = new Object(); > private long fullSize = 0; > private int numPendingRequests = 0; > private int numRequiredMapOutputs = 0; > private int numClosed = 0; > private boolean closed = false; > > public ShuffleRamManager(Configuration conf) throws IOException { > final float maxInMemCopyUse = > conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f); > if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { > throw new IOException("mapred.job.shuffle.input.buffer.percent" + > maxInMemCopyUse); > } > // Allow unit tests to fix Runtime memory > --> maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes", > -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) > --> * maxInMemCopyUse); > maxSingleShuffleLimit = (long)(maxSize * > MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); > LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + > ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
[ https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13563455#comment-13563455 ] Lijie Xu commented on MAPREDUCE-4883: - [~jerrychenhf]Good job! Yes, maxSingleShuffleLimit should not be changed. In practice, maxSingleShuffleLimit boundary is rarely achieved. I have another question, here. I think "mapred.job.reduce.input.buffer.percent" is bizarre and hardly used. Is it possible to remove this parameter although it is always set to 0.0f. > Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM > -- > > Key: MAPREDUCE-4883 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883 > Project: Hadoop Map/Reduce > Issue Type: Improvement >Affects Versions: 0.20.2, 1.0.3 > Environment: Especially for 64bit JVM >Reporter: Lijie Xu >Assignee: Jerry Chen > Labels: patch > Attachments: MAPREDUCE-4883.patch > > Original Estimate: 12h > Remaining Estimate: 12h > > In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer > size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for > 32bit JVM. > But for 64bit JVM, although reducer's JVM size can be set more than 2048MB > (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle > buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * > maxInMemCopyUse". > So the pointed piece of code in ReduceTask.java needs modification for 64bit > JVM. > --- > private final long maxSize; > private final long maxSingleShuffleLimit; > > private long size = 0; > > private Object dataAvailable = new Object(); > private long fullSize = 0; > private int numPendingRequests = 0; > private int numRequiredMapOutputs = 0; > private int numClosed = 0; > private boolean closed = false; > > public ShuffleRamManager(Configuration conf) throws IOException { > final float maxInMemCopyUse = > conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f); > if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { > throw new IOException("mapred.job.shuffle.input.buffer.percent" + > maxInMemCopyUse); > } > // Allow unit tests to fix Runtime memory > --> maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes", > -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) > --> * maxInMemCopyUse); > maxSingleShuffleLimit = (long)(maxSize * > MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); > LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + > ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
[ https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13534529#comment-13534529 ] Lijie Xu commented on MAPREDUCE-4883: - Actually, when a segment(MapOutput) is copied from finished mappers, ReduceTask just creates a small byte[the length of this segment] to cache it. So there is not a large byte array (e.g., 2GB) is allocated directly, but plenty of small byte[]. Their total length is limited by maxSize(e.g, Integer.MAX_VALUE). I think it's possible to enlarge this limit. As Radim says, more byte[] may burden GC but reduce the in-memory merge counts in Shuffle Phase. Well, it is really a balance of space and efficiency. > Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM > -- > > Key: MAPREDUCE-4883 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883 > Project: Hadoop Map/Reduce > Issue Type: Improvement >Affects Versions: 0.20.2, 1.0.3 > Environment: Especially for 64bit JVM >Reporter: Lijie Xu > Labels: patch > Original Estimate: 12h > Remaining Estimate: 12h > > In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer > size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for > 32bit JVM. > But for 64bit JVM, although reducer's JVM size can be set more than 2048MB > (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle > buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * > maxInMemCopyUse". > So the pointed piece of code in ReduceTask.java needs modification for 64bit > JVM. > --- > private final long maxSize; > private final long maxSingleShuffleLimit; > > private long size = 0; > > private Object dataAvailable = new Object(); > private long fullSize = 0; > private int numPendingRequests = 0; > private int numRequiredMapOutputs = 0; > private int numClosed = 0; > private boolean closed = false; > > public ShuffleRamManager(Configuration conf) throws IOException { > final float maxInMemCopyUse = > conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f); > if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { > throw new IOException("mapred.job.shuffle.input.buffer.percent" + > maxInMemCopyUse); > } > // Allow unit tests to fix Runtime memory > --> maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes", > -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) > --> * maxInMemCopyUse); > maxSingleShuffleLimit = (long)(maxSize * > MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); > LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + > ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
[ https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13534505#comment-13534505 ] Lijie Xu commented on MAPREDUCE-4883: - [~tlipcon] Are there some other solutions? I think 2GB is a bit smaller for the "large" reducer. Can we use 2D array to break this limit? > Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM > -- > > Key: MAPREDUCE-4883 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883 > Project: Hadoop Map/Reduce > Issue Type: Improvement >Affects Versions: 0.20.2, 1.0.3 > Environment: Especially for 64bit JVM >Reporter: Lijie Xu > Labels: patch > Original Estimate: 12h > Remaining Estimate: 12h > > In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer > size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for > 32bit JVM. > But for 64bit JVM, although reducer's JVM size can be set more than 2048MB > (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle > buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * > maxInMemCopyUse". > So the pointed piece of code in ReduceTask.java needs modification for 64bit > JVM. > --- > private final long maxSize; > private final long maxSingleShuffleLimit; > > private long size = 0; > > private Object dataAvailable = new Object(); > private long fullSize = 0; > private int numPendingRequests = 0; > private int numRequiredMapOutputs = 0; > private int numClosed = 0; > private boolean closed = false; > > public ShuffleRamManager(Configuration conf) throws IOException { > final float maxInMemCopyUse = > conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f); > if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { > throw new IOException("mapred.job.shuffle.input.buffer.percent" + > maxInMemCopyUse); > } > // Allow unit tests to fix Runtime memory > --> maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes", > -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) > --> * maxInMemCopyUse); > maxSingleShuffleLimit = (long)(maxSize * > MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); > LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + > ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
Lijie Xu created MAPREDUCE-4883: --- Summary: Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM Key: MAPREDUCE-4883 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883 Project: Hadoop Map/Reduce Issue Type: Improvement Affects Versions: 1.0.3, 0.20.2 Environment: Especially for 64bit JVM Reporter: Lijie Xu In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for 32bit JVM. But for 64bit JVM, although reducer's JVM size can be set more than 2048MB (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * maxInMemCopyUse". So the pointed piece of code in ReduceTask.java needs modification for 64bit JVM. --- private final long maxSize; private final long maxSingleShuffleLimit; private long size = 0; private Object dataAvailable = new Object(); private long fullSize = 0; private int numPendingRequests = 0; private int numRequiredMapOutputs = 0; private int numClosed = 0; private boolean closed = false; public ShuffleRamManager(Configuration conf) throws IOException { final float maxInMemCopyUse = conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f); if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { throw new IOException("mapred.job.shuffle.input.buffer.percent" + maxInMemCopyUse); } // Allow unit tests to fix Runtime memory --> maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes", -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) --> * maxInMemCopyUse); maxSingleShuffleLimit = (long)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (MAPREDUCE-4882) Error in estimating the length of the output file in Spill Phase
Lijie Xu created MAPREDUCE-4882: --- Summary: Error in estimating the length of the output file in Spill Phase Key: MAPREDUCE-4882 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4882 Project: Hadoop Map/Reduce Issue Type: Bug Affects Versions: 1.0.3, 0.20.2 Environment: Any Environment Reporter: Lijie Xu The sortAndSpill() method in MapTask.java has an error in estimating the length of the output file. The "long size" should be "(bufvoid - bufstart) + bufend" not "(bufvoid - bufend) + bufstart" when "bufend < bufstart". Here is the original code in MapTask.java. private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; -- I had a test on "TeraSort". A snippet from mapper's log is as follows: MapTask: Spilling map output: record full = true MapTask: bufstart = 157286200; bufend = 10485460; bufvoid = 199229440 MapTask: kvstart = 262142; kvend = 131069; length = 655360 MapTask: Finished spill 3 In this occasioin, Spill Bytes should be (199229440 - 157286200) + 10485460 = 52428700 (52 MB) because the number of spilled records is 524287 and each record costs 100B. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira