Yingjie Cao created FLINK-11859:
-----------------------------------

             Summary: Improve SpanningRecordSerializer performance by 
serializing record length to serialization buffer directly
                 Key: FLINK-11859
                 URL: https://issues.apache.org/jira/browse/FLINK-11859
             Project: Flink
          Issue Type: Improvement
            Reporter: Yingjie Cao
            Assignee: Yingjie Cao


In the current implementation of SpanningRecordSerializer, the length of a 
record is serialized to an intermediate length buffer and then copied to the 
target buffer. Actually, the length filed can be serialized directly to the 
data buffer (serializationBuffer), which can avoid the copy of length buffer. 
Though the total bytes copied remain unchanged, it one copy of a small record 
which incurs high overhead. The flink-benchmarks shows it can improve 
performance and the test results are as follows.

Result with the optimization:
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: 
channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms|
 |MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms|
 |FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms|
 |FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms|
 |ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms|
 |ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms|
 | |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms|
 | |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL|
 |
|SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | |
|WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | |
|WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | |
|WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | |
|WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | |
|StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op|
 | |

Result without the optimization:

 
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: 
channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms|
 |MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms|
 |FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms|
 |FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms|
 |ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms|
 |ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms|
 | |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms|
 | |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|20788.34364|1146.470652|ops/ms|1000,1ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|24412.00941|981.98882|ops/ms|1000,100ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8284.336114|177.482373|ops/ms|1000,100ms,SSL|
 |
|SumLongsBenchmark.benchmarkCount|thrpt|1|30|7846.800667|127.321584|ops/ms| | |
|WindowBenchmarks.globalWindow|thrpt|1|30|4837.270101|94.519852|ops/ms| | |
|WindowBenchmarks.sessionWindow|thrpt|1|30|591.304589|5.324132|ops/ms| | |
|WindowBenchmarks.slidingWindow|thrpt|1|30|446.605784|2.53677|ops/ms| | |
|WindowBenchmarks.tumblingWindow|thrpt|1|30|2878.885056|64.035709|ops/ms| | |
|StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.705601|0.164959|ms/op|
 | |
 
The optimization is especially useful for small records.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to