andygrove opened a new issue, #3064: URL: https://github.com/apache/datafusion-comet/issues/3064
### Describe the bug # Comet writes significantly more shuffle data than Spark ## Summary Comet's shuffle manager writes 1.5x - 3x more shuffle data than Spark's native shuffle manager for equivalent queries. This was observed with both nested complex types and simple flat schemas. ## Environment - PySpark 3.5.7 - Comet 0.13.0-SNAPSHOT (comet-spark-spark3.5_2.12) - Java 17 (OpenJDK 17.0.17) - Ubuntu 22.04 ## Reproduction ### Prerequisites - Python 3.8+ - Java 17 - A Spark standalone cluster (optional - will use local mode if not set) ### Setup ```bash # Create virtual environment python3 -m venv venv source venv/bin/activate # Install PySpark 3.5.x pip install pyspark==3.5.7 # Download Comet JAR (adjust version as needed) # From: https://github.com/apache/datafusion-comet/releases # Or build from source export COMET_JAR=/path/to/comet-spark-spark3.5_2.12-0.13.0.jar ``` ### Run reproduction Save the script below as `comet_shuffle_repro.py` and run: ```bash # Optional: connect to standalone cluster # export SPARK_MASTER=spark://hostname:7077 # Run the reproduction python comet_shuffle_repro.py ``` ### Reproduction script ```python #!/usr/bin/env python3 """ Minimal reproduction for Comet shuffle size issue. Demonstrates that Comet writes significantly more shuffle data than Spark for the same query. """ import json import os import subprocess import sys import tempfile REPRO_SCRIPT = ''' import os from pyspark.sql import SparkSession from pyspark.sql import functions as F def create_spark_session(app_name, enable_comet=False): """Create SparkSession with optional Comet support.""" spark_master = os.environ.get("SPARK_MASTER") event_log_dir = os.environ.get("SPARK_EVENT_LOG_DIR", "/tmp/spark-events") comet_jar = os.environ.get("COMET_JAR") os.makedirs(event_log_dir, exist_ok=True) builder = SparkSession.builder.appName(app_name) if spark_master: builder = builder.master(spark_master) builder = ( builder .config("spark.eventLog.enabled", "true") .config("spark.eventLog.dir", f"file://{event_log_dir}") .config("spark.sql.shuffle.partitions", "200") ) if enable_comet: if not comet_jar: raise ValueError("COMET_JAR environment variable not set") builder = ( builder .config("spark.jars", comet_jar) .config("spark.driver.extraClassPath", comet_jar) .config("spark.executor.extraClassPath", comet_jar) .config("spark.plugins", "org.apache.spark.CometPlugin") .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") .config("spark.memory.offHeap.enabled", "true") .config("spark.memory.offHeap.size", "2g") ) return builder.getOrCreate() def run_test(enable_comet): """Generate data and run shuffle query.""" mode = "Comet" if enable_comet else "Spark" print(f"Running with: {mode}") spark = create_spark_session(f"ShuffleRepro-{mode}", enable_comet=enable_comet) spark.sparkContext.setLogLevel("WARN") try: # Generate simple test data print("Generating test data...") df = spark.range(500000).select( F.col("id"), (F.col("id") % 1000).alias("group_key"), F.rand(42).alias("value1"), F.rand(43).alias("value2"), F.concat(F.lit("user_"), (F.col("id") % 10000).cast("string")).alias("user_id"), ) df.cache() df.count() # Run query that triggers shuffle print("Running shuffle query...") import time start = time.time() result = ( df .groupBy("group_key") .agg( F.count("*").alias("cnt"), F.sum("value1").alias("sum_v1"), F.avg("value2").alias("avg_v2"), F.countDistinct("user_id").alias("distinct_users"), ) .orderBy("group_key") ) count = result.count() elapsed = (time.time() - start) * 1000 print(f"Result rows: {count}") print(f"Execution time: {elapsed:.2f} ms") finally: spark.stop() if __name__ == "__main__": import sys enable_comet = "--comet" in sys.argv run_test(enable_comet) ''' def parse_event_log(log_path): """Parse Spark event log and extract shuffle metrics.""" metrics = {"total_shuffle_write_bytes": 0, "total_shuffle_read_bytes": 0} with open(log_path, 'r') as f: for line in f: try: event = json.loads(line.strip()) except json.JSONDecodeError: continue if event.get("Event") == "SparkListenerTaskEnd": task_metrics = event.get("Task Metrics", {}) write_metrics = task_metrics.get("Shuffle Write Metrics", {}) read_metrics = task_metrics.get("Shuffle Read Metrics", {}) metrics["total_shuffle_write_bytes"] += write_metrics.get("Shuffle Bytes Written", 0) metrics["total_shuffle_read_bytes"] += ( read_metrics.get("Remote Bytes Read", 0) + read_metrics.get("Local Bytes Read", 0) ) return metrics def find_latest_log(log_dir): """Find the most recent event log file.""" logs = [(os.path.getmtime(os.path.join(log_dir, f)), os.path.join(log_dir, f)) for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and not f.startswith('.')] logs.sort(reverse=True) return logs[0][1] if logs else None def format_bytes(num_bytes): """Format bytes as human-readable string.""" for unit in ['B', 'KB', 'MB', 'GB']: if abs(num_bytes) < 1024.0: return f"{num_bytes:.2f} {unit}" num_bytes /= 1024.0 return f"{num_bytes:.2f} TB" def run_repro(enable_comet): """Run the reproduction script and return metrics.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(REPRO_SCRIPT) script_path = f.name try: cmd = [sys.executable, script_path] if enable_comet: cmd.append("--comet") env = os.environ.copy() if not enable_comet: env.pop("COMET_JAR", None) result = subprocess.run(cmd, env=env, capture_output=True, text=True) print(result.stdout) if result.returncode != 0: print(f"STDERR: {result.stderr}") return None event_log_dir = os.environ.get("SPARK_EVENT_LOG_DIR", "/tmp/spark-events") log_path = find_latest_log(event_log_dir) return parse_event_log(log_path) if log_path else None finally: os.unlink(script_path) def main(): print("=" * 60) print("Comet Shuffle Size Reproduction") print("=" * 60) comet_jar = os.environ.get("COMET_JAR") if not comet_jar or not os.path.exists(comet_jar): print("\nError: Set COMET_JAR environment variable to Comet JAR path") sys.exit(1) print(f"\nComet JAR: {comet_jar}") # Run Spark baseline print("\n" + "-" * 60) print("Running Spark (baseline)...") print("-" * 60) spark_metrics = run_repro(enable_comet=False) # Run Comet print("\n" + "-" * 60) print("Running Comet...") print("-" * 60) comet_metrics = run_repro(enable_comet=True) if not spark_metrics or not comet_metrics: print("Failed to collect metrics") sys.exit(1) # Print comparison spark_write = spark_metrics["total_shuffle_write_bytes"] comet_write = comet_metrics["total_shuffle_write_bytes"] ratio = comet_write / spark_write if spark_write > 0 else 0 print("\n" + "=" * 60) print("RESULTS") print("=" * 60) print(f"\n{'Metric':<20} {'Spark':<15} {'Comet':<15} {'Ratio':<10}") print("-" * 60) print(f"{'Shuffle Write':<20} {format_bytes(spark_write):<15} " f"{format_bytes(comet_write):<15} {ratio:.2f}x") print(f"{'Shuffle Read':<20} {format_bytes(spark_metrics['total_shuffle_read_bytes']):<15} " f"{format_bytes(comet_metrics['total_shuffle_read_bytes']):<15}") if ratio > 1.1: print(f"\n⚠️ Comet writes {ratio:.2f}x MORE shuffle data than Spark") if __name__ == "__main__": main() ``` ## Expected output ``` ============================================================ Comet Shuffle Size Reproduction ============================================================ Comet JAR: /path/to/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar ------------------------------------------------------------ Running Spark (baseline)... ------------------------------------------------------------ Running with: Spark Generating test data... Running shuffle query... Result rows: 1000 Execution time: 417.17 ms ------------------------------------------------------------ Running Comet... ------------------------------------------------------------ Running with: Comet Generating test data... Running shuffle query... Result rows: 1000 Execution time: 535.16 ms ============================================================ RESULTS ============================================================ Metric Spark Comet Ratio ------------------------------------------------------------ Shuffle Write 36.04 KB 118.65 KB 3.29x Shuffle Read 36.04 KB 118.65 KB ⚠️ Comet writes 3.29x MORE shuffle data than Spark ``` ## Additional observations - The shuffle size increase was also observed with larger datasets (~1GB): - Spark: 14.23 MB - Comet: 21.92 MB (1.54x) - Verified by monitoring actual disk usage during query execution (not just metrics) - Occurs with both simple flat schemas and nested complex types (structs, arrays) ## Questions 1. Is this expected behavior due to Comet's shuffle format? 2. Is there a configuration to reduce shuffle size? 3. Could this impact performance for shuffle-heavy workloads on disk-constrained systems? ### Steps to reproduce _No response_ ### Expected behavior _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
