Hi  Kim and Stephan

Kim's report is sorting 3360GB per 1427 seconds by Flink 0.9.0.      3360 =
80*42 ((80GB/per node and 42 nodes)
Based on Kim's report.  The TPS is 2.35GB/sec for Flink 0.9.0

Kim was using 42 nodes for testing purposes.  I found that the best Spark
performance result was using 190 nodes from Databricks
The scalability factor is 42:190.   if we are going to use 190 nodes for
this testing.
The Flink TPS should be 10.65GB /sec


Here is the summary table for your reference.
Please let me know if you have any questions about this table.
Thanks.

[image: Inline image 2]


72.93GB/sec  = (1000TB*1024) / (234min*60)


The performance test report from Databricks.
https://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html






Best regards
Hawin

On Fri, Jul 10, 2015 at 1:33 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Dongwon Kim!
>
> Thank you for trying out these changes.
>
> The OptimizedText can be sorted more efficiently, because it generates a
> binary key prefix. That way, the sorting needs to serialize/deserialize
> less and saves on CPU.
>
> In parts of the program, the CPU is then less of a bottleneck and the
> disks and the network can unfold their bandwidth better.
>
> Greetings,
> Stephan
>
>
>
> On Fri, Jul 10, 2015 at 9:35 AM, Dongwon Kim <eastcirc...@postech.ac.kr>
> wrote:
>
>> Hi Stephan,
>>
>> I just pushed changes to my github:
>> https://github.com/eastcirclek/terasort.
>> I've modified the TeraSort program so that (A) it can reuse objects
>> and (B) it can exploit OptimizedText as you suggested.
>>
>> I've also conducted few experiments and the results are as follows:
>> ORIGINAL : 1714
>> ORIGINAL+A : 1671
>> ORIGINAL+B : 1467
>> ORIGINAL+A+B : 1427
>> Your advice works as shown above :-)
>>
>> Datasets are now defined as below:
>> - val inputFile = env.readHadoopFile(teraInputFormat, classOf[Text],
>> classOf[Text], inputPath)
>> - val optimizedText = inputFile.map(tp => (new OptimizedText(tp._1),
>> tp._2))
>> - val sortedPartitioned = optimizedText.partitionCustom(partitioner,
>> 0).sortPartition(0, Order.ASCENDING)
>> - sortedPartitioned.map(tp => (tp._1.getText, tp._2)).output(hadoopOF)
>> You can see the two map transformations before and after the function
>> composition partitionCustom.sortPartition.
>>
>> Here is a question regarding the performance improvement.
>> Please see the attached Ganglia image files.
>> - ORIGINAL-[cpu, disks, network].png are for ORIGINAL.
>> - BEST-[cpu, disks, network].png are for ORIGINAL+A+B.
>> Compared to ORIGINAL, BEST shows better utilization of disks and
>> network and shows lower CPU utilization.
>> Is this because OptimizedText objects are serialized into Flink memory
>> layout?
>> What happens when keys are represented in just Text, not
>> OptimziedText? Are there another memory area to hold such objects? or
>> are they serialized anyway but in an inefficient way?
>> If latter, is the CPU utilization in ORIGINAL high because CPUs work
>> hard to serialize Text objects using Java serialization mechanism with
>> DataInput and DataOutput?
>> If true, I can explain the high throughput of network and disks in
>> ORIGINAL+A+B.
>> I, however, observed the similar performance when I do mapping not
>> only on 10-byte keys but also on 90-byte values, which cannot be
>> explained by the above conjecture.
>> Could you make things clear? If so, I would be very appreciated ;-)
>>
>> I'm also wondering whether the two map transformations,
>> (Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
>> (Text, Text),
>> can prevent Flink from performing a lot better.
>> I don't have time to modify TeraInputFormat and TeraOutputFormat to
>> read (String, String) pairs from HDFS and write (String, String) pairs
>> to HDFS.
>> Do you see that one can get a better TeraSort result using an new
>> implementation of FileInputFormat<String,String>?
>>
>> Regards,
>>
>> Dongwon Kim
>>
>> 2015-07-03 3:29 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> > Hello Dongwon Kim!
>> >
>> > Thanks you for sharing these numbers with us.
>> >
>> > I have gone through your implementation and there are two things you
>> could
>> > try:
>> >
>> > 1)
>> >
>> > I see that you sort Hadoop's Text data type with Flink. I think this
>> may be
>> > less efficient than if you sort String, or a Flink specific data type.
>> >
>> > For efficient byte operations on managed memory, Flink needs to
>> understand
>> > the binary representation of the data type. Flink understands that for
>> > "String" and many other types, but not for "Text".
>> >
>> > There are two things you can do:
>> >   - First, try what happens if you map the Hadoop Text type to a Java
>> String
>> > (only for the tera key).
>> >   - Second, you can try what happens if you wrap the Hadoop Text type
>> in a
>> > Flink type that supports optimized binary sorting. I have pasted code
>> for
>> > that at the bottom of this email.
>> >
>> > 2)
>> >
>> > You can see if it helps performance if you enable object re-use in
>> Flink.
>> > You can do this on the ExecutionEnvironment via
>> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
>> > objects repeatedly, in case they are mutable.
>> >
>> >
>> > Can you try these options out and see how they affect Flink's runtime?
>> >
>> >
>> > Greetings,
>> > Stephan
>> >
>> > ---------------------------------------------------------
>> > Code for optimized sortable (Java):
>> >
>> > public final class OptimizedText implements
>> NormalizableKey<OptimizedText >
>> > {
>> > private final Text text;
>> > public OptimizedText () {
>> > this.text = new Text();
>> > }
>> > public OptimizedText (Text from) {
>> > this.text = from;
>> > }
>> >
>> > public Text getText() {
>> > return text;
>> > }
>> >
>> > @Override
>> > public int getMaxNormalizedKeyLen() {
>> > return 10;
>> > }
>> >
>> > @Override
>> > public void copyNormalizedKey(MemorySegment memory, int offset, int
>> len) {
>> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
>> > Math.min(10, len)));
>> > }
>> >
>> > @Override
>> > public void write(DataOutputView out) throws IOException {
>> > text.write(out);
>> > }
>> >
>> > @Override
>> > public void read(DataInputView in) throws IOException {
>> > text.readFields(in);
>> > }
>> >
>> > @Override
>> > public int compareTo(OptimizedText o) {
>> > return this.text.compareTo(o.text);
>> > }
>> > }
>> >
>> > ---------------------------------------------------------
>> > Converting Text to OptimizedText (Java code)
>> >
>> > map(new MapFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>() {
>> > @Override
>> > public Tuple2<OptimizedText, Text> map(Tuple2<Text, Text> value) {
>> > return new Tuple2<OptimizedText, Text>(new OptimizedText(value.f0),
>> > value.f1);
>> > }
>> > })
>> >
>> >
>> >
>> >
>> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirc...@postech.ac.kr>
>> > wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'd like to share my code for TeraSort on Flink and Spark which uses
>> >> the same range partitioner as Hadoop TeraSort:
>> >> https://github.com/eastcirclek/terasort
>> >>
>> >> I also write a short report on it:
>> >>
>> >>
>> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
>> >> In the blog post, I make a simple performance comparison between
>> >> Flink, Spark, Tez, and MapReduce.
>> >>
>> >> I hope it will be helpful to you guys!
>> >> Thanks.
>> >>
>> >> Dongwon Kim
>> >> Postdoctoral Researcher @ Postech
>> >
>> >
>>
>
>

Reply via email to