This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4def99d54fc [SPARK-40912][CORE] Overhead of Exceptions in KryoDeserializationStream 4def99d54fc is described below commit 4def99d54fcb55e80fb4f5f9558af1739b385e6c Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com> AuthorDate: Wed May 10 08:23:07 2023 -0500 [SPARK-40912][CORE] Overhead of Exceptions in KryoDeserializationStream ### What changes were proposed in this pull request? This PR avoid exceptions in the implementation of KryoDeserializationStream. ### Why are the changes needed? Using an exceptions for end of stream is slow, especially for small streams. It also problematic as it the exception caught in the KryoDeserializationStream could also be caused by corrupt data which would just be ignored in the current implementation. ### Does this PR introduce _any_ user-facing change? Yes, it changes so some method on KryoDeserializationStream no longer raises EOFException. ### How was this patch tested? Existing tests. This PR only changes KryoDeserializationStream as a proof of concept. If this is the direction we want to go we should probably change DerserializationStream isntead so that the interface is consistent. Closes #38428 from eejbyfeldt/SPARK-40912. Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- core/benchmarks/KryoBenchmark-jdk11-results.txt | 40 +++---- core/benchmarks/KryoBenchmark-jdk17-results.txt | 36 +++---- core/benchmarks/KryoBenchmark-results.txt | 40 +++---- .../KryoIteratorBenchmark-jdk11-results.txt | 28 +++++ .../KryoIteratorBenchmark-jdk17-results.txt | 28 +++++ core/benchmarks/KryoIteratorBenchmark-results.txt | 28 +++++ .../KryoSerializerBenchmark-jdk11-results.txt | 8 +- .../KryoSerializerBenchmark-jdk17-results.txt | 6 +- .../benchmarks/KryoSerializerBenchmark-results.txt | 8 +- .../apache/spark/serializer/KryoSerializer.scala | 48 ++++++++- .../util/collection/ExternalAppendOnlyMap.scala | 46 +++----- .../spark/serializer/KryoIteratorBenchmark.scala | 120 +++++++++++++++++++++ .../spark/serializer/KryoSerializerSuite.scala | 24 ++++- 13 files changed, 360 insertions(+), 100 deletions(-) diff --git a/core/benchmarks/KryoBenchmark-jdk11-results.txt b/core/benchmarks/KryoBenchmark-jdk11-results.txt index 73e7f15ba22..01269b496e0 100644 --- a/core/benchmarks/KryoBenchmark-jdk11-results.txt +++ b/core/benchmarks/KryoBenchmark-jdk11-results.txt @@ -2,27 +2,27 @@ Benchmark Kryo Unsafe vs safe Serialization ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1031-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1036-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz Benchmark Kryo Unsafe vs safe Serialization: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -basicTypes: Int with unsafe:true 301 319 11 3.3 301.5 1.0X -basicTypes: Long with unsafe:true 337 351 9 3.0 337.2 0.9X -basicTypes: Float with unsafe:true 327 335 6 3.1 327.5 0.9X -basicTypes: Double with unsafe:true 321 336 10 3.1 321.0 0.9X -Array: Int with unsafe:true 4 5 1 245.2 4.1 73.9X -Array: Long with unsafe:true 7 8 1 147.6 6.8 44.5X -Array: Float with unsafe:true 4 5 1 250.4 4.0 75.5X -Array: Double with unsafe:true 7 8 1 144.1 6.9 43.4X -Map of string->Double with unsafe:true 42 46 4 23.8 42.0 7.2X -basicTypes: Int with unsafe:false 347 357 10 2.9 347.4 0.9X -basicTypes: Long with unsafe:false 378 394 10 2.6 378.1 0.8X -basicTypes: Float with unsafe:false 346 359 9 2.9 345.6 0.9X -basicTypes: Double with unsafe:false 350 372 20 2.9 350.3 0.9X -Array: Int with unsafe:false 22 24 2 46.0 21.8 13.9X -Array: Long with unsafe:false 34 37 3 29.1 34.3 8.8X -Array: Float with unsafe:false 10 10 1 103.5 9.7 31.2X -Array: Double with unsafe:false 16 17 1 61.2 16.3 18.5X -Map of string->Double with unsafe:false 44 48 4 22.7 44.1 6.8X +basicTypes: Int with unsafe:true 269 272 2 3.7 269.0 1.0X +basicTypes: Long with unsafe:true 296 301 9 3.4 295.8 0.9X +basicTypes: Float with unsafe:true 300 301 1 3.3 299.6 0.9X +basicTypes: Double with unsafe:true 291 292 1 3.4 291.3 0.9X +Array: Int with unsafe:true 3 3 0 325.9 3.1 87.7X +Array: Long with unsafe:true 5 5 0 216.1 4.6 58.1X +Array: Float with unsafe:true 3 3 0 319.8 3.1 86.0X +Array: Double with unsafe:true 5 5 0 219.1 4.6 58.9X +Map of string->Double with unsafe:true 38 38 0 26.2 38.1 7.1X +basicTypes: Int with unsafe:false 314 319 9 3.2 313.8 0.9X +basicTypes: Long with unsafe:false 335 337 1 3.0 335.4 0.8X +basicTypes: Float with unsafe:false 309 310 1 3.2 309.2 0.9X +basicTypes: Double with unsafe:false 320 323 2 3.1 320.0 0.8X +Array: Int with unsafe:false 18 19 0 54.2 18.4 14.6X +Array: Long with unsafe:false 30 30 1 33.8 29.5 9.1X +Array: Float with unsafe:false 8 8 0 126.0 7.9 33.9X +Array: Double with unsafe:false 14 14 0 72.5 13.8 19.5X +Map of string->Double with unsafe:false 40 40 1 24.9 40.1 6.7X diff --git a/core/benchmarks/KryoBenchmark-jdk17-results.txt b/core/benchmarks/KryoBenchmark-jdk17-results.txt index e0629f7836c..c59f7e1a5e6 100644 --- a/core/benchmarks/KryoBenchmark-jdk17-results.txt +++ b/core/benchmarks/KryoBenchmark-jdk17-results.txt @@ -2,27 +2,27 @@ Benchmark Kryo Unsafe vs safe Serialization ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1031-azure +OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1036-azure Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz Benchmark Kryo Unsafe vs safe Serialization: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -basicTypes: Int with unsafe:true 263 265 2 3.8 262.5 1.0X -basicTypes: Long with unsafe:true 294 295 1 3.4 293.6 0.9X -basicTypes: Float with unsafe:true 280 282 1 3.6 279.7 0.9X -basicTypes: Double with unsafe:true 283 286 2 3.5 282.7 0.9X -Array: Int with unsafe:true 3 3 0 337.9 3.0 88.7X -Array: Long with unsafe:true 5 5 0 210.7 4.7 55.3X -Array: Float with unsafe:true 3 3 0 338.4 3.0 88.8X -Array: Double with unsafe:true 5 5 0 210.8 4.7 55.4X -Map of string->Double with unsafe:true 38 38 0 26.5 37.7 7.0X +basicTypes: Int with unsafe:true 268 270 2 3.7 267.8 1.0X +basicTypes: Long with unsafe:true 292 294 2 3.4 291.6 0.9X +basicTypes: Float with unsafe:true 290 294 3 3.5 289.7 0.9X +basicTypes: Double with unsafe:true 297 300 2 3.4 297.1 0.9X +Array: Int with unsafe:true 3 3 0 365.5 2.7 97.9X +Array: Long with unsafe:true 4 5 0 242.5 4.1 64.9X +Array: Float with unsafe:true 3 3 0 364.5 2.7 97.6X +Array: Double with unsafe:true 4 5 0 238.0 4.2 63.7X +Map of string->Double with unsafe:true 37 38 1 26.9 37.2 7.2X basicTypes: Int with unsafe:false 304 306 1 3.3 304.4 0.9X -basicTypes: Long with unsafe:false 330 333 3 3.0 329.5 0.8X -basicTypes: Float with unsafe:false 301 303 1 3.3 301.3 0.9X -basicTypes: Double with unsafe:false 309 312 2 3.2 308.7 0.9X -Array: Int with unsafe:false 21 21 0 48.3 20.7 12.7X -Array: Long with unsafe:false 31 32 1 31.9 31.4 8.4X -Array: Float with unsafe:false 8 8 0 120.7 8.3 31.7X -Array: Double with unsafe:false 14 14 1 71.4 14.0 18.7X -Map of string->Double with unsafe:false 40 40 1 25.0 40.0 6.6X +basicTypes: Long with unsafe:false 332 335 2 3.0 332.4 0.8X +basicTypes: Float with unsafe:false 301 305 3 3.3 301.0 0.9X +basicTypes: Double with unsafe:false 308 310 2 3.2 308.2 0.9X +Array: Int with unsafe:false 20 21 0 49.7 20.1 13.3X +Array: Long with unsafe:false 31 31 1 32.2 31.1 8.6X +Array: Float with unsafe:false 8 8 0 122.6 8.2 32.8X +Array: Double with unsafe:false 13 14 0 74.5 13.4 20.0X +Map of string->Double with unsafe:false 39 39 1 25.8 38.8 6.9X diff --git a/core/benchmarks/KryoBenchmark-results.txt b/core/benchmarks/KryoBenchmark-results.txt index e6555a281f4..ccc656431e6 100644 --- a/core/benchmarks/KryoBenchmark-results.txt +++ b/core/benchmarks/KryoBenchmark-results.txt @@ -2,27 +2,27 @@ Benchmark Kryo Unsafe vs safe Serialization ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1031-azure -Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz +OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1036-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz Benchmark Kryo Unsafe vs safe Serialization: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -basicTypes: Int with unsafe:true 248 252 5 4.0 248.1 1.0X -basicTypes: Long with unsafe:true 281 284 3 3.6 280.7 0.9X -basicTypes: Float with unsafe:true 266 268 2 3.8 265.9 0.9X -basicTypes: Double with unsafe:true 265 270 3 3.8 265.3 0.9X -Array: Int with unsafe:true 3 3 0 346.2 2.9 85.9X -Array: Long with unsafe:true 5 5 0 216.0 4.6 53.6X -Array: Float with unsafe:true 3 3 0 341.4 2.9 84.7X -Array: Double with unsafe:true 5 5 0 217.8 4.6 54.0X -Map of string->Double with unsafe:true 35 35 0 28.9 34.6 7.2X -basicTypes: Int with unsafe:false 283 285 1 3.5 283.4 0.9X -basicTypes: Long with unsafe:false 302 302 1 3.3 301.6 0.8X -basicTypes: Float with unsafe:false 276 278 3 3.6 275.6 0.9X -basicTypes: Double with unsafe:false 281 282 1 3.6 280.8 0.9X -Array: Int with unsafe:false 21 21 0 48.6 20.6 12.1X -Array: Long with unsafe:false 30 30 0 33.3 30.0 8.3X -Array: Float with unsafe:false 8 8 0 126.6 7.9 31.4X -Array: Double with unsafe:false 15 15 0 68.2 14.7 16.9X -Map of string->Double with unsafe:false 36 37 0 27.4 36.4 6.8X +basicTypes: Int with unsafe:true 239 258 15 4.2 239.4 1.0X +basicTypes: Long with unsafe:true 277 302 18 3.6 276.5 0.9X +basicTypes: Float with unsafe:true 296 304 8 3.4 295.6 0.8X +basicTypes: Double with unsafe:true 281 308 13 3.6 281.0 0.9X +Array: Int with unsafe:true 5 6 1 201.1 5.0 48.1X +Array: Long with unsafe:true 7 8 1 141.6 7.1 33.9X +Array: Float with unsafe:true 5 5 1 218.4 4.6 52.3X +Array: Double with unsafe:true 7 8 1 136.6 7.3 32.7X +Map of string->Double with unsafe:true 43 47 4 23.2 43.1 5.6X +basicTypes: Int with unsafe:false 295 316 13 3.4 294.7 0.8X +basicTypes: Long with unsafe:false 310 329 13 3.2 310.0 0.8X +basicTypes: Float with unsafe:false 283 290 7 3.5 283.0 0.8X +basicTypes: Double with unsafe:false 294 325 17 3.4 293.8 0.8X +Array: Int with unsafe:false 23 25 2 44.1 22.7 10.6X +Array: Long with unsafe:false 34 37 2 29.1 34.4 7.0X +Array: Float with unsafe:false 10 11 1 104.8 9.5 25.1X +Array: Double with unsafe:false 17 19 2 60.0 16.7 14.4X +Map of string->Double with unsafe:false 40 46 3 24.7 40.4 5.9X diff --git a/core/benchmarks/KryoIteratorBenchmark-jdk11-results.txt b/core/benchmarks/KryoIteratorBenchmark-jdk11-results.txt new file mode 100644 index 00000000000..d35d94f2d1d --- /dev/null +++ b/core/benchmarks/KryoIteratorBenchmark-jdk11-results.txt @@ -0,0 +1,28 @@ +================================================================================================ +Benchmark of kryo asIterator on deserialization stream +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1036-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz +Benchmark of kryo asIterator on deserialization stream: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------------- +Colletion of int with 1 elements, useIterator: true 13 13 0 0.8 1262.3 1.0X +Colletion of int with 10 elements, useIterator: true 30 30 1 0.3 2974.4 0.4X +Colletion of int with 100 elements, useIterator: true 190 191 2 0.1 18981.4 0.1X +Colletion of string with 1 elements, useIterator: true 15 15 0 0.7 1497.9 0.8X +Colletion of string with 10 elements, useIterator: true 43 44 1 0.2 4338.8 0.3X +Colletion of string with 100 elements, useIterator: true 308 309 1 0.0 30800.1 0.0X +Colletion of Array[int] with 1 elements, useIterator: true 14 15 0 0.7 1411.7 0.9X +Colletion of Array[int] with 10 elements, useIterator: true 38 39 0 0.3 3798.4 0.3X +Colletion of Array[int] with 100 elements, useIterator: true 283 284 0 0.0 28301.9 0.0X +Colletion of int with 1 elements, useIterator: false 11 12 0 0.9 1146.7 1.1X +Colletion of int with 10 elements, useIterator: false 23 23 0 0.4 2256.0 0.6X +Colletion of int with 100 elements, useIterator: false 126 126 1 0.1 12550.7 0.1X +Colletion of string with 1 elements, useIterator: false 13 14 0 0.7 1341.3 0.9X +Colletion of string with 10 elements, useIterator: false 35 36 1 0.3 3536.6 0.4X +Colletion of string with 100 elements, useIterator: false 243 244 0 0.0 24332.6 0.1X +Colletion of Array[int] with 1 elements, useIterator: false 13 13 0 0.8 1262.6 1.0X +Colletion of Array[int] with 10 elements, useIterator: false 32 32 0 0.3 3155.6 0.4X +Colletion of Array[int] with 100 elements, useIterator: false 219 220 0 0.0 21932.1 0.1X + + diff --git a/core/benchmarks/KryoIteratorBenchmark-jdk17-results.txt b/core/benchmarks/KryoIteratorBenchmark-jdk17-results.txt new file mode 100644 index 00000000000..bff217aca48 --- /dev/null +++ b/core/benchmarks/KryoIteratorBenchmark-jdk17-results.txt @@ -0,0 +1,28 @@ +================================================================================================ +Benchmark of kryo asIterator on deserialization stream +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1036-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +Benchmark of kryo asIterator on deserialization stream: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------------- +Colletion of int with 1 elements, useIterator: true 14 15 0 0.7 1418.2 1.0X +Colletion of int with 10 elements, useIterator: true 26 26 1 0.4 2565.9 0.6X +Colletion of int with 100 elements, useIterator: true 137 138 1 0.1 13720.7 0.1X +Colletion of string with 1 elements, useIterator: true 16 16 0 0.6 1572.0 0.9X +Colletion of string with 10 elements, useIterator: true 38 38 0 0.3 3782.3 0.4X +Colletion of string with 100 elements, useIterator: true 252 253 0 0.0 25193.4 0.1X +Colletion of Array[int] with 1 elements, useIterator: true 15 16 0 0.6 1547.7 0.9X +Colletion of Array[int] with 10 elements, useIterator: true 34 35 1 0.3 3389.1 0.4X +Colletion of Array[int] with 100 elements, useIterator: true 227 229 1 0.0 22733.2 0.1X +Colletion of int with 1 elements, useIterator: false 14 14 0 0.7 1376.0 1.0X +Colletion of int with 10 elements, useIterator: false 24 25 0 0.4 2426.0 0.6X +Colletion of int with 100 elements, useIterator: false 132 132 0 0.1 13204.7 0.1X +Colletion of string with 1 elements, useIterator: false 15 16 0 0.7 1517.3 0.9X +Colletion of string with 10 elements, useIterator: false 37 38 1 0.3 3700.8 0.4X +Colletion of string with 100 elements, useIterator: false 252 253 0 0.0 25234.5 0.1X +Colletion of Array[int] with 1 elements, useIterator: false 15 15 0 0.7 1466.1 1.0X +Colletion of Array[int] with 10 elements, useIterator: false 32 33 0 0.3 3237.5 0.4X +Colletion of Array[int] with 100 elements, useIterator: false 217 218 1 0.0 21706.8 0.1X + + diff --git a/core/benchmarks/KryoIteratorBenchmark-results.txt b/core/benchmarks/KryoIteratorBenchmark-results.txt new file mode 100644 index 00000000000..35e3524a0be --- /dev/null +++ b/core/benchmarks/KryoIteratorBenchmark-results.txt @@ -0,0 +1,28 @@ +================================================================================================ +Benchmark of kryo asIterator on deserialization stream +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1036-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Benchmark of kryo asIterator on deserialization stream: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------------- +Colletion of int with 1 elements, useIterator: true 22 24 2 0.4 2228.3 1.0X +Colletion of int with 10 elements, useIterator: true 30 33 3 0.3 2970.4 0.8X +Colletion of int with 100 elements, useIterator: true 142 147 3 0.1 14249.2 0.2X +Colletion of string with 1 elements, useIterator: true 23 26 2 0.4 2308.2 1.0X +Colletion of string with 10 elements, useIterator: true 40 43 3 0.3 3997.5 0.6X +Colletion of string with 100 elements, useIterator: true 256 263 7 0.0 25550.7 0.1X +Colletion of Array[int] with 1 elements, useIterator: true 22 24 1 0.5 2209.9 1.0X +Colletion of Array[int] with 10 elements, useIterator: true 38 40 1 0.3 3755.1 0.6X +Colletion of Array[int] with 100 elements, useIterator: true 240 251 9 0.0 23978.8 0.1X +Colletion of int with 1 elements, useIterator: false 21 23 1 0.5 2126.5 1.0X +Colletion of int with 10 elements, useIterator: false 27 29 2 0.4 2671.3 0.8X +Colletion of int with 100 elements, useIterator: false 114 118 3 0.1 11413.3 0.2X +Colletion of string with 1 elements, useIterator: false 22 24 2 0.4 2243.3 1.0X +Colletion of string with 10 elements, useIterator: false 38 41 2 0.3 3830.4 0.6X +Colletion of string with 100 elements, useIterator: false 242 258 11 0.0 24223.8 0.1X +Colletion of Array[int] with 1 elements, useIterator: false 22 23 1 0.5 2211.3 1.0X +Colletion of Array[int] with 10 elements, useIterator: false 36 38 2 0.3 3566.2 0.6X +Colletion of Array[int] with 100 elements, useIterator: false 221 225 5 0.0 22054.1 0.1X + + diff --git a/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt b/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt index 60694c6ef25..777dbce11ed 100644 --- a/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt +++ b/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt @@ -2,11 +2,11 @@ Benchmark KryoPool vs old"pool of 1" implementation ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1031-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1036-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz Benchmark KryoPool vs old"pool of 1" implementation: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------- -KryoPool:true 9790 12432 370 0.0 19579752.2 1.0X -KryoPool:false 14512 17607 653 0.0 29023660.8 0.7X +KryoPool:true 8338 10560 NaN 0.0 16675730.9 1.0X +KryoPool:false 12586 14541 329 0.0 25172974.1 0.7X diff --git a/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt b/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt index 8153484ab2f..8c995bc9f9a 100644 --- a/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt +++ b/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt @@ -2,11 +2,11 @@ Benchmark KryoPool vs old"pool of 1" implementation ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1031-azure +OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1036-azure Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz Benchmark KryoPool vs old"pool of 1" implementation: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------- -KryoPool:true 8306 11121 875 0.0 16611043.7 1.0X -KryoPool:false 11855 15605 NaN 0.0 23709932.2 0.7X +KryoPool:true 8230 11345 772 0.0 16459043.1 1.0X +KryoPool:false 12819 15714 NaN 0.0 25637323.3 0.6X diff --git a/core/benchmarks/KryoSerializerBenchmark-results.txt b/core/benchmarks/KryoSerializerBenchmark-results.txt index 0571e7db1c5..e60a95e7cf2 100644 --- a/core/benchmarks/KryoSerializerBenchmark-results.txt +++ b/core/benchmarks/KryoSerializerBenchmark-results.txt @@ -2,11 +2,11 @@ Benchmark KryoPool vs old"pool of 1" implementation ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1031-azure -Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz +OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1036-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz Benchmark KryoPool vs old"pool of 1" implementation: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------- -KryoPool:true 7751 9409 NaN 0.0 15501889.2 1.0X -KryoPool:false 11350 14163 177 0.0 22700046.8 0.7X +KryoPool:true 9955 12622 NaN 0.0 19910361.3 1.0X +KryoPool:false 15141 17852 NaN 0.0 30282157.2 0.7X diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 1736088b498..2bbc15c490c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -45,7 +45,7 @@ import org.apache.spark.internal.io.FileCommitProtocol._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ -import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, NextIterator, SerializableConfiguration, SerializableJobConf, Utils} import org.apache.spark.util.collection.{BitSet, CompactBuffer} import org.apache.spark.util.io.ChunkedByteBuffer @@ -306,6 +306,16 @@ class KryoDeserializationStream( private[this] var kryo: Kryo = serInstance.borrowKryo() + private[this] def hasNext: Boolean = { + if (input == null) { + return false + } + + val eof = input.eof() + if (eof) close() + !eof + } + override def readObject[T: ClassTag](): T = { try { kryo.readClassAndObject(input).asInstanceOf[T] @@ -329,6 +339,42 @@ class KryoDeserializationStream( } } } + + final override def asIterator: Iterator[Any] = new NextIterator[Any] { + override protected def getNext(): Any = { + if (KryoDeserializationStream.this.hasNext) { + try { + return readObject[Any]() + } catch { + case eof: EOFException => + } + } + finished = true + null + } + + override protected def close(): Unit = { + KryoDeserializationStream.this.close() + } + } + + final override def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] { + override protected def getNext(): (Any, Any) = { + if (KryoDeserializationStream.this.hasNext) { + try { + return (readKey[Any](), readValue[Any]()) + } catch { + case eof: EOFException => + } + } + finished = true + null + } + + override protected def close(): Unit = { + KryoDeserializationStream.this.close() + } + } } private[spark] class KryoSerializerInstance( diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index f24c44b2f84..93efaafa43b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -460,13 +460,13 @@ class ExternalAppendOnlyMap[K, V, C]( // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams private var deserializeStream: DeserializationStream = null - private var nextItem: (K, C) = null + private var batchIterator: Iterator[(K, C)] = null private var objectsRead = 0 /** * Construct a stream that reads only from the next batch. */ - private def nextBatchStream(): DeserializationStream = { + private def nextBatchIterator(): Iterator[(K, C)] = { // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether // we're still in a valid batch. if (batchIndex < batchOffsets.length - 1) { @@ -489,7 +489,8 @@ class ExternalAppendOnlyMap[K, V, C]( val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream) - ser.deserializeStream(wrappedStream) + deserializeStream = ser.deserializeStream(wrappedStream) + deserializeStream.asKeyValueIterator.asInstanceOf[Iterator[(K, C)]] } else { // No more batches left cleanup() @@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C]( * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { - try { - val k = deserializeStream.readKey().asInstanceOf[K] - val c = deserializeStream.readValue().asInstanceOf[C] - val item = (k, c) - objectsRead += 1 - if (objectsRead == serializerBatchSize) { - objectsRead = 0 - deserializeStream = nextBatchStream() - } - item - } catch { - case e: EOFException => - cleanup() - null + val item = batchIterator.next() + objectsRead += 1 + if (objectsRead == serializerBatchSize) { + objectsRead = 0 + batchIterator = nextBatchIterator() } + item } override def hasNext: Boolean = { - if (nextItem == null) { - if (deserializeStream == null) { - // In case of deserializeStream has not been initialized - deserializeStream = nextBatchStream() - if (deserializeStream == null) { - return false - } + if (batchIterator == null) { + // In case of batchIterator has not been initialized + batchIterator = nextBatchIterator() + if (batchIterator == null) { + return false } - nextItem = readNextItem() } - nextItem != null + batchIterator.hasNext } override def next(): (K, C) = { if (!hasNext) { throw new NoSuchElementException } - val item = nextItem - nextItem = null - item + readNextItem() } private def cleanup(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala new file mode 100644 index 00000000000..5de1a12ffe0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import scala.reflect.ClassTag +import scala.util.Random + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Kryo._ +import org.apache.spark.serializer.KryoTest._ + +/** + * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> <spark core test jar> + * 2. build/sbt "core/Test/runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>" + * Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt". + * }}} + */ +object KryoIteratorBenchmark extends BenchmarkBase { + val N = 10000 + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val name = "Benchmark of kryo asIterator on deserialization stream" + runBenchmark(name) { + val benchmark = new Benchmark(name, N, 10, output = output) + Seq(true, false).map(useIterator => run(useIterator, benchmark)) + benchmark.run() + } + } + + private def run(useIterator: Boolean, benchmark: Benchmark): Unit = { + val ser = createSerializer() + + def roundTrip[T: ClassTag]( + elements: Array[T], + useIterator: Boolean, + ser: SerializerInstance): Int = { + val serialized: Array[Byte] = { + val baos = new ByteArrayOutputStream() + val serStream = ser.serializeStream(baos) + var i = 0 + while (i < elements.length) { + serStream.writeObject(elements(i)) + i += 1 + } + serStream.close() + baos.toByteArray + } + + val deserStream = ser.deserializeStream(new ByteArrayInputStream(serialized)) + if (useIterator) { + if (deserStream.asIterator.toArray.length == elements.length) 1 else 0 + } else { + val res = new Array[T](elements.length) + var i = 0 + while (i < elements.length) { + res(i) = deserStream.readValue() + i += 1 + } + deserStream.close() + if (res.length == elements.length) 1 else 0 + } + } + + def createCase[T: ClassTag](name: String, elementCount: Int, createElement: => T): Unit = { + val elements = Array.fill[T](elementCount)(createElement) + + benchmark.addCase( + s"Colletion of $name with $elementCount elements, useIterator: $useIterator") { _ => + var sum = 0L + var i = 0 + while (i < N) { + sum += roundTrip(elements, useIterator, ser) + i += 1 + } + sum + } + } + + createCase("int", 1, Random.nextInt) + createCase("int", 10, Random.nextInt) + createCase("int", 100, Random.nextInt) + createCase("string", 1, Random.nextString(5)) + createCase("string", 10, Random.nextString(5)) + createCase("string", 100, Random.nextString(5)) + createCase("Array[int]", 1, Array.fill(10)(Random.nextInt)) + createCase("Array[int]", 10, Array.fill(10)(Random.nextInt)) + createCase("Array[int]", 100, Array.fill(10)(Random.nextInt)) + } + + def createSerializer(): SerializerInstance = { + val conf = new SparkConf() + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName)) + + new KryoSerializer(conf).newInstance() + } +} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 260a5b29235..1d4f61cfde9 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.serializer -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, EOFException} import java.nio.ByteBuffer import java.util.concurrent.Executors @@ -598,6 +598,28 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar assert(serInstance.deserialize[Any](serObj) === (obj)) assert(serInstance.deserialize[Any](byteBuffer) === (obj)) } + + test("SPARK-40912: Ignore unexpectedly truncated buffer") { + // This test checks that the improvement in SPARK-40912 does not break backwards compatabillity. + // But the behvaior of the asIterator iterface of silently ignoring trucated data should be + // revisited in a follow up ticket. + val serInstance = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance] + val serialized: Array[Byte] = { + val baos = new ByteArrayOutputStream() + val serStream = serInstance.serializeStream(baos) + serStream.writeObject(KryoTest.CaseClass(0, "")) + serStream.close() + baos.toByteArray + } + // Make sure we disregard some data + assert(serialized.length > 2) + val trucated = serialized.take(2).toArray + val deserializationStream = serInstance.deserializeStream(new ByteArrayInputStream(trucated)) + intercept[EOFException]( + serInstance.deserializeStream(new ByteArrayInputStream(trucated)).readValue() + ) + assert(deserializationStream.asIterator.toSeq == Seq()) + } } class ClassLoaderTestingObject --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org