TeraSort on Flink and Spark

2015-07-02 Thread Dongwon Kim
Hello,

I'd like to share my code for TeraSort on Flink and Spark which uses
the same range partitioner as Hadoop TeraSort:
https://github.com/eastcirclek/terasort

I also write a short report on it:
http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
In the blog post, I make a simple performance comparison between
Flink, Spark, Tez, and MapReduce.

I hope it will be helpful to you guys!
Thanks.

Dongwon Kim
Postdoctoral Researcher @ Postech


Re: TeraSort on Flink and Spark

2015-07-02 Thread Stephan Ewen
Hello Dongwon Kim!

Thanks you for sharing these numbers with us.

I have gone through your implementation and there are two things you could
try:

1)

I see that you sort Hadoop's Text data type with Flink. I think this may be
less efficient than if you sort String, or a Flink specific data type.

For efficient byte operations on managed memory, Flink needs to understand
the binary representation of the data type. Flink understands that for
"String" and many other types, but not for "Text".

There are two things you can do:
  - First, try what happens if you map the Hadoop Text type to a Java
String (only for the tera key).
  - Second, you can try what happens if you wrap the Hadoop Text type in a
Flink type that supports optimized binary sorting. I have pasted code for
that at the bottom of this email.

2)

You can see if it helps performance if you enable object re-use in Flink.
You can do this on the ExecutionEnvironment via
"env.getConfig().enableObjectReuse()". Then Flink tries to use the same
objects repeatedly, in case they are mutable.


Can you try these options out and see how they affect Flink's runtime?


Greetings,
Stephan

-
Code for optimized sortable (Java):

public final class OptimizedText implements NormalizableKey
{
 private final Text text;
 public OptimizedText () {
this.text = new Text();
}
 public OptimizedText (Text from) {
this.text = from;
}

public Text getText() {
return text;
}

@Override
public int getMaxNormalizedKeyLen() {
return 10;
}

@Override
public void copyNormalizedKey(MemorySegment memory, int offset, int len) {
memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
Math.min(10, len)));
}

@Override
public void write(DataOutputView out) throws IOException {
text.write(out);
}

@Override
public void read(DataInputView in) throws IOException {
text.readFields(in);
}

@Override
public int compareTo(OptimizedText o) {
return this.text.compareTo(o.text);
}
}

-
Converting Text to OptimizedText (Java code)

map(new MapFunction, Tuple2>() {
 @Override
public Tuple2 map(Tuple2 value) {
return new Tuple2(new OptimizedText(value.f0),
value.f1);
}
})




On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim 
wrote:

> Hello,
>
> I'd like to share my code for TeraSort on Flink and Spark which uses
> the same range partitioner as Hadoop TeraSort:
> https://github.com/eastcirclek/terasort
>
> I also write a short report on it:
>
> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
> In the blog post, I make a simple performance comparison between
> Flink, Spark, Tez, and MapReduce.
>
> I hope it will be helpful to you guys!
> Thanks.
>
> Dongwon Kim
> Postdoctoral Researcher @ Postech
>


Re: TeraSort on Flink and Spark

2015-07-02 Thread Flavio Pompermaier
Hi Stephan,
if I understood correctly you are substituting the Text key with a more
efficient version (OptimizedText).
Just one question: you set max lenght of the key to 10..you know that a
priori?
This implementation of the key is much more efficient that just using
String?
Is there any comparison about that?

Best,
Flavio
On 2 Jul 2015 20:29, "Stephan Ewen"  wrote:

