This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 77fdeb75d chore: Add consistency checks and result hashing to TPC 
benchmarks (#3582)
77fdeb75d is described below

commit 77fdeb75d1cd6041a655f98f4c2313fecdc616d7
Author: Andy Grove <[email protected]>
AuthorDate: Tue Feb 24 12:43:28 2026 -0700

    chore: Add consistency checks and result hashing to TPC benchmarks (#3582)
---
 .../engines/{comet.toml => comet-hashjoin.toml}    |   2 +-
 ...et-iceberg.toml => comet-iceberg-hashjoin.toml} |   2 +-
 benchmarks/tpc/engines/comet-iceberg.toml          |   1 -
 benchmarks/tpc/engines/comet.toml                  |   1 -
 benchmarks/tpc/generate-comparison.py              | 130 ++++++++++++++++-----
 .../tpc/infra/docker/docker-compose-laptop.yml     |   1 +
 benchmarks/tpc/infra/docker/docker-compose.yml     |   1 +
 benchmarks/tpc/queries/tpcds/q12.sql               |   2 +-
 benchmarks/tpc/queries/tpcds/q16.sql               |   8 +-
 benchmarks/tpc/queries/tpcds/q20.sql               |   2 +-
 benchmarks/tpc/queries/tpcds/q21.sql               |   4 +-
 benchmarks/tpc/queries/tpcds/q32.sql               |   6 +-
 benchmarks/tpc/queries/tpcds/q35.sql               |  12 +-
 benchmarks/tpc/queries/tpcds/q37.sql               |   2 +-
 benchmarks/tpc/queries/tpcds/q40.sql               |   4 +-
 benchmarks/tpc/queries/tpcds/q5.sql                |   6 +-
 benchmarks/tpc/queries/tpcds/q50.sql               |  10 +-
 benchmarks/tpc/queries/tpcds/q62.sql               |  10 +-
 benchmarks/tpc/queries/tpcds/q77.sql               |  12 +-
 benchmarks/tpc/queries/tpcds/q80.sql               |   6 +-
 benchmarks/tpc/queries/tpcds/q82.sql               |   2 +-
 benchmarks/tpc/queries/tpcds/q92.sql               |   6 +-
 benchmarks/tpc/queries/tpcds/q94.sql               |   8 +-
 benchmarks/tpc/queries/tpcds/q95.sql               |   8 +-
 benchmarks/tpc/queries/tpcds/q98.sql               |   2 +-
 benchmarks/tpc/queries/tpcds/q99.sql               |  10 +-
 benchmarks/tpc/run.py                              |   2 +-
 benchmarks/tpc/tpcbench.py                         |  21 +++-
 28 files changed, 185 insertions(+), 96 deletions(-)

diff --git a/benchmarks/tpc/engines/comet.toml 
b/benchmarks/tpc/engines/comet-hashjoin.toml
similarity index 98%
copy from benchmarks/tpc/engines/comet.toml
copy to benchmarks/tpc/engines/comet-hashjoin.toml
index 8e19165eb..1aa495724 100644
--- a/benchmarks/tpc/engines/comet.toml
+++ b/benchmarks/tpc/engines/comet-hashjoin.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [engine]
-name = "comet"
+name = "comet-hashjoin"
 
 [env]
 required = ["COMET_JAR"]
diff --git a/benchmarks/tpc/engines/comet-iceberg.toml 
b/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml
similarity index 98%
copy from benchmarks/tpc/engines/comet-iceberg.toml
copy to benchmarks/tpc/engines/comet-iceberg-hashjoin.toml
index 2e01270f1..84a533372 100644
--- a/benchmarks/tpc/engines/comet-iceberg.toml
+++ b/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [engine]
-name = "comet-iceberg"
+name = "comet-iceberg-hashjoin"
 
 [env]
 required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"]
diff --git a/benchmarks/tpc/engines/comet-iceberg.toml 
b/benchmarks/tpc/engines/comet-iceberg.toml
index 2e01270f1..3654f359e 100644
--- a/benchmarks/tpc/engines/comet-iceberg.toml
+++ b/benchmarks/tpc/engines/comet-iceberg.toml
@@ -33,7 +33,6 @@ driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]
 "spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
 "spark.plugins" = "org.apache.spark.CometPlugin"
 "spark.shuffle.manager" = 
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
-"spark.comet.exec.replaceSortMergeJoin" = "true"
 "spark.comet.expression.Cast.allowIncompatible" = "true"
 "spark.comet.enabled" = "true"
 "spark.comet.exec.enabled" = "true"
diff --git a/benchmarks/tpc/engines/comet.toml 
b/benchmarks/tpc/engines/comet.toml
index 8e19165eb..05b2cb22b 100644
--- a/benchmarks/tpc/engines/comet.toml
+++ b/benchmarks/tpc/engines/comet.toml
@@ -31,5 +31,4 @@ driver_class_path = ["$COMET_JAR"]
 "spark.plugins" = "org.apache.spark.CometPlugin"
 "spark.shuffle.manager" = 
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
 "spark.comet.scan.impl" = "native_datafusion"
-"spark.comet.exec.replaceSortMergeJoin" = "true"
 "spark.comet.expression.Cast.allowIncompatible" = "true"
diff --git a/benchmarks/tpc/generate-comparison.py 
b/benchmarks/tpc/generate-comparison.py
index eb57cc1e4..e5058a3bf 100644
--- a/benchmarks/tpc/generate-comparison.py
+++ b/benchmarks/tpc/generate-comparison.py
@@ -17,19 +17,88 @@
 
 import argparse
 import json
