Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2985161552 @Kontinuation thank you for bringing this up! Let me investigate. In the meantime I suspect we'll revert this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
Kontinuation commented on PR #1862:
URL:
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2985113515
This implementation of RangePartitioning may be incorrect. RangePartitioning
should partition the input DataFrame into partitions with consecutive and
non-overlapping ranges, this requires scanning the entire DataFrame to obtain
the ranges of each partition before performing the actual shuffle writing.
Here is the PySpark code to illustrate the difference between the behavior
of Comet and Vanilla Spark.
```python
spark.range(0,
10).write.format("parquet").mode("overwrite").save("range-partitioning")
df = spark.read.parquet("range-partitioning")
df_range_partitioned = df.repartitionByRange(10, "id")
df_range_partitioned.explain()
# Show the min and max of each range
def get_partition_bounds(idx, iterator):
min = None
max = None
for row in iterator:
if min is None or row.id < min:
min = row.id
if max is None or row.id > max:
max = row.id
yield idx, min, max
partition_bounds =
df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect()
# Print the results
for partition_id, min_id, max_id in sorted(partition_bounds):
print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")
```
**Comet**:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10),
REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173]
+- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format:
CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
Partition 0: min_id=0, max_id=90799
Partition 1: min_id=753, max_id=91680
Partition 2: min_id=1527, max_id=92520
Partition 3: min_id=2399, max_id=93284
Partition 4: min_id=3274, max_id=94123
Partition 5: min_id=4053, max_id=94844
Partition 6: min_id=4851, max_id=95671
Partition 7: min_id=5738, max_id=96522
Partition 8: min_id=6571, max_id=97335
Partition 9: min_id=7408, max_id=9
```
**Spark**:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10),
REPARTITION_BY_NUM, [plan_id=197]
+- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
Partition 0: min_id=0, max_id=9974
Partition 1: min_id=9975, max_id=19981
Partition 2: min_id=19982, max_id=29993
Partition 3: min_id=29994, max_id=39997
Partition 4: min_id=39998, max_id=49959
Partition 5: min_id=49960, max_id=59995
Partition 6: min_id=59996, max_id=69898
Partition 7: min_id=69899, max_id=79970
Partition 8: min_id=79971, max_id=89976
Partition 9: min_id=89977, max_id=9
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich merged PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146947020
##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
Review Comment:
That's basically every unit test already (including the updated native
shuffle suite and fuzz test).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
parthchandra commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146243533
##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
Review Comment:
That's what I thought. Is there a way to add a unit test with both enabled?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146241932
##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
Review Comment:
The default is both enabled. They individually control whether hash or range
partitioning falls back, respectively.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
parthchandra commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146236444
##
spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala:
##
@@ -120,29 +120,51 @@ class CometNativeShuffleSuite extends CometTestBase with
AdaptiveSparkPlanHelper
}
}
- test("native operator after native shuffle") {
-withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl")
-
- val shuffled1 = df
-.repartition(10, $"_2")
-.select($"_1", $"_1" + 1, $"_2" + 2)
-.repartition(10, $"_1")
-.filter($"_1" > 1)
-
- // 2 Comet shuffle exchanges are expected
- checkShuffleAnswer(shuffled1, 2)
-
- val shuffled2 = df
-.repartitionByRange(10, $"_2")
-.select($"_1", $"_1" + 1, $"_2" + 2)
-.repartition(10, $"_1")
-.filter($"_1" > 1)
-
- // Because the first exchange from the bottom is range exchange which
native shuffle
- // doesn't support. So Comet exec operators stop before the first
exchange and thus
- // there is no Comet exchange.
- checkShuffleAnswer(shuffled2, 0)
+ test("native operator after native shuffle with hash partitioning") {
+Seq("true", "false").foreach { hashPartitioningEnabled =>
+ withSQLConf(
+CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.key ->
hashPartitioningEnabled) {
Review Comment:
We could probably merge these two tests ?
```
Seq("true", "false").foreach { partitioningEnabled =>
Seq(CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED,
CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED) {
partitioningType =>
withSQLConf(
partittioningType.key -> partitioningEnabled)
```
##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
Review Comment:
Can a user have both configs enabled? What happens?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146006687
##
dev/diffs/3.4.3.diff:
##
@@ -2404,7 +2411,31 @@ index 266bb343526..c3e3d155813 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
-@@ -1026,15 +1057,23 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+@@ -895,6 +928,7 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+
+ test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions")
{
+ withSQLConf(
++ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key ->
"false",
Review Comment:
Could you add comments where this is disabled, explaining why we must do
this? I know this is needed, but I will likely forget why.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2970529921 Looking at the 3 Spark SQL test failures (all related to bucket scan) now that there are fewer 3.5.x diffs to update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2964065141 Last thing I am waiting on is to do a new set of Spark diffs to turn off native RangePartitioning in the 3 bucketing-related tests. Because of the different random number generator (for sampling) we get different results. I'll do that after #1870 and #1873 merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2139997704
##
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##
@@ -2904,6 +2903,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
supported
case SinglePartition =>
inputs.forall(attr => supportedShuffleDataType(attr.dataType))
+ case RangePartitioning(_, _) =>
+true
Review Comment:
I had intended to write fallback rules there, but haven't found a scenario
to add to that yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138821947
##
native/core/src/execution/shuffle/range_partitioner.rs:
##
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, UInt64Array};
+use arrow::compute::{take_arrays, TakeOptions};
+use arrow::row::{Row, RowConverter, Rows, SortField};
+use datafusion::physical_expr::LexOrdering;
+use rand::{rngs::SmallRng, Rng, SeedableRng};
+
+pub struct RangePartitioner;
+
+impl RangePartitioner {
+/// Given a number of rows, sample size, and a random seed, generates
unique indices to take
+/// from an input batch to act as a random sample.
+/// Adapted from
https://en.wikipedia.org/wiki/Reservoir_sampling#Optimal:_Algorithm_L
+/// We use sample_size instead of k and num_rows instead of n.
+/// We use indices instead of actual values in the reservoir since we'll
do one take() on the
+/// input arrays at the end.
+pub fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed:
u64) -> Vec {
+assert!(sample_size > 0);
+assert!(
+num_rows > sample_size,
+"Sample size > num_rows yields original batch."
+);
+
+// Initialize our reservoir with indices of the first |sample_size|
elements.
+let mut reservoir: Vec = (0..sample_size as u64).collect();
+
+let mut rng = SmallRng::seed_from_u64(seed);
+let mut w = (rng.random::().ln() / sample_size as f64).exp();
+let mut i = sample_size - 1;
+
+while i < num_rows {
+i += (rng.random::().ln() / (1.0 - w).ln()).floor() as usize
+ 1;
+
+if i < num_rows {
+// Replace a random item in the reservoir with i
+let random_index = rng.random_range(0..sample_size);
+reservoir[random_index] = i as u64;
+w *= (rng.random::().ln() / sample_size as f64).exp();
+}
+}
+
+reservoir
+}
+
+/// Given a batch of Rows, an ordered vector of Rows that represent
partition boundaries, and
+/// a slice with enough space for the input batch, determines a partition
id for every input
+/// Row using binary search.
+pub fn partition_indices_for_batch(
+row_batch: &Rows,
+partition_bounds_vec: &Vec,
+partition_ids: &mut [u32],
+) {
+row_batch.iter().enumerate().for_each(|(row_idx, row)| {
+partition_ids[row_idx] =
+partition_bounds_vec.partition_point(|bound| *bound <= row) as
u32
+});
+}
+
+/// Given input arrays and range partitioning metadata: samples the input
arrays, generates
+/// partition bounds, and returns Rows (for comparison against) and a
RowConverter (for
+/// adapting future incoming batches).
+pub fn generate_bounds(
+partition_arrays: &Vec,
+lex_ordering: &LexOrdering,
+num_output_partitions: usize,
+num_rows: usize,
+sample_size: usize,
+seed: u64,
+) -> (Rows, RowConverter) {
+let sampled_columns = if sample_size < num_rows {
+// Construct our sample indices.
+let sample_indices =
UInt64Array::from(RangePartitioner::reservoir_sample_indices(
+num_rows,
+sample_size,
+seed,
+));
+
+// Extract our sampled data from the input data.
+take_arrays(
+partition_arrays,
+&sample_indices,
+Some(TakeOptions {
+check_bounds: false,
+}),
+)
+.unwrap()
+} else {
+// Requested sample_size is larger than the batch, so just use the
batch.
+partition_arrays.clone()
+};
+
+// Generate our bounds indices.
+let sort_fields: Vec = partition_arrays
+.iter()
+.zip(lex_ordering)
+.map(|(array, sort_expr)| {
+SortField::new_with_options(array.data_type().clone(),
sort_expr.options)
+})
+.collect();
+
+let (bo
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138822721
##
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##
@@ -2904,6 +2903,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
supported
case SinglePartition =>
inputs.forall(attr => supportedShuffleDataType(attr.dataType))
+ case RangePartitioning(_, _) =>
+true
Review Comment:
This looks like a placeholder for additional checks?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138821450
##
native/core/src/execution/shuffle/range_partitioner.rs:
##
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, UInt64Array};
+use arrow::compute::{take_arrays, TakeOptions};
+use arrow::row::{Row, RowConverter, Rows, SortField};
+use datafusion::physical_expr::LexOrdering;
+use rand::{rngs::SmallRng, Rng, SeedableRng};
+
+pub struct RangePartitioner;
+
+impl RangePartitioner {
+/// Given a number of rows, sample size, and a random seed, generates
unique indices to take
+/// from an input batch to act as a random sample.
+/// Adapted from
https://en.wikipedia.org/wiki/Reservoir_sampling#Optimal:_Algorithm_L
+/// We use sample_size instead of k and num_rows instead of n.
+/// We use indices instead of actual values in the reservoir since we'll
do one take() on the
+/// input arrays at the end.
+pub fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed:
u64) -> Vec {
+assert!(sample_size > 0);
+assert!(
+num_rows > sample_size,
+"Sample size > num_rows yields original batch."
+);
+
+// Initialize our reservoir with indices of the first |sample_size|
elements.
+let mut reservoir: Vec = (0..sample_size as u64).collect();
+
+let mut rng = SmallRng::seed_from_u64(seed);
+let mut w = (rng.random::().ln() / sample_size as f64).exp();
+let mut i = sample_size - 1;
+
+while i < num_rows {
+i += (rng.random::().ln() / (1.0 - w).ln()).floor() as usize
+ 1;
+
+if i < num_rows {
+// Replace a random item in the reservoir with i
+let random_index = rng.random_range(0..sample_size);
+reservoir[random_index] = i as u64;
+w *= (rng.random::().ln() / sample_size as f64).exp();
+}
+}
+
+reservoir
+}
+
+/// Given a batch of Rows, an ordered vector of Rows that represent
partition boundaries, and
+/// a slice with enough space for the input batch, determines a partition
id for every input
+/// Row using binary search.
+pub fn partition_indices_for_batch(
+row_batch: &Rows,
+partition_bounds_vec: &Vec,
+partition_ids: &mut [u32],
+) {
+row_batch.iter().enumerate().for_each(|(row_idx, row)| {
+partition_ids[row_idx] =
+partition_bounds_vec.partition_point(|bound| *bound <= row) as
u32
+});
+}
+
+/// Given input arrays and range partitioning metadata: samples the input
arrays, generates
+/// partition bounds, and returns Rows (for comparison against) and a
RowConverter (for
+/// adapting future incoming batches).
+pub fn generate_bounds(
+partition_arrays: &Vec,
+lex_ordering: &LexOrdering,
+num_output_partitions: usize,
+num_rows: usize,
+sample_size: usize,
+seed: u64,
+) -> (Rows, RowConverter) {
Review Comment:
We should return a `Result` here and replace the unwraps with `?` in this
function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138343547
##
native/core/benches/shuffle_writer.rs:
##
@@ -66,10 +67,40 @@ fn criterion_benchmark(c: &mut Criterion) {
CompressionCodec::Zstd(6),
] {
group.bench_function(
-format!("shuffle_writer: end to end (compression =
{compression_codec:?}"),
+format!("shuffle_writer: end to end (compression =
{compression_codec:?})"),
+|b| {
+let ctx = SessionContext::new();
+let exec = create_shuffle_writer_exec(
+compression_codec.clone(),
+CometPartitioning::Hash(vec![Arc::new(Column::new("a",
0))], 16),
+);
+b.iter(|| {
+let task_ctx = ctx.task_ctx();
+let stream = exec.execute(0, task_ctx).unwrap();
+let rt = Runtime::new().unwrap();
+rt.block_on(collect(stream)).unwrap();
+});
+},
+);
+}
+
+for partitioning in [
+CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
+CometPartitioning::RangePartitioning(
Review Comment:
Added RangePartitioning benchmark here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
andygrove commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2957023096 I ran fresh benchmarks, but I do not see any change in performance. Perhaps the range partitioning shuffles are not a significant cost in these benchmarks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2135992681
##
native/core/benches/shuffle_writer.rs:
##
@@ -42,20 +45,18 @@ fn criterion_benchmark(c: &mut Criterion) {
CompressionCodec::Zstd(1),
CompressionCodec::Zstd(6),
] {
-for enable_fast_encoding in [true, false] {
Review Comment:
Remove this `enable_fast_encoding` loop since we don't have that flag
anymore after #1703.
##
native/core/benches/shuffle_writer.rs:
##
@@ -42,20 +45,18 @@ fn criterion_benchmark(c: &mut Criterion) {
CompressionCodec::Zstd(1),
CompressionCodec::Zstd(6),
] {
-for enable_fast_encoding in [true, false] {
Review Comment:
Removed this `enable_fast_encoding` loop since we don't have that flag
anymore after #1703.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
andygrove commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2954116663 It would be interesting to use our new tracing feature to compare on-heap vs off-heap memory usage with range partitioning supported natively versus falling back to Spark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
mbutrovich commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2952887778 > I ran TPC-H benchmarks and saw shuffles with range partitioning run natively. I did not see any difference in performance compared to the last set of benchmarks I ran some time ago, but I have not compared to the main branch yet. Thanks Andy. I'm doing some pretty inefficient stuff to get around ownership issues of `Rows`, `Vec`, `Vec`, etc. that I aim to improve, so performance should improve. The microbenchmark for shuffle_writer shows range partitioning taking almost twice as long as hash partitioning on my laptop at the moment, but I aim to fix that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
andygrove commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2952814148 I ran TPC-H benchmarks and saw shuffles with range partitioning run natively. I did not see any difference in performance compared to the last set of benchmarks I ran some time ago, but I have not compared to the main branch yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]
codecov-commenter commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2952754584 ## [Codecov](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `64.70588%` with `6 lines` in your changes missing coverage. Please review. > Project coverage is 32.15%. Comparing base [(`f09f8af`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/f09f8af64c6599255e116a376f4f008f2fd63b43?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`45cafd2`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/45cafd20dbca6cdff0da8e666112edd9e85b64c1?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 247 commits behind head on main. | [Files with missing lines](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [.../scala/org/apache/comet/serde/QueryPlanSerde.scala](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&filepath=spark%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fcomet%2Fserde%2FQueryPlanSerde.scala&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9jb21ldC9zZXJkZS9RdWVyeVBsYW5TZXJkZS5zY2FsYQ==) | 0.00% | [1 Missing and 2 partials :warning: ](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...t/execution/shuffle/CometNativeShuffleWriter.scala](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&filepath=spark%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fsql%2Fcomet%2Fexecution%2Fshuffle%2FCometNativeShuffleWriter.scala&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvY29tZXQvZXhlY3V0aW9uL3NodWZmbGUvQ29tZXROYXRpdmVTaHVmZmxlV3JpdGVyLnNjYWxh) | 78.57% | [2 Missing and 1 partial :warning: ](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## main#1862 +/- ## = - Coverage 56.12% 32.15% -23.98% + Complexity 976 653 -323 = Files 119 130 +11 Lines 1174312674 +931 Branches 2251 2362 +111 = - Hits 6591 4075 -2516 - Misses 4012 7804 +3792 + Partials 1140 795 -345 ``` [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :rocket: New features to boost your workflow: - :snowflake: [Test Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, report on failures, and find test suite problems. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
