This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4def99d54fc [SPARK-40912][CORE] Overhead of Exceptions in 
KryoDeserializationStream
4def99d54fc is described below

commit 4def99d54fcb55e80fb4f5f9558af1739b385e6c
Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
AuthorDate: Wed May 10 08:23:07 2023 -0500

    [SPARK-40912][CORE] Overhead of Exceptions in KryoDeserializationStream
    
    ### What changes were proposed in this pull request?
    This PR avoid exceptions in the implementation of KryoDeserializationStream.
    
    ### Why are the changes needed?
    Using an exceptions for end of stream is slow, especially for small 
streams. It also problematic as it the exception caught in the 
KryoDeserializationStream could also be caused by corrupt data which would just 
be ignored in the current implementation.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it changes so some method on KryoDeserializationStream no longer 
raises EOFException.
    
    ### How was this patch tested?
    Existing tests.
    
    This PR only changes KryoDeserializationStream as a proof of concept. If 
this is the direction we want to go we should probably change 
DerserializationStream isntead so that the interface is consistent.
    
    Closes #38428 from eejbyfeldt/SPARK-40912.
    
    Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 core/benchmarks/KryoBenchmark-jdk11-results.txt    |  40 +++----
 core/benchmarks/KryoBenchmark-jdk17-results.txt    |  36 +++----
 core/benchmarks/KryoBenchmark-results.txt          |  40 +++----
 .../KryoIteratorBenchmark-jdk11-results.txt        |  28 +++++
 .../KryoIteratorBenchmark-jdk17-results.txt        |  28 +++++
 core/benchmarks/KryoIteratorBenchmark-results.txt  |  28 +++++
 .../KryoSerializerBenchmark-jdk11-results.txt      |   8 +-
 .../KryoSerializerBenchmark-jdk17-results.txt      |   6 +-
 .../benchmarks/KryoSerializerBenchmark-results.txt |   8 +-
 .../apache/spark/serializer/KryoSerializer.scala   |  48 ++++++++-
 .../util/collection/ExternalAppendOnlyMap.scala    |  46 +++-----
 .../spark/serializer/KryoIteratorBenchmark.scala   | 120 +++++++++++++++++++++
 .../spark/serializer/KryoSerializerSuite.scala     |  24 ++++-
 13 files changed, 360 insertions(+), 100 deletions(-)

diff --git a/core/benchmarks/KryoBenchmark-jdk11-results.txt 
b/core/benchmarks/KryoBenchmark-jdk11-results.txt
index 73e7f15ba22..01269b496e0 100644
--- a/core/benchmarks/KryoBenchmark-jdk11-results.txt
+++ b/core/benchmarks/KryoBenchmark-jdk11-results.txt
@@ -2,27 +2,27 @@
 Benchmark Kryo Unsafe vs safe Serialization
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1031-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1036-azure
+Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
 Benchmark Kryo Unsafe vs safe Serialization:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-basicTypes: Int with unsafe:true                       301            319      
    11          3.3         301.5       1.0X
-basicTypes: Long with unsafe:true                      337            351      
     9          3.0         337.2       0.9X
-basicTypes: Float with unsafe:true                     327            335      
     6          3.1         327.5       0.9X
-basicTypes: Double with unsafe:true                    321            336      
    10          3.1         321.0       0.9X
-Array: Int with unsafe:true                              4              5      
     1        245.2           4.1      73.9X
-Array: Long with unsafe:true                             7              8      
     1        147.6           6.8      44.5X
-Array: Float with unsafe:true                            4              5      
     1        250.4           4.0      75.5X
-Array: Double with unsafe:true                           7              8      
     1        144.1           6.9      43.4X
-Map of string->Double  with unsafe:true                 42             46      
     4         23.8          42.0       7.2X
-basicTypes: Int with unsafe:false                      347            357      
    10          2.9         347.4       0.9X
-basicTypes: Long with unsafe:false                     378            394      
    10          2.6         378.1       0.8X
-basicTypes: Float with unsafe:false                    346            359      
     9          2.9         345.6       0.9X
-basicTypes: Double with unsafe:false                   350            372      
    20          2.9         350.3       0.9X
