This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1efbf43160aa [SPARK-47310][SS] Add micro-benchmark for merge 
operations for multiple values in value portion of state store
1efbf43160aa is described below

commit 1efbf43160aa4e36710a4668f05fe61534f49648
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Sat Apr 6 06:10:18 2024 +0900

    [SPARK-47310][SS] Add micro-benchmark for merge operations for multiple 
values in value portion of state store
    
    ### What changes were proposed in this pull request?
    Add microbenchmark for merge operations for multiple values in value 
portion of state store
    
    ### Why are the changes needed?
    Micro-benchmark to understand performance with/without rows tracking around 
merge operations
    
    As shown in the results, merge without tracking is consistently 3x faster
    
    ```
    merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 
100):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
    
--------------------------------------------------------------------------------------------------------------------------------------------------------------
    RocksDB (trackTotalNumberOfRows: true)                                      
              519            533           7          0.0       51916.6       
1.0X
    RocksDB (trackTotalNumberOfRows: false)                                     
              171            177           3          0.1       17083.9       
3.0X
    ```
    
    GH Actions here:
    - https://github.com/anishshri-db/spark/actions/runs/8559698160
    - https://github.com/anishshri-db/spark/actions/runs/8559694994
    
    Difference is even more running locally (> 7x faster without tracking)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Test only change
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45865 from anishshri-db/task/SPARK-47310.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 ...StoreBasicOperationsBenchmark-jdk21-results.txt | 107 +++++++++++------
 .../StateStoreBasicOperationsBenchmark-results.txt | 107 +++++++++++------
 .../StateStoreBasicOperationsBenchmark.scala       | 130 ++++++++++++++++++++-
 3 files changed, 265 insertions(+), 79 deletions(-)

diff --git 
a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt 
b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt
index d3b3aafc21e5..0317e6116375 100644
--- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt
@@ -6,33 +6,66 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 
6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (10000 rows to overwrite - rate 100):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                            9         
    10           1          1.1         894.7       1.0X
-RocksDB (trackTotalNumberOfRows: true)                              41         
    42           2          0.2        4064.6       0.2X
-RocksDB (trackTotalNumberOfRows: false)                             15         
    15           1          0.7        1466.8       0.6X
+In-memory                                                            9         
    10           1          1.1         936.2       1.0X
+RocksDB (trackTotalNumberOfRows: true)                              41         
    42           1          0.2        4068.9       0.2X
+RocksDB (trackTotalNumberOfRows: false)                             15         
    16           1          0.7        1500.4       0.6X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (5000 rows to overwrite - rate 50):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                          9           
  10           0          1.1         893.1       1.0X
-RocksDB (trackTotalNumberOfRows: true)                            40           
  40           1          0.3        3959.7       0.2X
-RocksDB (trackTotalNumberOfRows: false)                           15           
  16           1          0.7        1510.8       0.6X
+In-memory                                                          9           
  11           1          1.1         929.8       1.0X
+RocksDB (trackTotalNumberOfRows: true)                            40           
  41           1          0.3        3955.7       0.2X
+RocksDB (trackTotalNumberOfRows: false)                           15           
  16           1          0.7        1497.3       0.6X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (1000 rows to overwrite - rate 10):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                          9           
   9           0          1.1         872.0       1.0X
-RocksDB (trackTotalNumberOfRows: true)                            39           
  40           1          0.3        3887.2       0.2X
-RocksDB (trackTotalNumberOfRows: false)                           15           
  16           0          0.7        1532.3       0.6X
+In-memory                                                          9           
  10           1          1.1         907.5       1.0X
+RocksDB (trackTotalNumberOfRows: true)                            39           
  40           1          0.3        3886.5       0.2X
+RocksDB (trackTotalNumberOfRows: false)                           15           
  16           1          0.7        1497.2       0.6X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (0 rows to overwrite - rate 0):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                      9             
