[ 
https://issues.apache.org/jira/browse/FLINK-37435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17935021#comment-17935021
 ] 

Kurt Ostfeld edited comment on FLINK-37435 at 3/13/25 12:33 AM:
----------------------------------------------------------------

I created a new benchmark in the flink-benchmarks project with two files:
https://gist.github.com/kurtostfeld/1a6a6cf1a73d85f238fe0522be6f2d43
https://gist.github.com/kurtostfeld/a7e7bdc36a26bfb793c9d01b1a8520d4

I'm not checking this in. You can copy these two source files into the source 
tree and run the benchmark via:

```
mvn package
java -jar target/benchmarks.jar -rf csv 
"org.apache.flink.benchmark.full.KryoBenchmark"
```

It results in (using my laptop with Temurin openjdk 17 distribution):

Benchmark Mode Cnt Score Error Units
KryoBenchmark.readKryoBaseline thrpt 25 534.628 ± 6.197 ops/ms
KryoBenchmark.readKryoVersionB thrpt 25 542.362 ± 7.574 ops/ms
KryoBenchmark.readKryoVersionC thrpt 25 537.827 ± 8.429 ops/ms
KryoBenchmark.readKryoVersionD thrpt 25 816.206 ± 11.167 ops/ms
KryoBenchmark.readKryoVersionE thrpt 25 1255.128 ± 49.761 ops/ms
KryoBenchmark.readKryoVersionF thrpt 25 2251.305 ± 99.973 ops/ms
KryoBenchmark.readKryoVersionG thrpt 25 4069.846 ± 820.285 ops/ms

To explain the results, starting from the slowest baseline benchmark that is 
mirroring PojoSerializationBenchmark.readKryo to the fastest benchmark:

- KryoBenchmark.readKryoBaseline (534.628 ops/ms). This simply mirrors the 
official PojoSerializationBenchmark.readKryo benchmark.
- KryoBenchmark.readKryoVersionB (542.362 ops/ms). This is an expanded for 
clarity version of the baseline benchmark with nearly identical benchmark 
results.
- KryoBenchmark.readKryoVersionC (537.827 ops/ms). This version removes 
unnecessary layers of InputStream wrappers. This provides no performance 
improvement.
- KryoBenchmark.readKryoVersionD (816.206 ops/ms). This version switches from 
NoFetchingInput to OldNoFetchInput which is a near copy/paste of 
NoFetchingInput from before the Kryo upgrade.
- KryoBenchmark.readKryoVersionE (1255.128 ops/ms). This version switches from 
OldNoFetchInput to Input. - - KryoBenchmark.readKryoVersionF (2251.305 ops/ms). 
This switches from the heavily customized Kryo created by Flink KryoSerializer 
to a much simpler Kryo configuration.
- KryoBenchmark.readKryoVersionG (4069.846 ops/ms). This does Input -> byte[] 
where the previous benchmarks do Input -> ByteArrayInputStream -> byte[].

To summarize, that's a ~8x performance difference from the way 
PojoSerializationBenchmark.readKryo works to a more optimized version caused by 
three changes:

1. NoFetchingInput -> OldNoFetchingInput -> Input.
2. Simple Kryo config vs complex Kryo config done by Flink KryoSerializer
3. Input -> byte[] instead of Input -> ByteArrayInputStream -> byte[]


It looks like the OldNoFetchingInput -> NoFetchingInput changes made during the 
Kryo upgrade may have caused the performance drop.

The other changes can make this benchmark much faster, but can't be easily 
dropped-in without bigger architectural changes.


was (Author: JIRAUSER300008):
I created this new benchmark in the flink-benchmarks project:
https://gist.github.com/kurtostfeld/1a6a6cf1a73d85f238fe0522be6f2d43

I'm not checking this in. This is just a single file you can drop-in to the 
source tree and run via:

