[jira] [Commented] (MAPREDUCE-5605) Memory-centric MapReduce aiming to solve the I/O bottleneck

2013-11-08 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13817207#comment-13817207
 ] 

Lijie Xu commented on MAPREDUCE-5605:
-

How to implement MapReduce-like framework effectively and efficiently is really 
a problem. I just want to know how you deal with the following issues that are 
related to the in-memory processing framework. 

Multi-threads vs. Multi-processes. Hadoop uses multi-processes to implement 
tasks, while Apache Spark uses multi-threads to implement them. Spark chooses 
multi-threads because it wants to share data between tasks/jobs but sharing 
data between processes (JVMs) is not efficient. Based on the architecture of 
this proposal, it seems that you want to congregate the intermediate data of 
several mappers and reducers together in memory. So that more controls can be 
done to optimize I/O. However, the concrete dataflow is not given, so I want to 
know if there is data sharing between tasks/jobs and how large the shared data 
will be. 

Fault-tolerance: Compared with multi-threads, multi-processes policy has its 
advantages: easy to manage and easy to guarantee fault-tolerance. Hadoop is 
disk-based and process-based, so failure of a mapper/reducer can be easily 
handled by rerunning the lost task on an appropriate node. If it is changed to 
memory-based, the safety of intermediate data (e.g., outputs of a mapper) is 
not easy to guarantee. Furthermore, the crashes of threads or JVM itself should 
be paid attention.

Idempotent: This term means that rerunning any task will not affect the final 
result. Putting the input/output/intermediate data of several tasks together 
needs special management to keep this feature.

Trade-off of memory usage and disk usage: Since memory is limited and data is 
huge, we still need disk to store/swap some data. So when/how to swap the 
overcrowded in-memory data onto disk is an important issue and related to the 
performance.

> Memory-centric MapReduce aiming to solve the I/O bottleneck
> ---
>
> Key: MAPREDUCE-5605
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-5605
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
>Affects Versions: 1.0.1
> Environment: x86-64 Linux/Unix
> 64-bit jdk7 preferred
>Reporter: Ming Chen
>Assignee: Ming Chen
> Fix For: 1.0.1
>
> Attachments: MAPREDUCE-5605-v1.patch, 
> hadoop-core-1.0.1-mammoth-0.9.0.jar
>
>
> Memory is a very important resource to bridge the gap between CPUs and I/O 
> devices. So the idea is to maximize the usage of memory to solve the problem 
> of I/O bottleneck. We developed a multi-threaded task execution engine, which 
> runs in a single JVM on a node. In the execution engine, we have implemented 
> the algorithm of memory scheduling to realize global memory management, based 
> on which we further developed the techniques such as sequential disk 
> accessing, multi-cache and solved the problem of full garbage collection in 
> the JVM. The benchmark results shows that it can get impressive improvement 
> in typical cases. When the a system is relatively short of memory (eg, HPC, 
> small- and medium-size enterprises), the improvement will be even more 
> impressive.



--
This message was sent by Atlassian JIRA
(v6.1#6144)


[jira] [Commented] (MAPREDUCE-5605) Memory-centric MapReduce aiming to solve the I/O bottleneck

2013-11-05 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13814504#comment-13814504
 ] 

Lijie Xu commented on MAPREDUCE-5605:
-

Nice job. Ming, is there a document to detail the design and implementation of 
this issue. I'm wondering if there are some performance or safety problems in 
this issue.