10           1          1.1         874.5       1.0X
-RocksDB (trackTotalNumberOfRows: true)                        40             
41           1          0.3        3967.1       0.2X
-RocksDB (trackTotalNumberOfRows: false)                       15             
16           0          0.7        1526.2       0.6X
+In-memory                                                      9             
10           1          1.1         904.0       1.0X
+RocksDB (trackTotalNumberOfRows: true)                        39             
40           1          0.3        3859.8       0.2X
+RocksDB (trackTotalNumberOfRows: false)                       15             
16           0          0.7        1497.2       0.6X
+
+
+================================================================================================
+merge rows
+================================================================================================
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 
100):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
           519            533           7          0.0       51916.6       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
           171            177           3          0.1       17083.9       3.0X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (5000 rows to overwrite - rate 50):  
Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
         506            521           7          0.0       50644.0       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
         170            176           3          0.1       17022.0       3.0X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (1000 rows to overwrite - rate 10):  
Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
         493            508           6          0.0       49319.3       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
         169            175           3          0.1       16897.6       2.9X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (0 rows to overwrite - rate 0):  
Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
     495            508           6          0.0       49462.5       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
     169            175           3          0.1       16896.6       2.9X
 
 
 
================================================================================================
@@ -43,33 +76,33 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 
6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(10000 rows are non-existing - rate 
100):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
                  0              1           0         20.9          47.9       
1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
                 40             40           1          0.3        3956.8       
0.0X
-RocksDB (trackTotalNumberOfRows: false)                                        
                 15             16           1          0.6        1541.9       
0.0X
+In-memory                                                                      
                  0              1           0         26.3          38.0       
1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
                 39             41           1          0.3        3942.0       
0.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
                 15             16           1          0.7        1529.2       
0.0X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(5000 rows are non-existing - rate 
50):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
                8             10           1          1.3         773.4       
1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
               40             41           1          0.2        4024.1       
0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
               15             16           1          0.7        1537.8       
0.5X
+In-memory                                                                      
                8              9           1          1.3         790.4       
1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
               40             41           1          0.2        4036.7       
0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
               15             16           0          0.7        1536.9       
0.5X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(1000 rows are non-existing - rate 
10):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
                8             10           1          1.2         817.7       
1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
               41             42           1          0.2        4111.7       
0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
               15             16           0          0.6        1540.5       
0.5X
+In-memory                                                                      
                8             10           1          1.2         847.0       
1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
               41             42           1          0.2        4099.8       
0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
               16             16           0          0.6        1563.3       
0.5X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(0 rows are non-existing - rate 0): 
 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
            8             10           1          1.2         820.0       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
           41             42           1          0.2        4133.0       0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
           15             16           0          0.7        1526.2       0.5X
+In-memory                                                                      
            9             10           1          1.2         859.4       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
           41             42           1          0.2        4118.9       0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
           15             16           1          0.7        1507.8       0.6X
 
 
 
================================================================================================
@@ -80,32 +113,30 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 
6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 10000 rows (maxTimestampToEvictInMillis: 9999) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
      8              9           0          1.2         805.5       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
     39             40           1          0.3        3888.6       0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
     15             16           0          0.7        1538.2       0.5X
+In-memory                                                                      
      8              9           1          1.2         831.0       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
     40             40           1          0.3        3956.6       0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
     16             16           0          0.6        1571.3       0.5X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 5000 rows (maxTimestampToEvictInMillis: 4999) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
     8              8           0          1.3         754.7       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
    21             22           0          0.5        2091.7       0.4X
-RocksDB (trackTotalNumberOfRows: false)                                        
     9              9           0          1.1         916.1       0.8X
+In-memory                                                                      
     8              8           1          1.3         787.6       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
    21             22           0          0.5        2112.6       0.4X
+RocksDB (trackTotalNumberOfRows: false)                                        
     9              9           0          1.1         932.9       0.8X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 1000 rows (maxTimestampToEvictInMillis: 999) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
    7              8           1          1.4         692.8       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
    7              7           0          1.5         654.6       1.1X
