This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 77fdeb75d chore: Add consistency checks and result hashing to TPC
benchmarks (#3582)
77fdeb75d is described below
commit 77fdeb75d1cd6041a655f98f4c2313fecdc616d7
Author: Andy Grove <[email protected]>
AuthorDate: Tue Feb 24 12:43:28 2026 -0700
chore: Add consistency checks and result hashing to TPC benchmarks (#3582)
---
.../engines/{comet.toml => comet-hashjoin.toml} | 2 +-
...et-iceberg.toml => comet-iceberg-hashjoin.toml} | 2 +-
benchmarks/tpc/engines/comet-iceberg.toml | 1 -
benchmarks/tpc/engines/comet.toml | 1 -
benchmarks/tpc/generate-comparison.py | 130 ++++++++++++++++-----
.../tpc/infra/docker/docker-compose-laptop.yml | 1 +
benchmarks/tpc/infra/docker/docker-compose.yml | 1 +
benchmarks/tpc/queries/tpcds/q12.sql | 2 +-
benchmarks/tpc/queries/tpcds/q16.sql | 8 +-
benchmarks/tpc/queries/tpcds/q20.sql | 2 +-
benchmarks/tpc/queries/tpcds/q21.sql | 4 +-
benchmarks/tpc/queries/tpcds/q32.sql | 6 +-
benchmarks/tpc/queries/tpcds/q35.sql | 12 +-
benchmarks/tpc/queries/tpcds/q37.sql | 2 +-
benchmarks/tpc/queries/tpcds/q40.sql | 4 +-
benchmarks/tpc/queries/tpcds/q5.sql | 6 +-
benchmarks/tpc/queries/tpcds/q50.sql | 10 +-
benchmarks/tpc/queries/tpcds/q62.sql | 10 +-
benchmarks/tpc/queries/tpcds/q77.sql | 12 +-
benchmarks/tpc/queries/tpcds/q80.sql | 6 +-
benchmarks/tpc/queries/tpcds/q82.sql | 2 +-
benchmarks/tpc/queries/tpcds/q92.sql | 6 +-
benchmarks/tpc/queries/tpcds/q94.sql | 8 +-
benchmarks/tpc/queries/tpcds/q95.sql | 8 +-
benchmarks/tpc/queries/tpcds/q98.sql | 2 +-
benchmarks/tpc/queries/tpcds/q99.sql | 10 +-
benchmarks/tpc/run.py | 2 +-
benchmarks/tpc/tpcbench.py | 21 +++-
28 files changed, 185 insertions(+), 96 deletions(-)
diff --git a/benchmarks/tpc/engines/comet.toml
b/benchmarks/tpc/engines/comet-hashjoin.toml
similarity index 98%
copy from benchmarks/tpc/engines/comet.toml
copy to benchmarks/tpc/engines/comet-hashjoin.toml
index 8e19165eb..1aa495724 100644
--- a/benchmarks/tpc/engines/comet.toml
+++ b/benchmarks/tpc/engines/comet-hashjoin.toml
@@ -16,7 +16,7 @@
# under the License.
[engine]
-name = "comet"
+name = "comet-hashjoin"
[env]
required = ["COMET_JAR"]
diff --git a/benchmarks/tpc/engines/comet-iceberg.toml
b/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml
similarity index 98%
copy from benchmarks/tpc/engines/comet-iceberg.toml
copy to benchmarks/tpc/engines/comet-iceberg-hashjoin.toml
index 2e01270f1..84a533372 100644
--- a/benchmarks/tpc/engines/comet-iceberg.toml
+++ b/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml
@@ -16,7 +16,7 @@
# under the License.
[engine]
-name = "comet-iceberg"
+name = "comet-iceberg-hashjoin"
[env]
required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"]
diff --git a/benchmarks/tpc/engines/comet-iceberg.toml
b/benchmarks/tpc/engines/comet-iceberg.toml
index 2e01270f1..3654f359e 100644
--- a/benchmarks/tpc/engines/comet-iceberg.toml
+++ b/benchmarks/tpc/engines/comet-iceberg.toml
@@ -33,7 +33,6 @@ driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]
"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" =
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
-"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
"spark.comet.enabled" = "true"
"spark.comet.exec.enabled" = "true"
diff --git a/benchmarks/tpc/engines/comet.toml
b/benchmarks/tpc/engines/comet.toml
index 8e19165eb..05b2cb22b 100644
--- a/benchmarks/tpc/engines/comet.toml
+++ b/benchmarks/tpc/engines/comet.toml
@@ -31,5 +31,4 @@ driver_class_path = ["$COMET_JAR"]
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" =
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.scan.impl" = "native_datafusion"
-"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
diff --git a/benchmarks/tpc/generate-comparison.py
b/benchmarks/tpc/generate-comparison.py
index eb57cc1e4..e5058a3bf 100644
--- a/benchmarks/tpc/generate-comparison.py
+++ b/benchmarks/tpc/generate-comparison.py
@@ -17,19 +17,88 @@
import argparse
import json
+import logging
import matplotlib.pyplot as plt
import numpy as np
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
def geomean(data):
return np.prod(data) ** (1 / len(data))
-def generate_query_rel_speedup_chart(baseline, comparison, label1: str,
label2: str, benchmark: str, title: str):
+def get_durations(result, query_key):
+ """Extract durations from a query result, supporting both old (list) and
new (dict) formats."""
+ value = result[query_key]
+ if isinstance(value, dict):
+ return value["durations"]
+ return value
+
+def get_all_queries(results):
+ """Return the sorted union of all query keys across all result sets."""
+ all_keys = set()
+ for result in results:
+ all_keys.update(result.keys())
+ # Filter to numeric query keys and sort numerically
+ numeric_keys = []
+ for k in all_keys:
+ try:
+ numeric_keys.append(int(k))
+ except ValueError:
+ pass
+ return sorted(numeric_keys)
+
+def get_common_queries(results, labels):
+ """Return queries present in ALL result sets, warning about queries
missing from some files."""
+ all_queries = get_all_queries(results)
+ common = []
+ for query in all_queries:
+ key = str(query)
+ present = [labels[i] for i, r in enumerate(results) if key in r]
+ missing = [labels[i] for i, r in enumerate(results) if key not in r]
+ if missing:
+ logger.warning(f"Query {query}: present in [{', '.join(present)}]
but missing from [{', '.join(missing)}]")
+ if not missing:
+ common.append(query)
+ return common
+
+def check_result_consistency(results, labels, benchmark):
+ """Log warnings if row counts or result hashes differ across result
sets."""
+ all_queries = get_all_queries(results)
+ for query in all_queries:
+ key = str(query)
+ row_counts = []
+ hashes = []
+ for i, result in enumerate(results):
+ if key not in result:
+ continue
+ value = result[key]
+ if not isinstance(value, dict):
+ continue
+ if "row_count" in value:
+ row_counts.append((labels[i], value["row_count"]))
+ if "result_hash" in value:
+ hashes.append((labels[i], value["result_hash"]))
+
+ if len(row_counts) > 1:
+ counts = set(rc for _, rc in row_counts)
+ if len(counts) > 1:
+ details = ", ".join(f"{label}={rc}" for label, rc in
row_counts)
+ logger.warning(f"Query {query}: row count mismatch: {details}")
+
+ if len(hashes) > 1:
+ hash_values = set(h for _, h in hashes)
+ if len(hash_values) > 1:
+ details = ", ".join(f"{label}={h}" for label, h in hashes)
+ logger.warning(f"Query {query}: result hash mismatch:
{details}")
+
+def generate_query_rel_speedup_chart(baseline, comparison, label1: str,
label2: str, benchmark: str, title: str, common_queries=None):
+ if common_queries is None:
+ common_queries = range(1, query_count(benchmark)+1)
results = []
- for query in range(1, query_count(benchmark)+1):
- if query == 999:
- continue
- a = np.median(np.array(baseline[str(query)]))
- b = np.median(np.array(comparison[str(query)]))
+ for query in common_queries:
+ a = np.median(np.array(get_durations(baseline, str(query))))
+ b = np.median(np.array(get_durations(comparison, str(query))))
if a > b:
speedup = a/b-1
else:
@@ -80,13 +149,13 @@ def generate_query_rel_speedup_chart(baseline, comparison,
label1: str, label2:
# Save the plot as an image file
plt.savefig(f'{benchmark}_queries_speedup_rel.png', format='png')
-def generate_query_abs_speedup_chart(baseline, comparison, label1: str,
label2: str, benchmark: str, title: str):
+def generate_query_abs_speedup_chart(baseline, comparison, label1: str,
label2: str, benchmark: str, title: str, common_queries=None):
+ if common_queries is None:
+ common_queries = range(1, query_count(benchmark)+1)
results = []
- for query in range(1, query_count(benchmark)+1):
- if query == 999:
- continue
- a = np.median(np.array(baseline[str(query)]))
- b = np.median(np.array(comparison[str(query)]))
+ for query in common_queries:
+ a = np.median(np.array(get_durations(baseline, str(query))))
+ b = np.median(np.array(get_durations(comparison, str(query))))
speedup = a-b
results.append(("q" + str(query), round(speedup, 1)))
@@ -130,17 +199,17 @@ def generate_query_abs_speedup_chart(baseline,
comparison, label1: str, label2:
# Save the plot as an image file
plt.savefig(f'{benchmark}_queries_speedup_abs.png', format='png')
-def generate_query_comparison_chart(results, labels, benchmark: str, title:
str):
+def generate_query_comparison_chart(results, labels, benchmark: str, title:
str, common_queries=None):
+ if common_queries is None:
+ common_queries = range(1, query_count(benchmark)+1)
queries = []
benches = []
for _ in results:
benches.append([])
- for query in range(1, query_count(benchmark)+1):
- if query == 999:
- continue
+ for query in common_queries:
queries.append("q" + str(query))
for i in range(0, len(results)):
- benches[i].append(np.median(np.array(results[i][str(query)])))
+ benches[i].append(np.median(np.array(get_durations(results[i],
str(query)))))
# Define the width of the bars
bar_width = 0.3
@@ -168,17 +237,17 @@ def generate_query_comparison_chart(results, labels,
benchmark: str, title: str)
# Save the plot as an image file
plt.savefig(f'{benchmark}_queries_compare.png', format='png')
-def generate_summary(results, labels, benchmark: str, title: str):
+def generate_summary(results, labels, benchmark: str, title: str,
common_queries=None):
+ if common_queries is None:
+ common_queries = range(1, query_count(benchmark)+1)
timings = []
for _ in results:
timings.append(0)
- num_queries = query_count(benchmark)
- for query in range(1, num_queries + 1):
- if query == 999:
- continue
+ num_queries = len([q for q in common_queries])
+ for query in common_queries:
for i in range(0, len(results)):
- timings[i] += np.median(np.array(results[i][str(query)]))
+ timings[i] += np.median(np.array(get_durations(results[i],
str(query))))
# Create figure and axis
fig, ax = plt.subplots()
@@ -186,7 +255,7 @@ def generate_summary(results, labels, benchmark: str,
title: str):
# Add title and labels
ax.set_title(title)
- ax.set_ylabel(f'Time in seconds to run all {num_queries} {benchmark}
queries (lower is better)')
+ ax.set_ylabel(f'Time in seconds to run {num_queries} {benchmark} queries
(lower is better)')
times = [round(x,0) for x in timings]
@@ -213,11 +282,16 @@ def main(files, labels, benchmark: str, title: str):
for filename in files:
with open(filename) as f:
results.append(json.load(f))
- generate_summary(results, labels, benchmark, title)
- generate_query_comparison_chart(results, labels, benchmark, title)
+ check_result_consistency(results, labels, benchmark)
+ common_queries = get_common_queries(results, labels)
+ if not common_queries:
+ logger.error("No queries found in common across all result files")
+ return
+ generate_summary(results, labels, benchmark, title, common_queries)
+ generate_query_comparison_chart(results, labels, benchmark, title,
common_queries)
if len(files) == 2:
- generate_query_abs_speedup_chart(results[0], results[1], labels[0],
labels[1], benchmark, title)
- generate_query_rel_speedup_chart(results[0], results[1], labels[0],
labels[1], benchmark, title)
+ generate_query_abs_speedup_chart(results[0], results[1], labels[0],
labels[1], benchmark, title, common_queries)
+ generate_query_rel_speedup_chart(results[0], results[1], labels[0],
labels[1], benchmark, title, common_queries)
if __name__ == '__main__':
argparse = argparse.ArgumentParser(description='Generate comparison')
diff --git a/benchmarks/tpc/infra/docker/docker-compose-laptop.yml
b/benchmarks/tpc/infra/docker/docker-compose-laptop.yml
index 6c5d8dbaf..bc882ae7b 100644
--- a/benchmarks/tpc/infra/docker/docker-compose-laptop.yml
+++ b/benchmarks/tpc/infra/docker/docker-compose-laptop.yml
@@ -93,5 +93,6 @@ services:
- ICEBERG_JAR=/jars/iceberg.jar
- TPCH_DATA=/data
- TPCDS_DATA=/data
+ - SPARK_EVENT_LOG_DIR=/results/spark-events
mem_limit: 4g
memswap_limit: 4g
diff --git a/benchmarks/tpc/infra/docker/docker-compose.yml
b/benchmarks/tpc/infra/docker/docker-compose.yml
index cca8cffa1..5a76a5d6e 100644
--- a/benchmarks/tpc/infra/docker/docker-compose.yml
+++ b/benchmarks/tpc/infra/docker/docker-compose.yml
@@ -107,6 +107,7 @@ services:
- ICEBERG_JAR=/jars/iceberg.jar
- TPCH_DATA=/data
- TPCDS_DATA=/data
+ - SPARK_EVENT_LOG_DIR=/results/spark-events
mem_limit: ${BENCH_MEM_LIMIT:-10g}
memswap_limit: ${BENCH_MEM_LIMIT:-10g}
diff --git a/benchmarks/tpc/queries/tpcds/q12.sql
b/benchmarks/tpc/queries/tpcds/q12.sql
index e44b5de0b..e2e46d92d 100644
--- a/benchmarks/tpc/queries/tpcds/q12.sql
+++ b/benchmarks/tpc/queries/tpcds/q12.sql
@@ -18,7 +18,7 @@ where
and i_category in ('Jewelry', 'Books', 'Women')
and ws_sold_date_sk = d_date_sk
and d_date between cast('2002-03-22' as date)
- and (cast('2002-03-22' as date) + 30 days)
+ and (cast('2002-03-22' as date) + INTERVAL '30
DAYS')
group by
i_item_id
,i_item_desc
diff --git a/benchmarks/tpc/queries/tpcds/q16.sql
b/benchmarks/tpc/queries/tpcds/q16.sql
index 2f5db58ff..5a4cb1cec 100644
--- a/benchmarks/tpc/queries/tpcds/q16.sql
+++ b/benchmarks/tpc/queries/tpcds/q16.sql
@@ -2,9 +2,9 @@
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance
Council.
-- This query was generated at scale factor 1.
select
- count(distinct cs_order_number) as "order count"
- ,sum(cs_ext_ship_cost) as "total shipping cost"
- ,sum(cs_net_profit) as "total net profit"
+ count(distinct cs_order_number) as `order count`
+ ,sum(cs_ext_ship_cost) as `total shipping cost`
+ ,sum(cs_net_profit) as `total net profit`
from
catalog_sales cs1
,date_dim
@@ -12,7 +12,7 @@ from
,call_center
where
d_date between '1999-5-01' and
- (cast('1999-5-01' as date) + 60 days)
+ (cast('1999-5-01' as date) + INTERVAL '60 DAYS')
and cs1.cs_ship_date_sk = d_date_sk
and cs1.cs_ship_addr_sk = ca_address_sk
and ca_state = 'ID'
diff --git a/benchmarks/tpc/queries/tpcds/q20.sql
b/benchmarks/tpc/queries/tpcds/q20.sql
index 12e61a06f..47531bd9b 100644
--- a/benchmarks/tpc/queries/tpcds/q20.sql
+++ b/benchmarks/tpc/queries/tpcds/q20.sql
@@ -16,7 +16,7 @@ select i_item_id
and i_category in ('Children', 'Sports', 'Music')
and cs_sold_date_sk = d_date_sk
and d_date between cast('2002-04-01' as date)
- and (cast('2002-04-01' as date) + 30 days)
+ and (cast('2002-04-01' as date) + INTERVAL '30
DAYS')
group by i_item_id
,i_item_desc
,i_category
diff --git a/benchmarks/tpc/queries/tpcds/q21.sql
b/benchmarks/tpc/queries/tpcds/q21.sql
index b7c2a5eb2..ecc7b2ea4 100644
--- a/benchmarks/tpc/queries/tpcds/q21.sql
+++ b/benchmarks/tpc/queries/tpcds/q21.sql
@@ -18,8 +18,8 @@ select *
and i_item_sk = inv_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
- and d_date between (cast ('2000-05-19' as date) - 30 days)
- and (cast ('2000-05-19' as date) + 30 days)
+ and d_date between (cast ('2000-05-19' as date) - INTERVAL '30 DAYS')
+ and (cast ('2000-05-19' as date) + INTERVAL '30 DAYS')
group by w_warehouse_name, i_item_id) x
where (case when inv_before > 0
then inv_after / inv_before
diff --git a/benchmarks/tpc/queries/tpcds/q32.sql
b/benchmarks/tpc/queries/tpcds/q32.sql
index 058756465..3b968974f 100644
--- a/benchmarks/tpc/queries/tpcds/q32.sql
+++ b/benchmarks/tpc/queries/tpcds/q32.sql
@@ -1,7 +1,7 @@
-- CometBench-DS query 32 derived from TPC-DS query 32 under the terms of the
TPC Fair Use Policy.
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance
Council.
-- This query was generated at scale factor 1.
-select sum(cs_ext_discount_amt) as "excess discount amount"
+select sum(cs_ext_discount_amt) as `excess discount amount`
from
catalog_sales
,item
@@ -10,7 +10,7 @@ where
i_manufact_id = 283
and i_item_sk = cs_item_sk
and d_date between '1999-02-22' and
- (cast('1999-02-22' as date) + 90 days)
+ (cast('1999-02-22' as date) + INTERVAL '90 DAYS')
and d_date_sk = cs_sold_date_sk
and cs_ext_discount_amt
> (
@@ -22,7 +22,7 @@ and cs_ext_discount_amt
where
cs_item_sk = i_item_sk
and d_date between '1999-02-22' and
- (cast('1999-02-22' as date) + 90 days)
+ (cast('1999-02-22' as date) + INTERVAL '90 DAYS')
and d_date_sk = cs_sold_date_sk
)
LIMIT 100;
diff --git a/benchmarks/tpc/queries/tpcds/q35.sql
b/benchmarks/tpc/queries/tpcds/q35.sql
index 5876bb94a..f116d84b2 100644
--- a/benchmarks/tpc/queries/tpcds/q35.sql
+++ b/benchmarks/tpc/queries/tpcds/q35.sql
@@ -7,19 +7,19 @@ select
cd_marital_status,
cd_dep_count,
count(*) cnt1,
- min(cd_dep_count),
max(cd_dep_count),
- avg(cd_dep_count),
+ stddev_samp(cd_dep_count),
+ stddev_samp(cd_dep_count),
cd_dep_employed_count,
count(*) cnt2,
- min(cd_dep_employed_count),
max(cd_dep_employed_count),
- avg(cd_dep_employed_count),
+ stddev_samp(cd_dep_employed_count),
+ stddev_samp(cd_dep_employed_count),
cd_dep_college_count,
count(*) cnt3,
- min(cd_dep_college_count),
max(cd_dep_college_count),
- avg(cd_dep_college_count)
+ stddev_samp(cd_dep_college_count),
+ stddev_samp(cd_dep_college_count)
from
customer c,customer_address ca,customer_demographics
where
diff --git a/benchmarks/tpc/queries/tpcds/q37.sql
b/benchmarks/tpc/queries/tpcds/q37.sql
index 4cc1d66d2..25ca6689a 100644
--- a/benchmarks/tpc/queries/tpcds/q37.sql
+++ b/benchmarks/tpc/queries/tpcds/q37.sql
@@ -8,7 +8,7 @@ select i_item_id
where i_current_price between 26 and 26 + 30
and inv_item_sk = i_item_sk
and d_date_sk=inv_date_sk
- and d_date between cast('2001-06-09' as date) and (cast('2001-06-09' as date)
+ 60 days)
+ and d_date between cast('2001-06-09' as date) and (cast('2001-06-09' as date)
+ INTERVAL '60 DAYS')
and i_manufact_id in (744,884,722,693)
and inv_quantity_on_hand between 100 and 500
and cs_item_sk = i_item_sk
diff --git a/benchmarks/tpc/queries/tpcds/q40.sql
b/benchmarks/tpc/queries/tpcds/q40.sql
index f1bd48aa7..b52966894 100644
--- a/benchmarks/tpc/queries/tpcds/q40.sql
+++ b/benchmarks/tpc/queries/tpcds/q40.sql
@@ -20,8 +20,8 @@ select
and i_item_sk = cs_item_sk
and cs_warehouse_sk = w_warehouse_sk
and cs_sold_date_sk = d_date_sk
- and d_date between (cast ('2002-05-18' as date) - 30 days)
- and (cast ('2002-05-18' as date) + 30 days)
+ and d_date between (cast ('2002-05-18' as date) - INTERVAL '30 DAYS')
+ and (cast ('2002-05-18' as date) + INTERVAL '30 DAYS')
group by
w_state,i_item_id
order by w_state,i_item_id
diff --git a/benchmarks/tpc/queries/tpcds/q5.sql
b/benchmarks/tpc/queries/tpcds/q5.sql
index 6b750b4c7..68fda358e 100644
--- a/benchmarks/tpc/queries/tpcds/q5.sql
+++ b/benchmarks/tpc/queries/tpcds/q5.sql
@@ -28,7 +28,7 @@ with ssr as
store
where date_sk = d_date_sk
and d_date between cast('2001-08-04' as date)
- and (cast('2001-08-04' as date) + 14 days)
+ and (cast('2001-08-04' as date) + INTERVAL '14 DAYS')
and store_sk = s_store_sk
group by s_store_id)
,
@@ -59,7 +59,7 @@ with ssr as
catalog_page
where date_sk = d_date_sk
and d_date between cast('2001-08-04' as date)
- and (cast('2001-08-04' as date) + 14 days)
+ and (cast('2001-08-04' as date) + INTERVAL '14 DAYS')
and page_sk = cp_catalog_page_sk
group by cp_catalog_page_id)
,
@@ -92,7 +92,7 @@ with ssr as
web_site
where date_sk = d_date_sk
and d_date between cast('2001-08-04' as date)
- and (cast('2001-08-04' as date) + 14 days)
+ and (cast('2001-08-04' as date) + INTERVAL '14 DAYS')
and wsr_web_site_sk = web_site_sk
group by web_site_id)
select channel
diff --git a/benchmarks/tpc/queries/tpcds/q50.sql
b/benchmarks/tpc/queries/tpcds/q50.sql
index 0fd43d113..82aa47200 100644
--- a/benchmarks/tpc/queries/tpcds/q50.sql
+++ b/benchmarks/tpc/queries/tpcds/q50.sql
@@ -12,14 +12,14 @@ select
,s_county
,s_state
,s_zip
- ,sum(case when (sr_returned_date_sk - ss_sold_date_sk <= 30 ) then 1 else 0
end) as "30 days"
+ ,sum(case when (sr_returned_date_sk - ss_sold_date_sk <= 30 ) then 1 else 0
end) as `30 days`
,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 30) and
- (sr_returned_date_sk - ss_sold_date_sk <= 60) then 1 else 0
end ) as "31-60 days"
+ (sr_returned_date_sk - ss_sold_date_sk <= 60) then 1 else 0
end ) as `31-60 days`
,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 60) and
- (sr_returned_date_sk - ss_sold_date_sk <= 90) then 1 else 0
end) as "61-90 days"
+ (sr_returned_date_sk - ss_sold_date_sk <= 90) then 1 else 0
end) as `61-90 days`
,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 90) and
- (sr_returned_date_sk - ss_sold_date_sk <= 120) then 1 else 0
end) as "91-120 days"
- ,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 120) then 1 else 0
end) as ">120 days"
+ (sr_returned_date_sk - ss_sold_date_sk <= 120) then 1 else 0
end) as `91-120 days`
+ ,sum(case when (sr_returned_date_sk - ss_sold_date_sk > 120) then 1 else 0
end) as `>120 days`
from
store_sales
,store_returns
diff --git a/benchmarks/tpc/queries/tpcds/q62.sql
b/benchmarks/tpc/queries/tpcds/q62.sql
index 15a081527..5d1445176 100644
--- a/benchmarks/tpc/queries/tpcds/q62.sql
+++ b/benchmarks/tpc/queries/tpcds/q62.sql
@@ -5,14 +5,14 @@ select
substr(w_warehouse_name,1,20)
,sm_type
,web_name
- ,sum(case when (ws_ship_date_sk - ws_sold_date_sk <= 30 ) then 1 else 0 end)
as "30 days"
+ ,sum(case when (ws_ship_date_sk - ws_sold_date_sk <= 30 ) then 1 else 0 end)
as `30 days`
,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 30) and
- (ws_ship_date_sk - ws_sold_date_sk <= 60) then 1 else 0 end )
as "31-60 days"
+ (ws_ship_date_sk - ws_sold_date_sk <= 60) then 1 else 0 end )
as `31-60 days`
,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 60) and
- (ws_ship_date_sk - ws_sold_date_sk <= 90) then 1 else 0 end)
as "61-90 days"
+ (ws_ship_date_sk - ws_sold_date_sk <= 90) then 1 else 0 end)
as `61-90 days`
,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 90) and
- (ws_ship_date_sk - ws_sold_date_sk <= 120) then 1 else 0 end)
as "91-120 days"
- ,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 120) then 1 else 0 end)
as ">120 days"
+ (ws_ship_date_sk - ws_sold_date_sk <= 120) then 1 else 0 end)
as `91-120 days`
+ ,sum(case when (ws_ship_date_sk - ws_sold_date_sk > 120) then 1 else 0 end)
as `>120 days`
from
web_sales
,warehouse
diff --git a/benchmarks/tpc/queries/tpcds/q77.sql
b/benchmarks/tpc/queries/tpcds/q77.sql
index fac28a26a..81e8ccf80 100644
--- a/benchmarks/tpc/queries/tpcds/q77.sql
+++ b/benchmarks/tpc/queries/tpcds/q77.sql
@@ -10,7 +10,7 @@ with ss as
store
where ss_sold_date_sk = d_date_sk
and d_date between cast('2001-08-11' as date)
- and (cast('2001-08-11' as date) + 30 days)
+ and (cast('2001-08-11' as date) + INTERVAL '30 DAYS')
and ss_store_sk = s_store_sk
group by s_store_sk)
,
@@ -23,7 +23,7 @@ with ss as
store
where sr_returned_date_sk = d_date_sk
and d_date between cast('2001-08-11' as date)
- and (cast('2001-08-11' as date) + 30 days)
+ and (cast('2001-08-11' as date) + INTERVAL '30 DAYS')
and sr_store_sk = s_store_sk
group by s_store_sk),
cs as
@@ -34,7 +34,7 @@ with ss as
date_dim
where cs_sold_date_sk = d_date_sk
and d_date between cast('2001-08-11' as date)
- and (cast('2001-08-11' as date) + 30 days)
+ and (cast('2001-08-11' as date) + INTERVAL '30 DAYS')
group by cs_call_center_sk
),
cr as
@@ -45,7 +45,7 @@ with ss as
date_dim
where cr_returned_date_sk = d_date_sk
and d_date between cast('2001-08-11' as date)
- and (cast('2001-08-11' as date) + 30 days)
+ and (cast('2001-08-11' as date) + INTERVAL '30 DAYS')
group by cr_call_center_sk
),
ws as
@@ -57,7 +57,7 @@ with ss as
web_page
where ws_sold_date_sk = d_date_sk
and d_date between cast('2001-08-11' as date)
- and (cast('2001-08-11' as date) + 30 days)
+ and (cast('2001-08-11' as date) + INTERVAL '30 DAYS')
and ws_web_page_sk = wp_web_page_sk
group by wp_web_page_sk),
wr as
@@ -69,7 +69,7 @@ with ss as
web_page
where wr_returned_date_sk = d_date_sk
and d_date between cast('2001-08-11' as date)
- and (cast('2001-08-11' as date) + 30 days)
+ and (cast('2001-08-11' as date) + INTERVAL '30 DAYS')
and wr_web_page_sk = wp_web_page_sk
group by wp_web_page_sk)
select channel
diff --git a/benchmarks/tpc/queries/tpcds/q80.sql
b/benchmarks/tpc/queries/tpcds/q80.sql
index 484aa4b28..8f0a9c63d 100644
--- a/benchmarks/tpc/queries/tpcds/q80.sql
+++ b/benchmarks/tpc/queries/tpcds/q80.sql
@@ -14,7 +14,7 @@ with ssr as
promotion
where ss_sold_date_sk = d_date_sk
and d_date between cast('2002-08-04' as date)
- and (cast('2002-08-04' as date) + 30 days)
+ and (cast('2002-08-04' as date) + INTERVAL '30 DAYS')
and ss_store_sk = s_store_sk
and ss_item_sk = i_item_sk
and i_current_price > 50
@@ -35,7 +35,7 @@ with ssr as
promotion
where cs_sold_date_sk = d_date_sk
and d_date between cast('2002-08-04' as date)
- and (cast('2002-08-04' as date) + 30 days)
+ and (cast('2002-08-04' as date) + INTERVAL '30 DAYS')
and cs_catalog_page_sk = cp_catalog_page_sk
and cs_item_sk = i_item_sk
and i_current_price > 50
@@ -56,7 +56,7 @@ group by cp_catalog_page_id)
promotion
where ws_sold_date_sk = d_date_sk
and d_date between cast('2002-08-04' as date)
- and (cast('2002-08-04' as date) + 30 days)
+ and (cast('2002-08-04' as date) + INTERVAL '30 DAYS')
and ws_web_site_sk = web_site_sk
and ws_item_sk = i_item_sk
and i_current_price > 50
diff --git a/benchmarks/tpc/queries/tpcds/q82.sql
b/benchmarks/tpc/queries/tpcds/q82.sql
index c967802b3..98c002cec 100644
--- a/benchmarks/tpc/queries/tpcds/q82.sql
+++ b/benchmarks/tpc/queries/tpcds/q82.sql
@@ -8,7 +8,7 @@ select i_item_id
where i_current_price between 69 and 69+30
and inv_item_sk = i_item_sk
and d_date_sk=inv_date_sk
- and d_date between cast('1998-06-06' as date) and (cast('1998-06-06' as date)
+ 60 days)
+ and d_date between cast('1998-06-06' as date) and (cast('1998-06-06' as date)
+ INTERVAL '60 DAYS')
and i_manufact_id in (105,513,180,137)
and inv_quantity_on_hand between 100 and 500
and ss_item_sk = i_item_sk
diff --git a/benchmarks/tpc/queries/tpcds/q92.sql
b/benchmarks/tpc/queries/tpcds/q92.sql
index 2a858fc11..63dab6f47 100644
--- a/benchmarks/tpc/queries/tpcds/q92.sql
+++ b/benchmarks/tpc/queries/tpcds/q92.sql
@@ -2,7 +2,7 @@
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance
Council.
-- This query was generated at scale factor 1.
select
- sum(ws_ext_discount_amt) as "Excess Discount Amount"
+ sum(ws_ext_discount_amt) as `Excess Discount Amount`
from
web_sales
,item
@@ -11,7 +11,7 @@ where
i_manufact_id = 914
and i_item_sk = ws_item_sk
and d_date between '2001-01-25' and
- (cast('2001-01-25' as date) + 90 days)
+ (cast('2001-01-25' as date) + INTERVAL '90 DAYS')
and d_date_sk = ws_sold_date_sk
and ws_ext_discount_amt
> (
@@ -23,7 +23,7 @@ and ws_ext_discount_amt
WHERE
ws_item_sk = i_item_sk
and d_date between '2001-01-25' and
- (cast('2001-01-25' as date) + 90 days)
+ (cast('2001-01-25' as date) + INTERVAL '90 DAYS')
and d_date_sk = ws_sold_date_sk
)
order by sum(ws_ext_discount_amt)
diff --git a/benchmarks/tpc/queries/tpcds/q94.sql
b/benchmarks/tpc/queries/tpcds/q94.sql
index ba9a1d902..79e528352 100644
--- a/benchmarks/tpc/queries/tpcds/q94.sql
+++ b/benchmarks/tpc/queries/tpcds/q94.sql
@@ -2,9 +2,9 @@
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance
Council.
-- This query was generated at scale factor 1.
select
- count(distinct ws_order_number) as "order count"
- ,sum(ws_ext_ship_cost) as "total shipping cost"
- ,sum(ws_net_profit) as "total net profit"
+ count(distinct ws_order_number) as `order count`
+ ,sum(ws_ext_ship_cost) as `total shipping cost`
+ ,sum(ws_net_profit) as `total net profit`
from
web_sales ws1
,date_dim
@@ -12,7 +12,7 @@ from
,web_site
where
d_date between '1999-4-01' and
- (cast('1999-4-01' as date) + 60 days)
+ (cast('1999-4-01' as date) + INTERVAL '60 DAYS')
and ws1.ws_ship_date_sk = d_date_sk
and ws1.ws_ship_addr_sk = ca_address_sk
and ca_state = 'WI'
diff --git a/benchmarks/tpc/queries/tpcds/q95.sql
b/benchmarks/tpc/queries/tpcds/q95.sql
index 01c3c8ccf..f29717711 100644
--- a/benchmarks/tpc/queries/tpcds/q95.sql
+++ b/benchmarks/tpc/queries/tpcds/q95.sql
@@ -7,9 +7,9 @@ with ws_wh as
where ws1.ws_order_number = ws2.ws_order_number
and ws1.ws_warehouse_sk <> ws2.ws_warehouse_sk)
select
- count(distinct ws_order_number) as "order count"
- ,sum(ws_ext_ship_cost) as "total shipping cost"
- ,sum(ws_net_profit) as "total net profit"
+ count(distinct ws_order_number) as `order count`
+ ,sum(ws_ext_ship_cost) as `total shipping cost`
+ ,sum(ws_net_profit) as `total net profit`
from
web_sales ws1
,date_dim
@@ -17,7 +17,7 @@ from
,web_site
where
d_date between '2002-5-01' and
- (cast('2002-5-01' as date) + 60 days)
+ (cast('2002-5-01' as date) + INTERVAL '60 DAYS')
and ws1.ws_ship_date_sk = d_date_sk
and ws1.ws_ship_addr_sk = ca_address_sk
and ca_state = 'MA'
diff --git a/benchmarks/tpc/queries/tpcds/q98.sql
b/benchmarks/tpc/queries/tpcds/q98.sql
index c364ca900..cfaf54de3 100644
--- a/benchmarks/tpc/queries/tpcds/q98.sql
+++ b/benchmarks/tpc/queries/tpcds/q98.sql
@@ -18,7 +18,7 @@ where
and i_category in ('Shoes', 'Music', 'Men')
and ss_sold_date_sk = d_date_sk
and d_date between cast('2000-01-05' as date)
- and (cast('2000-01-05' as date) + 30 days)
+ and (cast('2000-01-05' as date) + INTERVAL '30
DAYS')
group by
i_item_id
,i_item_desc
diff --git a/benchmarks/tpc/queries/tpcds/q99.sql
b/benchmarks/tpc/queries/tpcds/q99.sql
index b4713752b..a72305b0b 100644
--- a/benchmarks/tpc/queries/tpcds/q99.sql
+++ b/benchmarks/tpc/queries/tpcds/q99.sql
@@ -5,14 +5,14 @@ select
substr(w_warehouse_name,1,20)
,sm_type
,cc_name
- ,sum(case when (cs_ship_date_sk - cs_sold_date_sk <= 30 ) then 1 else 0 end)
as "30 days"
+ ,sum(case when (cs_ship_date_sk - cs_sold_date_sk <= 30 ) then 1 else 0 end)
as `30 days`
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 30) and
- (cs_ship_date_sk - cs_sold_date_sk <= 60) then 1 else 0 end )
as "31-60 days"
+ (cs_ship_date_sk - cs_sold_date_sk <= 60) then 1 else 0 end )
as `31-60 days`
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 60) and
- (cs_ship_date_sk - cs_sold_date_sk <= 90) then 1 else 0 end)
as "61-90 days"
+ (cs_ship_date_sk - cs_sold_date_sk <= 90) then 1 else 0 end)
as `61-90 days`
,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 90) and
- (cs_ship_date_sk - cs_sold_date_sk <= 120) then 1 else 0 end)
as "91-120 days"
- ,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 120) then 1 else 0 end)
as ">120 days"
+ (cs_ship_date_sk - cs_sold_date_sk <= 120) then 1 else 0 end)
as `91-120 days`
+ ,sum(case when (cs_ship_date_sk - cs_sold_date_sk > 120) then 1 else 0 end)
as `>120 days`
from
catalog_sales
,warehouse
diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py
index d98d1693a..38b0ed500 100755
--- a/benchmarks/tpc/run.py
+++ b/benchmarks/tpc/run.py
@@ -110,7 +110,7 @@ COMMON_SPARK_CONF = {
"spark.memory.offHeap.enabled": "true",
"spark.memory.offHeap.size": "16g",
"spark.eventLog.enabled": "true",
- "spark.eventLog.dir": "/results/spark-events",
+ "spark.eventLog.dir": os.environ.get("SPARK_EVENT_LOG_DIR",
"/tmp/spark-events"),
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.aws.credentials.provider":
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
}
diff --git a/benchmarks/tpc/tpcbench.py b/benchmarks/tpc/tpcbench.py
index f043afb1c..036d7b0e9 100644
--- a/benchmarks/tpc/tpcbench.py
+++ b/benchmarks/tpc/tpcbench.py
@@ -25,6 +25,7 @@ Supports two data sources:
import argparse
from datetime import datetime
+import hashlib
import json
import os
from pyspark.sql import SparkSession
@@ -46,6 +47,15 @@ def dedup_columns(df):
return df.toDF(*new_cols)
+def result_hash(rows):
+ """Compute a deterministic MD5 hash from collected rows."""
+ sorted_rows = sorted(rows, key=lambda r: str(r))
+ h = hashlib.md5()
+ for row in sorted_rows:
+ h.update(str(row).encode("utf-8"))
+ return h.hexdigest()
+
+
def main(
benchmark: str,
data_path: str,
@@ -162,14 +172,19 @@ def main(
print(f"Results written to {output_path}")
else:
rows = df.collect()
- print(f"Query {query} returned {len(rows)} rows")
+ row_count = len(rows)
+ row_hash = result_hash(rows)
+ print(f"Query {query} returned {row_count} rows,
hash={row_hash}")
end_time = time.time()
elapsed = end_time - start_time
print(f"Query {query} took {elapsed:.2f} seconds")
- query_timings = results.setdefault(query, [])
- query_timings.append(elapsed)
+ query_result = results.setdefault(query, {"durations": []})
+ query_result["durations"].append(round(elapsed, 3))
+ if "row_count" not in query_result and not write_path:
+ query_result["row_count"] = row_count
+ query_result["result_hash"] = row_hash
iter_end_time = time.time()
print(f"\nIteration {iteration + 1} took {iter_end_time -
iter_start_time:.2f} seconds")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]