> Hello Dongwon Kim!
>
> Thanks you for sharing these numbers with us.
>
> I have gone through your implementation and there are two things you could
> try:
>
> 1)
>
> I see that you sort Hadoop's Text data type with Flink. I think this may
> be less efficient than if you sort String, or a Flink specific data type.
>
> For efficient byte operations on managed memory, Flink needs to understand
> the binary representation of the data type. Flink understands that for
> "String" and many other types, but not for "Text".
>
> There are two things you can do:
>   - First, try what happens if you map the Hadoop Text type to a Java
> String (only for the tera key).
>   - Second, you can try what happens if you wrap the Hadoop Text type in a
> Flink type that supports optimized binary sorting. I have pasted code for
> that at the bottom of this email.
>
> 2)
>
> You can see if it helps performance if you enable object re-use in Flink.
> You can do this on the ExecutionEnvironment via
> "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
> objects repeatedly, in case they are mutable.
>
>
> Can you try these options out and see how they affect Flink's runtime?
>
>
> Greetings,
> Stephan
>
> -
> Code for optimized sortable (Java):
>
> public final class OptimizedText implements
> NormalizableKey {
>  private final Text text;
>  public OptimizedText () {
> this.text = new Text();
> }
>  public OptimizedText (Text from) {
> this.text = from;
> }
>
> public Text getText() {
> return text;
> }
>
> @Override
> public int getMaxNormalizedKeyLen() {
> return 10;
> }
>
> @Override
> public void copyNormalizedKey(MemorySegment memory, int offset, int len) {
> memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> Math.min(10, len)));
> }
>
> @Override
> public void write(DataOutputView out) throws IOException {
> text.write(out);
> }
>
> @Override
> public void read(DataInputView in) throws IOException {
> text.readFields(in);
> }
>
> @Override
> public int compareTo(OptimizedText o) {
> return this.text.compareTo(o.text);
> }
> }
>
> -
> Converting Text to OptimizedText (Java code)
>
> map(new MapFunction, Tuple2>() {
>  @Override
> public Tuple2 map(Tuple2 value) {
> return new Tuple2(new OptimizedText(value.f0),
> value.f1);
> }
> })
>
>
>
>
> On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim 
> wrote:
>
>> Hello,
>>
>> I'd like to share my code for TeraSort on Flink and Spark which uses
>> the same range partitioner as Hadoop TeraSort:
>> https://github.com/eastcirclek/terasort
>>
>> I also write a short report on it:
>>
>> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
>> In the blog post, I make a simple performance comparison between
>> Flink, Spark, Tez, and MapReduce.
>>
>> I hope it will be helpful to you guys!
>> Thanks.
>>
>> Dongwon Kim
>> Postdoctoral Researcher @ Postech
>>
>
>


Re: TeraSort on Flink and Spark

2015-07-03 Thread Stephan Ewen
Flavio,

In general, String works well in Flink, because it behaves for sorting much
like this OptimizedText.

If you want to access the String contents, then using String is good. Text
may have slight advantages if you never access the actual contents, but
just partition and sort it (as in TeraSort).

The key length is limited to 10, because in TeraSort, the keys are defined
to be 10 characters long ;-)

Greetings,
Stephan


On Thu, Jul 2, 2015 at 9:14 PM, Flavio Pompermaier 
wrote:

> Hi Stephan,
> if I understood correctly you are substituting the Text key with a more
> efficient version (OptimizedText).
> Just one question: you set max lenght of the key to 10..you know that a
> priori?
> This implementation of the key is much more efficient that just using
> String?
> Is there any comparison about that?
>
> Best,
> Flavio
> On 2 Jul 2015 20:29, "Stephan Ewen"  wrote:
>
>> Hello Dongwon Kim!
>>
>> Thanks you for sharing these numbers with us.
>>
>> I have gone through your implementation and there are two things you
>> could try:
>>
>> 1)
>>
>> I see that you sort Hadoop's Text data type with Flink. I think this may
>> be less efficient than if you sort String, or a Flink specific data type.
>>
>> For efficient byte operations on managed memory, Flink needs to
>> understand the binary representation of the data type. Flink understands
>> that for "String" and many other types, but not for "Text".
>>
>> There are two things you can do:
>>   - First, try what happens if you map the Hadoop Text type to a Java
>> String (only for the tera key).
>>   - Second, you can try what happens if you wrap the Hadoop Text type in
>> a Flink type that supports optimized binary sorting. I have pasted code for
>> that at the bottom of this email.
>>
>> 2)
>>
>> You can see if it helps performance if you enable object re-use in Flink.
>> You can do this on the ExecutionEnvironment via
>> "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
>> objects repeatedly, in case they are mutable.
>>
>>
>> Can you try these options out and see how they affect Flink's runtime?
>>
>>
>> Greetings,
>> Stephan
>>
>> -
>> Code for optimized sortable (Java):
>>
>> public final class OptimizedText implements
>> NormalizableKey {
>>  private final Text text;
>>  public OptimizedText () {
>> this.text = new Text();
>> }
>>  public OptimizedText (Text from) {
>> this.text = from;
>> }
>>
>> public Text getText() {
>> return text;
>> }
>>
>> @Override
>> public int getMaxNormalizedKeyLen() {
>> return 10;
>> }
>>
>> @Override
>> public void copyNormalizedKey(MemorySegment memory, int offset, int len) {
>> memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
>> Math.min(10, len)));
>> }
>>
>> @Override
>> public void write(DataOutputView out) throws IOException {
>> text.write(out);
>> }
>>
>> @Override
>> public void read(DataInputView in) throws IOException {
>> text.readFields(in);
>> }
>>
>> @Override
>> public int compareTo(OptimizedText o) {
>> return this.text.compareTo(o.text);
>> }
>> }
>>
>> -
>> Converting Text to OptimizedText (Java code)
>>
>> map(new MapFunction, Tuple2>() {
>>  @Override
>> public Tuple2 map(Tuple2 value) {
>> return new Tuple2(new OptimizedText(value.f0),
>> value.f1);
>> }
>> })
>>
>>
>>
>>
>> On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim 
>> wrote:
>>
>>> Hello,
>>>
>>> I'd like to share my code for TeraSort on Flink and Spark which uses
>>> the same range partitioner as Hadoop TeraSort:
>>> https://github.com/eastcirclek/terasort
>>>
>>> I also write a short report on it:
>>>
>>> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
>>> In the blog post, I make a simple performance comparison between
>>> Flink, Spark, Tez, and MapReduce.
>>>
>>> I hope it will be helpful to you guys!
>>> Thanks.
>>>
>>> Dongwon Kim
>>> Postdoctoral Researcher @ Postech
>>>
>>
>>


