[ 
https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binglin Chang updated MAPREDUCE-2841:
-------------------------------------

    Status: Patch Available  (was: Open)


I just submit a demo patch to show basic ideas and current progress. 
This patch contains map task optimization using NativeMapOutputCollector.

NativeMapOutputCollector use JNI to pass java k/v pairs and 
partition value to native side. As JNI has un-neglectable 
overhead(about 1000ns per empty JNI call wit 4-6 arguments), 
a k/v cache is added to pass k/v pairs to native side in batch. 
I suspect using a DirectBuffer will be better.

On the native side, k/v pairs are put into partitioned buffers,
this is different from java's single buffer approach. By doing so, 
sort can be much faster, because sort a big array is much slower
than sort many small arrays; small array also means less cache 
miss; and partition number does not needed to be compared 
in sort. 

Two light weighted io buffers: ReadBuffer & AppendBuffer, 
which has better performance than decorator based 
java & hadoop io streams. This greatly benefits IFile 
serialization, many method can be inlined. Currently 
compression is not supported yet, but it should not be a 
complex work to add block based compressor like snappy or 
quicklz.

This optimization is ONLY targeted for x86-64, little-endian
and assumes unaligned 64-bit loads and stores are cheap, 
like google-snappy.

About the patch:
I put cpp code into src/c++/libnativetask, and use a Makefile
separate from build.xml and src/native, because many things are 
not decided yet, so I don't want to mess up other components. 
# cd src/c++/libnativetask
# make
# copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/
# set mapred.native.map.output.collector to true in jobconf

Again, this patch is just a demo, it is far from stable & complete, 
and has many known and unknown bugs.


Here is some test results:
1. running single task
   intel corei5 jdk6u24 on macbook pro
   input: 
     size  250000000 bytes 
     100 bytes per line
     [key 9bytes]\t[value 89bytes]\n
   KeyValueTextInputFormat
   IdentityMapper
   partition number 100
   io.sort.mb 400MB no mid-spill
   Total time and analysis:
   || ||JAVA||Native||Speedup||
   |Sort|5.27|0.8|6.5|
   |IFile SER|2.8|0.9|3.1|
   |Total|14.13|6.34|2.2|

2. Some result about partitioned sort & spill
   Input is the same as test 1
   ||partition |   1  |  10  |  20  |  40  |  80  | 200  | 400  | 800  | 1600 | 
2000 |
   ||sort time | 2.21 | 1.47 | 1.27 | 1.11 | 0.9  | 0.71 | 0.65 | 0.58 | 0.51 | 
0.5  |
   ||spill time| 0.49 | 0.43 | 0.41 | 0.39 | 0.36 | 0.33 | 0.32 | 0.32 | 0.29 | 
0.28 |
   As is illustrated, partition number has great impact on sort & spill 
performance,
   This is largely because sort complexity and CPU cache effect.
   If partition number is P, recored count is N, we get:
   T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters.

3. Terasort 10G input 40map 40reduce on 9node cluster
   io.sort.mb 500MB
   Results on jobhistory:
   ||        ||   Total ||  AverageMap || AverageShuffle || AverageReduce
   ||java     |   54s   |  14s  |         14s   |        10s |
   ||native   |   39s   |   7s   |        15s    |       9s|
   ||java-lzo |   43s   |  15s      |     8s        |    8s|
   ||native-lzo|  ? | | | |
   speedup: 1.38
   Map output compression can reduce shuffle time significantly,
   so with better compression speed & less shuffle time impact,
   We can get better speedup when map output compression is finished. 
   I don't have large clusters to do more standard tests.
   

Current progress:
* NativeMapOutputCollector jni [done]
* io buffers [done]
* crc32/crc32c [done]
* integration of jobconfs [done]
* ifile writer [done]
* write ifile index file [done]
* ifile index reader [done]
* ifile reader [done]
* single-pass merge [done]
* handle big key/value
  In current implementation, if key/value length exceeds
  buffer length, there will be crash, I will add big 
  k/v handle soon.
* support block compression snappy
* multi-pass merge
* local io counters
* map-side combine
* parallel spill & merge
  Java has a spill thread to do collect/spill
  concurrently. Currently NativeMapOutputCollelctor only
  use one thread, but parallel sort&spill&merge is possible,
  since k/v pairs are partitioned now. This can further 
  speed up sort&spill&merge by simply adding threads.
* reduce task optimization
* support no sort
* support grouping 
* support pipes API, streaming


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs 
> emitted by mapper, therefore sort, spill, IFile serialization can all be done 
> in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising 
> results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is 
> supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware 
> CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to 
> prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if 
> IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is 
> supported, and I have not think through many things right now, such as how to 
> support map side combine. I had some discussion with somebody familiar with 
> hive, it seems that these limitations won't be much problem for Hive to 
> benefit from those optimizations, at least. Advices or discussions about 
> improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), 
> which checks if key/value type, comparator type, combiner are all compatible, 
> then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better 
> final results, and I believe similar optimization can be adopt to reduce task 
> and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to