```
mvn package
java -jar target/benchmarks.jar -rf csv 
"org.apache.flink.benchmark.full.KryoBenchmark"
```

It results in (using my laptop with Temurin openjdk 17 distribution):

Benchmark Mode Cnt Score Error Units
KryoBenchmark.readFlinkKryo5POJOBenchmark thrpt 25 537.134 ± 10.109 ops/ms
KryoBenchmark.readFlinkKryo5vBPOJOBenchmark thrpt 25 536.995 ± 10.380 ops/ms
KryoBenchmark.readFlinkKryo5vCPOJOBenchmark thrpt 25 544.460 ± 6.775 ops/ms
KryoBenchmark.readFlinkKryo5vDPOJOBenchmark thrpt 25 1340.848 ± 22.822 ops/ms
KryoBenchmark.readStreamKryo5POJOBenchmark thrpt 25 2277.788 ± 88.142 ops/ms
KryoBenchmark.readDirectKryo5POJOBenchmark thrpt 25 5649.602 ± 685.249 ops/ms

To explain the results, starting from the slowest benchmark that is mirroring 
PojoSerializationBenchmark.readKryo to the fastest benchmark:

537.134 ops/ms. KryoBenchmark.readFlinkKryo5POJOBenchmark
This is nearly identical to the official PojoSerializationBenchmark.readKryo 
benchmark.

536.995 ops/ms. KryoBenchmark.readFlinkKryo5vBPOJOBenchmark
This is an expanded for clarity version of the first benchmark with nearly 
identical benchmark results.

544.460 ops/ms. KryoBenchmark.readFlinkKryo5vCPOJOBenchmark
This verison removes unnecessary layers of InputStream wrappers. This provides 
negligible performance improvement.

1340.848 ops/ms. KryoBenchmark.readFlinkKryo5vDPOJOBenchmark
Switch from NoFetchingInput to Input. This gives a major performance 
improvement, but we can't do that in KryoSerializer as KryoSerializer needs 
"peek" functionality that NoFetchingInput provides.

2277.788 ops/ms. KryoBenchmark.readStreamKryo5POJOBenchmark
The only difference between this and the previous benchmark is that this uses a 
much simpler Kryo serializer configured for this benchmark and does not have 
the full suite of Flink Kryo serializer options registered. I'm quite surprised 
that this delivers such a large performance improvement.

5649.602 ops/ms. KryoBenchmark.readDirectKryo5POJOBenchmark
This is the fastest benchmark. This is like the last benchmark but this does 
Input -> byte[] where the previous benchmark does Input -> ByteArrayInputStream 
-> byte[].

To summarize, that's a ~10x performance difference from the way 
PojoSerializationBenchmark.readKryo works to a more optimized version caused by 
three changes:
1. NoFetchingInput -> Input.
2. Simple Kryo config vs complex Kryo config done by Flink KryoSerializer
3. Input -> byte[] instead of Input -> ByteArrayInputStream -> byte[]

None of these changes can easily be dropped into Flink without bigger 
architectural changes.

Why is the Kryo upgrade reducing performance? That's the original concern of 
this issue and I'm still not sure.

> Kryo related perf regression since March 5th
> --------------------------------------------
>
>                 Key: FLINK-37435
>                 URL: https://issues.apache.org/jira/browse/FLINK-37435
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System, Benchmarks
>    Affects Versions: 2.0.0
>            Reporter: Zakelly Lan
>            Priority: Major
>         Attachments: image-2025-03-07-12-29-54-443.png, 
> profile-results-after.zip, profile-results-before.zip
>
>
> Seems a obvious regression across all java version.
> http://flink-speed.xyz/timeline/?exe=6%2C12%2C13&base=&ben=readKryo&env=3&revs=200&equid=off&quarts=on&extr=on
> http://flink-speed.xyz/timeline/?exe=6%2C12%2C13&base=&ben=serializerKryo&env=3&revs=200&equid=off&quarts=on&extr=on



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to