-Array: Int with unsafe:false                            22             24      
     2         46.0          21.8      13.9X
-Array: Long with unsafe:false                           34             37      
     3         29.1          34.3       8.8X
-Array: Float with unsafe:false                          10             10      
     1        103.5           9.7      31.2X
-Array: Double with unsafe:false                         16             17      
     1         61.2          16.3      18.5X
-Map of string->Double  with unsafe:false                44             48      
     4         22.7          44.1       6.8X
+basicTypes: Int with unsafe:true                       269            272      
     2          3.7         269.0       1.0X
+basicTypes: Long with unsafe:true                      296            301      
     9          3.4         295.8       0.9X
+basicTypes: Float with unsafe:true                     300            301      
     1          3.3         299.6       0.9X
+basicTypes: Double with unsafe:true                    291            292      
     1          3.4         291.3       0.9X
+Array: Int with unsafe:true                              3              3      
     0        325.9           3.1      87.7X
+Array: Long with unsafe:true                             5              5      
     0        216.1           4.6      58.1X
+Array: Float with unsafe:true                            3              3      
     0        319.8           3.1      86.0X
+Array: Double with unsafe:true                           5              5      
     0        219.1           4.6      58.9X
+Map of string->Double  with unsafe:true                 38             38      
     0         26.2          38.1       7.1X
+basicTypes: Int with unsafe:false                      314            319      
     9          3.2         313.8       0.9X
+basicTypes: Long with unsafe:false                     335            337      
     1          3.0         335.4       0.8X
+basicTypes: Float with unsafe:false                    309            310      
     1          3.2         309.2       0.9X
+basicTypes: Double with unsafe:false                   320            323      
     2          3.1         320.0       0.8X
+Array: Int with unsafe:false                            18             19      
     0         54.2          18.4      14.6X
+Array: Long with unsafe:false                           30             30      
     1         33.8          29.5       9.1X
+Array: Float with unsafe:false                           8              8      
     0        126.0           7.9      33.9X
+Array: Double with unsafe:false                         14             14      
     0         72.5          13.8      19.5X
+Map of string->Double  with unsafe:false                40             40      
     1         24.9          40.1       6.7X
 
 
diff --git a/core/benchmarks/KryoBenchmark-jdk17-results.txt 
b/core/benchmarks/KryoBenchmark-jdk17-results.txt
index e0629f7836c..c59f7e1a5e6 100644
--- a/core/benchmarks/KryoBenchmark-jdk17-results.txt
+++ b/core/benchmarks/KryoBenchmark-jdk17-results.txt
@@ -2,27 +2,27 @@
 Benchmark Kryo Unsafe vs safe Serialization
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1031-azure
+OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1036-azure
 Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Benchmark Kryo Unsafe vs safe Serialization:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-basicTypes: Int with unsafe:true                       263            265      
     2          3.8         262.5       1.0X
-basicTypes: Long with unsafe:true                      294            295      
     1          3.4         293.6       0.9X
-basicTypes: Float with unsafe:true                     280            282      
     1          3.6         279.7       0.9X
-basicTypes: Double with unsafe:true                    283            286      
     2          3.5         282.7       0.9X
-Array: Int with unsafe:true                              3              3      
     0        337.9           3.0      88.7X
-Array: Long with unsafe:true                             5              5      
     0        210.7           4.7      55.3X
-Array: Float with unsafe:true                            3              3      
     0        338.4           3.0      88.8X
-Array: Double with unsafe:true                           5              5      
     0        210.8           4.7      55.4X
-Map of string->Double  with unsafe:true                 38             38      
     0         26.5          37.7       7.0X
+basicTypes: Int with unsafe:true                       268            270      
     2          3.7         267.8       1.0X
+basicTypes: Long with unsafe:true                      292            294      
     2          3.4         291.6       0.9X
+basicTypes: Float with unsafe:true                     290            294      
     3          3.5         289.7       0.9X
+basicTypes: Double with unsafe:true                    297            300      
     2          3.4         297.1       0.9X
+Array: Int with unsafe:true                              3              3      
     0        365.5           2.7      97.9X
+Array: Long with unsafe:true                             4              5      
     0        242.5           4.1      64.9X
+Array: Float with unsafe:true                            3              3      
     0        364.5           2.7      97.6X
+Array: Double with unsafe:true                           4              5      
     0        238.0           4.2      63.7X
+Map of string->Double  with unsafe:true                 37             38      
     1         26.9          37.2       7.2X
 basicTypes: Int with unsafe:false                      304            306      
     1          3.3         304.4       0.9X
-basicTypes: Long with unsafe:false                     330            333      
     3          3.0         329.5       0.8X
-basicTypes: Float with unsafe:false                    301            303      
     1          3.3         301.3       0.9X
-basicTypes: Double with unsafe:false                   309            312      
     2          3.2         308.7       0.9X
-Array: Int with unsafe:false                            21             21      
     0         48.3          20.7      12.7X
-Array: Long with unsafe:false                           31             32      
     1         31.9          31.4       8.4X
-Array: Float with unsafe:false                           8              8      
     0        120.7           8.3      31.7X
-Array: Double with unsafe:false                         14             14      
     1         71.4          14.0      18.7X
-Map of string->Double  with unsafe:false                40             40      
     1         25.0          40.0       6.6X
+basicTypes: Long with unsafe:false                     332            335      
     2          3.0         332.4       0.8X
+basicTypes: Float with unsafe:false                    301            305      
     3          3.3         301.0       0.9X
+basicTypes: Double with unsafe:false                   308            310      
     2          3.2         308.2       0.9X
+Array: Int with unsafe:false                            20             21      
     0         49.7          20.1      13.3X
+Array: Long with unsafe:false                           31             31      
     1         32.2          31.1       8.6X
+Array: Float with unsafe:false                           8              8      
     0        122.6           8.2      32.8X
+Array: Double with unsafe:false                         13             14      
     0         74.5          13.4      20.0X
+Map of string->Double  with unsafe:false                39             39      
     1         25.8          38.8       6.9X
 
 
diff --git a/core/benchmarks/KryoBenchmark-results.txt 
b/core/benchmarks/KryoBenchmark-results.txt
index e6555a281f4..ccc656431e6 100644
--- a/core/benchmarks/KryoBenchmark-results.txt
+++ b/core/benchmarks/KryoBenchmark-results.txt
@@ -2,27 +2,27 @@
 Benchmark Kryo Unsafe vs safe Serialization
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1031-azure
-Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
+OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1036-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 Benchmark Kryo Unsafe vs safe Serialization:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-basicTypes: Int with unsafe:true                       248            252      
     5          4.0         248.1       1.0X
-basicTypes: Long with unsafe:true                      281            284      
     3          3.6         280.7       0.9X
-basicTypes: Float with unsafe:true                     266            268      
     2          3.8         265.9       0.9X
-basicTypes: Double with unsafe:true                    265            270      
     3          3.8         265.3       0.9X
-Array: Int with unsafe:true                              3              3      
     0        346.2           2.9      85.9X
-Array: Long with unsafe:true                             5              5      
     0        216.0           4.6      53.6X
-Array: Float with unsafe:true                            3              3      
     0        341.4           2.9      84.7X
-Array: Double with unsafe:true                           5              5      
     0        217.8           4.6      54.0X
-Map of string->Double  with unsafe:true                 35             35      
     0         28.9          34.6       7.2X
-basicTypes: Int with unsafe:false                      283            285      
     1          3.5         283.4       0.9X
-basicTypes: Long with unsafe:false                     302            302      
     1          3.3         301.6       0.8X
-basicTypes: Float with unsafe:false                    276            278      
     3          3.6         275.6       0.9X
-basicTypes: Double with unsafe:false                   281            282      
     1          3.6         280.8       0.9X
-Array: Int with unsafe:false                            21             21      
     0         48.6          20.6      12.1X
-Array: Long with unsafe:false                           30             30      
     0         33.3          30.0       8.3X
-Array: Float with unsafe:false                           8              8      
     0        126.6           7.9      31.4X
-Array: Double with unsafe:false                         15             15      
     0         68.2          14.7      16.9X
-Map of string->Double  with unsafe:false                36             37      
     0         27.4          36.4       6.8X
+basicTypes: Int with unsafe:true                       239            258      
    15          4.2         239.4       1.0X
+basicTypes: Long with unsafe:true                      277            302      
    18          3.6         276.5       0.9X
+basicTypes: Float with unsafe:true                     296            304      
     8          3.4         295.6       0.8X
+basicTypes: Double with unsafe:true                    281            308      
    13          3.6         281.0       0.9X
+Array: Int with unsafe:true                              5              6      
     1        201.1           5.0      48.1X
+Array: Long with unsafe:true                             7              8      
     1        141.6           7.1      33.9X
+Array: Float with unsafe:true                            5              5      
     1        218.4           4.6      52.3X
+Array: Double with unsafe:true                           7              8      
     1        136.6           7.3      32.7X
+Map of string->Double  with unsafe:true                 43             47      
     4         23.2          43.1       5.6X
+basicTypes: Int with unsafe:false                      295            316      
    13          3.4         294.7       0.8X
+basicTypes: Long with unsafe:false                     310            329      
    13          3.2         310.0       0.8X
+basicTypes: Float with unsafe:false                    283            290      
     7          3.5         283.0       0.8X
+basicTypes: Double with unsafe:false                   294            325      
    17          3.4         293.8       0.8X
+Array: Int with unsafe:false                            23             25      
     2         44.1          22.7      10.6X
+Array: Long with unsafe:false                           34             37      
     2         29.1          34.4       7.0X
+Array: Float with unsafe:false                          10             11      
     1        104.8           9.5      25.1X
+Array: Double with unsafe:false                         17             19      
     2         60.0          16.7      14.4X
+Map of string->Double  with unsafe:false                40             46      
     3         24.7          40.4       5.9X
 
 
diff --git a/core/benchmarks/KryoIteratorBenchmark-jdk11-results.txt 
b/core/benchmarks/KryoIteratorBenchmark-jdk11-results.txt
new file mode 100644
index 00000000000..d35d94f2d1d
--- /dev/null
+++ b/core/benchmarks/KryoIteratorBenchmark-jdk11-results.txt
@@ -0,0 +1,28 @@
+================================================================================================
+Benchmark of kryo asIterator on deserialization stream
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1036-azure
+Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
+Benchmark of kryo asIterator on deserialization stream:        Best Time(ms)   
Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------------------------
+Colletion of int with 1 elements, useIterator: true                       13   
          13           0          0.8        1262.3       1.0X
+Colletion of int with 10 elements, useIterator: true                      30   
          30           1          0.3        2974.4       0.4X
+Colletion of int with 100 elements, useIterator: true                    190   
         191           2          0.1       18981.4       0.1X
+Colletion of string with 1 elements, useIterator: true                    15   
          15           0          0.7        1497.9       0.8X
+Colletion of string with 10 elements, useIterator: true                   43   
          44           1          0.2        4338.8       0.3X
+Colletion of string with 100 elements, useIterator: true                 308   
         309           1          0.0       30800.1       0.0X
+Colletion of Array[int] with 1 elements, useIterator: true                14   
          15           0          0.7        1411.7       0.9X
+Colletion of Array[int] with 10 elements, useIterator: true               38   
          39           0          0.3        3798.4       0.3X
+Colletion of Array[int] with 100 elements, useIterator: true             283   
         284           0          0.0       28301.9       0.0X
+Colletion of int with 1 elements, useIterator: false                      11   
          12           0          0.9        1146.7       1.1X
+Colletion of int with 10 elements, useIterator: false                     23   
          23           0          0.4        2256.0       0.6X
+Colletion of int with 100 elements, useIterator: false                   126   
         126           1          0.1       12550.7       0.1X
+Colletion of string with 1 elements, useIterator: false                   13   
          14           0          0.7        1341.3       0.9X
+Colletion of string with 10 elements, useIterator: false                  35   
          36           1          0.3        3536.6       0.4X
+Colletion of string with 100 elements, useIterator: false                243   
         244           0          0.0       24332.6       0.1X
+Colletion of Array[int] with 1 elements, useIterator: false               13   
          13           0          0.8        1262.6       1.0X
