Hi Kim and Stephan Kim's report is sorting 3360GB per 1427 seconds by Flink 0.9.0. 3360 = 80*42 ((80GB/per node and 42 nodes) Based on Kim's report. The TPS is 2.35GB/sec for Flink 0.9.0
Kim was using 42 nodes for testing purposes. I found that the best Spark performance result was using 190 nodes from Databricks The scalability factor is 42:190. if we are going to use 190 nodes for this testing. The Flink TPS should be 10.65GB /sec Here is the summary table for your reference. Please let me know if you have any questions about this table. Thanks. [image: Inline image 2] 72.93GB/sec = (1000TB*1024) / (234min*60) The performance test report from Databricks. https://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html Best regards Hawin On Fri, Jul 10, 2015 at 1:33 AM, Stephan Ewen <se...@apache.org> wrote: > Hi Dongwon Kim! > > Thank you for trying out these changes. > > The OptimizedText can be sorted more efficiently, because it generates a > binary key prefix. That way, the sorting needs to serialize/deserialize > less and saves on CPU. > > In parts of the program, the CPU is then less of a bottleneck and the > disks and the network can unfold their bandwidth better. > > Greetings, > Stephan > > > > On Fri, Jul 10, 2015 at 9:35 AM, Dongwon Kim <eastcirc...@postech.ac.kr> > wrote: > >> Hi Stephan, >> >> I just pushed changes to my github: >> https://github.com/eastcirclek/terasort. >> I've modified the TeraSort program so that (A) it can reuse objects >> and (B) it can exploit OptimizedText as you suggested. >> >> I've also conducted few experiments and the results are as follows: >> ORIGINAL : 1714 >> ORIGINAL+A : 1671 >> ORIGINAL+B : 1467 >> ORIGINAL+A+B : 1427 >> Your advice works as shown above :-) >> >> Datasets are now defined as below: >> - val inputFile = env.readHadoopFile(teraInputFormat, classOf[Text], >> classOf[Text], inputPath) >> - val optimizedText = inputFile.map(tp => (new OptimizedText(tp._1), >> tp._2)) >> - val sortedPartitioned = optimizedText.partitionCustom(partitioner, >> 0).sortPartition(0, Order.ASCENDING) >> - sortedPartitioned.map(tp => (tp._1.getText, tp._2)).output(hadoopOF) >> You can see the two map transformations before and after the function >> composition partitionCustom.sortPartition. >> >> Here is a question regarding the performance improvement. >> Please see the attached Ganglia image files. >> - ORIGINAL-[cpu, disks, network].png are for ORIGINAL. >> - BEST-[cpu, disks, network].png are for ORIGINAL+A+B. >> Compared to ORIGINAL, BEST shows better utilization of disks and >> network and shows lower CPU utilization. >> Is this because OptimizedText objects are serialized into Flink memory >> layout? >> What happens when keys are represented in just Text, not >> OptimziedText? Are there another memory area to hold such objects? or >> are they serialized anyway but in an inefficient way? >> If latter, is the CPU utilization in ORIGINAL high because CPUs work >> hard to serialize Text objects using Java serialization mechanism with >> DataInput and DataOutput? >> If true, I can explain the high throughput of network and disks in >> ORIGINAL+A+B. >> I, however, observed the similar performance when I do mapping not >> only on 10-byte keys but also on 90-byte values, which cannot be >> explained by the above conjecture. >> Could you make things clear? If so, I would be very appreciated ;-) >> >> I'm also wondering whether the two map transformations, >> (Text, Text) to (OptimizedText, Text) and (OptimizedText, Text) to >> (Text, Text), >> can prevent Flink from performing a lot better. >> I don't have time to modify TeraInputFormat and TeraOutputFormat to >> read (String, String) pairs from HDFS and write (String, String) pairs >> to HDFS. >> Do you see that one can get a better TeraSort result using an new >> implementation of FileInputFormat<String,String>? >> >> Regards, >> >> Dongwon Kim >> >> 2015-07-03 3:29 GMT+09:00 Stephan Ewen <se...@apache.org>: >> > Hello Dongwon Kim! >> > >> > Thanks you for sharing these numbers with us. >> > >> > I have gone through your implementation and there are two things you >> could >> > try: >> > >> > 1) >> > >> > I see that you sort Hadoop's Text data type with Flink. I think this >> may be >> > less efficient than if you sort String, or a Flink specific data type. >> > >> > For efficient byte operations on managed memory, Flink needs to >> understand >> > the binary representation of the data type. Flink understands that for >> > "String" and many other types, but not for "Text". >> > >> > There are two things you can do: >> > - First, try what happens if you map the Hadoop Text type to a Java >> String >> > (only for the tera key). >> > - Second, you can try what happens if you wrap the Hadoop Text type >> in a >> > Flink type that supports optimized binary sorting. I have pasted code >> for >> > that at the bottom of this email. >> > >> > 2) >> > >> > You can see if it helps performance if you enable object re-use in >> Flink. >> > You can do this on the ExecutionEnvironment via >> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same >> > objects repeatedly, in case they are mutable. >> > >> > >> > Can you try these options out and see how they affect Flink's runtime? >> > >> > >> > Greetings, >> > Stephan >> > >> > --------------------------------------------------------- >> > Code for optimized sortable (Java): >> > >> > public final class OptimizedText implements >> NormalizableKey<OptimizedText > >> > { >> > private final Text text; >> > public OptimizedText () { >> > this.text = new Text(); >> > } >> > public OptimizedText (Text from) { >> > this.text = from; >> > } >> > >> > public Text getText() { >> > return text; >> > } >> > >> > @Override >> > public int getMaxNormalizedKeyLen() { >> > return 10; >> > } >> > >> > @Override >> > public void copyNormalizedKey(MemorySegment memory, int offset, int >> len) { >> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(), >> > Math.min(10, len))); >> > } >> > >> > @Override >> > public void write(DataOutputView out) throws IOException { >> > text.write(out); >> > } >> > >> > @Override >> > public void read(DataInputView in) throws IOException { >> > text.readFields(in); >> > } >> > >> > @Override >> > public int compareTo(OptimizedText o) { >> > return this.text.compareTo(o.text); >> > } >> > } >> > >> > --------------------------------------------------------- >> > Converting Text to OptimizedText (Java code) >> > >> > map(new MapFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>() { >> > @Override >> > public Tuple2<OptimizedText, Text> map(Tuple2<Text, Text> value) { >> > return new Tuple2<OptimizedText, Text>(new OptimizedText(value.f0), >> > value.f1); >> > } >> > }) >> > >> > >> > >> > >> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirc...@postech.ac.kr> >> > wrote: >> >> >> >> Hello, >> >> >> >> I'd like to share my code for TeraSort on Flink and Spark which uses >> >> the same range partitioner as Hadoop TeraSort: >> >> https://github.com/eastcirclek/terasort >> >> >> >> I also write a short report on it: >> >> >> >> >> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html >> >> In the blog post, I make a simple performance comparison between >> >> Flink, Spark, Tez, and MapReduce. >> >> >> >> I hope it will be helpful to you guys! >> >> Thanks. >> >> >> >> Dongwon Kim >> >> Postdoctoral Researcher @ Postech >> > >> > >> > >