-RocksDB (trackTotalNumberOfRows: false)                                        
    4              4           0          2.4         423.8       1.6X
+In-memory                                                                      
    7              8           0          1.4         715.7       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
    7              7           0          1.5         676.3       1.1X
+RocksDB (trackTotalNumberOfRows: false)                                        
    4              5           0          2.3         442.3       1.6X
 
 OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 0 rows (maxTimestampToEvictInMillis: -1) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
0              0           0         24.2          41.2       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
3              3           0          3.4         290.1       0.1X
-RocksDB (trackTotalNumberOfRows: false)                                        
3              3           0          3.4         290.6       0.1X
-
-
+In-memory                                                                      
0              0           0         23.8          41.9       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
3              3           0          3.2         309.5       0.1X
+RocksDB (trackTotalNumberOfRows: false)                                        
3              3           0          3.2         309.9       0.1X
diff --git a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt 
b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt
index 86d3d4400331..d2aa646d5ec1 100644
--- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt
+++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt
@@ -6,33 +6,66 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 
6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (10000 rows to overwrite - rate 100):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                            9         
    10           1          1.1         907.3       1.0X
-RocksDB (trackTotalNumberOfRows: true)                              40         
    42           2          0.2        4048.4       0.2X
-RocksDB (trackTotalNumberOfRows: false)                             15         
    16           1          0.7        1508.4       0.6X
+In-memory                                                           10         
    12           1          1.0         960.1       1.0X
+RocksDB (trackTotalNumberOfRows: true)                              42         
    43           2          0.2        4173.9       0.2X
+RocksDB (trackTotalNumberOfRows: false)                             16         
    16           1          0.6        1551.6       0.6X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (5000 rows to overwrite - rate 50):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                          9           
  10           1          1.1         901.9       1.0X
-RocksDB (trackTotalNumberOfRows: true)                            39           
  41           1          0.3        3922.0       0.2X
-RocksDB (trackTotalNumberOfRows: false)                           15           
  16           1          0.7        1513.5       0.6X
+In-memory                                                         10           
  12           1          1.0         970.1       1.0X
+RocksDB (trackTotalNumberOfRows: true)                            41           
  42           1          0.2        4095.8       0.2X
+RocksDB (trackTotalNumberOfRows: false)                           15           
  17           1          0.6        1544.6       0.6X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (1000 rows to overwrite - rate 10):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                          9           
   9           0          1.1         880.2       1.0X
-RocksDB (trackTotalNumberOfRows: true)                            38           
  39           1          0.3        3829.7       0.2X
-RocksDB (trackTotalNumberOfRows: false)                           15           
  16           1          0.7        1506.0       0.6X
+In-memory                                                          9           
  11           1          1.1         933.3       1.0X
+RocksDB (trackTotalNumberOfRows: true)                            40           
  41           1          0.3        3966.2       0.2X
+RocksDB (trackTotalNumberOfRows: false)                           15           
  16           1          0.6        1540.2       0.6X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 putting 10000 rows (0 rows to overwrite - rate 0):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                      9              
9           0          1.1         878.6       1.0X
-RocksDB (trackTotalNumberOfRows: true)                        38             
39           1          0.3        3803.4       0.2X
-RocksDB (trackTotalNumberOfRows: false)                       15             
16           1          0.7        1500.3       0.6X
+In-memory                                                      9             
11           1          1.1         936.1       1.0X
+RocksDB (trackTotalNumberOfRows: true)                        39             
41           1          0.3        3942.4       0.2X
+RocksDB (trackTotalNumberOfRows: false)                       15             
16           0          0.7        1530.1       0.6X
+
+
+================================================================================================
+merge rows
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 
100):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
           525            538           6          0.0       52516.4       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
           170            177           4          0.1       16960.4       3.1X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (5000 rows to overwrite - rate 50):  
Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
         514            528           6          0.0       51351.9       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
         168            174           4          0.1       16794.0       3.1X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (1000 rows to overwrite - rate 10):  
Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
         500            513           6          0.0       49955.1       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
         169            174           2          0.1       16867.1       3.0X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (0 rows to overwrite - rate 0):  
Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true)                                         
     492            508           8          0.0       49225.8       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
     168            173           3          0.1       16757.2       2.9X
 
 
 
