[
https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Binglin Chang updated MAPREDUCE-2841:
-------------------------------------
Status: Patch Available (was: Open)
I just submit a demo patch to show basic ideas and current progress.
This patch contains map task optimization using NativeMapOutputCollector.
NativeMapOutputCollector use JNI to pass java k/v pairs and
partition value to native side. As JNI has un-neglectable
overhead(about 1000ns per empty JNI call wit 4-6 arguments),
a k/v cache is added to pass k/v pairs to native side in batch.
I suspect using a DirectBuffer will be better.
On the native side, k/v pairs are put into partitioned buffers,
this is different from java's single buffer approach. By doing so,
sort can be much faster, because sort a big array is much slower
than sort many small arrays; small array also means less cache
miss; and partition number does not needed to be compared
in sort.
Two light weighted io buffers: ReadBuffer & AppendBuffer,
which has better performance than decorator based
java & hadoop io streams. This greatly benefits IFile
serialization, many method can be inlined. Currently
compression is not supported yet, but it should not be a
complex work to add block based compressor like snappy or
quicklz.
This optimization is ONLY targeted for x86-64, little-endian
and assumes unaligned 64-bit loads and stores are cheap,
like google-snappy.
About the patch:
I put cpp code into src/c++/libnativetask, and use a Makefile
separate from build.xml and src/native, because many things are
not decided yet, so I don't want to mess up other components.
# cd src/c++/libnativetask
# make
# copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/
# set mapred.native.map.output.collector to true in jobconf
Again, this patch is just a demo, it is far from stable & complete,
and has many known and unknown bugs.
Here is some test results:
1. running single task
intel corei5 jdk6u24 on macbook pro
input:
size 250000000 bytes
100 bytes per line
[key 9bytes]\t[value 89bytes]\n
KeyValueTextInputFormat
IdentityMapper
partition number 100
io.sort.mb 400MB no mid-spill
Total time and analysis:
|| ||JAVA||Native||Speedup||
|Sort|5.27|0.8|6.5|
|IFile SER|2.8|0.9|3.1|
|Total|14.13|6.34|2.2|
2. Some result about partitioned sort & spill
Input is the same as test 1
||partition | 1 | 10 | 20 | 40 | 80 | 200 | 400 | 800 | 1600 |
2000 |
||sort time | 2.21 | 1.47 | 1.27 | 1.11 | 0.9 | 0.71 | 0.65 | 0.58 | 0.51 |
0.5 |
||spill time| 0.49 | 0.43 | 0.41 | 0.39 | 0.36 | 0.33 | 0.32 | 0.32 | 0.29 |
0.28 |
As is illustrated, partition number has great impact on sort & spill
performance,
This is largely because sort complexity and CPU cache effect.
If partition number is P, recored count is N, we get:
T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters.
3. Terasort 10G input 40map 40reduce on 9node cluster
io.sort.mb 500MB
Results on jobhistory:
|| || Total || AverageMap || AverageShuffle || AverageReduce
||java | 54s | 14s | 14s | 10s |
||native | 39s | 7s | 15s | 9s|
||java-lzo | 43s | 15s | 8s | 8s|
||native-lzo| ? | | | |
speedup: 1.38
Map output compression can reduce shuffle time significantly,
so with better compression speed & less shuffle time impact,
We can get better speedup when map output compression is finished.
I don't have large clusters to do more standard tests.
Current progress:
* NativeMapOutputCollector jni [done]
* io buffers [done]
* crc32/crc32c [done]
* integration of jobconfs [done]
* ifile writer [done]
* write ifile index file [done]
* ifile index reader [done]
* ifile reader [done]
* single-pass merge [done]
* handle big key/value
In current implementation, if key/value length exceeds
buffer length, there will be crash, I will add big
k/v handle soon.
* support block compression snappy
* multi-pass merge
* local io counters
* map-side combine
* parallel spill & merge
Java has a spill thread to do collect/spill
concurrently. Currently NativeMapOutputCollelctor only
use one thread, but parallel sort&spill&merge is possible,
since k/v pairs are partitioned now. This can further
speed up sort&spill&merge by simply adding threads.
* reduce task optimization
* support no sort
* support grouping
* support pipes API, streaming
> Task level native optimization
> ------------------------------
>
> Key: MAPREDUCE-2841
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Components: task
> Environment: x86-64 Linux
> Reporter: Binglin Chang
> Assignee: Binglin Chang
> Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI.
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs
> emitted by mapper, therefore sort, spill, IFile serialization can all be done
> in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising
> results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is
> supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware
> CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to
> prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if
> IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is
> supported, and I have not think through many things right now, such as how to
> support map side combine. I had some discussion with somebody familiar with
> hive, it seems that these limitations won't be much problem for Hive to
> benefit from those optimizations, at least. Advices or discussions about
> improving compatibility are most welcome:)
> Currently NativeMapOutputCollector has a static method called canEnable(),
> which checks if key/value type, comparator type, combiner are all compatible,
> then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better
> final results, and I believe similar optimization can be adopt to reduce task
> and shuffle too.
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira