This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1efbf43160aa [SPARK-47310][SS] Add micro-benchmark for merge operations for multiple values in value portion of state store 1efbf43160aa is described below commit 1efbf43160aa4e36710a4668f05fe61534f49648 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Sat Apr 6 06:10:18 2024 +0900 [SPARK-47310][SS] Add micro-benchmark for merge operations for multiple values in value portion of state store ### What changes were proposed in this pull request? Add microbenchmark for merge operations for multiple values in value portion of state store ### Why are the changes needed? Micro-benchmark to understand performance with/without rows tracking around merge operations As shown in the results, merge without tracking is consistently 3x faster ``` merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------------------------------- RocksDB (trackTotalNumberOfRows: true) 519 533 7 0.0 51916.6 1.0X RocksDB (trackTotalNumberOfRows: false) 171 177 3 0.1 17083.9 3.0X ``` GH Actions here: - https://github.com/anishshri-db/spark/actions/runs/8559698160 - https://github.com/anishshri-db/spark/actions/runs/8559694994 Difference is even more running locally (> 7x faster without tracking) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test only change ### Was this patch authored or co-authored using generative AI tooling? No Closes #45865 from anishshri-db/task/SPARK-47310. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- ...StoreBasicOperationsBenchmark-jdk21-results.txt | 107 +++++++++++------ .../StateStoreBasicOperationsBenchmark-results.txt | 107 +++++++++++------ .../StateStoreBasicOperationsBenchmark.scala | 130 ++++++++++++++++++++- 3 files changed, 265 insertions(+), 79 deletions(-) diff --git a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt index d3b3aafc21e5..0317e6116375 100644 --- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt @@ -6,33 +6,66 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 10 1 1.1 894.7 1.0X -RocksDB (trackTotalNumberOfRows: true) 41 42 2 0.2 4064.6 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 15 1 0.7 1466.8 0.6X +In-memory 9 10 1 1.1 936.2 1.0X +RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4068.9 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1500.4 0.6X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 10 0 1.1 893.1 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 40 1 0.3 3959.7 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1510.8 0.6X +In-memory 9 11 1 1.1 929.8 1.0X +RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3955.7 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1497.3 0.6X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 9 0 1.1 872.0 1.0X -RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3887.2 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1532.3 0.6X +In-memory 9 10 1 1.1 907.5 1.0X +RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3886.5 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1497.2 0.6X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 10 1 1.1 874.5 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3967.1 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1526.2 0.6X +In-memory 9 10 1 1.1 904.0 1.0X +RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3859.8 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1497.2 0.6X + + +================================================================================================ +merge rows +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------------- +RocksDB (trackTotalNumberOfRows: true) 519 533 7 0.0 51916.6 1.0X +RocksDB (trackTotalNumberOfRows: false) 171 177 3 0.1 17083.9 3.0X + +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------ +RocksDB (trackTotalNumberOfRows: true) 506 521 7 0.0 50644.0 1.0X +RocksDB (trackTotalNumberOfRows: false) 170 176 3 0.1 17022.0 3.0X + +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------ +RocksDB (trackTotalNumberOfRows: true) 493 508 6 0.0 49319.3 1.0X +RocksDB (trackTotalNumberOfRows: false) 169 175 3 0.1 16897.6 2.9X + +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------- +RocksDB (trackTotalNumberOfRows: true) 495 508 6 0.0 49462.5 1.0X +RocksDB (trackTotalNumberOfRows: false) 169 175 3 0.1 16896.6 2.9X ================================================================================================ @@ -43,33 +76,33 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(10000 rows are non-existing - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 0 1 0 20.9 47.9 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 40 1 0.3 3956.8 0.0X -RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.6 1541.9 0.0X +In-memory 0 1 0 26.3 38.0 1.0X +RocksDB (trackTotalNumberOfRows: true) 39 41 1 0.3 3942.0 0.0X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1529.2 0.0X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(5000 rows are non-existing - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 8 10 1 1.3 773.4 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4024.1 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1537.8 0.5X +In-memory 8 9 1 1.3 790.4 1.0X +RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4036.7 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1536.9 0.5X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(1000 rows are non-existing - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 8 10 1 1.2 817.7 1.0X -RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4111.7 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.6 1540.5 0.5X +In-memory 8 10 1 1.2 847.0 1.0X +RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4099.8 0.2X +RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1563.3 0.5X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(0 rows are non-existing - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 8 10 1 1.2 820.0 1.0X -RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4133.0 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1526.2 0.5X +In-memory 9 10 1 1.2 859.4 1.0X +RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4118.9 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1507.8 0.6X ================================================================================================ @@ -80,32 +113,30 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 10000 rows (maxTimestampToEvictInMillis: 9999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 8 9 0 1.2 805.5 1.0X -RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3888.6 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1538.2 0.5X +In-memory 8 9 1 1.2 831.0 1.0X +RocksDB (trackTotalNumberOfRows: true) 40 40 1 0.3 3956.6 0.2X +RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1571.3 0.5X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 5000 rows (maxTimestampToEvictInMillis: 4999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------ -In-memory 8 8 0 1.3 754.7 1.0X -RocksDB (trackTotalNumberOfRows: true) 21 22 0 0.5 2091.7 0.4X -RocksDB (trackTotalNumberOfRows: false) 9 9 0 1.1 916.1 0.8X +In-memory 8 8 1 1.3 787.6 1.0X +RocksDB (trackTotalNumberOfRows: true) 21 22 0 0.5 2112.6 0.4X +RocksDB (trackTotalNumberOfRows: false) 9 9 0 1.1 932.9 0.8X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 1000 rows (maxTimestampToEvictInMillis: 999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 7 8 1 1.4 692.8 1.0X -RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.5 654.6 1.1X -RocksDB (trackTotalNumberOfRows: false) 4 4 0 2.4 423.8 1.6X +In-memory 7 8 0 1.4 715.7 1.0X +RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.5 676.3 1.1X +RocksDB (trackTotalNumberOfRows: false) 4 5 0 2.3 442.3 1.6X OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 0 rows (maxTimestampToEvictInMillis: -1) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 0 0 0 24.2 41.2 1.0X -RocksDB (trackTotalNumberOfRows: true) 3 3 0 3.4 290.1 0.1X -RocksDB (trackTotalNumberOfRows: false) 3 3 0 3.4 290.6 0.1X - - +In-memory 0 0 0 23.8 41.9 1.0X +RocksDB (trackTotalNumberOfRows: true) 3 3 0 3.2 309.5 0.1X +RocksDB (trackTotalNumberOfRows: false) 3 3 0 3.2 309.9 0.1X diff --git a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt index 86d3d4400331..d2aa646d5ec1 100644 --- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt +++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt @@ -6,33 +6,66 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 10 1 1.1 907.3 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 42 2 0.2 4048.4 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1508.4 0.6X +In-memory 10 12 1 1.0 960.1 1.0X +RocksDB (trackTotalNumberOfRows: true) 42 43 2 0.2 4173.9 0.2X +RocksDB (trackTotalNumberOfRows: false) 16 16 1 0.6 1551.6 0.6X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 10 1 1.1 901.9 1.0X -RocksDB (trackTotalNumberOfRows: true) 39 41 1 0.3 3922.0 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1513.5 0.6X +In-memory 10 12 1 1.0 970.1 1.0X +RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4095.8 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 17 1 0.6 1544.6 0.6X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 9 0 1.1 880.2 1.0X -RocksDB (trackTotalNumberOfRows: true) 38 39 1 0.3 3829.7 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1506.0 0.6X +In-memory 9 11 1 1.1 933.3 1.0X +RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3966.2 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.6 1540.2 0.6X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor putting 10000 rows (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -In-memory 9 9 0 1.1 878.6 1.0X -RocksDB (trackTotalNumberOfRows: true) 38 39 1 0.3 3803.4 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1500.3 0.6X +In-memory 9 11 1 1.1 936.1 1.0X +RocksDB (trackTotalNumberOfRows: true) 39 41 1 0.3 3942.4 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1530.1 0.6X + + +================================================================================================ +merge rows +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------------- +RocksDB (trackTotalNumberOfRows: true) 525 538 6 0.0 52516.4 1.0X +RocksDB (trackTotalNumberOfRows: false) 170 177 4 0.1 16960.4 3.1X + +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------ +RocksDB (trackTotalNumberOfRows: true) 514 528 6 0.0 51351.9 1.0X +RocksDB (trackTotalNumberOfRows: false) 168 174 4 0.1 16794.0 3.1X + +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------ +RocksDB (trackTotalNumberOfRows: true) 500 513 6 0.0 49955.1 1.0X +RocksDB (trackTotalNumberOfRows: false) 169 174 2 0.1 16867.1 3.0X + +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure +AMD EPYC 7763 64-Core Processor +merging 10000 rows with 10 values per key (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------- +RocksDB (trackTotalNumberOfRows: true) 492 508 8 0.0 49225.8 1.0X +RocksDB (trackTotalNumberOfRows: false) 168 173 3 0.1 16757.2 2.9X ================================================================================================ @@ -43,33 +76,33 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(10000 rows are non-existing - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 0 1 0 26.5 37.8 1.0X -RocksDB (trackTotalNumberOfRows: true) 38 39 1 0.3 3779.3 0.0X -RocksDB (trackTotalNumberOfRows: false) 15 15 1 0.7 1462.9 0.0X +In-memory 0 1 0 26.1 38.3 1.0X +RocksDB (trackTotalNumberOfRows: true) 38 40 1 0.3 3835.6 0.0X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1455.7 0.0X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(5000 rows are non-existing - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 7 8 0 1.3 742.4 1.0X -RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3913.8 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 15 0 0.7 1461.1 0.5X +In-memory 8 9 1 1.3 793.9 1.0X +RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4018.1 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1505.6 0.5X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(1000 rows are non-existing - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 8 8 0 1.3 794.8 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4012.8 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 15 0 0.7 1461.5 0.5X +In-memory 8 10 1 1.2 837.2 1.0X +RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4073.9 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1470.6 0.6X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor trying to delete 10000 rows from 10000 rows(0 rows are non-existing - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 8 9 0 1.2 809.7 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 42 1 0.2 4043.4 0.2X -RocksDB (trackTotalNumberOfRows: false) 14 15 1 0.7 1445.2 0.6X +In-memory 8 9 0 1.2 843.6 1.0X +RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4088.7 0.2X +RocksDB (trackTotalNumberOfRows: false) 15 15 0 0.7 1466.1 0.6X ================================================================================================ @@ -80,32 +113,30 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 10000 rows (maxTimestampToEvictInMillis: 9999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 8 9 0 1.2 806.2 1.0X -RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3980.0 0.2X -RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1599.4 0.5X +In-memory 8 9 0 1.2 833.5 1.0X +RocksDB (trackTotalNumberOfRows: true) 40 41 0 0.3 3976.5 0.2X +RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1588.1 0.5X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 5000 rows (maxTimestampToEvictInMillis: 4999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------ -In-memory 8 8 0 1.3 752.2 1.0X -RocksDB (trackTotalNumberOfRows: true) 22 22 0 0.5 2170.8 0.3X -RocksDB (trackTotalNumberOfRows: false) 10 10 0 1.0 967.7 0.8X +In-memory 8 8 0 1.3 784.3 1.0X +RocksDB (trackTotalNumberOfRows: true) 22 22 0 0.5 2155.1 0.4X +RocksDB (trackTotalNumberOfRows: false) 10 10 0 1.0 986.9 0.8X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 1000 rows (maxTimestampToEvictInMillis: 999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 7 7 0 1.4 694.9 1.0X -RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.4 700.1 1.0X -RocksDB (trackTotalNumberOfRows: false) 5 5 0 2.2 465.0 1.5X +In-memory 7 8 0 1.4 722.3 1.0X +RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.4 718.8 1.0X +RocksDB (trackTotalNumberOfRows: false) 5 5 0 2.0 488.7 1.5X OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure AMD EPYC 7763 64-Core Processor evicting 0 rows (maxTimestampToEvictInMillis: -1) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------- -In-memory 1 1 0 19.7 50.7 1.0X -RocksDB (trackTotalNumberOfRows: true) 3 3 0 3.0 332.1 0.2X -RocksDB (trackTotalNumberOfRows: false) 3 3 0 3.0 331.7 0.2X - - +In-memory 0 1 0 21.3 46.9 1.0X +RocksDB (trackTotalNumberOfRows: true) 4 4 0 2.8 358.9 0.1X +RocksDB (trackTotalNumberOfRows: false) 4 4 0 2.8 358.7 0.1X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala index a5c393ac0567..36035e35ee25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala @@ -38,7 +38,8 @@ import org.apache.spark.util.Utils * 2. build/sbt "sql/Test/runMain <this class>" * 3. generate result: * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain <this class>" - * Results will be written to "benchmarks/StateStoreBasicOperationsBenchmark-results.txt". + * Results will be written to: + * "sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt". * }}} */ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { @@ -52,6 +53,7 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runPutBenchmark() + runMergeBenchmark() runDeleteBenchmark() runEvictBenchmark() } @@ -133,6 +135,81 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { } } + private def runMergeBenchmark(): Unit = { + def registerMergeBenchmarkCase( + benchmark: Benchmark, + testName: String, + provider: StateStoreProvider, + version: Long, + rows: Seq[(UnsafeRow, Seq[UnsafeRow])]): Unit = { + benchmark.addTimerCase(testName) { timer => + val store = provider.getStore(version) + + timer.startTiming() + mergeRows(store, rows) + timer.stopTiming() + + store.abort() + } + } + + runBenchmark("merge rows") { + val numOfRows = Seq(10000) + val numValuesPerKey = 10 + val overwriteRates = Seq(100, 50, 10, 0) + + numOfRows.foreach { numOfRow => + val testData = constructRandomizedTestDataWithMultipleValues(numOfRow, + (1 to numOfRow).map(_ * 1000L).toList, numValuesPerKey, 0) + + // note that merge is only supported for RocksDB state store provider + val rocksDBProvider = newRocksDBStateProvider(trackTotalNumberOfRows = true, + useColumnFamilies = true, + useMultipleValuesPerKey = true) + val rocksDBWithNoTrackProvider = newRocksDBStateProvider(trackTotalNumberOfRows = false, + useColumnFamilies = true, + useMultipleValuesPerKey = true) + + val committedRocksDBVersion = loadInitialDataWithMultipleValues(rocksDBProvider, testData) + val committedRocksDBWithNoTrackVersion = loadInitialDataWithMultipleValues( + rocksDBWithNoTrackProvider, testData) + + overwriteRates.foreach { overwriteRate => + val numOfRowsToOverwrite = numOfRow * overwriteRate / 100 + + val numOfNewRows = numOfRow - numOfRowsToOverwrite + val newRows = if (numOfNewRows > 0) { + constructRandomizedTestDataWithMultipleValues(numOfNewRows, + (1 to numOfNewRows).map(_ * 1000L).toList, numValuesPerKey, 0) + } else { + Seq.empty[(UnsafeRow, Seq[UnsafeRow])] + } + val existingRows = if (numOfRowsToOverwrite > 0) { + Random.shuffle(testData).take(numOfRowsToOverwrite) + } else { + Seq.empty[(UnsafeRow, Seq[UnsafeRow])] + } + val rowsToPut = Random.shuffle(newRows ++ existingRows) + + val benchmark = new Benchmark(s"merging $numOfRow rows " + + s"with $numValuesPerKey values per key " + + s"($numOfRowsToOverwrite rows to overwrite - rate $overwriteRate)", + numOfRow, minNumIters = 1000, output = output) + + registerMergeBenchmarkCase(benchmark, "RocksDB (trackTotalNumberOfRows: true)", + rocksDBProvider, committedRocksDBVersion, rowsToPut) + registerMergeBenchmarkCase(benchmark, "RocksDB (trackTotalNumberOfRows: false)", + rocksDBWithNoTrackProvider, committedRocksDBWithNoTrackVersion, rowsToPut) + + benchmark.run() + } + + rocksDBProvider.close() + rocksDBWithNoTrackProvider.close() + } + } + } + private def runDeleteBenchmark(): Unit = { def registerDeleteBenchmarkCase( benchmark: Benchmark, @@ -288,6 +365,14 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { store.commit() } + private def loadInitialDataWithMultipleValues( + provider: StateStoreProvider, + data: Seq[(UnsafeRow, Seq[UnsafeRow])]): Long = { + val store = provider.getStore(0) + mergeRows(store, data) + store.commit() + } + private def updateRows( store: StateStore, rows: Seq[(UnsafeRow, UnsafeRow)]): Unit = { @@ -296,6 +381,16 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { } } + private def mergeRows( + store: StateStore, + rows: Seq[(UnsafeRow, Seq[UnsafeRow])]): Unit = { + rows.foreach { case (key, values) => + values.foreach { value => + store.merge(key, value) + } + } + } + private def deleteRows( store: StateStore, rows: Seq[UnsafeRow]): Unit = { @@ -341,6 +436,32 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { } } + private def constructRandomizedTestDataWithMultipleValues( + numRows: Int, + timestamps: List[Long], + numValues: Int, + minIdx: Int = 0): Seq[(UnsafeRow, Seq[UnsafeRow])] = { + assert(numRows >= timestamps.length) + assert(numRows % timestamps.length == 0) + + (1 to numRows).map { idx => + val keyRow = new GenericInternalRow(2) + keyRow.setInt(0, Random.nextInt(Int.MaxValue)) + keyRow.setLong(1, timestamps((minIdx + idx) % timestamps.length)) // microseconds + + val valRows: Seq[UnsafeRow] = (1 to numValues).map { valueIdx => + val valueRow = new GenericInternalRow(1) + valueRow.setInt(0, valueIdx + Random.nextInt(Int.MaxValue)) + val valueUnsafeRow = valueProjection(valueRow).copy() + valueUnsafeRow + }.toSeq + + val keyUnsafeRow = keyProjection(keyRow).copy() + + (keyUnsafeRow, valRows) + } + } + private def newHDFSBackedStateStoreProvider(): StateStoreProvider = { val storeId = StateStoreId(newDir(), Random.nextInt(), 0) val provider = new HDFSBackedStateStoreProvider() @@ -352,7 +473,9 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { } private def newRocksDBStateProvider( - trackTotalNumberOfRows: Boolean = true): StateStoreProvider = { + trackTotalNumberOfRows: Boolean = true, + useColumnFamilies: Boolean = false, + useMultipleValuesPerKey: Boolean = false): StateStoreProvider = { val storeId = StateStoreId(newDir(), Random.nextInt(), 0) val provider = new RocksDBStateStoreProvider() val sqlConf = new SQLConf() @@ -362,7 +485,8 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark { provider.init( storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), - useColumnFamilies = false, storeConf, new Configuration) + useColumnFamilies = useColumnFamilies, storeConf, new Configuration, + useMultipleValuesPerKey = useMultipleValuesPerKey) provider } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org