adriangb commented on PR #18393:
URL: https://github.com/apache/datafusion/pull/18393#issuecomment-3474879801

   Here's one interesting benchmark:
   
   ```sql
   COPY (SELECT uuid() as k, uuid() as v FROM generate_series(1, 5) t(i))
   TO 'small_table_uuids.parquet'
   OPTIONS (
     'MAX_ROW_GROUP_SIZE' '50000',
     'BLOOM_FILTER_ENABLED::k' 'true'
   );
   COPY (SELECT random()::text as v1, random()::text as v2, uuid() as k FROM 
generate_series(1, 100000000) t(i))
   TO 'large_table_uuids.parquet'
   OPTIONS (
     'MAX_ROW_GROUP_SIZE' '50000',
     'BLOOM_FILTER_ENABLED::k' 'true'
   );
   
   CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION 
'small_table_uuids.parquet';
   CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION 
'large_table_uuids.parquet';
   
   SET datafusion.execution.parquet.pushdown_filters = true;
   SET datafusion.execution.parquet.reorder_filters = true;
   SET datafusion.runtime.metadata_cache_limit = 0;
   
   -- Join the two tables, with a filter on small_table
   SELECT *
   FROM small_table s JOIN large_table l ON s.k = l.k;
   ```
   
   <details>
   <summary>Full benchmark script</summary>
   
   
   ```python
   #!/usr/bin/env python3
   """
   Benchmark script comparing DataFusion with/without inlist pushdown vs DuckDB.
   
   Groups:
   1. branch (no inlist): hash_join_inlist_pushdown_max_size = 0
   2. branch (w/ inlist): hash_join_inlist_pushdown_max_size = default (999999)
   3. main: using datafusion-cli-main
   4. duckdb: using duckdb CLI
   """
   
   import subprocess
   import tempfile
   import time
   import os
   import sys
   from pathlib import Path
   
   # Configuration
   DATAFUSION_CLI = "./target/release/datafusion-cli"
   DATAFUSION_CLI_MAIN = "./datafusion-cli-main"
   DUCKDB_CLI = "duckdb"
   NUM_RUNS = 5  # Number of times to run each benchmark
   
   # Data generation settings
   SMALL_TABLE_SIZE = 5
   LARGE_TABLE_SIZE = 100_000_000
   SMALL_TABLE_FILE = "small_table_uuids.parquet"
   LARGE_TABLE_FILE = "large_table_uuids.parquet"
   
   
   def run_command(cmd, input_sql=None, description=""):
       """Run a command and measure execution time."""
       print(f"  Running: {description}...", end=" ", flush=True)
   
       start = time.time()
       try:
           if input_sql:
               result = subprocess.run(
                   cmd,
                   input=input_sql,
                   capture_output=True,
                   text=True,
                   timeout=600  # 10 minute timeout
               )
           else:
               result = subprocess.run(
                   cmd,
                   capture_output=True,
                   text=True,
                   timeout=600
               )
   
           elapsed = time.time() - start
   
           if result.returncode != 0:
               print(f"FAILED (exit code {result.returncode})")
               print(f"    stderr: {result.stderr}")
               return None
   
           print(f"{elapsed:.3f}s")
           return elapsed
       except subprocess.TimeoutExpired:
           print("TIMEOUT")
           return None
       except Exception as e:
           print(f"ERROR: {e}")
           return None
   
   
   def create_data():
       """Create test data files if they don't exist."""
       if os.path.exists(SMALL_TABLE_FILE) and os.path.exists(LARGE_TABLE_FILE):
           print(f"Data files already exist, skipping creation.")
           return True
   
       print(f"Creating test data...")
   
       data_gen_sql = f"""
   COPY (SELECT uuid() as k, uuid() as v FROM generate_series(1, 
{SMALL_TABLE_SIZE}) t(i))
   TO '{SMALL_TABLE_FILE}'
   OPTIONS (
     'MAX_ROW_GROUP_SIZE' '50000',
     'BLOOM_FILTER_ENABLED::k' 'true'
   );
   
   COPY (SELECT random()::text as v1, random()::text as v2, uuid() as k FROM 
generate_series(1, {LARGE_TABLE_SIZE}) t(i))
   TO '{LARGE_TABLE_FILE}'
   OPTIONS (
     'MAX_ROW_GROUP_SIZE' '50000',
     'BLOOM_FILTER_ENABLED::k' 'true'
   );
   """
   
       result = subprocess.run(
           [DATAFUSION_CLI],
           input=data_gen_sql,
           capture_output=True,
           text=True
       )
   
       if result.returncode != 0:
           print(f"Failed to create data: {result.stderr}")
           return False
   
       print(f"Data created successfully.")
       return True
   
   
   def create_datafusion_sql(inlist_size):
       """Create SQL for DataFusion with specified inlist pushdown size."""
       return f"""
   CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION 
'{SMALL_TABLE_FILE}';
   CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION 
'{LARGE_TABLE_FILE}';
   
   SET datafusion.execution.parquet.pushdown_filters = true;
   SET datafusion.execution.parquet.reorder_filters = true;
   SET datafusion.optimizer.hash_join_inlist_pushdown_max_size = {inlist_size};
   SET datafusion.runtime.metadata_cache_limit = '0M';
   
   SELECT *
   FROM small_table s JOIN large_table l ON s.k = l.k;
   """
   
   
   def create_duckdb_sql():
       """Create SQL for DuckDB."""
       return f"""
   SELECT *
   FROM '{SMALL_TABLE_FILE}' s JOIN '{LARGE_TABLE_FILE}' l ON s.k = l.k;
   """
   
   
   def run_benchmark_group(name, cmd, sql_content, num_runs=NUM_RUNS):
       """Run a benchmark group multiple times and collect results."""
       print(f"\n{name}:")
       times = []
   
       for i in range(num_runs):
           elapsed = run_command(cmd, input_sql=sql_content, description=f"Run 
{i+1}/{num_runs}")
           if elapsed is not None:
               times.append(elapsed)
   
       if times:
           avg = sum(times) / len(times)
           min_time = min(times)
           max_time = max(times)
           print(f"  Results: min={min_time:.3f}s, avg={avg:.3f}s, 
max={max_time:.3f}s")
           return times
       else:
           print(f"  No successful runs")
           return []
   
   
   def main():
       print("=" * 60)
       print("DataFusion Inlist Pushdown Benchmark")
       print("=" * 60)
   
       # Verify executables exist
       if not os.path.exists(DATAFUSION_CLI):
           print(f"Error: {DATAFUSION_CLI} not found")
           sys.exit(1)
   
       if not os.path.exists(DATAFUSION_CLI_MAIN):
           print(f"Error: {DATAFUSION_CLI_MAIN} not found")
           sys.exit(1)
   
       try:
           subprocess.run([DUCKDB_CLI, "--version"], capture_output=True, 
check=True)
       except (subprocess.CalledProcessError, FileNotFoundError):
           print(f"Error: duckdb CLI not found or not working")
           sys.exit(1)
   
       # Create data
       if not create_data():
           sys.exit(1)
   
       # Run benchmarks
       results = {}
   
       # 1. Branch without inlist pushdown
       results["branch_no_inlist"] = run_benchmark_group(
           "Branch (no inlist, size=0)",
           [DATAFUSION_CLI],
           create_datafusion_sql(0)
       )
   
       # 2. Branch with inlist pushdown
       results["branch_with_inlist"] = run_benchmark_group(
           "Branch (w/ inlist, size=999999)",
           [DATAFUSION_CLI],
           create_datafusion_sql(999999)
       )
   
       # 3. Main branch
       results["main"] = run_benchmark_group(
           "Main branch",
           [DATAFUSION_CLI_MAIN],
           create_datafusion_sql(999999)
       )
   
       # 4. DuckDB
       results["duckdb"] = run_benchmark_group(
           "DuckDB",
           [DUCKDB_CLI],
           create_duckdb_sql()
       )
   
       # Summary
       print("\n" + "=" * 60)
       print("SUMMARY")
       print("=" * 60)
   
       for name, times in results.items():
           if times:
               avg = sum(times) / len(times)
               print(f"{name:25s}: {avg:.3f}s avg over {len(times)} runs")
           else:
               print(f"{name:25s}: No successful runs")
   
       print("\nAll times (seconds):")
       for name, times in results.items():
           if times:
               times_str = ", ".join(f"{t:.3f}" for t in times)
               print(f"  {name}: [{times_str}]")
   
   
   if __name__ == "__main__":
       main()
   ```
   
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to