mbutrovich opened a new issue, #1906:
URL: https://github.com/apache/datafusion-comet/issues/1906

   ### Describe the bug
   
   As noted by @Kontinuation, the RangePartitioning implementation that just 
got merged has a bug, most obvious when trying to RangePartition pre-sorted 
data:
   
   > This implementation of RangePartitioning may be incorrect. 
RangePartitioning should partition the input DataFrame into partitions with 
consecutive and non-overlapping ranges, this requires scanning the entire 
DataFrame to obtain the ranges of each partition before performing the actual 
shuffle writing.
   > 
   > Here is the PySpark code to illustrate the difference between the behavior 
of Comet and Vanilla Spark.
   > 
   > ```python
   > spark.range(0, 
100000).write.format("parquet").mode("overwrite").save("range-partitioning")
   > 
   > df = spark.read.parquet("range-partitioning")
   > df_range_partitioned = df.repartitionByRange(10, "id")
   > 
   > df_range_partitioned.explain()
   > 
   > # Show the min and max of each range
   > def get_partition_bounds(idx, iterator):
   >     min = None
   >     max = None
   >     for row in iterator:
   >         if min is None or row.id < min:
   >             min = row.id
   >         if max is None or row.id > max:
   >             max = row.id
   >     yield idx, min, max
   > 
   > partition_bounds = 
df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect()
   > 
   > # Print the results
   > for partition_id, min_id, max_id in sorted(partition_bounds):
   >     print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")
   > ```
   > 
   > **Comet**:
   > 
   > ```
   > == Physical Plan ==
   > AdaptiveSparkPlan isFinalPlan=false
   > +- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), 
REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173]
   >    +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: 
CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
   > 
   > Partition 0: min_id=0, max_id=90799
   > Partition 1: min_id=753, max_id=91680
   > Partition 2: min_id=1527, max_id=92520
   > Partition 3: min_id=2399, max_id=93284
   > Partition 4: min_id=3274, max_id=94123
   > Partition 5: min_id=4053, max_id=94844
   > Partition 6: min_id=4851, max_id=95671
   > Partition 7: min_id=5738, max_id=96522
   > Partition 8: min_id=6571, max_id=97335
   > Partition 9: min_id=7408, max_id=99999
   > ```
   > 
   > **Spark**:
   > 
   > ```
   > == Physical Plan ==
   > AdaptiveSparkPlan isFinalPlan=false
   > +- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), 
REPARTITION_BY_NUM, [plan_id=197]
   >    +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
   > 
   > Partition 0: min_id=0, max_id=9974
   > Partition 1: min_id=9975, max_id=19981
   > Partition 2: min_id=19982, max_id=29993
   > Partition 3: min_id=29994, max_id=39997
   > Partition 4: min_id=39998, max_id=49959
   > Partition 5: min_id=49960, max_id=59995
   > Partition 6: min_id=59996, max_id=69898
   > Partition 7: min_id=69899, max_id=79970
   > Partition 8: min_id=79971, max_id=89976
   > Partition 9: min_id=89977, max_id=99999
   > ```
   
   
   ### Steps to reproduce
   
   _No response_
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to