mbutrovich opened a new issue, #1906: URL: https://github.com/apache/datafusion-comet/issues/1906
### Describe the bug As noted by @Kontinuation, the RangePartitioning implementation that just got merged has a bug, most obvious when trying to RangePartition pre-sorted data: > This implementation of RangePartitioning may be incorrect. RangePartitioning should partition the input DataFrame into partitions with consecutive and non-overlapping ranges, this requires scanning the entire DataFrame to obtain the ranges of each partition before performing the actual shuffle writing. > > Here is the PySpark code to illustrate the difference between the behavior of Comet and Vanilla Spark. > > ```python > spark.range(0, 100000).write.format("parquet").mode("overwrite").save("range-partitioning") > > df = spark.read.parquet("range-partitioning") > df_range_partitioned = df.repartitionByRange(10, "id") > > df_range_partitioned.explain() > > # Show the min and max of each range > def get_partition_bounds(idx, iterator): > min = None > max = None > for row in iterator: > if min is None or row.id < min: > min = row.id > if max is None or row.id > max: > max = row.id > yield idx, min, max > > partition_bounds = df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect() > > # Print the results > for partition_id, min_id, max_id in sorted(partition_bounds): > print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}") > ``` > > **Comet**: > > ``` > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173] > +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> > > Partition 0: min_id=0, max_id=90799 > Partition 1: min_id=753, max_id=91680 > Partition 2: min_id=1527, max_id=92520 > Partition 3: min_id=2399, max_id=93284 > Partition 4: min_id=3274, max_id=94123 > Partition 5: min_id=4053, max_id=94844 > Partition 6: min_id=4851, max_id=95671 > Partition 7: min_id=5738, max_id=96522 > Partition 8: min_id=6571, max_id=97335 > Partition 9: min_id=7408, max_id=99999 > ``` > > **Spark**: > > ``` > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, [plan_id=197] > +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> > > Partition 0: min_id=0, max_id=9974 > Partition 1: min_id=9975, max_id=19981 > Partition 2: min_id=19982, max_id=29993 > Partition 3: min_id=29994, max_id=39997 > Partition 4: min_id=39998, max_id=49959 > Partition 5: min_id=49960, max_id=59995 > Partition 6: min_id=59996, max_id=69898 > Partition 7: min_id=69899, max_id=79970 > Partition 8: min_id=79971, max_id=89976 > Partition 9: min_id=89977, max_id=99999 > ``` ### Steps to reproduce _No response_ ### Expected behavior _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org