adriangb commented on PR #18393:
URL: https://github.com/apache/datafusion/pull/18393#issuecomment-3474879801
Here's one interesting benchmark:
```sql
COPY (SELECT uuid() as k, uuid() as v FROM generate_series(1, 5) t(i))
TO 'small_table_uuids.parquet'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
COPY (SELECT random()::text as v1, random()::text as v2, uuid() as k FROM
generate_series(1, 100000000) t(i))
TO 'large_table_uuids.parquet'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION
'small_table_uuids.parquet';
CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION
'large_table_uuids.parquet';
SET datafusion.execution.parquet.pushdown_filters = true;
SET datafusion.execution.parquet.reorder_filters = true;
SET datafusion.runtime.metadata_cache_limit = 0;
-- Join the two tables, with a filter on small_table
SELECT *
FROM small_table s JOIN large_table l ON s.k = l.k;
```
<details>
<summary>Full benchmark script</summary>
```python
#!/usr/bin/env python3
"""
Benchmark script comparing DataFusion with/without inlist pushdown vs DuckDB.
Groups:
1. branch (no inlist): hash_join_inlist_pushdown_max_size = 0
2. branch (w/ inlist): hash_join_inlist_pushdown_max_size = default (999999)
3. main: using datafusion-cli-main
4. duckdb: using duckdb CLI
"""
import subprocess
import tempfile
import time
import os
import sys
from pathlib import Path
# Configuration
DATAFUSION_CLI = "./target/release/datafusion-cli"
DATAFUSION_CLI_MAIN = "./datafusion-cli-main"
DUCKDB_CLI = "duckdb"
NUM_RUNS = 5 # Number of times to run each benchmark
# Data generation settings
SMALL_TABLE_SIZE = 5
LARGE_TABLE_SIZE = 100_000_000
SMALL_TABLE_FILE = "small_table_uuids.parquet"
LARGE_TABLE_FILE = "large_table_uuids.parquet"
def run_command(cmd, input_sql=None, description=""):
"""Run a command and measure execution time."""
print(f" Running: {description}...", end=" ", flush=True)
start = time.time()
try:
if input_sql:
result = subprocess.run(
cmd,
input=input_sql,
capture_output=True,
text=True,
timeout=600 # 10 minute timeout
)
else:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600
)
elapsed = time.time() - start
if result.returncode != 0:
print(f"FAILED (exit code {result.returncode})")
print(f" stderr: {result.stderr}")
return None
print(f"{elapsed:.3f}s")
return elapsed
except subprocess.TimeoutExpired:
print("TIMEOUT")
return None
except Exception as e:
print(f"ERROR: {e}")
return None
def create_data():
"""Create test data files if they don't exist."""
if os.path.exists(SMALL_TABLE_FILE) and os.path.exists(LARGE_TABLE_FILE):
print(f"Data files already exist, skipping creation.")
return True
print(f"Creating test data...")
data_gen_sql = f"""
COPY (SELECT uuid() as k, uuid() as v FROM generate_series(1,
{SMALL_TABLE_SIZE}) t(i))
TO '{SMALL_TABLE_FILE}'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
COPY (SELECT random()::text as v1, random()::text as v2, uuid() as k FROM
generate_series(1, {LARGE_TABLE_SIZE}) t(i))
TO '{LARGE_TABLE_FILE}'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
"""
result = subprocess.run(
[DATAFUSION_CLI],
input=data_gen_sql,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"Failed to create data: {result.stderr}")
return False
print(f"Data created successfully.")
return True
def create_datafusion_sql(inlist_size):
"""Create SQL for DataFusion with specified inlist pushdown size."""
return f"""
CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION
'{SMALL_TABLE_FILE}';
CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION
'{LARGE_TABLE_FILE}';
SET datafusion.execution.parquet.pushdown_filters = true;
SET datafusion.execution.parquet.reorder_filters = true;
SET datafusion.optimizer.hash_join_inlist_pushdown_max_size = {inlist_size};
SET datafusion.runtime.metadata_cache_limit = '0M';
SELECT *
FROM small_table s JOIN large_table l ON s.k = l.k;
"""
def create_duckdb_sql():
"""Create SQL for DuckDB."""
return f"""
SELECT *
FROM '{SMALL_TABLE_FILE}' s JOIN '{LARGE_TABLE_FILE}' l ON s.k = l.k;
"""
def run_benchmark_group(name, cmd, sql_content, num_runs=NUM_RUNS):
"""Run a benchmark group multiple times and collect results."""
print(f"\n{name}:")
times = []
for i in range(num_runs):
elapsed = run_command(cmd, input_sql=sql_content, description=f"Run
{i+1}/{num_runs}")
if elapsed is not None:
times.append(elapsed)
if times:
avg = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
print(f" Results: min={min_time:.3f}s, avg={avg:.3f}s,
max={max_time:.3f}s")
return times
else:
print(f" No successful runs")
return []
def main():
print("=" * 60)
print("DataFusion Inlist Pushdown Benchmark")
print("=" * 60)
# Verify executables exist
if not os.path.exists(DATAFUSION_CLI):
print(f"Error: {DATAFUSION_CLI} not found")
sys.exit(1)
if not os.path.exists(DATAFUSION_CLI_MAIN):
print(f"Error: {DATAFUSION_CLI_MAIN} not found")
sys.exit(1)
try:
subprocess.run([DUCKDB_CLI, "--version"], capture_output=True,
check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print(f"Error: duckdb CLI not found or not working")
sys.exit(1)
# Create data
if not create_data():
sys.exit(1)
# Run benchmarks
results = {}
# 1. Branch without inlist pushdown
results["branch_no_inlist"] = run_benchmark_group(
"Branch (no inlist, size=0)",
[DATAFUSION_CLI],
create_datafusion_sql(0)
)
# 2. Branch with inlist pushdown
results["branch_with_inlist"] = run_benchmark_group(
"Branch (w/ inlist, size=999999)",
[DATAFUSION_CLI],
create_datafusion_sql(999999)
)
# 3. Main branch
results["main"] = run_benchmark_group(
"Main branch",
[DATAFUSION_CLI_MAIN],
create_datafusion_sql(999999)
)
# 4. DuckDB
results["duckdb"] = run_benchmark_group(
"DuckDB",
[DUCKDB_CLI],
create_duckdb_sql()
)
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
for name, times in results.items():
if times:
avg = sum(times) / len(times)
print(f"{name:25s}: {avg:.3f}s avg over {len(times)} runs")
else:
print(f"{name:25s}: No successful runs")
print("\nAll times (seconds):")
for name, times in results.items():
if times:
times_str = ", ".join(f"{t:.3f}" for t in times)
print(f" {name}: [{times_str}]")
if __name__ == "__main__":
main()
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]