+Colletion of Array[int] with 10 elements, useIterator: false              32   
          32           0          0.3        3155.6       0.4X
+Colletion of Array[int] with 100 elements, useIterator: false            219   
         220           0          0.0       21932.1       0.1X
+
+
diff --git a/core/benchmarks/KryoIteratorBenchmark-jdk17-results.txt 
b/core/benchmarks/KryoIteratorBenchmark-jdk17-results.txt
new file mode 100644
index 00000000000..bff217aca48
--- /dev/null
+++ b/core/benchmarks/KryoIteratorBenchmark-jdk17-results.txt
@@ -0,0 +1,28 @@
+================================================================================================
+Benchmark of kryo asIterator on deserialization stream
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1036-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Benchmark of kryo asIterator on deserialization stream:        Best Time(ms)   
Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------------------------
+Colletion of int with 1 elements, useIterator: true                       14   
          15           0          0.7        1418.2       1.0X
+Colletion of int with 10 elements, useIterator: true                      26   
          26           1          0.4        2565.9       0.6X
+Colletion of int with 100 elements, useIterator: true                    137   
         138           1          0.1       13720.7       0.1X
+Colletion of string with 1 elements, useIterator: true                    16   
          16           0          0.6        1572.0       0.9X
+Colletion of string with 10 elements, useIterator: true                   38   
          38           0          0.3        3782.3       0.4X
+Colletion of string with 100 elements, useIterator: true                 252   
         253           0          0.0       25193.4       0.1X
+Colletion of Array[int] with 1 elements, useIterator: true                15   
          16           0          0.6        1547.7       0.9X
+Colletion of Array[int] with 10 elements, useIterator: true               34   
          35           1          0.3        3389.1       0.4X
+Colletion of Array[int] with 100 elements, useIterator: true             227   
         229           1          0.0       22733.2       0.1X
+Colletion of int with 1 elements, useIterator: false                      14   
          14           0          0.7        1376.0       1.0X
+Colletion of int with 10 elements, useIterator: false                     24   
          25           0          0.4        2426.0       0.6X
+Colletion of int with 100 elements, useIterator: false                   132   
         132           0          0.1       13204.7       0.1X
+Colletion of string with 1 elements, useIterator: false                   15   
          16           0          0.7        1517.3       0.9X
+Colletion of string with 10 elements, useIterator: false                  37   
          38           1          0.3        3700.8       0.4X
+Colletion of string with 100 elements, useIterator: false                252   
         253           0          0.0       25234.5       0.1X
+Colletion of Array[int] with 1 elements, useIterator: false               15   
          15           0          0.7        1466.1       1.0X
+Colletion of Array[int] with 10 elements, useIterator: false              32   
          33           0          0.3        3237.5       0.4X
+Colletion of Array[int] with 100 elements, useIterator: false            217   
         218           1          0.0       21706.8       0.1X
+
+
diff --git a/core/benchmarks/KryoIteratorBenchmark-results.txt 
b/core/benchmarks/KryoIteratorBenchmark-results.txt
new file mode 100644
index 00000000000..35e3524a0be
--- /dev/null
+++ b/core/benchmarks/KryoIteratorBenchmark-results.txt
@@ -0,0 +1,28 @@
+================================================================================================
+Benchmark of kryo asIterator on deserialization stream
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1036-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Benchmark of kryo asIterator on deserialization stream:        Best Time(ms)   
Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------------------------
+Colletion of int with 1 elements, useIterator: true                       22   
          24           2          0.4        2228.3       1.0X
+Colletion of int with 10 elements, useIterator: true                      30   
          33           3          0.3        2970.4       0.8X
+Colletion of int with 100 elements, useIterator: true                    142   
         147           3          0.1       14249.2       0.2X
+Colletion of string with 1 elements, useIterator: true                    23   
          26           2          0.4        2308.2       1.0X
+Colletion of string with 10 elements, useIterator: true                   40   
          43           3          0.3        3997.5       0.6X
+Colletion of string with 100 elements, useIterator: true                 256   
         263           7          0.0       25550.7       0.1X
+Colletion of Array[int] with 1 elements, useIterator: true                22   
          24           1          0.5        2209.9       1.0X
