0xPoe commented on code in PR #16956:
URL: https://github.com/apache/datafusion/pull/16956#discussion_r2485023283
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -892,21 +892,56 @@ impl ExecutionPlan for HashJoinExec {
}
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema()));
+ match (partition, self.mode) {
+ // For CollectLeft mode, the left side is collected into a single
partition,
+ // so all left partitions are available to each output partition.
+ // For the right side, we need the specific partition statistics.
+ (Some(partition), PartitionMode::CollectLeft) => {
+ let left_stats = self.left.partition_statistics(None)?;
+ let right_stats =
self.right.partition_statistics(Some(partition))?;
+
+ let stats = estimate_join_statistics(
+ left_stats,
+ right_stats,
+ self.on.clone(),
+ &self.join_type,
+ &self.join_schema,
+ )?;
+ Ok(stats.project(self.projection.as_ref()))
+ }
+
+ // For Partitioned mode, both sides are partitioned, so each
output partition
+ // only has access to the corresponding partition from both sides.
+ (Some(partition), PartitionMode::Partitioned) => {
+ let left_stats =
self.left.partition_statistics(Some(partition))?;
+ let right_stats =
self.right.partition_statistics(Some(partition))?;
+
+ let stats = estimate_join_statistics(
+ left_stats,
+ right_stats,
+ self.on.clone(),
+ &self.join_type,
+ &self.join_schema,
+ )?;
+ Ok(stats.project(self.projection.as_ref()))
+ }
+
+ // For Auto mode or when no specific partition is requested, fall
back to
Review Comment:
@xudong963 Do you have time to look at the question I asked above? Thank
you. Also, could you please help me reopen the PR?
For this question, I've thought about it a little more.
> however, the method may be called first,
I searched the code base, no optimizer rules currently call
partition_statistics(Some(partition))) before JoinSelection.
I’m also wondering — based on this, should we assert that the auto mode will
never happen in this code path?
Or do we still want to determine the PartitionMode here? If so, that would
mean we need to store collect_threshold_byte_size and
collect_threshold_num_rows in HashJoinExec. I’m not sure if it’s a good design
to expose these optimizer-related configs to the executor layer.
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -892,21 +892,56 @@ impl ExecutionPlan for HashJoinExec {
}
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema()));
+ match (partition, self.mode) {
+ // For CollectLeft mode, the left side is collected into a single
partition,
+ // so all left partitions are available to each output partition.
+ // For the right side, we need the specific partition statistics.
+ (Some(partition), PartitionMode::CollectLeft) => {
+ let left_stats = self.left.partition_statistics(None)?;
+ let right_stats =
self.right.partition_statistics(Some(partition))?;
+
+ let stats = estimate_join_statistics(
+ left_stats,
+ right_stats,
+ self.on.clone(),
+ &self.join_type,
+ &self.join_schema,
+ )?;
+ Ok(stats.project(self.projection.as_ref()))
+ }
+
+ // For Partitioned mode, both sides are partitioned, so each
output partition
+ // only has access to the corresponding partition from both sides.
+ (Some(partition), PartitionMode::Partitioned) => {
+ let left_stats =
self.left.partition_statistics(Some(partition))?;
+ let right_stats =
self.right.partition_statistics(Some(partition))?;
+
+ let stats = estimate_join_statistics(
+ left_stats,
+ right_stats,
+ self.on.clone(),
+ &self.join_type,
+ &self.join_schema,
+ )?;
+ Ok(stats.project(self.projection.as_ref()))
+ }
+
+ // For Auto mode or when no specific partition is requested, fall
back to
Review Comment:
@xudong963 Do you have time to look at the question I asked above? Thank
you. Also, could you please help me reopen the PR?
For this question, I've thought about it a little more.
> however, the method may be called first,
I searched the code base, no optimizer rules currently call
partition_statistics(Some(partition))) before JoinSelection.
I’m also wondering — based on this, should we assert that the auto mode will
never happen in this code path?
Or do we still want to determine the PartitionMode here? If so, that would
mean we need to store collect_threshold_byte_size and
collect_threshold_num_rows in HashJoinExec. I’m not sure if it’s a good design
to expose these optimizer-related configs to the executor layer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]