================================================================================================
@@ -43,33 +76,33 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 
6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(10000 rows are non-existing - rate 
100):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
                  0              1           0         26.5          37.8       
1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
                 38             39           1          0.3        3779.3       
0.0X
-RocksDB (trackTotalNumberOfRows: false)                                        
                 15             15           1          0.7        1462.9       
0.0X
+In-memory                                                                      
                  0              1           0         26.1          38.3       
1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
                 38             40           1          0.3        3835.6       
0.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
                 15             16           1          0.7        1455.7       
0.0X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(5000 rows are non-existing - rate 
50):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
                7              8           0          1.3         742.4       
1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
               39             40           1          0.3        3913.8       
0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
               15             15           0          0.7        1461.1       
0.5X
+In-memory                                                                      
                8              9           1          1.3         793.9       
1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
               40             41           1          0.2        4018.1       
0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
               15             16           0          0.7        1505.6       
0.5X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(1000 rows are non-existing - rate 
10):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   
Relative
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
                8              8           0          1.3         794.8       
1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
               40             41           1          0.2        4012.8       
0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
               15             15           0          0.7        1461.5       
0.5X
+In-memory                                                                      
                8             10           1          1.2         837.2       
1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
               41             42           1          0.2        4073.9       
0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
               15             16           1          0.7        1470.6       
0.6X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 trying to delete 10000 rows from 10000 rows(0 rows are non-existing - rate 0): 
 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
            8              9           0          1.2         809.7       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
           40             42           1          0.2        4043.4       0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
           14             15           1          0.7        1445.2       0.6X
+In-memory                                                                      
            8              9           0          1.2         843.6       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
           41             42           1          0.2        4088.7       0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
           15             15           0          0.7        1466.1       0.6X
 
 
 
================================================================================================
@@ -80,32 +113,30 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 
6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 10000 rows (maxTimestampToEvictInMillis: 9999) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
      8              9           0          1.2         806.2       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
     40             41           1          0.3        3980.0       0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
     16             16           0          0.6        1599.4       0.5X
+In-memory                                                                      
      8              9           0          1.2         833.5       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
     40             41           0          0.3        3976.5       0.2X
+RocksDB (trackTotalNumberOfRows: false)                                        
     16             16           0          0.6        1588.1       0.5X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 5000 rows (maxTimestampToEvictInMillis: 4999) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
     8              8           0          1.3         752.2       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
    22             22           0          0.5        2170.8       0.3X
-RocksDB (trackTotalNumberOfRows: false)                                        
    10             10           0          1.0         967.7       0.8X
+In-memory                                                                      
     8              8           0          1.3         784.3       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
    22             22           0          0.5        2155.1       0.4X
+RocksDB (trackTotalNumberOfRows: false)                                        
    10             10           0          1.0         986.9       0.8X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 1000 rows (maxTimestampToEvictInMillis: 999) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
    7              7           0          1.4         694.9       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
    7              7           0          1.4         700.1       1.0X
-RocksDB (trackTotalNumberOfRows: false)                                        
    5              5           0          2.2         465.0       1.5X
+In-memory                                                                      
    7              8           0          1.4         722.3       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
    7              7           0          1.4         718.8       1.0X
+RocksDB (trackTotalNumberOfRows: false)                                        
    5              5           0          2.0         488.7       1.5X
 
 OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
 AMD EPYC 7763 64-Core Processor
 evicting 0 rows (maxTimestampToEvictInMillis: -1) from 10000 rows:  Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory                                                                      