Re: TeraSort on Flink and Spark

2015-07-10 Thread Fabian Hueske
x27;s runtime?
> >
> >
> > Greetings,
> > Stephan
> >
> > -
> > Code for optimized sortable (Java):
> >
> > public final class OptimizedText implements
> NormalizableKey
> > {
> > private final Text text;
> > public OptimizedText () {
> > this.text = new Text();
> > }
> > public OptimizedText (Text from) {
> > this.text = from;
> > }
> >
> > public Text getText() {
> > return text;
> > }
> >
> > @Override
> > public int getMaxNormalizedKeyLen() {
> > return 10;
> > }
> >
> > @Override
> > public void copyNormalizedKey(MemorySegment memory, int offset, int len)
> {
> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> > Math.min(10, len)));
> > }
> >
> > @Override
> > public void write(DataOutputView out) throws IOException {
> > text.write(out);
> > }
> >
> > @Override
> > public void read(DataInputView in) throws IOException {
> > text.readFields(in);
> > }
> >
> > @Override
> > public int compareTo(OptimizedText o) {
> > return this.text.compareTo(o.text);
> > }
> > }
> >
> > -
> > Converting Text to OptimizedText (Java code)
> >
> > map(new MapFunction, Tuple2>() {
> > @Override
> > public Tuple2 map(Tuple2 value) {
> > return new Tuple2(new OptimizedText(value.f0),
> > value.f1);
> > }
> > })
> >
> >
> >
> >
> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim 
> > wrote:
> >>
> >> Hello,
> >>
> >> I'd like to share my code for TeraSort on Flink and Spark which uses
> >> the same range partitioner as Hadoop TeraSort:
> >> https://github.com/eastcirclek/terasort
> >>
> >> I also write a short report on it:
> >>
> >>
> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
> >> In the blog post, I make a simple performance comparison between
> >> Flink, Spark, Tez, and MapReduce.
> >>
> >> I hope it will be helpful to you guys!
> >> Thanks.
> >>
> >> Dongwon Kim
> >> Postdoctoral Researcher @ Postech
> >
> >
>


Re: TeraSort on Flink and Spark

2015-07-10 Thread Stephan Ewen
gt; {
> > private final Text text;
> > public OptimizedText () {
> > this.text = new Text();
> > }
> > public OptimizedText (Text from) {
> > this.text = from;
> > }
> >
> > public Text getText() {
> > return text;
> > }
> >
> > @Override
> > public int getMaxNormalizedKeyLen() {
> > return 10;
> > }
> >
> > @Override
> > public void copyNormalizedKey(MemorySegment memory, int offset, int len)
> {
> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> > Math.min(10, len)));
> > }
> >
> > @Override
> > public void write(DataOutputView out) throws IOException {
> > text.write(out);
> > }
> >
> > @Override
> > public void read(DataInputView in) throws IOException {
> > text.readFields(in);
> > }
> >
> > @Override
> > public int compareTo(OptimizedText o) {
> > return this.text.compareTo(o.text);
> > }
> > }
> >
> > -
> > Converting Text to OptimizedText (Java code)
> >
> > map(new MapFunction, Tuple2>() {
> > @Override
> > public Tuple2 map(Tuple2 value) {
> > return new Tuple2(new OptimizedText(value.f0),
> > value.f1);
> > }
> > })
> >
> >
> >
> >
> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim 
> > wrote:
> >>
> >> Hello,
> >>
> >> I'd like to share my code for TeraSort on Flink and Spark which uses
> >> the same range partitioner as Hadoop TeraSort:
> >> https://github.com/eastcirclek/terasort
> >>
> >> I also write a short report on it:
> >>
> >>
> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
> >> In the blog post, I make a simple performance comparison between
> >> Flink, Spark, Tez, and MapReduce.
> >>
> >> I hope it will be helpful to you guys!
> >> Thanks.
> >>
> >> Dongwon Kim
> >> Postdoctoral Researcher @ Postech
> >
> >
>


Re: TeraSort on Flink and Spark

2015-07-12 Thread Hawin Jiang
rstand
>> > the binary representation of the data type. Flink understands that for
>> > "String" and many other types, but not for "Text".
>> >
>> > There are two things you can do:
>> >   - First, try what happens if you map the Hadoop Text type to a Java
>> String
>> > (only for the tera key).
>> >   - Second, you can try what happens if you wrap the Hadoop Text type
>> in a
>> > Flink type that supports optimized binary sorting. I have pasted code
>> for
>> > that at the bottom of this email.
>> >
>> > 2)
>> >
>> > You can see if it helps performance if you enable object re-use in
>> Flink.
>> > You can do this on the ExecutionEnvironment via
>> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
>> > objects repeatedly, in case they are mutable.
>> >
>> >
>> > Can you try these options out and see how they affect Flink's runtime?
>> >
>> >
>> > Greetings,
>> > Stephan
>> >
>> > -
>> > Code for optimized sortable (Java):
>> >
>> > public final class OptimizedText implements
>> NormalizableKey
>> > {
>> > private final Text text;
>> > public OptimizedText () {
>> > this.text = new Text();
>> > }
>> > public OptimizedText (Text from) {
>> > this.text = from;
>> > }
>> >
>> > public Text getText() {
>> > return text;
>> > }
>> >
>> > @Override
>> > public int getMaxNormalizedKeyLen() {
>> > return 10;
>> > }
>> >
>> > @Override
>> > public void copyNormalizedKey(MemorySegment memory, int offset, int
>> len) {
>> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
>> > Math.min(10, len)));
>> > }
>> >
>> > @Override
>> > public void write(DataOutputView out) throws IOException {
>> > text.write(out);
>> > }
>> >
>> > @Override
>> > public void read(DataInputView in) throws IOException {
>> > text.readFields(in);
>> > }
>> >
>> > @Override
>> > public int compareTo(OptimizedText o) {
>> > return this.text.compareTo(o.text);
>> > }
>> > }
>> >
>> > -
>> > Converting Text to OptimizedText (Java code)
>> >
>> > map(new MapFunction, Tuple2>() {
>> > @Override
>> > public Tuple2 map(Tuple2 value) {
>> > return new Tuple2(new OptimizedText(value.f0),
>> > value.f1);
>> > }
>> > })
>> >
>> >
>> >
>> >
>> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim 
>> > wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'd like to share my code for TeraSort on Flink and Spark which uses
>> >> the same range partitioner as Hadoop TeraSort:
>> >> https://github.com/eastcirclek/terasort
>> >>
>> >> I also write a short report on it:
>> >>
>> >>
>> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
>> >> In the blog post, I make a simple performance comparison between
>> >> Flink, Spark, Tez, and MapReduce.
>> >>
>> >> I hope it will be helpful to you guys!
>> >> Thanks.
>> >>
>> >> Dongwon Kim
>> >> Postdoctoral Researcher @ Postech
>> >
>> >
>>
>
>


Re: TeraSort on Flink and Spark

2015-07-12 Thread Dongwon Kim
sented in just Text, not
>>> OptimziedText? Are there another memory area to hold such objects? or
>>> are they serialized anyway but in an inefficient way?
>>> If latter, is the CPU utilization in ORIGINAL high because CPUs work
>>> hard to serialize Text objects using Java serialization mechanism with
>>> DataInput and DataOutput?
>>> If true, I can explain the high throughput of network and disks in
>>> ORIGINAL+A+B.
>>> I, however, observed the similar performance when I do mapping not
>>> only on 10-byte keys but also on 90-byte values, which cannot be
>>> explained by the above conjecture.
>>> Could you make things clear? If so, I would be very appreciated ;-)
>>>
>>> I'm also wondering whether the two map transformations,
>>> (Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
>>> (Text, Text),
>>> can prevent Flink from performing a lot better.
>>> I don't have time to modify TeraInputFormat and TeraOutputFormat to
>>> read (String, String) pairs from HDFS and write (String, String) pairs
>>> to HDFS.
>>> Do you see that one can get a better TeraSort result using an new
>>> implementation of FileInputFormat?
>>>
>>> Regards,
>>>
>>> Dongwon Kim
>>>
>>> 2015-07-03 3:29 GMT+09:00 Stephan Ewen :
>>> > Hello Dongwon Kim!
>>> >
>>> > Thanks you for sharing these numbers with us.
>>> >
>>> > I have gone through your implementation and there are two things you
>>> could
>>> > try:
>>> >
>>> > 1)
>>> >
>>> > I see that you sort Hadoop's Text data type with Flink. I think this
>>> may be
>>> > less efficient than if you sort String, or a Flink specific data type.
>>> >
>>> > For efficient byte operations on managed memory, Flink needs to
>>> understand
>>> > the binary representation of the data type. Flink understands that for
>>> > "String" and many other types, but not for "Text".
>>> >
>>> > There are two things you can do:
>>> >   - First, try what happens if you map the Hadoop Text type to a Java
>>> String
>>> > (only for the tera key).
>>> >   - Second, you can try what happens if you wrap the Hadoop Text type
>>> in a
>>> > Flink type that supports optimized binary sorting. I have pasted code
>>> for
>>> > that at the bottom of this email.
>>> >
>>> > 2)
>>> >
>>> > You can see if it helps performance if you enable object re-use in
>>> Flink.
>>> > You can do this on the ExecutionEnvironment via
>>> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
>>> > objects repeatedly, in case they are mutable.
>>> >
>>> >
>>> > Can you try these options out and see how they affect Flink's runtime?
>>> >
>>> >
>>> > Greetings,
>>> > Stephan
>>> >
>>> > -
>>> > Code for optimized sortable (Java):
>>> >
>>> > public final class OptimizedText implements
>>> NormalizableKey
>>> > {
>>> > private final Text text;
>>> > public OptimizedText () {
>>> > this.text = new Text();
>>> > }
>>> > public OptimizedText (Text from) {
>>> > this.text = from;
>>> > }
>>> >
>>> > public Text getText() {
>>> > return text;
>>> > }
>>> >
>>> > @Override
>>> > public int getMaxNormalizedKeyLen() {
>>> > return 10;
>>> > }
>>> >
>>> > @Override
>>> > public void copyNormalizedKey(MemorySegment memory, int offset, int
>>> len) {
>>> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
>>> > Math.min(10, len)));
>>> > }
>>> >
>>> > @Override
>>> > public void write(DataOutputView out) throws IOException {
>>> > text.write(out);
>>> > }
>>> >
>>> > @Override
>>> > public void read(DataInputView in) throws IOException {
>>> > text.readFields(in);
>>> > }
>>> >
>>> > @Override
>>> > public int compareTo(OptimizedText o) {
>>> > return this.text.compareTo(o.text);
>>> > }
>>> > }
>>> >
>>> > -
>>> > Converting Text to OptimizedText (Java code)
>>> >
>>> > map(new MapFunction, Tuple2>()
>>> {
>>> > @Override
>>> > public Tuple2 map(Tuple2 value) {
>>> > return new Tuple2(new OptimizedText(value.f0),
>>> > value.f1);
>>> > }
>>> > })
>>> >
>>> >
>>> >
>>> >
>>> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim >> >
>>> > wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> I'd like to share my code for TeraSort on Flink and Spark which uses
>>> >> the same range partitioner as Hadoop TeraSort:
>>> >> https://github.com/eastcirclek/terasort
>>> >>
>>> >> I also write a short report on it:
>>> >>
>>> >>
>>> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
>>> >> In the blog post, I make a simple performance comparison between
>>> >> Flink, Spark, Tez, and MapReduce.
>>> >>
>>> >> I hope it will be helpful to you guys!
>>> >> Thanks.
>>> >>
>>> >> Dongwon Kim
>>> >> Postdoctoral Researcher @ Postech
>>> >
>>> >
>>>
>>
>>
>