Hi Dongwon Kim,

this blog post describes Flink's memory management, serialization, and sort
algorithm and also includes performance numbers of some microbenchmarks.

-->
http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html

The difference between Text and OptimizedText, is that OptimizedText is
sorted using 10-byte binary prefix key. Hence, the sorting happens directly
on the binary data and OptimizedText objects are not deserialized.
 The lower CPU utilization can be explained by less deserialization +
garbage collection. Since the CPU is less utilized, the network and disk
utilization increases.

Let us know if you have further questions,
Fabian

2015-07-10 9:35 GMT+02:00 Dongwon Kim <eastcirc...@postech.ac.kr>:

> Hi Stephan,
>
> I just pushed changes to my github:
> https://github.com/eastcirclek/terasort.
> I've modified the TeraSort program so that (A) it can reuse objects
> and (B) it can exploit OptimizedText as you suggested.
>
> I've also conducted few experiments and the results are as follows:
> ORIGINAL : 1714
> ORIGINAL+A : 1671
> ORIGINAL+B : 1467
> ORIGINAL+A+B : 1427
> Your advice works as shown above :-)
>
> Datasets are now defined as below:
> - val inputFile = env.readHadoopFile(teraInputFormat, classOf[Text],
> classOf[Text], inputPath)
> - val optimizedText = inputFile.map(tp => (new OptimizedText(tp._1),
> tp._2))
> - val sortedPartitioned = optimizedText.partitionCustom(partitioner,
> 0).sortPartition(0, Order.ASCENDING)
> - sortedPartitioned.map(tp => (tp._1.getText, tp._2)).output(hadoopOF)
> You can see the two map transformations before and after the function
> composition partitionCustom.sortPartition.
>
> Here is a question regarding the performance improvement.
> Please see the attached Ganglia image files.
> - ORIGINAL-[cpu, disks, network].png are for ORIGINAL.
> - BEST-[cpu, disks, network].png are for ORIGINAL+A+B.
> Compared to ORIGINAL, BEST shows better utilization of disks and
> network and shows lower CPU utilization.
> Is this because OptimizedText objects are serialized into Flink memory
> layout?
> What happens when keys are represented in just Text, not
> OptimziedText? Are there another memory area to hold such objects? or
> are they serialized anyway but in an inefficient way?
> If latter, is the CPU utilization in ORIGINAL high because CPUs work
> hard to serialize Text objects using Java serialization mechanism with
> DataInput and DataOutput?
> If true, I can explain the high throughput of network and disks in
> ORIGINAL+A+B.
> I, however, observed the similar performance when I do mapping not
> only on 10-byte keys but also on 90-byte values, which cannot be
> explained by the above conjecture.
> Could you make things clear? If so, I would be very appreciated ;-)
>
> I'm also wondering whether the two map transformations,
> (Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
> (Text, Text),
> can prevent Flink from performing a lot better.
> I don't have time to modify TeraInputFormat and TeraOutputFormat to
> read (String, String) pairs from HDFS and write (String, String) pairs
> to HDFS.
> Do you see that one can get a better TeraSort result using an new
> implementation of FileInputFormat<String,String>?
>
> Regards,
>
> Dongwon Kim
>
> 2015-07-03 3:29 GMT+09:00 Stephan Ewen <se...@apache.org>:
> > Hello Dongwon Kim!
> >
> > Thanks you for sharing these numbers with us.
> >
> > I have gone through your implementation and there are two things you
> could
> > try:
> >
> > 1)
> >
> > I see that you sort Hadoop's Text data type with Flink. I think this may
> be
> > less efficient than if you sort String, or a Flink specific data type.
> >
> > For efficient byte operations on managed memory, Flink needs to
> understand
> > the binary representation of the data type. Flink understands that for
> > "String" and many other types, but not for "Text".
> >
> > There are two things you can do:
> >   - First, try what happens if you map the Hadoop Text type to a Java
> String
> > (only for the tera key).
> >   - Second, you can try what happens if you wrap the Hadoop Text type in
> a
> > Flink type that supports optimized binary sorting. I have pasted code for
> > that at the bottom of this email.
> >
> > 2)
> >
> > You can see if it helps performance if you enable object re-use in Flink.
> > You can do this on the ExecutionEnvironment via
> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
> > objects repeatedly, in case they are mutable.
> >
> >
> > Can you try these options out and see how they affect Flink's runtime?
> >
> >
> > Greetings,
> > Stephan
> >
> > ---------------------------------------------------------
> > Code for optimized sortable (Java):
> >
> > public final class OptimizedText implements
> NormalizableKey<OptimizedText >
> > {
> > private final Text text;
> > public OptimizedText () {
> > this.text = new Text();
> > }
> > public OptimizedText (Text from) {
> > this.text = from;
> > }
> >
> > public Text getText() {
> > return text;
> > }
> >
> > @Override
> > public int getMaxNormalizedKeyLen() {
> > return 10;
> > }
> >
> > @Override
> > public void copyNormalizedKey(MemorySegment memory, int offset, int len)
> {
> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> > Math.min(10, len)));
> > }
> >
> > @Override
> > public void write(DataOutputView out) throws IOException {
> > text.write(out);
> > }
> >
> > @Override
> > public void read(DataInputView in) throws IOException {
> > text.readFields(in);
> > }
> >
> > @Override
> > public int compareTo(OptimizedText o) {
> > return this.text.compareTo(o.text);
> > }
> > }
> >
> > ---------------------------------------------------------
> > Converting Text to OptimizedText (Java code)
> >
> > map(new MapFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>() {
> > @Override
> > public Tuple2<OptimizedText, Text> map(Tuple2<Text, Text> value) {
> > return new Tuple2<OptimizedText, Text>(new OptimizedText(value.f0),
> > value.f1);
> > }
> > })
> >
> >
> >
> >
> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirc...@postech.ac.kr>
> > wrote:
> >>
> >> Hello,
> >>
> >> I'd like to share my code for TeraSort on Flink and Spark which uses
> >> the same range partitioner as Hadoop TeraSort:
> >> https://github.com/eastcirclek/terasort
> >>
> >> I also write a short report on it:
> >>
> >>
> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
> >> In the blog post, I make a simple performance comparison between
> >> Flink, Spark, Tez, and MapReduce.
> >>
> >> I hope it will be helpful to you guys!
> >> Thanks.
> >>
> >> Dongwon Kim
> >> Postdoctoral Researcher @ Postech
> >
> >
>

Reply via email to