+import logging
 import matplotlib.pyplot as plt
 import numpy as np
 
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
 def geomean(data):
     return np.prod(data) ** (1 / len(data))
 
-def generate_query_rel_speedup_chart(baseline, comparison, label1: str, 
label2: str, benchmark: str, title: str):
+def get_durations(result, query_key):
+    """Extract durations from a query result, supporting both old (list) and 
new (dict) formats."""
+    value = result[query_key]
+    if isinstance(value, dict):
+        return value["durations"]
+    return value
+
+def get_all_queries(results):
+    """Return the sorted union of all query keys across all result sets."""
+    all_keys = set()
+    for result in results:
+        all_keys.update(result.keys())
+    # Filter to numeric query keys and sort numerically
+    numeric_keys = []
+    for k in all_keys:
+        try:
+            numeric_keys.append(int(k))
+        except ValueError:
+            pass
+    return sorted(numeric_keys)
+
+def get_common_queries(results, labels):
+    """Return queries present in ALL result sets, warning about queries 
missing from some files."""
+    all_queries = get_all_queries(results)
+    common = []
+    for query in all_queries:
+        key = str(query)
+        present = [labels[i] for i, r in enumerate(results) if key in r]
+        missing = [labels[i] for i, r in enumerate(results) if key not in r]
+        if missing:
+            logger.warning(f"Query {query}: present in [{', '.join(present)}] 
but missing from [{', '.join(missing)}]")
+        if not missing:
+            common.append(query)
+    return common
+
+def check_result_consistency(results, labels, benchmark):
+    """Log warnings if row counts or result hashes differ across result 
sets."""
+    all_queries = get_all_queries(results)
+    for query in all_queries:
+        key = str(query)
+        row_counts = []
+        hashes = []
+        for i, result in enumerate(results):
+            if key not in result:
+                continue
+            value = result[key]
+            if not isinstance(value, dict):
+                continue
+            if "row_count" in value:
+                row_counts.append((labels[i], value["row_count"]))
+            if "result_hash" in value:
+                hashes.append((labels[i], value["result_hash"]))
+
+        if len(row_counts) > 1:
+            counts = set(rc for _, rc in row_counts)
+            if len(counts) > 1:
+                details = ", ".join(f"{label}={rc}" for label, rc in 
row_counts)
+                logger.warning(f"Query {query}: row count mismatch: {details}")
+
+        if len(hashes) > 1:
+            hash_values = set(h for _, h in hashes)
+            if len(hash_values) > 1:
+                details = ", ".join(f"{label}={h}" for label, h in hashes)
+                logger.warning(f"Query {query}: result hash mismatch: 
{details}")
+
+def generate_query_rel_speedup_chart(baseline, comparison, label1: str, 
label2: str, benchmark: str, title: str, common_queries=None):
+    if common_queries is None:
+        common_queries = range(1, query_count(benchmark)+1)
     results = []
-    for query in range(1, query_count(benchmark)+1):
-        if query == 999:
-            continue
-        a = np.median(np.array(baseline[str(query)]))
-        b = np.median(np.array(comparison[str(query)]))
+    for query in common_queries:
+        a = np.median(np.array(get_durations(baseline, str(query))))
+        b = np.median(np.array(get_durations(comparison, str(query))))
         if a > b:
             speedup = a/b-1
         else:
@@ -80,13 +149,13 @@ def generate_query_rel_speedup_chart(baseline, comparison, 
label1: str, label2:
     # Save the plot as an image file
     plt.savefig(f'{benchmark}_queries_speedup_rel.png', format='png')
 
-def generate_query_abs_speedup_chart(baseline, comparison, label1: str, 
label2: str, benchmark: str, title: str):
+def generate_query_abs_speedup_chart(baseline, comparison, label1: str, 
label2: str, benchmark: str, title: str, common_queries=None):
+    if common_queries is None:
+        common_queries = range(1, query_count(benchmark)+1)
     results = []
-    for query in range(1, query_count(benchmark)+1):
-        if query == 999:
-            continue
-        a = np.median(np.array(baseline[str(query)]))
-        b = np.median(np.array(comparison[str(query)]))
+    for query in common_queries:
+        a = np.median(np.array(get_durations(baseline, str(query))))
+        b = np.median(np.array(get_durations(comparison, str(query))))
         speedup = a-b
         results.append(("q" + str(query), round(speedup, 1)))
 
@@ -130,17 +199,17 @@ def generate_query_abs_speedup_chart(baseline, 
comparison, label1: str, label2:
     # Save the plot as an image file
     plt.savefig(f'{benchmark}_queries_speedup_abs.png', format='png')
 
-def generate_query_comparison_chart(results, labels, benchmark: str, title: 
str):
+def generate_query_comparison_chart(results, labels, benchmark: str, title: 
str, common_queries=None):
+    if common_queries is None:
+        common_queries = range(1, query_count(benchmark)+1)
     queries = []
     benches = []
     for _ in results:
         benches.append([])
-    for query in range(1, query_count(benchmark)+1):
-        if query == 999:
-            continue
+    for query in common_queries:
         queries.append("q" + str(query))
         for i in range(0, len(results)):
-            benches[i].append(np.median(np.array(results[i][str(query)])))
+            benches[i].append(np.median(np.array(get_durations(results[i], 
str(query)))))
 
     # Define the width of the bars
     bar_width = 0.3
@@ -168,17 +237,17 @@ def generate_query_comparison_chart(results, labels, 
benchmark: str, title: str)
     # Save the plot as an image file
     plt.savefig(f'{benchmark}_queries_compare.png', format='png')
 
-def generate_summary(results, labels, benchmark: str, title: str):
+def generate_summary(results, labels, benchmark: str, title: str, 
common_queries=None):
+    if common_queries is None:
+        common_queries = range(1, query_count(benchmark)+1)
     timings = []
     for _ in results:
         timings.append(0)
 
-    num_queries = query_count(benchmark)
-    for query in range(1, num_queries + 1):
-        if query == 999:
-            continue
+    num_queries = len([q for q in common_queries])
+    for query in common_queries:
         for i in range(0, len(results)):
-            timings[i] += np.median(np.array(results[i][str(query)]))
+            timings[i] += np.median(np.array(get_durations(results[i], 
str(query))))
 
     # Create figure and axis
     fig, ax = plt.subplots()
@@ -186,7 +255,7 @@ def generate_summary(results, labels, benchmark: str, 
title: str):
 
     # Add title and labels
     ax.set_title(title)
-    ax.set_ylabel(f'Time in seconds to run all {num_queries} {benchmark} 
queries (lower is better)')
+    ax.set_ylabel(f'Time in seconds to run {num_queries} {benchmark} queries 
(lower is better)')
 
     times = [round(x,0) for x in timings]
 
@@ -213,11 +282,16 @@ def main(files, labels, benchmark: str, title: str):
     for filename in files:
         with open(filename) as f:
             results.append(json.load(f))
-    generate_summary(results, labels, benchmark, title)
-    generate_query_comparison_chart(results, labels, benchmark, title)
+    check_result_consistency(results, labels, benchmark)
+    common_queries = get_common_queries(results, labels)
+    if not common_queries:
+        logger.error("No queries found in common across all result files")
+        return
+    generate_summary(results, labels, benchmark, title, common_queries)
+    generate_query_comparison_chart(results, labels, benchmark, title, 
common_queries)
     if len(files) == 2:
-        generate_query_abs_speedup_chart(results[0], results[1], labels[0], 
labels[1], benchmark, title)
-        generate_query_rel_speedup_chart(results[0], results[1], labels[0], 
labels[1], benchmark, title)
+        generate_query_abs_speedup_chart(results[0], results[1], labels[0], 
labels[1], benchmark, title, common_queries)
+        generate_query_rel_speedup_chart(results[0], results[1], labels[0], 
labels[1], benchmark, title, common_queries)
 
 if __name__ == '__main__':
     argparse = argparse.ArgumentParser(description='Generate comparison')
diff --git a/benchmarks/tpc/infra/docker/docker-compose-laptop.yml 
b/benchmarks/tpc/infra/docker/docker-compose-laptop.yml
index 6c5d8dbaf..bc882ae7b 100644
--- a/benchmarks/tpc/infra/docker/docker-compose-laptop.yml
+++ b/benchmarks/tpc/infra/docker/docker-compose-laptop.yml
@@ -93,5 +93,6 @@ services:
       - ICEBERG_JAR=/jars/iceberg.jar
       - TPCH_DATA=/data
       - TPCDS_DATA=/data
+      - SPARK_EVENT_LOG_DIR=/results/spark-events
     mem_limit: 4g
     memswap_limit: 4g
diff --git a/benchmarks/tpc/infra/docker/docker-compose.yml 
b/benchmarks/tpc/infra/docker/docker-compose.yml
index cca8cffa1..5a76a5d6e 100644
--- a/benchmarks/tpc/infra/docker/docker-compose.yml
+++ b/benchmarks/tpc/infra/docker/docker-compose.yml
@@ -107,6 +107,7 @@ services:
       - ICEBERG_JAR=/jars/iceberg.jar
       - TPCH_DATA=/data
       - TPCDS_DATA=/data
+      - SPARK_EVENT_LOG_DIR=/results/spark-events
     mem_limit: ${BENCH_MEM_LIMIT:-10g}
     memswap_limit: ${BENCH_MEM_LIMIT:-10g}
 
diff --git a/benchmarks/tpc/queries/tpcds/q12.sql 
b/benchmarks/tpc/queries/tpcds/q12.sql
index e44b5de0b..e2e46d92d 100644
--- a/benchmarks/tpc/queries/tpcds/q12.sql
+++ b/benchmarks/tpc/queries/tpcds/q12.sql
@@ -18,7 +18,7 @@ where
        and i_category in ('Jewelry', 'Books', 'Women')
        and ws_sold_date_sk = d_date_sk
        and d_date between cast('2002-03-22' as date) 
-                               and (cast('2002-03-22' as date) + 30 days)
+                               and (cast('2002-03-22' as date) + INTERVAL '30 
DAYS')
 group by 
        i_item_id
         ,i_item_desc 
diff --git a/benchmarks/tpc/queries/tpcds/q16.sql 
b/benchmarks/tpc/queries/tpcds/q16.sql
index 2f5db58ff..5a4cb1cec 100644
--- a/benchmarks/tpc/queries/tpcds/q16.sql
+++ b/benchmarks/tpc/queries/tpcds/q16.sql
@@ -2,9 +2,9 @@
 -- TPC-DS queries are Copyright 2021 Transaction Processing Performance 
Council.
 -- This query was generated at scale factor 1.
 select  
-   count(distinct cs_order_number) as "order count"
-  ,sum(cs_ext_ship_cost) as "total shipping cost"
-  ,sum(cs_net_profit) as "total net profit"
+   count(distinct cs_order_number) as `order count`
+  ,sum(cs_ext_ship_cost) as `total shipping cost`
+  ,sum(cs_net_profit) as `total net profit`
 from
    catalog_sales cs1
   ,date_dim
@@ -12,7 +12,7 @@ from
   ,call_center
 where
     d_date between '1999-5-01' and 
-           (cast('1999-5-01' as date) + 60 days)
+           (cast('1999-5-01' as date) + INTERVAL '60 DAYS')
 and cs1.cs_ship_date_sk = d_date_sk
 and cs1.cs_ship_addr_sk = ca_address_sk
 and ca_state = 'ID'
diff --git a/benchmarks/tpc/queries/tpcds/q20.sql 
b/benchmarks/tpc/queries/tpcds/q20.sql
index 12e61a06f..47531bd9b 100644
--- a/benchmarks/tpc/queries/tpcds/q20.sql
+++ b/benchmarks/tpc/queries/tpcds/q20.sql
@@ -16,7 +16,7 @@ select  i_item_id
    and i_category in ('Children', 'Sports', 'Music')
    and cs_sold_date_sk = d_date_sk
  and d_date between cast('2002-04-01' as date) 
-                               and (cast('2002-04-01' as date) + 30 days)
+                               and (cast('2002-04-01' as date) + INTERVAL '30 
DAYS')
  group by i_item_id
          ,i_item_desc 
          ,i_category
diff --git a/benchmarks/tpc/queries/tpcds/q21.sql 
b/benchmarks/tpc/queries/tpcds/q21.sql
index b7c2a5eb2..ecc7b2ea4 100644
--- a/benchmarks/tpc/queries/tpcds/q21.sql
+++ b/benchmarks/tpc/queries/tpcds/q21.sql
@@ -18,8 +18,8 @@ select  *
      and i_item_sk          = inv_item_sk
      and inv_warehouse_sk   = w_warehouse_sk
      and inv_date_sk    = d_date_sk
-     and d_date between (cast ('2000-05-19' as date) - 30 days)
-                    and (cast ('2000-05-19' as date) + 30 days)
+     and d_date between (cast ('2000-05-19' as date) - INTERVAL '30 DAYS')
+                    and (cast ('2000-05-19' as date) + INTERVAL '30 DAYS')
    group by w_warehouse_name, i_item_id) x
  where (case when inv_before > 0 
              then inv_after / inv_before 
diff --git a/benchmarks/tpc/queries/tpcds/q32.sql 
b/benchmarks/tpc/queries/tpcds/q32.sql
index 058756465..3b968974f 100644
--- a/benchmarks/tpc/queries/tpcds/q32.sql
+++ b/benchmarks/tpc/queries/tpcds/q32.sql
@@ -1,7 +1,7 @@
 -- CometBench-DS query 32 derived from TPC-DS query 32 under the terms of the 
TPC Fair Use Policy.
 -- TPC-DS queries are Copyright 2021 Transaction Processing Performance 
Council.
 -- This query was generated at scale factor 1.
-select  sum(cs_ext_discount_amt)  as "excess discount amount" 
+select  sum(cs_ext_discount_amt)  as `excess discount amount` 
 from 
    catalog_sales 
    ,item 
@@ -10,7 +10,7 @@ where
 i_manufact_id = 283
 and i_item_sk = cs_item_sk 
 and d_date between '1999-02-22' and 
-        (cast('1999-02-22' as date) + 90 days)
+        (cast('1999-02-22' as date) + INTERVAL '90 DAYS')
 and d_date_sk = cs_sold_date_sk 
 and cs_ext_discount_amt  
      > ( 
@@ -22,7 +22,7 @@ and cs_ext_discount_amt
          where 
               cs_item_sk = i_item_sk 
           and d_date between '1999-02-22' and
-                             (cast('1999-02-22' as date) + 90 days)
+                             (cast('1999-02-22' as date) + INTERVAL '90 DAYS')
           and d_date_sk = cs_sold_date_sk 
       ) 
  LIMIT 100;
diff --git a/benchmarks/tpc/queries/tpcds/q35.sql 
b/benchmarks/tpc/queries/tpcds/q35.sql
index 5876bb94a..f116d84b2 100644
--- a/benchmarks/tpc/queries/tpcds/q35.sql
+++ b/benchmarks/tpc/queries/tpcds/q35.sql
@@ -7,19 +7,19 @@ select
   cd_marital_status,
   cd_dep_count,
   count(*) cnt1,
-  min(cd_dep_count),
   max(cd_dep_count),
-  avg(cd_dep_count),
+  stddev_samp(cd_dep_count),
+  stddev_samp(cd_dep_count),
   cd_dep_employed_count,
   count(*) cnt2,
-  min(cd_dep_employed_count),
   max(cd_dep_employed_count),
-  avg(cd_dep_employed_count),
+  stddev_samp(cd_dep_employed_count),
+  stddev_samp(cd_dep_employed_count),
   cd_dep_college_count,
   count(*) cnt3,
-  min(cd_dep_college_count),
   max(cd_dep_college_count),
-  avg(cd_dep_college_count)
+  stddev_samp(cd_dep_college_count),
+  stddev_samp(cd_dep_college_count)
  from
   customer c,customer_address ca,customer_demographics
  where
diff --git a/benchmarks/tpc/queries/tpcds/q37.sql 
b/benchmarks/tpc/queries/tpcds/q37.sql
index 4cc1d66d2..25ca6689a 100644
--- a/benchmarks/tpc/queries/tpcds/q37.sql
+++ b/benchmarks/tpc/queries/tpcds/q37.sql
@@ -8,7 +8,7 @@ select  i_item_id
  where i_current_price between 26 and 26 + 30
  and inv_item_sk = i_item_sk
  and d_date_sk=inv_date_sk
- and d_date between cast('2001-06-09' as date) and (cast('2001-06-09' as date) 
+  60 days)
+ and d_date between cast('2001-06-09' as date) and (cast('2001-06-09' as date) 
+  INTERVAL '60 DAYS')
  and i_manufact_id in (744,884,722,693)
  and inv_quantity_on_hand between 100 and 500
  and cs_item_sk = i_item_sk
diff --git a/benchmarks/tpc/queries/tpcds/q40.sql 
b/benchmarks/tpc/queries/tpcds/q40.sql
index f1bd48aa7..b52966894 100644
--- a/benchmarks/tpc/queries/tpcds/q40.sql
+++ b/benchmarks/tpc/queries/tpcds/q40.sql
@@ -20,8 +20,8 @@ select
  and i_item_sk          = cs_item_sk
  and cs_warehouse_sk    = w_warehouse_sk 
  and cs_sold_date_sk    = d_date_sk
- and d_date between (cast ('2002-05-18' as date) - 30 days)
-                and (cast ('2002-05-18' as date) + 30 days) 
+ and d_date between (cast ('2002-05-18' as date) - INTERVAL '30 DAYS')
+                and (cast ('2002-05-18' as date) + INTERVAL '30 DAYS') 
  group by
     w_state,i_item_id
  order by w_state,i_item_id
diff --git a/benchmarks/tpc/queries/tpcds/q5.sql 
b/benchmarks/tpc/queries/tpcds/q5.sql
index 6b750b4c7..68fda358e 100644
--- a/benchmarks/tpc/queries/tpcds/q5.sql
+++ b/benchmarks/tpc/queries/tpcds/q5.sql
@@ -28,7 +28,7 @@ with ssr as
      store
  where date_sk = d_date_sk
        and d_date between cast('2001-08-04' as date) 
-                  and (cast('2001-08-04' as date) +  14 days)
+                  and (cast('2001-08-04' as date) +  INTERVAL '14 DAYS')
        and store_sk = s_store_sk
  group by s_store_id)
  ,
@@ -59,7 +59,7 @@ with ssr as
      catalog_page
  where date_sk = d_date_sk
        and d_date between cast('2001-08-04' as date)
-                  and (cast('2001-08-04' as date) +  14 days)
+                  and (cast('2001-08-04' as date) +  INTERVAL '14 DAYS')
        and page_sk = cp_catalog_page_sk
  group by cp_catalog_page_id)
  ,
@@ -92,7 +92,7 @@ with ssr as
      web_site
  where date_sk = d_date_sk
        and d_date between cast('2001-08-04' as date)
-                  and (cast('2001-08-04' as date) +  14 days)
+                  and (cast('2001-08-04' as date) +  INTERVAL '14 DAYS')
        and wsr_web_site_sk = web_site_sk
  group by web_site_id)
   select  channel
diff --git a/benchmarks/tpc/queries/tpcds/q50.sql 
b/benchmarks/tpc/queries/tpcds/q50.sql
index 0fd43d113..82aa47200 100644
--- a/benchmarks/tpc/queries/tpcds/q50.sql
+++ b/benchmarks/tpc/queries/tpcds/q50.sql
@@ -12,14 +12,14 @@ select
   ,s_county
   ,s_state
   ,s_zip
-  ,sum(case when (sr_returned_date_sk - ss_sold_date_sk <= 30 ) then 1 else 0 
end)  as "30 days" 
+  ,sum(case when (sr_returned_date_sk - ss_sold_date_sk <= 30 ) then 1 else 0 
end)  as `30 days` 
   ,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 30) and 
-                 (sr_returned_date_sk - ss_sold_date_sk <= 60) then 1 else 0 
end )  as "31-60 days" 
+                 (sr_returned_date_sk - ss_sold_date_sk <= 60) then 1 else 0 
end )  as `31-60 days` 
   ,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 60) and 
-                 (sr_returned_date_sk - ss_sold_date_sk <= 90) then 1 else 0 
end)  as "61-90 days" 
+                 (sr_returned_date_sk - ss_sold_date_sk <= 90) then 1 else 0 
end)  as `61-90 days` 
   ,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 90) and
-                 (sr_returned_date_sk - ss_sold_date_sk <= 120) then 1 else 0 
end)  as "91-120 days" 
-  ,sum(case when (sr_returned_date_sk - ss_sold_date_sk  > 120) then 1 else 0 
end)  as ">120 days" 
+                 (sr_returned_date_sk - ss_sold_date_sk <= 120) then 1 else 0 
end)  as `91-120 days` 
+  ,sum(case when (sr_returned_date_sk - ss_sold_date_sk  > 120) then 1 else 0 
end)  as `>120 days` 
 from
    store_sales
   ,store_returns
diff --git a/benchmarks/tpc/queries/tpcds/q62.sql 
b/benchmarks/tpc/queries/tpcds/q62.sql
index 15a081527..5d1445176 100644
--- a/benchmarks/tpc/queries/tpcds/q62.sql
+++ b/benchmarks/tpc/queries/tpcds/q62.sql
@@ -5,14 +5,14 @@ select
    substr(w_warehouse_name,1,20)
   ,sm_type
   ,web_name
-  ,sum(case when (ws_ship_date_sk - ws_sold_date_sk <= 30 ) then 1 else 0 end) 
 as "30 days" 
+  ,sum(case when (ws_ship_date_sk - ws_sold_date_sk <= 30 ) then 1 else 0 end) 
 as `30 days` 
   ,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 30) and 
-                 (ws_ship_date_sk - ws_sold_date_sk <= 60) then 1 else 0 end ) 
 as "31-60 days" 
+                 (ws_ship_date_sk - ws_sold_date_sk <= 60) then 1 else 0 end ) 
 as `31-60 days` 
   ,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 60) and 
-                 (ws_ship_date_sk - ws_sold_date_sk <= 90) then 1 else 0 end)  
as "61-90 days" 
+                 (ws_ship_date_sk - ws_sold_date_sk <= 90) then 1 else 0 end)  
as `61-90 days` 
   ,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 90) and
-                 (ws_ship_date_sk - ws_sold_date_sk <= 120) then 1 else 0 end) 
 as "91-120 days" 
-  ,sum(case when (ws_ship_date_sk - ws_sold_date_sk  > 120) then 1 else 0 end) 
 as ">120 days" 
+                 (ws_ship_date_sk - ws_sold_date_sk <= 120) then 1 else 0 end) 
 as `91-120 days` 
+  ,sum(case when (ws_ship_date_sk - ws_sold_date_sk  > 120) then 1 else 0 end) 
 as `>120 days` 
 from
    web_sales
   ,warehouse
diff --git a/benchmarks/tpc/queries/tpcds/q77.sql 
b/benchmarks/tpc/queries/tpcds/q77.sql
index fac28a26a..81e8ccf80 100644
--- a/benchmarks/tpc/queries/tpcds/q77.sql
+++ b/benchmarks/tpc/queries/tpcds/q77.sql
@@ -10,7 +10,7 @@ with ss as
       store
  where ss_sold_date_sk = d_date_sk
        and d_date between cast('2001-08-11' as date) 
-                  and (cast('2001-08-11' as date) +  30 days) 
+                  and (cast('2001-08-11' as date) +  INTERVAL '30 DAYS') 
        and ss_store_sk = s_store_sk
  group by s_store_sk)
  ,
@@ -23,7 +23,7 @@ with ss as
       store
  where sr_returned_date_sk = d_date_sk
        and d_date between cast('2001-08-11' as date)
-                  and (cast('2001-08-11' as date) +  30 days)
+                  and (cast('2001-08-11' as date) +  INTERVAL '30 DAYS')
        and sr_store_sk = s_store_sk
  group by s_store_sk), 
  cs as
@@ -34,7 +34,7 @@ with ss as
       date_dim
  where cs_sold_date_sk = d_date_sk
        and d_date between cast('2001-08-11' as date)
-                  and (cast('2001-08-11' as date) +  30 days)
+                  and (cast('2001-08-11' as date) +  INTERVAL '30 DAYS')
  group by cs_call_center_sk 
  ), 
  cr as
@@ -45,7 +45,7 @@ with ss as
       date_dim
  where cr_returned_date_sk = d_date_sk
        and d_date between cast('2001-08-11' as date)
-                  and (cast('2001-08-11' as date) +  30 days)
+                  and (cast('2001-08-11' as date) +  INTERVAL '30 DAYS')
  group by cr_call_center_sk
  ), 
  ws as
@@ -57,7 +57,7 @@ with ss as
       web_page
  where ws_sold_date_sk = d_date_sk
        and d_date between cast('2001-08-11' as date)
-                  and (cast('2001-08-11' as date) +  30 days)
+                  and (cast('2001-08-11' as date) +  INTERVAL '30 DAYS')
        and ws_web_page_sk = wp_web_page_sk
  group by wp_web_page_sk), 
  wr as
@@ -69,7 +69,7 @@ with ss as
       web_page
  where wr_returned_date_sk = d_date_sk
        and d_date between cast('2001-08-11' as date)
-                  and (cast('2001-08-11' as date) +  30 days)
+                  and (cast('2001-08-11' as date) +  INTERVAL '30 DAYS')
        and wr_web_page_sk = wp_web_page_sk
  group by wp_web_page_sk)
   select  channel
diff --git a/benchmarks/tpc/queries/tpcds/q80.sql 
b/benchmarks/tpc/queries/tpcds/q80.sql
index 484aa4b28..8f0a9c63d 100644
--- a/benchmarks/tpc/queries/tpcds/q80.sql
+++ b/benchmarks/tpc/queries/tpcds/q80.sql
@@ -14,7 +14,7 @@ with ssr as
      promotion
  where ss_sold_date_sk = d_date_sk
        and d_date between cast('2002-08-04' as date) 
-                  and (cast('2002-08-04' as date) +  30 days)
+                  and (cast('2002-08-04' as date) +  INTERVAL '30 DAYS')
        and ss_store_sk = s_store_sk
        and ss_item_sk = i_item_sk
        and i_current_price > 50
@@ -35,7 +35,7 @@ with ssr as
      promotion
  where cs_sold_date_sk = d_date_sk
        and d_date between cast('2002-08-04' as date)
-                  and (cast('2002-08-04' as date) +  30 days)
+                  and (cast('2002-08-04' as date) +  INTERVAL '30 DAYS')
         and cs_catalog_page_sk = cp_catalog_page_sk
        and cs_item_sk = i_item_sk
        and i_current_price > 50
@@ -56,7 +56,7 @@ group by cp_catalog_page_id)
      promotion
  where ws_sold_date_sk = d_date_sk
        and d_date between cast('2002-08-04' as date)
-                  and (cast('2002-08-04' as date) +  30 days)
+                  and (cast('2002-08-04' as date) +  INTERVAL '30 DAYS')
         and ws_web_site_sk = web_site_sk
        and ws_item_sk = i_item_sk
        and i_current_price > 50
diff --git a/benchmarks/tpc/queries/tpcds/q82.sql 
b/benchmarks/tpc/queries/tpcds/q82.sql
index c967802b3..98c002cec 100644
--- a/benchmarks/tpc/queries/tpcds/q82.sql
+++ b/benchmarks/tpc/queries/tpcds/q82.sql
@@ -8,7 +8,7 @@ select  i_item_id
  where i_current_price between 69 and 69+30
  and inv_item_sk = i_item_sk
  and d_date_sk=inv_date_sk
- and d_date between cast('1998-06-06' as date) and (cast('1998-06-06' as date) 
+  60 days)
+ and d_date between cast('1998-06-06' as date) and (cast('1998-06-06' as date) 
+  INTERVAL '60 DAYS')
  and i_manufact_id in (105,513,180,137)
  and inv_quantity_on_hand between 100 and 500
  and ss_item_sk = i_item_sk
diff --git a/benchmarks/tpc/queries/tpcds/q92.sql 
b/benchmarks/tpc/queries/tpcds/q92.sql
index 2a858fc11..63dab6f47 100644
--- a/benchmarks/tpc/queries/tpcds/q92.sql
+++ b/benchmarks/tpc/queries/tpcds/q92.sql
@@ -2,7 +2,7 @@
 -- TPC-DS queries are Copyright 2021 Transaction Processing Performance 
Council.
 -- This query was generated at scale factor 1.
 select  
-   sum(ws_ext_discount_amt)  as "Excess Discount Amount" 
+   sum(ws_ext_discount_amt)  as `Excess Discount Amount` 
 from 
     web_sales 
    ,item 
@@ -11,7 +11,7 @@ where
 i_manufact_id = 914
 and i_item_sk = ws_item_sk 
 and d_date between '2001-01-25' and 
-        (cast('2001-01-25' as date) + 90 days)
+        (cast('2001-01-25' as date) + INTERVAL '90 DAYS')
 and d_date_sk = ws_sold_date_sk 
 and ws_ext_discount_amt  
      > ( 
@@ -23,7 +23,7 @@ and ws_ext_discount_amt
          WHERE 
               ws_item_sk = i_item_sk 
           and d_date between '2001-01-25' and
-                             (cast('2001-01-25' as date) + 90 days)
+                             (cast('2001-01-25' as date) + INTERVAL '90 DAYS')
           and d_date_sk = ws_sold_date_sk 
       ) 
 order by sum(ws_ext_discount_amt)
diff --git a/benchmarks/tpc/queries/tpcds/q94.sql 
b/benchmarks/tpc/queries/tpcds/q94.sql
index ba9a1d902..79e528352 100644
--- a/benchmarks/tpc/queries/tpcds/q94.sql
+++ b/benchmarks/tpc/queries/tpcds/q94.sql
@@ -2,9 +2,9 @@
 -- TPC-DS queries are Copyright 2021 Transaction Processing Performance 
Council.
 -- This query was generated at scale factor 1.
 select  
-   count(distinct ws_order_number) as "order count"
-  ,sum(ws_ext_ship_cost) as "total shipping cost"
-  ,sum(ws_net_profit) as "total net profit"
+   count(distinct ws_order_number) as `order count`
+  ,sum(ws_ext_ship_cost) as `total shipping cost`
+  ,sum(ws_net_profit) as `total net profit`
 from
    web_sales ws1
   ,date_dim
@@ -12,7 +12,7 @@ from
   ,web_site
 where
     d_date between '1999-4-01' and 
-           (cast('1999-4-01' as date) + 60 days)
+           (cast('1999-4-01' as date) + INTERVAL '60 DAYS')
 and ws1.ws_ship_date_sk = d_date_sk
 and ws1.ws_ship_addr_sk = ca_address_sk
 and ca_state = 'WI'
diff --git a/benchmarks/tpc/queries/tpcds/q95.sql 
b/benchmarks/tpc/queries/tpcds/q95.sql
index 01c3c8ccf..f29717711 100644
--- a/benchmarks/tpc/queries/tpcds/q95.sql
+++ b/benchmarks/tpc/queries/tpcds/q95.sql
@@ -7,9 +7,9 @@ with ws_wh as
  where ws1.ws_order_number = ws2.ws_order_number
    and ws1.ws_warehouse_sk <> ws2.ws_warehouse_sk)
  select  
-   count(distinct ws_order_number) as "order count"
-  ,sum(ws_ext_ship_cost) as "total shipping cost"
-  ,sum(ws_net_profit) as "total net profit"
+   count(distinct ws_order_number) as `order count`
+  ,sum(ws_ext_ship_cost) as `total shipping cost`
+  ,sum(ws_net_profit) as `total net profit`
 from
    web_sales ws1
   ,date_dim
@@ -17,7 +17,7 @@ from
   ,web_site
 where
     d_date between '2002-5-01' and 