> Memory-centric MapReduce aiming to solve the I/O bottleneck
> ---
>
> Key: MAPREDUCE-5605
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-5605
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
>Affects Versions: 1.0.1
> Environment: x86-64 Linux/Unix
> 64-bit jdk7 preferred
>Reporter: Ming Chen
>Assignee: Ming Chen
> Fix For: 1.0.1
>
> Attachments: MAPREDUCE-5605-v1.patch, 
> hadoop-core-1.0.1-mammoth-0.9.0.jar
>
>
> Memory is a very important resource to bridge the gap between CPUs and I/O 
> devices. So the idea is to maximize the usage of memory to solve the problem 
> of I/O bottleneck. We developed a multi-threaded task execution engine, which 
> runs in a single JVM on a node. In the execution engine, we have implemented 
> the algorithm of memory scheduling to realize global memory management, based 
> on which we further developed the techniques such as sequential disk 
> accessing, multi-cache and solved the problem of full garbage collection in 
> the JVM. The benchmark results shows that it can get impressive improvement 
> in typical cases. When the a system is relatively short of memory (eg, HPC, 
> small- and medium-size enterprises), the improvement will be even more 
> impressive.



--
This message was sent by Atlassian JIRA
(v6.1#6144)


[jira] [Commented] (MAPREDUCE-5494) Hash-based MapReduce

2013-09-08 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13761624#comment-13761624
 ] 

Lijie Xu commented on MAPREDUCE-5494:
-

Cool, Hadoop's sort-based and disk-based MapReduce framework should be 
dramatically improved to better job's performance.

As for this hash-based MapReduce, Apache Spark uses a similar hash-based 
GroupBy-Aggregate approach. Compared with Spark's implementation, this proposal 
may have some problems. 

1) At map side, the only problem is that Combine() cannot be done easily. For 
jobs with Combine(), Spark uses a hashmap to collect the  pairs emitted by 
map() directly without a buffer. Combine() can be done when the  pairs are 
put into the hashmap one by one. But it requires that the semantic meaning of 
this Combine() is similar with fold() in functional language. After map(), 
hashmap outputs all the  into different buffers (one for each reducer) 
according to each pair's partition id. Then, the  are reserved in the 
buffers or spilled onto disk for further shuffling. The defect of Spark's 
approach is that hashmap is memory-consuming. So it is a trade-off between 
performance and fault-tolerance. For jobs without Combine(), Spark uses your 
proposed method. 

2) At reduce side, firstly, I want to argue the sort-based MapReduce in current 
Hadoop versions. Why should we shuffle all the map outputs into a reducer 
before doing reduce()? Since all the  pairs in remote map outputs are 
sorted, reduce() can perform as same as Merge() on the fly. Maybe the reason is 
that it is not easy to iterating  pairs when map outputs are compressed.
   Secondly, return to the proposed approach. "the hash shuffle will fetch its 
own partitions from maps as usually" is not very clear. Maybe you want to do 
shuffle in this way: Firstly, copying remote map outputs and store the shuffled 
segments into memory as usual. When a threshold is achieved, you iterate every 
record in every in-memory segment. During iterating, you choose a pretty good 
hash function to partition each record and store it onto related disk files. 
After all the records are stored onto disk, reduce() begins to load the 
partition file into hashmap. After all the records are cached into hashmap, 
Reduce() begins to perform on each >. The secondary level 
partition guarantees the hashmap can be kept in memory.
   I think this approach is applicable. However, how to choose the secondary 
partition function is tricky and Combine() is discarded too. Another 
performance issue is that this memory-to-disk and disk-to-memory back-and-forth 
copy is not efficient. Spark uses a large hashmap to do "shuffle and combine()" 
simultaneously as descried in mapper's combine(), but it requires memory is 
enough to hold the hashmap. We can enlarge the reduce number to lower down this 
requirement.

Above all, if we only change sort-based to hash-based, the job's performance 
may not be promoted. A better way is to combine the hash-based and memory-based 
approaches together as Spark does. We can use memory aggressively (e.g., use 
large hashmap instead of back-and-forth copy). If there is a memory error, we 
can change the job's running style back into sort-based. 