+Colletion of Array[int] with 10 elements, useIterator: true               38   
          40           1          0.3        3755.1       0.6X
+Colletion of Array[int] with 100 elements, useIterator: true             240   
         251           9          0.0       23978.8       0.1X
+Colletion of int with 1 elements, useIterator: false                      21   
          23           1          0.5        2126.5       1.0X
+Colletion of int with 10 elements, useIterator: false                     27   
          29           2          0.4        2671.3       0.8X
+Colletion of int with 100 elements, useIterator: false                   114   
         118           3          0.1       11413.3       0.2X
+Colletion of string with 1 elements, useIterator: false                   22   
          24           2          0.4        2243.3       1.0X
+Colletion of string with 10 elements, useIterator: false                  38   
          41           2          0.3        3830.4       0.6X
+Colletion of string with 100 elements, useIterator: false                242   
         258          11          0.0       24223.8       0.1X
+Colletion of Array[int] with 1 elements, useIterator: false               22   
          23           1          0.5        2211.3       1.0X
+Colletion of Array[int] with 10 elements, useIterator: false              36   
          38           2          0.3        3566.2       0.6X
+Colletion of Array[int] with 100 elements, useIterator: false            221   
         225           5          0.0       22054.1       0.1X
+
+
diff --git a/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt 
b/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt
index 60694c6ef25..777dbce11ed 100644
--- a/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt
+++ b/core/benchmarks/KryoSerializerBenchmark-jdk11-results.txt
@@ -2,11 +2,11 @@
 Benchmark KryoPool vs old"pool of 1" implementation
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1031-azure
-Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1036-azure
+Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
 Benchmark KryoPool vs old"pool of 1" implementation:  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------------
-KryoPool:true                                                 9790          
12432         370          0.0    19579752.2       1.0X
-KryoPool:false                                               14512          
17607         653          0.0    29023660.8       0.7X
+KryoPool:true                                                 8338          
10560         NaN          0.0    16675730.9       1.0X
+KryoPool:false                                               12586          
14541         329          0.0    25172974.1       0.7X
 
 
diff --git a/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt 
b/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt
index 8153484ab2f..8c995bc9f9a 100644
--- a/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt
+++ b/core/benchmarks/KryoSerializerBenchmark-jdk17-results.txt
@@ -2,11 +2,11 @@
 Benchmark KryoPool vs old"pool of 1" implementation
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1031-azure
+OpenJDK 64-Bit Server VM 17.0.6+10 on Linux 5.15.0-1036-azure
 Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Benchmark KryoPool vs old"pool of 1" implementation:  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------------
-KryoPool:true                                                 8306          
11121         875          0.0    16611043.7       1.0X
-KryoPool:false                                               11855          
15605         NaN          0.0    23709932.2       0.7X
+KryoPool:true                                                 8230          
11345         772          0.0    16459043.1       1.0X
+KryoPool:false                                               12819          
15714         NaN          0.0    25637323.3       0.6X
 
 
diff --git a/core/benchmarks/KryoSerializerBenchmark-results.txt 
b/core/benchmarks/KryoSerializerBenchmark-results.txt
index 0571e7db1c5..e60a95e7cf2 100644
--- a/core/benchmarks/KryoSerializerBenchmark-results.txt
+++ b/core/benchmarks/KryoSerializerBenchmark-results.txt
@@ -2,11 +2,11 @@
 Benchmark KryoPool vs old"pool of 1" implementation
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1031-azure
-Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
+OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1036-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 Benchmark KryoPool vs old"pool of 1" implementation:  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------------
-KryoPool:true                                                 7751           
9409         NaN          0.0    15501889.2       1.0X
-KryoPool:false                                               11350          
14163         177          0.0    22700046.8       0.7X
+KryoPool:true                                                 9955          
12622         NaN          0.0    19910361.3       1.0X
+KryoPool:false                                               15141          
17852         NaN          0.0    30282157.2       0.7X
 
 
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1736088b498..2bbc15c490c 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -45,7 +45,7 @@ import org.apache.spark.internal.io.FileCommitProtocol._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus}
 import org.apache.spark.storage._
