Hi Stephan, if I understood correctly you are substituting the Text key with a more efficient version (OptimizedText). Just one question: you set max lenght of the key to 10..you know that a priori? This implementation of the key is much more efficient that just using String? Is there any comparison about that?
Best, Flavio On 2 Jul 2015 20:29, "Stephan Ewen" <se...@apache.org> wrote: > Hello Dongwon Kim! > > Thanks you for sharing these numbers with us. > > I have gone through your implementation and there are two things you could > try: > > 1) > > I see that you sort Hadoop's Text data type with Flink. I think this may > be less efficient than if you sort String, or a Flink specific data type. > > For efficient byte operations on managed memory, Flink needs to understand > the binary representation of the data type. Flink understands that for > "String" and many other types, but not for "Text". > > There are two things you can do: > - First, try what happens if you map the Hadoop Text type to a Java > String (only for the tera key). > - Second, you can try what happens if you wrap the Hadoop Text type in a > Flink type that supports optimized binary sorting. I have pasted code for > that at the bottom of this email. > > 2) > > You can see if it helps performance if you enable object re-use in Flink. > You can do this on the ExecutionEnvironment via > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same > objects repeatedly, in case they are mutable. > > > Can you try these options out and see how they affect Flink's runtime? > > > Greetings, > Stephan > > --------------------------------------------------------- > Code for optimized sortable (Java): > > public final class OptimizedText implements > NormalizableKey<OptimizedText > { > private final Text text; > public OptimizedText () { > this.text = new Text(); > } > public OptimizedText (Text from) { > this.text = from; > } > > public Text getText() { > return text; > } > > @Override > public int getMaxNormalizedKeyLen() { > return 10; > } > > @Override > public void copyNormalizedKey(MemorySegment memory, int offset, int len) { > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(), > Math.min(10, len))); > } > > @Override > public void write(DataOutputView out) throws IOException { > text.write(out); > } > > @Override > public void read(DataInputView in) throws IOException { > text.readFields(in); > } > > @Override > public int compareTo(OptimizedText o) { > return this.text.compareTo(o.text); > } > } > > --------------------------------------------------------- > Converting Text to OptimizedText (Java code) > > map(new MapFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>() { > @Override > public Tuple2<OptimizedText, Text> map(Tuple2<Text, Text> value) { > return new Tuple2<OptimizedText, Text>(new OptimizedText(value.f0), > value.f1); > } > }) > > > > > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirc...@postech.ac.kr> > wrote: > >> Hello, >> >> I'd like to share my code for TeraSort on Flink and Spark which uses >> the same range partitioner as Hadoop TeraSort: >> https://github.com/eastcirclek/terasort >> >> I also write a short report on it: >> >> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html >> In the blog post, I make a simple performance comparison between >> Flink, Spark, Tez, and MapReduce. >> >> I hope it will be helpful to you guys! >> Thanks. >> >> Dongwon Kim >> Postdoctoral Researcher @ Postech >> > >