-           (cast('2002-5-01' as date) + 60 days)
+           (cast('2002-5-01' as date) + INTERVAL '60 DAYS')
 and ws1.ws_ship_date_sk = d_date_sk
 and ws1.ws_ship_addr_sk = ca_address_sk
 and ca_state = 'MA'
diff --git a/benchmarks/tpc/queries/tpcds/q98.sql 
b/benchmarks/tpc/queries/tpcds/q98.sql
index c364ca900..cfaf54de3 100644
--- a/benchmarks/tpc/queries/tpcds/q98.sql
+++ b/benchmarks/tpc/queries/tpcds/q98.sql
@@ -18,7 +18,7 @@ where
        and i_category in ('Shoes', 'Music', 'Men')
        and ss_sold_date_sk = d_date_sk
        and d_date between cast('2000-01-05' as date) 
-                               and (cast('2000-01-05' as date) + 30 days)
+                               and (cast('2000-01-05' as date) + INTERVAL '30 
DAYS')
 group by 
        i_item_id
         ,i_item_desc 
diff --git a/benchmarks/tpc/queries/tpcds/q99.sql 
b/benchmarks/tpc/queries/tpcds/q99.sql
index b4713752b..a72305b0b 100644
--- a/benchmarks/tpc/queries/tpcds/q99.sql
+++ b/benchmarks/tpc/queries/tpcds/q99.sql
@@ -5,14 +5,14 @@ select
    substr(w_warehouse_name,1,20)
   ,sm_type
   ,cc_name
-  ,sum(case when (cs_ship_date_sk - cs_sold_date_sk <= 30 ) then 1 else 0 end) 
 as "30 days" 
+  ,sum(case when (cs_ship_date_sk - cs_sold_date_sk <= 30 ) then 1 else 0 end) 
 as `30 days` 
   ,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 30) and 
-                 (cs_ship_date_sk - cs_sold_date_sk <= 60) then 1 else 0 end ) 
 as "31-60 days" 
+                 (cs_ship_date_sk - cs_sold_date_sk <= 60) then 1 else 0 end ) 
 as `31-60 days` 
   ,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 60) and 
-                 (cs_ship_date_sk - cs_sold_date_sk <= 90) then 1 else 0 end)  
as "61-90 days" 
+                 (cs_ship_date_sk - cs_sold_date_sk <= 90) then 1 else 0 end)  
as `61-90 days` 
   ,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 90) and
-                 (cs_ship_date_sk - cs_sold_date_sk <= 120) then 1 else 0 end) 
 as "91-120 days" 
-  ,sum(case when (cs_ship_date_sk - cs_sold_date_sk  > 120) then 1 else 0 end) 
 as ">120 days" 
+                 (cs_ship_date_sk - cs_sold_date_sk <= 120) then 1 else 0 end) 
 as `91-120 days` 
+  ,sum(case when (cs_ship_date_sk - cs_sold_date_sk  > 120) then 1 else 0 end) 
 as `>120 days` 
 from
    catalog_sales
   ,warehouse
diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py
index d98d1693a..38b0ed500 100755
--- a/benchmarks/tpc/run.py
+++ b/benchmarks/tpc/run.py
@@ -110,7 +110,7 @@ COMMON_SPARK_CONF = {
     "spark.memory.offHeap.enabled": "true",
     "spark.memory.offHeap.size": "16g",
     "spark.eventLog.enabled": "true",
-    "spark.eventLog.dir": "/results/spark-events",
+    "spark.eventLog.dir": os.environ.get("SPARK_EVENT_LOG_DIR", 
"/tmp/spark-events"),
     "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
     "spark.hadoop.fs.s3a.aws.credentials.provider": 
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
 }
diff --git a/benchmarks/tpc/tpcbench.py b/benchmarks/tpc/tpcbench.py
index f043afb1c..036d7b0e9 100644
--- a/benchmarks/tpc/tpcbench.py
+++ b/benchmarks/tpc/tpcbench.py
@@ -25,6 +25,7 @@ Supports two data sources:
 
 import argparse
 from datetime import datetime
+import hashlib
 import json
 import os
 from pyspark.sql import SparkSession
@@ -46,6 +47,15 @@ def dedup_columns(df):
     return df.toDF(*new_cols)
 
 
+def result_hash(rows):
+    """Compute a deterministic MD5 hash from collected rows."""
+    sorted_rows = sorted(rows, key=lambda r: str(r))
+    h = hashlib.md5()
+    for row in sorted_rows:
+        h.update(str(row).encode("utf-8"))
+    return h.hexdigest()
+
+
 def main(
     benchmark: str,
     data_path: str,
@@ -162,14 +172,19 @@ def main(
                                 print(f"Results written to {output_path}")
                         else:
                             rows = df.collect()
-                            print(f"Query {query} returned {len(rows)} rows")
+                            row_count = len(rows)
+                            row_hash = result_hash(rows)
+                            print(f"Query {query} returned {row_count} rows, 
hash={row_hash}")
 
                 end_time = time.time()
                 elapsed = end_time - start_time
                 print(f"Query {query} took {elapsed:.2f} seconds")
 
-                query_timings = results.setdefault(query, [])
-                query_timings.append(elapsed)
+                query_result = results.setdefault(query, {"durations": []})
+                query_result["durations"].append(round(elapsed, 3))
+                if "row_count" not in query_result and not write_path:
+                    query_result["row_count"] = row_count
+                    query_result["result_hash"] = row_hash
 
         iter_end_time = time.time()
         print(f"\nIteration {iteration + 1} took {iter_end_time - 
iter_start_time:.2f} seconds")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to