andygrove opened a new issue, #3064:
URL: https://github.com/apache/datafusion-comet/issues/3064

   ### Describe the bug
   
   # Comet writes significantly more shuffle data than Spark
   
   ## Summary
   
   Comet's shuffle manager writes 1.5x - 3x more shuffle data than Spark's 
native shuffle manager for equivalent queries. This was observed with both 
nested complex types and simple flat schemas.
   
   ## Environment
   
   - PySpark 3.5.7
   - Comet 0.13.0-SNAPSHOT (comet-spark-spark3.5_2.12)
   - Java 17 (OpenJDK 17.0.17)
   - Ubuntu 22.04
   
   ## Reproduction
   
   ### Prerequisites
   
   - Python 3.8+
   - Java 17
   - A Spark standalone cluster (optional - will use local mode if not set)
   
   ### Setup
   
   ```bash
   # Create virtual environment
   python3 -m venv venv
   source venv/bin/activate
   
   # Install PySpark 3.5.x
   pip install pyspark==3.5.7
   
   # Download Comet JAR (adjust version as needed)
   # From: https://github.com/apache/datafusion-comet/releases
   # Or build from source
   export COMET_JAR=/path/to/comet-spark-spark3.5_2.12-0.13.0.jar
   ```
   
   ### Run reproduction
   
   Save the script below as `comet_shuffle_repro.py` and run:
   
   ```bash
   # Optional: connect to standalone cluster
   # export SPARK_MASTER=spark://hostname:7077
   
   # Run the reproduction
   python comet_shuffle_repro.py
   ```
   
   ### Reproduction script
   
   ```python
   #!/usr/bin/env python3
   """
   Minimal reproduction for Comet shuffle size issue.
   
   Demonstrates that Comet writes significantly more shuffle data
   than Spark for the same query.
   """
   
   import json
   import os
   import subprocess
   import sys
   import tempfile
   
   
   REPRO_SCRIPT = '''
   import os
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as F
   
   def create_spark_session(app_name, enable_comet=False):
       """Create SparkSession with optional Comet support."""
       spark_master = os.environ.get("SPARK_MASTER")
       event_log_dir = os.environ.get("SPARK_EVENT_LOG_DIR", 
"/tmp/spark-events")
       comet_jar = os.environ.get("COMET_JAR")
   
       os.makedirs(event_log_dir, exist_ok=True)
   
       builder = SparkSession.builder.appName(app_name)
   
       if spark_master:
           builder = builder.master(spark_master)
   
       builder = (
           builder
           .config("spark.eventLog.enabled", "true")
           .config("spark.eventLog.dir", f"file://{event_log_dir}")
           .config("spark.sql.shuffle.partitions", "200")
       )
   
       if enable_comet:
           if not comet_jar:
               raise ValueError("COMET_JAR environment variable not set")
           builder = (
               builder
               .config("spark.jars", comet_jar)
               .config("spark.driver.extraClassPath", comet_jar)
               .config("spark.executor.extraClassPath", comet_jar)
               .config("spark.plugins", "org.apache.spark.CometPlugin")
               .config("spark.shuffle.manager", 
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
               .config("spark.memory.offHeap.enabled", "true")
               .config("spark.memory.offHeap.size", "2g")
           )
   
       return builder.getOrCreate()
   
   
   def run_test(enable_comet):
       """Generate data and run shuffle query."""
       mode = "Comet" if enable_comet else "Spark"
       print(f"Running with: {mode}")
   
       spark = create_spark_session(f"ShuffleRepro-{mode}", 
enable_comet=enable_comet)
       spark.sparkContext.setLogLevel("WARN")
   
       try:
           # Generate simple test data
           print("Generating test data...")
           df = spark.range(500000).select(
               F.col("id"),
               (F.col("id") % 1000).alias("group_key"),
               F.rand(42).alias("value1"),
               F.rand(43).alias("value2"),
               F.concat(F.lit("user_"), (F.col("id") % 
10000).cast("string")).alias("user_id"),
           )
           df.cache()
           df.count()
   
           # Run query that triggers shuffle
           print("Running shuffle query...")
           import time
           start = time.time()
   
           result = (
               df
               .groupBy("group_key")
               .agg(
                   F.count("*").alias("cnt"),
                   F.sum("value1").alias("sum_v1"),
                   F.avg("value2").alias("avg_v2"),
                   F.countDistinct("user_id").alias("distinct_users"),
               )
               .orderBy("group_key")
           )
           count = result.count()
   
           elapsed = (time.time() - start) * 1000
           print(f"Result rows: {count}")
           print(f"Execution time: {elapsed:.2f} ms")
   
       finally:
           spark.stop()
   
   
   if __name__ == "__main__":
       import sys
       enable_comet = "--comet" in sys.argv
       run_test(enable_comet)
   '''
   
   
   def parse_event_log(log_path):
       """Parse Spark event log and extract shuffle metrics."""
       metrics = {"total_shuffle_write_bytes": 0, "total_shuffle_read_bytes": 0}
   
       with open(log_path, 'r') as f:
           for line in f:
               try:
                   event = json.loads(line.strip())
               except json.JSONDecodeError:
                   continue
   
               if event.get("Event") == "SparkListenerTaskEnd":
                   task_metrics = event.get("Task Metrics", {})
                   write_metrics = task_metrics.get("Shuffle Write Metrics", {})
                   read_metrics = task_metrics.get("Shuffle Read Metrics", {})
   
                   metrics["total_shuffle_write_bytes"] += 
write_metrics.get("Shuffle Bytes Written", 0)
                   metrics["total_shuffle_read_bytes"] += (
                       read_metrics.get("Remote Bytes Read", 0) +
                       read_metrics.get("Local Bytes Read", 0)
                   )
   
       return metrics
   
   
   def find_latest_log(log_dir):
       """Find the most recent event log file."""
       logs = [(os.path.getmtime(os.path.join(log_dir, f)), 
os.path.join(log_dir, f))
               for f in os.listdir(log_dir)
               if os.path.isfile(os.path.join(log_dir, f)) and not 
f.startswith('.')]
       logs.sort(reverse=True)
       return logs[0][1] if logs else None
   
   
   def format_bytes(num_bytes):
       """Format bytes as human-readable string."""
       for unit in ['B', 'KB', 'MB', 'GB']:
           if abs(num_bytes) < 1024.0:
               return f"{num_bytes:.2f} {unit}"
           num_bytes /= 1024.0
       return f"{num_bytes:.2f} TB"
   
   
   def run_repro(enable_comet):
       """Run the reproduction script and return metrics."""
       with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) 
as f:
           f.write(REPRO_SCRIPT)
           script_path = f.name
   
       try:
           cmd = [sys.executable, script_path]
           if enable_comet:
               cmd.append("--comet")
   
           env = os.environ.copy()
           if not enable_comet:
               env.pop("COMET_JAR", None)
   
           result = subprocess.run(cmd, env=env, capture_output=True, text=True)
           print(result.stdout)
           if result.returncode != 0:
               print(f"STDERR: {result.stderr}")
               return None
   
           event_log_dir = os.environ.get("SPARK_EVENT_LOG_DIR", 
"/tmp/spark-events")
           log_path = find_latest_log(event_log_dir)
           return parse_event_log(log_path) if log_path else None
   
       finally:
           os.unlink(script_path)
   
   
   def main():
       print("=" * 60)
       print("Comet Shuffle Size Reproduction")
       print("=" * 60)
   
       comet_jar = os.environ.get("COMET_JAR")
       if not comet_jar or not os.path.exists(comet_jar):
           print("\nError: Set COMET_JAR environment variable to Comet JAR 
path")
           sys.exit(1)
   
       print(f"\nComet JAR: {comet_jar}")
   
       # Run Spark baseline
       print("\n" + "-" * 60)
       print("Running Spark (baseline)...")
       print("-" * 60)
       spark_metrics = run_repro(enable_comet=False)
   
       # Run Comet
       print("\n" + "-" * 60)
       print("Running Comet...")
       print("-" * 60)
       comet_metrics = run_repro(enable_comet=True)
   
       if not spark_metrics or not comet_metrics:
           print("Failed to collect metrics")
           sys.exit(1)
   
       # Print comparison
       spark_write = spark_metrics["total_shuffle_write_bytes"]
       comet_write = comet_metrics["total_shuffle_write_bytes"]
       ratio = comet_write / spark_write if spark_write > 0 else 0
   
       print("\n" + "=" * 60)
       print("RESULTS")
       print("=" * 60)
       print(f"\n{'Metric':<20} {'Spark':<15} {'Comet':<15} {'Ratio':<10}")
       print("-" * 60)
       print(f"{'Shuffle Write':<20} {format_bytes(spark_write):<15} "
             f"{format_bytes(comet_write):<15} {ratio:.2f}x")
       print(f"{'Shuffle Read':<20} 
{format_bytes(spark_metrics['total_shuffle_read_bytes']):<15} "
             f"{format_bytes(comet_metrics['total_shuffle_read_bytes']):<15}")
   
       if ratio > 1.1:
           print(f"\n⚠️  Comet writes {ratio:.2f}x MORE shuffle data than 
Spark")
   
   
   if __name__ == "__main__":
       main()
   ```
   
   ## Expected output
   
   ```
   ============================================================
   Comet Shuffle Size Reproduction
   ============================================================
   
   Comet JAR: /path/to/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar
   
   ------------------------------------------------------------
   Running Spark (baseline)...
   ------------------------------------------------------------
   Running with: Spark
   Generating test data...
   Running shuffle query...
   Result rows: 1000
   Execution time: 417.17 ms
   
   ------------------------------------------------------------
   Running Comet...
   ------------------------------------------------------------
   Running with: Comet
   Generating test data...
   Running shuffle query...
   Result rows: 1000
   Execution time: 535.16 ms
   
   ============================================================
   RESULTS
   ============================================================
   
   Metric               Spark           Comet           Ratio
   ------------------------------------------------------------
   Shuffle Write        36.04 KB        118.65 KB       3.29x
   Shuffle Read         36.04 KB        118.65 KB
   
   ⚠️  Comet writes 3.29x MORE shuffle data than Spark
   ```
   
   ## Additional observations
   
   - The shuffle size increase was also observed with larger datasets (~1GB):
     - Spark: 14.23 MB
     - Comet: 21.92 MB (1.54x)
   
   - Verified by monitoring actual disk usage during query execution (not just 
metrics)
   
   - Occurs with both simple flat schemas and nested complex types (structs, 
arrays)
   
   ## Questions
   
   1. Is this expected behavior due to Comet's shuffle format?
   2. Is there a configuration to reduce shuffle size?
   3. Could this impact performance for shuffle-heavy workloads on 
disk-constrained systems?
   
   
   ### Steps to reproduce
   
   _No response_
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to