> Hash-based MapReduce
> 
>
> Key: MAPREDUCE-5494
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-5494
> Project: Hadoop Map/Reduce
>  Issue Type: New Feature
>  Components: mrv2
>Affects Versions: trunk
>Reporter: Jerry Chen
>Assignee: Jerry Chen
>   Original Estimate: 1,344h
>  Remaining Estimate: 1,344h
>
> To support parallel processing, the MapReduce computation model essentially 
> implements group data by key, then apply the reduce function to each group. 
> The currently implementation of MapReduce framework uses sort-merge to 
> guarantee the computation model. While “sort-merge” is relatively expensive 
> when only grouping is needed. And this is what hash capable to do.
>  
> We propose to implement Hash-based MapReduce which utilizes hash to guarantee 
> the computation model instead of the sort merge. This technique will works as 
> following:
>  
> 1. At map side, the hash map output collector will collect the map output 
> directly to the corresponding partition buffer (for each reduces) without 
> sorting (first level partition). Each partition buffer can be flushed to disk 
> if the buffer is full or close to full. To handling disk IO efficiently when 
> there are too many partitions (reduces), the map side can be optimized by 
> using a shared buffer for different partitions. Counting sort on partition 
> number can be performed when flushing the shared buffer. 
> 2. At reduce side, the hash shuffle will fetch its own partitions from maps 
> as usually. While fetching, the records will be further partiti

[jira] [Created] (MAPREDUCE-5482) Release kvoffsets and kvindices before going to merge phase

2013-08-26 Thread Lijie Xu (JIRA)
Lijie Xu created MAPREDUCE-5482:
---

 Summary: Release kvoffsets and kvindices before going to merge 
phase
 Key: MAPREDUCE-5482
 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5482
 Project: Hadoop Map/Reduce
  Issue Type: Bug
  Components: task
Affects Versions: 1.2.0
 Environment: Any Environment
Reporter: Lijie Xu


In org.apache.hadoop.mapred.MapTask.MapOutputBuffer.flush() method, we only 
released the kvbuffer before the merge phase (i.e., mergeParts()). Though 
kvindices and kvoffsets are small in many cases, we should release them 
theoretically before mergeParts().
   
try {
spillThread.interrupt();
spillThread.join();
  } catch (InterruptedException e) {
throw (IOException)new IOException("Spill failed"
).initCause(e);
  }
  // release sort buffer before the merge
  kvbuffer = null;
  mergeParts();
  Path outputPath = mapOutputFile.getOutputFile();
  fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (MAPREDUCE-5461) Let users be able to get latest Key in reduce()

2013-08-15 Thread Lijie Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/MAPREDUCE-5461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Xu updated MAPREDUCE-5461:


Description: 
Reducer generates  for reduce(). In some cases such as 
SecondarySort, although current V and next V share the same K, their actual 
corresponding Ks are different. For example, in SecondarySort, map() outputs
Key Value
<1, 3>3
<1, 1>1
<2, 5>5
<1, 8>8

After partition by Key.getFirst(), sort and group by Key.getFirst(),
reducer gets:
Key Value
--Group 1--
<1, 1>1
<1, 3>3
<1, 8>8
--Group 2--
<2, 5>5

reduce() receives:

Key  List
<1, 1>   List<1, 3, 8>
<2, 5>   List<5>

When invoking V.next(), we can get next V (e.g, 3). But we do not have API to 
get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., 
<1,1>).

If we let user be able to get latest key, SecondarySort does not need to emit 
value in map(). So that the network traffic is better.

Another example is Join. If we can get latest Key, we do not need to put table 
label in both key and value.



  was:
Reducer generates  for reduce(). In some cases such as 
SecondarySort, although current V and next V share the same K, their actual 
corresponding Ks are different. For example, in SecondarySort, map() outputs
Key Value
<1, 3>3
<1, 1>1
<2, 5>5
<1, 8>8

After partition by Key.getFirst(), sort and group by Key.getFirst(),
reducer gets:
Key Value
--Group 1--
<1, 1>1
<1, 3>3
<1, 8>8
--Group 2--
<2, 5>5

reduce() receives:

Key  List
<1, 1>   List<1, 3, 8>
<2, 5>   List<5>

When invoking V.next(), we can get next V (e.g, 3). But we do not have API to 
get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., 
<1,1>).