1              1           0         19.7          50.7       1.0X
-RocksDB (trackTotalNumberOfRows: true)                                         
3              3           0          3.0         332.1       0.2X
-RocksDB (trackTotalNumberOfRows: false)                                        
3              3           0          3.0         331.7       0.2X
-
-
+In-memory                                                                      
0              1           0         21.3          46.9       1.0X
+RocksDB (trackTotalNumberOfRows: true)                                         
4              4           0          2.8         358.9       0.1X
+RocksDB (trackTotalNumberOfRows: false)                                        
4              4           0          2.8         358.7       0.1X
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
index a5c393ac0567..36035e35ee25 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.Utils
  *   2. build/sbt "sql/Test/runMain <this class>"
  *   3. generate result:
  *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain <this 
class>"
- *      Results will be written to 
"benchmarks/StateStoreBasicOperationsBenchmark-results.txt".
+ *      Results will be written to:
+ *      "sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt".
  * }}}
  */
 object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark {
@@ -52,6 +53,7 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
 
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
     runPutBenchmark()
+    runMergeBenchmark()
     runDeleteBenchmark()
     runEvictBenchmark()
   }
@@ -133,6 +135,81 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
     }
   }
 
+  private def runMergeBenchmark(): Unit = {
+    def registerMergeBenchmarkCase(
+        benchmark: Benchmark,
+        testName: String,
+        provider: StateStoreProvider,
+        version: Long,
+        rows: Seq[(UnsafeRow, Seq[UnsafeRow])]): Unit = {
+      benchmark.addTimerCase(testName) { timer =>
+        val store = provider.getStore(version)
+
+        timer.startTiming()
+        mergeRows(store, rows)
+        timer.stopTiming()
+
+        store.abort()
+      }
+    }
+
+    runBenchmark("merge rows") {
+      val numOfRows = Seq(10000)
+      val numValuesPerKey = 10
+      val overwriteRates = Seq(100, 50, 10, 0)
+
+      numOfRows.foreach { numOfRow =>
+        val testData = constructRandomizedTestDataWithMultipleValues(numOfRow,
+          (1 to numOfRow).map(_ * 1000L).toList, numValuesPerKey, 0)
+
+        // note that merge is only supported for RocksDB state store provider
+        val rocksDBProvider = newRocksDBStateProvider(trackTotalNumberOfRows = 
true,
+          useColumnFamilies = true,
+          useMultipleValuesPerKey = true)
+        val rocksDBWithNoTrackProvider = 
newRocksDBStateProvider(trackTotalNumberOfRows = false,
+          useColumnFamilies = true,
+          useMultipleValuesPerKey = true)
+
+        val committedRocksDBVersion = 
loadInitialDataWithMultipleValues(rocksDBProvider, testData)
+        val committedRocksDBWithNoTrackVersion = 
loadInitialDataWithMultipleValues(
+          rocksDBWithNoTrackProvider, testData)
+
+        overwriteRates.foreach { overwriteRate =>
+          val numOfRowsToOverwrite = numOfRow * overwriteRate / 100
+
+          val numOfNewRows = numOfRow - numOfRowsToOverwrite
+          val newRows = if (numOfNewRows > 0) {
+            constructRandomizedTestDataWithMultipleValues(numOfNewRows,
+              (1 to numOfNewRows).map(_ * 1000L).toList, numValuesPerKey, 0)
+          } else {
+            Seq.empty[(UnsafeRow, Seq[UnsafeRow])]
+          }
+          val existingRows = if (numOfRowsToOverwrite > 0) {
+            Random.shuffle(testData).take(numOfRowsToOverwrite)
+          } else {
+            Seq.empty[(UnsafeRow, Seq[UnsafeRow])]
+          }
+          val rowsToPut = Random.shuffle(newRows ++ existingRows)
+
+          val benchmark = new Benchmark(s"merging $numOfRow rows " +
+            s"with $numValuesPerKey values per key " +
+            s"($numOfRowsToOverwrite rows to overwrite - rate $overwriteRate)",
+            numOfRow, minNumIters = 1000, output = output)
+
+          registerMergeBenchmarkCase(benchmark, "RocksDB 
(trackTotalNumberOfRows: true)",
+            rocksDBProvider, committedRocksDBVersion, rowsToPut)
+          registerMergeBenchmarkCase(benchmark, "RocksDB 
(trackTotalNumberOfRows: false)",
+            rocksDBWithNoTrackProvider, committedRocksDBWithNoTrackVersion, 
rowsToPut)
+
+          benchmark.run()
+        }
+
+        rocksDBProvider.close()
+        rocksDBWithNoTrackProvider.close()
+      }
+    }
+  }
+
   private def runDeleteBenchmark(): Unit = {
     def registerDeleteBenchmarkCase(
         benchmark: Benchmark,
@@ -288,6 +365,14 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
     store.commit()
   }
 
+  private def loadInitialDataWithMultipleValues(
+      provider: StateStoreProvider,
+      data: Seq[(UnsafeRow, Seq[UnsafeRow])]): Long = {
+    val store = provider.getStore(0)
+    mergeRows(store, data)
+    store.commit()
+  }
+
   private def updateRows(
       store: StateStore,
       rows: Seq[(UnsafeRow, UnsafeRow)]): Unit = {
@@ -296,6 +381,16 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
     }
   }
 