-import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, 
SerializableConfiguration, SerializableJobConf, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, 
NextIterator, SerializableConfiguration, SerializableJobConf, Utils}
 import org.apache.spark.util.collection.{BitSet, CompactBuffer}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
@@ -306,6 +306,16 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  private[this] def hasNext: Boolean = {
+    if (input == null) {
+      return false
+    }
+
+    val eof = input.eof()
+    if (eof) close()
+    !eof
+  }
+
   override def readObject[T: ClassTag](): T = {
     try {
       kryo.readClassAndObject(input).asInstanceOf[T]
@@ -329,6 +339,42 @@ class KryoDeserializationStream(
       }
     }
   }
+
+  final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext(): Any = {
+      if (KryoDeserializationStream.this.hasNext) {
+        try {
+          return readObject[Any]()
+        } catch {
+          case eof: EOFException =>
+        }
+      }
+      finished = true
+      null
+    }
+
+    override protected def close(): Unit = {
+      KryoDeserializationStream.this.close()
+    }
+  }
+
+  final override def asKeyValueIterator: Iterator[(Any, Any)] = new 
NextIterator[(Any, Any)] {
+    override protected def getNext(): (Any, Any) = {
+      if (KryoDeserializationStream.this.hasNext) {
+        try {
+          return (readKey[Any](), readValue[Any]())
+        } catch {
+          case eof: EOFException =>
+        }
+      }
+      finished = true
+      null
+    }
+
+    override protected def close(): Unit = {
+      KryoDeserializationStream.this.close()
+    }
+  }
 }
 
 private[spark] class KryoSerializerInstance(
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index f24c44b2f84..93efaafa43b 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -460,13 +460,13 @@ class ExternalAppendOnlyMap[K, V, C](
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
     private var deserializeStream: DeserializationStream = null
-    private var nextItem: (K, C) = null
+    private var batchIterator: Iterator[(K, C)] = null
     private var objectsRead = 0
 
     /**
      * Construct a stream that reads only from the next batch.
      */
-    private def nextBatchStream(): DeserializationStream = {
+    private def nextBatchIterator(): Iterator[(K, C)] = {
       // Note that batchOffsets.length = numBatches + 1 since we did a scan 
above; check whether
       // we're still in a valid batch.
       if (batchIndex < batchOffsets.length - 1) {
@@ -489,7 +489,8 @@ class ExternalAppendOnlyMap[K, V, C](
 
         val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
         val wrappedStream = serializerManager.wrapStream(blockId, 
bufferedStream)
-        ser.deserializeStream(wrappedStream)
+        deserializeStream = ser.deserializeStream(wrappedStream)
+        deserializeStream.asKeyValueIterator.asInstanceOf[Iterator[(K, C)]]
       } else {
         // No more batches left
         cleanup()
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      try {
-        val k = deserializeStream.readKey().asInstanceOf[K]
-        val c = deserializeStream.readValue().asInstanceOf[C]
-        val item = (k, c)
-        objectsRead += 1
-        if (objectsRead == serializerBatchSize) {
-          objectsRead = 0
-          deserializeStream = nextBatchStream()
-        }
-        item
-      } catch {
-        case e: EOFException =>
-          cleanup()
-          null
+      val item = batchIterator.next()
+      objectsRead += 1
+      if (objectsRead == serializerBatchSize) {
+        objectsRead = 0
+        batchIterator = nextBatchIterator()
       }
+      item
     }
 
     override def hasNext: Boolean = {
-      if (nextItem == null) {
-        if (deserializeStream == null) {
-          // In case of deserializeStream has not been initialized
-          deserializeStream = nextBatchStream()
-          if (deserializeStream == null) {
-            return false
-          }
+      if (batchIterator == null) {
+        // In case of batchIterator has not been initialized
+        batchIterator = nextBatchIterator()
+        if (batchIterator == null) {
+          return false
         }
-        nextItem = readNextItem()
       }
-      nextItem != null
+      batchIterator.hasNext
     }
 
     override def next(): (K, C) = {
       if (!hasNext) {
         throw new NoSuchElementException
       }
-      val item = nextItem
-      nextItem = null
-      item
+      readNextItem()
     }
 
     private def cleanup(): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala
new file mode 100644
index 00000000000..5de1a12ffe0
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this 
benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this 
class>"
+ *      Results will be written to 
"benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))
+      benchmark.run()
+    }
+  }
+
+  private def run(useIterator: Boolean, benchmark: Benchmark): Unit = {
+    val ser = createSerializer()
+
+    def roundTrip[T: ClassTag](
+        elements: Array[T],
+        useIterator: Boolean,
+        ser: SerializerInstance): Int = {
+      val serialized: Array[Byte] = {
+        val baos = new ByteArrayOutputStream()
+        val serStream = ser.serializeStream(baos)
+        var i = 0
+        while (i < elements.length) {
+          serStream.writeObject(elements(i))
+          i += 1
+        }
+        serStream.close()
+        baos.toByteArray
+      }
+
+      val deserStream = ser.deserializeStream(new 
ByteArrayInputStream(serialized))
+      if (useIterator) {
+        if (deserStream.asIterator.toArray.length == elements.length) 1 else 0
+      } else {
+        val res = new Array[T](elements.length)
+        var i = 0
+        while (i < elements.length) {
+          res(i) = deserStream.readValue()
+          i += 1
+        }
+        deserStream.close()
+        if (res.length == elements.length) 1 else 0
+      }
+    }
+
+    def createCase[T: ClassTag](name: String, elementCount: Int, 
createElement: => T): Unit = {
+      val elements = Array.fill[T](elementCount)(createElement)
+
+      benchmark.addCase(
+        s"Colletion of $name with $elementCount elements, useIterator: 
$useIterator") { _ =>
+        var sum = 0L
+        var i = 0
+        while (i < N) {
+          sum += roundTrip(elements, useIterator, ser)
+          i += 1
+        }
+        sum
+      }
+    }
+
+    createCase("int", 1, Random.nextInt)
+    createCase("int", 10, Random.nextInt)
+    createCase("int", 100, Random.nextInt)
+    createCase("string", 1, Random.nextString(5))
+    createCase("string", 10, Random.nextString(5))
+    createCase("string", 100, Random.nextString(5))
+    createCase("Array[int]", 1, Array.fill(10)(Random.nextInt))
+    createCase("Array[int]", 10, Array.fill(10)(Random.nextInt))
+    createCase("Array[int]", 100, Array.fill(10)(Random.nextInt))
+  }
+
+  def createSerializer(): SerializerInstance = {
+    val conf = new SparkConf()
+    conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+    conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
+
+    new KryoSerializer(conf).newInstance()
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 260a5b29235..1d4f61cfde9 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.serializer
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, EOFException}
 import java.nio.ByteBuffer
 import java.util.concurrent.Executors
 
@@ -598,6 +598,28 @@ class KryoSerializerAutoResetDisabledSuite extends 
SparkFunSuite with SharedSpar
     assert(serInstance.deserialize[Any](serObj) === (obj))
     assert(serInstance.deserialize[Any](byteBuffer) === (obj))
   }
+
+  test("SPARK-40912: Ignore unexpectedly truncated buffer") {
+    // This test checks that the improvement in SPARK-40912 does not break 
backwards compatabillity.
+    // But the behvaior of the asIterator iterface of silently ignoring 
trucated data should be
+    // revisited in a follow up ticket.
+    val serInstance = new 
KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
+    val serialized: Array[Byte] = {
+      val baos = new ByteArrayOutputStream()
+      val serStream = serInstance.serializeStream(baos)
+      serStream.writeObject(KryoTest.CaseClass(0, ""))
+      serStream.close()
+      baos.toByteArray
+    }
+    // Make sure we disregard some data
+    assert(serialized.length > 2)
+    val trucated = serialized.take(2).toArray
+    val deserializationStream = serInstance.deserializeStream(new 
ByteArrayInputStream(trucated))
+    intercept[EOFException](
+      serInstance.deserializeStream(new 
ByteArrayInputStream(trucated)).readValue()
+    )
+    assert(deserializationStream.asIterator.toSeq == Seq())
+  }
 }
 
 class ClassLoaderTestingObject


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to