If we let user be able to get latest key, SecondarySort does not need to emit 
value in map(). So that the network traffic is better.

Another example is Join. If we can get latest Key, we do need to put table 
label in both key and value.




> Let users be able to get latest Key in reduce()
> ---
>
> Key: MAPREDUCE-5461
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-5461
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
>  Components: task
>Affects Versions: 1.2.1
> Environment: Any environment
>Reporter: Lijie Xu
>  Labels: features
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Reducer generates  for reduce(). In some cases such as 
> SecondarySort, although current V and next V share the same K, their actual 
> corresponding Ks are different. For example, in SecondarySort, map() outputs
> Key Value
> <1, 3>3
> <1, 1>1
> <2, 5>5
> <1, 8>8
> After partition by Key.getFirst(), sort and group by Key.getFirst(),
> reducer gets:
> Key Value
> --Group 1--
> <1, 1>1
> <1, 3>3
> <1, 8>8
> --Group 2--
> <2, 5>5
> reduce() receives:
> Key  List
> <1, 1>   List<1, 3, 8>
> <2, 5>   List<5>
> When invoking V.next(), we can get next V (e.g, 3). But we do not have API to 
> get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., 
> <1,1>).
> If we let user be able to get latest key, SecondarySort does not need to emit 
> value in map(). So that the network traffic is better.
> Another example is Join. If we can get latest Key, we do not need to put 
> table label in both key and value.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (MAPREDUCE-5461) Let users be able to get latest Key in reduce()

2013-08-15 Thread Lijie Xu (JIRA)
Lijie Xu created MAPREDUCE-5461:
---

 Summary: Let users be able to get latest Key in reduce()
 Key: MAPREDUCE-5461
 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5461
 Project: Hadoop Map/Reduce
  Issue Type: Improvement
  Components: task
Affects Versions: 1.2.1
 Environment: Any environment
Reporter: Lijie Xu


Reducer generates  for reduce(). In some cases such as 
SecondarySort, although current V and next V share the same K, their actual 
corresponding Ks are different. For example, in SecondarySort, map() outputs
Key Value
<1, 3>3
<1, 1>1
<2, 5>5
<1, 8>8

After partition by Key.getFirst(), sort and group by Key.getFirst(),
reducer gets:
Key Value
--Group 1--
<1, 1>1
<1, 3>3
<1, 8>8
--Group 2--
<2, 5>5

reduce() receives:

Key  List
<1, 1>   List<1, 3, 8>
<2, 5>   List<5>

When invoking V.next(), we can get next V (e.g, 3). But we do not have API to 
get its corresponding Key (e.g, <1, 3>). We can only get the first Key (e.g., 
<1,1>).

If we let user be able to get latest key, SecondarySort does not need to emit 
value in map(). So that the network traffic is better.

Another example is Join. If we can get latest Key, we do need to put table 
label in both key and value.



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (MAPREDUCE-4882) Error in estimating the length of the output file in Spill Phase

2013-01-26 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-4882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13563459#comment-13563459
 ] 

Lijie Xu commented on MAPREDUCE-4882:
-

[~jerrychenhf]
Thanks, I checked this patch and think it is correct. In fact, I had run many 
jobs under this change and found nothing abnormal. If I find more problems 
about this change, I will report.