+  private def mergeRows(
+      store: StateStore,
+      rows: Seq[(UnsafeRow, Seq[UnsafeRow])]): Unit = {
+    rows.foreach { case (key, values) =>
+      values.foreach { value =>
+        store.merge(key, value)
+      }
+    }
+  }
+
   private def deleteRows(
       store: StateStore,
       rows: Seq[UnsafeRow]): Unit = {
@@ -341,6 +436,32 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
     }
   }
 
+  private def constructRandomizedTestDataWithMultipleValues(
+      numRows: Int,
+      timestamps: List[Long],
+      numValues: Int,
+      minIdx: Int = 0): Seq[(UnsafeRow, Seq[UnsafeRow])] = {
+    assert(numRows >= timestamps.length)
+    assert(numRows % timestamps.length == 0)
+
+    (1 to numRows).map { idx =>
+      val keyRow = new GenericInternalRow(2)
+      keyRow.setInt(0, Random.nextInt(Int.MaxValue))
+      keyRow.setLong(1, timestamps((minIdx + idx) % timestamps.length)) // 
microseconds
+
+      val valRows: Seq[UnsafeRow] = (1 to numValues).map { valueIdx =>
+        val valueRow = new GenericInternalRow(1)
+        valueRow.setInt(0, valueIdx + Random.nextInt(Int.MaxValue))
+        val valueUnsafeRow = valueProjection(valueRow).copy()
+        valueUnsafeRow
+      }.toSeq
+
+      val keyUnsafeRow = keyProjection(keyRow).copy()
+
+      (keyUnsafeRow, valRows)
+    }
+  }
+
   private def newHDFSBackedStateStoreProvider(): StateStoreProvider = {
     val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
     val provider = new HDFSBackedStateStoreProvider()
@@ -352,7 +473,9 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
   }
 
   private def newRocksDBStateProvider(
-      trackTotalNumberOfRows: Boolean = true): StateStoreProvider = {
+      trackTotalNumberOfRows: Boolean = true,
+      useColumnFamilies: Boolean = false,
+      useMultipleValuesPerKey: Boolean = false): StateStoreProvider = {
     val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
     val provider = new RocksDBStateStoreProvider()
     val sqlConf = new SQLConf()
@@ -362,7 +485,8 @@ object StateStoreBasicOperationsBenchmark extends 
SqlBasedBenchmark {
 
     provider.init(
       storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
-      useColumnFamilies = false, storeConf, new Configuration)
+      useColumnFamilies = useColumnFamilies, storeConf, new Configuration,
+      useMultipleValuesPerKey = useMultipleValuesPerKey)
     provider
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to