Hi Stephan,
if I understood correctly you are substituting the Text key with a more
efficient version (OptimizedText).
Just one question: you set max lenght of the key to 10..you know that a
priori?
This implementation of the key is much more efficient that just using
String?
Is there any comparison about that?

Best,
Flavio
On 2 Jul 2015 20:29, "Stephan Ewen" <se...@apache.org> wrote:

> Hello Dongwon Kim!
>
> Thanks you for sharing these numbers with us.
>
> I have gone through your implementation and there are two things you could
> try:
>
> 1)
>
> I see that you sort Hadoop's Text data type with Flink. I think this may
> be less efficient than if you sort String, or a Flink specific data type.
>
> For efficient byte operations on managed memory, Flink needs to understand
> the binary representation of the data type. Flink understands that for
> "String" and many other types, but not for "Text".
>
> There are two things you can do:
>   - First, try what happens if you map the Hadoop Text type to a Java
> String (only for the tera key).
>   - Second, you can try what happens if you wrap the Hadoop Text type in a
> Flink type that supports optimized binary sorting. I have pasted code for
> that at the bottom of this email.
>
> 2)
>
> You can see if it helps performance if you enable object re-use in Flink.
> You can do this on the ExecutionEnvironment via
> "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
> objects repeatedly, in case they are mutable.
>
>
> Can you try these options out and see how they affect Flink's runtime?
>
>
> Greetings,
> Stephan
>
> ---------------------------------------------------------
> Code for optimized sortable (Java):
>
> public final class OptimizedText implements
> NormalizableKey<OptimizedText > {
>  private final Text text;
>  public OptimizedText () {
> this.text = new Text();
> }
>  public OptimizedText (Text from) {
> this.text = from;
> }
>
> public Text getText() {
> return text;
> }
>
> @Override
> public int getMaxNormalizedKeyLen() {
> return 10;
> }
>
> @Override
> public void copyNormalizedKey(MemorySegment memory, int offset, int len) {
> memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> Math.min(10, len)));
> }
>
> @Override
> public void write(DataOutputView out) throws IOException {
> text.write(out);
> }
>
> @Override
> public void read(DataInputView in) throws IOException {
> text.readFields(in);
> }
>
> @Override
> public int compareTo(OptimizedText o) {
> return this.text.compareTo(o.text);
> }
> }
>
> ---------------------------------------------------------
> Converting Text to OptimizedText (Java code)
>
> map(new MapFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>() {
>  @Override
> public Tuple2<OptimizedText, Text> map(Tuple2<Text, Text> value) {
> return new Tuple2<OptimizedText, Text>(new OptimizedText(value.f0),
> value.f1);
> }
> })
>
>
>
>
> On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirc...@postech.ac.kr>
> wrote:
>
>> Hello,
>>
>> I'd like to share my code for TeraSort on Flink and Spark which uses
>> the same range partitioner as Hadoop TeraSort:
>> https://github.com/eastcirclek/terasort
>>
>> I also write a short report on it:
>>
>> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
>> In the blog post, I make a simple performance comparison between
>> Flink, Spark, Tez, and MapReduce.
>>
>> I hope it will be helpful to you guys!
>> Thanks.
>>
>> Dongwon Kim
>> Postdoctoral Researcher @ Postech
>>
>
>

Reply via email to