> Error in estimating the length of the output file in Spill Phase
> 
>
> Key: MAPREDUCE-4882
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-4882
> Project: Hadoop Map/Reduce
>  Issue Type: Bug
>Affects Versions: 0.20.2, 1.0.3
> Environment: Any Environment
>Reporter: Lijie Xu
>Assignee: Jerry Chen
>  Labels: patch
> Attachments: MAPREDUCE-4882.patch
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The sortAndSpill() method in MapTask.java has an error in estimating the 
> length of the output file. 
> The "long size" should be "(bufvoid - bufstart) + bufend" not "(bufvoid - 
> bufend) + bufstart" when "bufend < bufstart".
> Here is the original code in MapTask.java.
>  private void sortAndSpill() throws IOException, ClassNotFoundException,
>InterruptedException {
>   //approximate the length of the output file to be the length of the
>   //buffer + header lengths for the partitions
>   long size = (bufend >= bufstart
>   ? bufend - bufstart
>   : (bufvoid - bufend) + bufstart) +
>   partitions * APPROX_HEADER_LENGTH;
>   FSDataOutputStream out = null;
> --
> I had a test on "TeraSort". A snippet from mapper's log is as follows:
> MapTask: Spilling map output: record full = true
> MapTask: bufstart = 157286200; bufend = 10485460; bufvoid = 199229440
> MapTask: kvstart = 262142; kvend = 131069; length = 655360
> MapTask: Finished spill 3
> In this occasioin, Spill Bytes should be (199229440 - 157286200) + 10485460 = 
> 52428700 (52 MB) because the number of spilled records is 524287 and each 
> record costs 100B.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM

2013-01-26 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13563455#comment-13563455
 ] 

Lijie Xu commented on MAPREDUCE-4883:
-

[~jerrychenhf]Good job! Yes, maxSingleShuffleLimit should not be changed. In 
practice, maxSingleShuffleLimit boundary is rarely achieved. I have another 
question, here. I think "mapred.job.reduce.input.buffer.percent" is bizarre and 
hardly used. Is it possible to remove this parameter although it is always set 
to 0.0f.

> Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
> --
>
> Key: MAPREDUCE-4883
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
>Affects Versions: 0.20.2, 1.0.3
> Environment: Especially for 64bit JVM
>Reporter: Lijie Xu
>Assignee: Jerry Chen
>  Labels: patch
> Attachments: MAPREDUCE-4883.patch
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer 
> size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for 
> 32bit JVM.
> But for 64bit JVM, although reducer's JVM size can be set more than 2048MB 
> (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle 
> buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * 
> maxInMemCopyUse". 
> So the pointed piece of code in ReduceTask.java needs modification for 64bit 
> JVM.
> ---
>   private final long maxSize;
>   private final long maxSingleShuffleLimit;
>  
>   private long size = 0;
>  
>   private Object dataAvailable = new Object();
>   private long fullSize = 0;
>   private int numPendingRequests = 0;
>   private int numRequiredMapOutputs = 0;
>   private int numClosed = 0;
>   private boolean closed = false;
>  
>   public ShuffleRamManager(Configuration conf) throws IOException {
> final float maxInMemCopyUse =
>   conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
> if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
>   throw new IOException("mapred.job.shuffle.input.buffer.percent" +
> maxInMemCopyUse);
> }
> // Allow unit tests to fix Runtime memory
> -->   maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
> -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
> -->  * maxInMemCopyUse);
> maxSingleShuffleLimit = (long)(maxSize * 
> MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
> LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
>  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM

2013-01-26 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13563455#comment-13563455
 ] 

Lijie Xu commented on MAPREDUCE-4883:
-

[~jerrychenhf]Good job! Yes, maxSingleShuffleLimit should not be changed. In 
practice, maxSingleShuffleLimit boundary is rarely achieved. I have another 
question, here. I think "mapred.job.reduce.input.buffer.percent" is bizarre and 
hardly used. Is it possible to remove this parameter although it is always set 
to 0.0f.

> Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
> --
>
> Key: MAPREDUCE-4883
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
>Affects Versions: 0.20.2, 1.0.3
> Environment: Especially for 64bit JVM
>Reporter: Lijie Xu
>Assignee: Jerry Chen
>  Labels: patch
> Attachments: MAPREDUCE-4883.patch
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer 
> size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for 
> 32bit JVM.
> But for 64bit JVM, although reducer's JVM size can be set more than 2048MB 
> (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle 
> buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * 
> maxInMemCopyUse". 
> So the pointed piece of code in ReduceTask.java needs modification for 64bit 
> JVM.
> ---
>   private final long maxSize;
>   private final long maxSingleShuffleLimit;
>  
>   private long size = 0;
>  
>   private Object dataAvailable = new Object();
>   private long fullSize = 0;
>   private int numPendingRequests = 0;
>   private int numRequiredMapOutputs = 0;
>   private int numClosed = 0;
>   private boolean closed = false;
>  
>   public ShuffleRamManager(Configuration conf) throws IOException {
> final float maxInMemCopyUse =
>   conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
> if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
>   throw new IOException("mapred.job.shuffle.input.buffer.percent" +
> maxInMemCopyUse);
> }
> // Allow unit tests to fix Runtime memory
> -->   maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
> -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
> -->  * maxInMemCopyUse);
> maxSingleShuffleLimit = (long)(maxSize * 
> MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
> LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
>  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM

2012-12-17 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13534529#comment-13534529
 ] 

Lijie Xu commented on MAPREDUCE-4883:
-

Actually, when a segment(MapOutput) is copied from finished mappers, ReduceTask 
just creates a small byte[the length of this segment] to cache it. So there is 
not a large byte array (e.g., 2GB) is allocated directly, but plenty of small 
byte[]. Their total length is limited by maxSize(e.g, Integer.MAX_VALUE). I 
think it's possible to enlarge this limit. As Radim says, more byte[] may 
burden GC but reduce the in-memory merge counts in Shuffle Phase. Well, it is 
really a balance of space and efficiency.

> Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
> --
>
> Key: MAPREDUCE-4883
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
>Affects Versions: 0.20.2, 1.0.3
> Environment: Especially for 64bit JVM
>Reporter: Lijie Xu
>  Labels: patch
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer 
> size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for 
> 32bit JVM.
> But for 64bit JVM, although reducer's JVM size can be set more than 2048MB 
> (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle 
> buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * 
> maxInMemCopyUse". 
> So the pointed piece of code in ReduceTask.java needs modification for 64bit 
> JVM.
> ---
>   private final long maxSize;
>   private final long maxSingleShuffleLimit;
>  
>   private long size = 0;
>  
>   private Object dataAvailable = new Object();
>   private long fullSize = 0;
>   private int numPendingRequests = 0;
>   private int numRequiredMapOutputs = 0;
>   private int numClosed = 0;
>   private boolean closed = false;
>  
>   public ShuffleRamManager(Configuration conf) throws IOException {
> final float maxInMemCopyUse =
>   conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
> if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
>   throw new IOException("mapred.job.shuffle.input.buffer.percent" +
> maxInMemCopyUse);
> }
> // Allow unit tests to fix Runtime memory
> -->   maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
> -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
> -->  * maxInMemCopyUse);
> maxSingleShuffleLimit = (long)(maxSize * 
> MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
> LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
>  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM

2012-12-17 Thread Lijie Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13534505#comment-13534505
 ] 

Lijie Xu commented on MAPREDUCE-4883:
-

[~tlipcon] Are there some other solutions? I think 2GB is a bit smaller for the 
"large" reducer. Can we use 2D array to break this limit?

> Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM
> --
>
> Key: MAPREDUCE-4883
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
>Affects Versions: 0.20.2, 1.0.3
> Environment: Especially for 64bit JVM
>Reporter: Lijie Xu
>  Labels: patch
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer 
> size cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for 
> 32bit JVM.
> But for 64bit JVM, although reducer's JVM size can be set more than 2048MB 
> (e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle 
> buffer is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * 
> maxInMemCopyUse". 
> So the pointed piece of code in ReduceTask.java needs modification for 64bit 
> JVM.
> ---
>   private final long maxSize;
>   private final long maxSingleShuffleLimit;
>  
>   private long size = 0;
>  
>   private Object dataAvailable = new Object();
>   private long fullSize = 0;
>   private int numPendingRequests = 0;
>   private int numRequiredMapOutputs = 0;
>   private int numClosed = 0;
>   private boolean closed = false;
>  
>   public ShuffleRamManager(Configuration conf) throws IOException {
> final float maxInMemCopyUse =
>   conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
> if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
>   throw new IOException("mapred.job.shuffle.input.buffer.percent" +
> maxInMemCopyUse);
> }
> // Allow unit tests to fix Runtime memory
> -->   maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
> -->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
> -->  * maxInMemCopyUse);
> maxSingleShuffleLimit = (long)(maxSize * 
> MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
> LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
>  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (MAPREDUCE-4883) Reducer's Maximum Shuffle Buffer Size should be enlarged for 64bit JVM

2012-12-16 Thread Lijie Xu (JIRA)
Lijie Xu created MAPREDUCE-4883:
---

 Summary: Reducer's Maximum Shuffle Buffer Size should be enlarged 
for 64bit JVM
 Key: MAPREDUCE-4883
 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4883
 Project: Hadoop Map/Reduce
  Issue Type: Improvement
Affects Versions: 1.0.3, 0.20.2
 Environment: Especially for 64bit JVM
Reporter: Lijie Xu


In hadoop-0.20.2, hadoop-1.0.3 or other versions, reducer's shuffle buffer size 
cannot exceed 2048MB (i.e., Integer.MAX_VALUE). This is reasonable for 32bit 
JVM.
But for 64bit JVM, although reducer's JVM size can be set more than 2048MB 
(e.g., mapred.child.java.opts=-Xmx4000m), the heap size used for shuffle buffer 
is at most "2048MB * maxInMemCopyUse (default 0.7)" not "4000MB * 
maxInMemCopyUse". 

So the pointed piece of code in ReduceTask.java needs modification for 64bit 
JVM.
---
  private final long maxSize;
  private final long maxSingleShuffleLimit;
 
  private long size = 0;
 
  private Object dataAvailable = new Object();
  private long fullSize = 0;
  private int numPendingRequests = 0;
  private int numRequiredMapOutputs = 0;
  private int numClosed = 0;
  private boolean closed = false;
 
  public ShuffleRamManager(Configuration conf) throws IOException {
final float maxInMemCopyUse =
  conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
  throw new IOException("mapred.job.shuffle.input.buffer.percent" +
maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
-->   maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
-->(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-->  * maxInMemCopyUse);
maxSingleShuffleLimit = (long)(maxSize * 
MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
 ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
  }


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (MAPREDUCE-4882) Error in estimating the length of the output file in Spill Phase

2012-12-16 Thread Lijie Xu (JIRA)
Lijie Xu created MAPREDUCE-4882:
---

 Summary: Error in estimating the length of the output file in 
Spill Phase
 Key: MAPREDUCE-4882
 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4882
 Project: Hadoop Map/Reduce
  Issue Type: Bug
Affects Versions: 1.0.3, 0.20.2
 Environment: Any Environment
Reporter: Lijie Xu


The sortAndSpill() method in MapTask.java has an error in estimating the length 
of the output file. 
The "long size" should be "(bufvoid - bufstart) + bufend" not "(bufvoid - 
bufend) + bufstart" when "bufend < bufstart".

Here is the original code in MapTask.java.
 private void sortAndSpill() throws IOException, ClassNotFoundException,
   InterruptedException {
  //approximate the length of the output file to be the length of the
  //buffer + header lengths for the partitions
  long size = (bufend >= bufstart
  ? bufend - bufstart
  : (bufvoid - bufend) + bufstart) +
  partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
--
I had a test on "TeraSort". A snippet from mapper's log is as follows:

MapTask: Spilling map output: record full = true
MapTask: bufstart = 157286200; bufend = 10485460; bufvoid = 199229440
MapTask: kvstart = 262142; kvend = 131069; length = 655360
MapTask: Finished spill 3

In this occasioin, Spill Bytes should be (199229440 - 157286200) + 10485460 = 
52428700 (52 MB) because the number of spilled records is 524287 and each 
record costs 100B.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira