Re: [PR] feat: support merge for `Distribution` [datafusion]
berkaysynnada commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2007079688
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,138 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+///
+/// # Important Notes
+///
+/// - The resulting mean, median, and variance are approximations of the
mixture
+/// distribution parameters. They are calculated using weighted averages
based on
+/// the input distributions. Users should not make definitive
assumptions based on these values.
+///
+/// - The range of the merged distribution is computed as the union of the
input ranges
+/// and its accuracy directly depends on the accuracy of the input
ranges.
+///
+/// - The result is always a [`Generic`] distribution, which may lose some
specific
+/// properties of the original distribution types.
+///
+/// # Returns
+///
+/// Returns a new [`Distribution`] that approximates the combined
statistics of the
+/// input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate weights for the mixture distribution
+let (weight_a, weight_b) = match (range_a.cardinality(),
range_b.cardinality()) {
+(Some(ca), Some(cb)) => {
+let total = (ca + cb) as f64;
+((ca as f64) / total, (cb as f64) / total)
+}
+_ => (0.5, 0.5), // Equal weights if cardinalities not available
+};
+
+// Get the original statistics
+let mean_a = self.mean()?;
+let mean_b = other.mean()?;
+let median_a = self.median()?;
+let median_b = other.median()?;
+let var_a = self.variance()?;
+let var_b = other.variance()?;
+
+// Always use Float64 for intermediate calculations to avoid truncation
+// I assume that the target type is always numeric
+// Todo: maybe we can keep all `ScalarValue` as `Float64` in
`Distribution`?
+let calc_type = DataType::Float64;
Review Comment:
Why float, decimals have higher precisions? We've thought on that a lot, and
relaxing the datatype is not a good way during computations and representing
intermediate or final results. Rather than presuming a target type, we need to
rely on the data type of the original quantity and standard coercions of it.
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,138 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+///
+/// # Important Notes
+///
+/// - The resulting mean, median, and variance are approximations of the
mixture
+/// distribution parameters. They are calculated using weighted averages
based on
+/// the input distributions. Users should not make definitive
assumptions based on these values.
Review Comment:
I'm afraid we cannot allow such approximations. This whole "Distribution"
context is implemented to represent these uncertainties. If we also allow
uncertainties here, the things are becoming more complicated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2743831182
> Attribute `total_count` is derivable from `counts`, so we may not want to
store it for normalization/consistency reasons. Same goes for `range`, it can
constructed from `bins` in O(1) time.
Yes, we don't need to store them.
```rust
pub struct HistogramDistribution {
bins: Vec,
}
pub struct HistogramBin {
upper: ScalarValue,
count: u64
// Maybe other fileds, such as ndv
}
```
How do we plan to generate the `HistogramDistribution`?
Let's assume we can get the exact min/max from the parquet file:
https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/file_format.rs#L827,
should we generate the `HistogramDistribution` based on the min/max? Or do we
have alternative ways?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
ozankabak commented on PR #15296: URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2743175661 This API, as it currently stands, does not seem to make sense. It seems to make the assumption that outcomes (i.e. individual items in the range) of the `Distribution`s are equally likely, which is not necessarily the case. We can only merge two statistical objects in certain special circumstances. For example, if we have a statistical object that tracks sample averages along with counts, we can merge two instances of them. Our distributions are not merge-able quantities in this sense. They are *mixable* (with a given weight), but not *merge-able*. One of the follow-ups we previously discussed was adding a `HistogramDistribution` object that tracks bins and ranges. These objects will be merge-able. Therefore, we should start off by adding a `HistogramDistribution` object first. Then, we can add a `merge` API to that object. If you think we should have a `mix` API for the general `Distribution` object, we can add it too. Such an API will need to include a mixing weight in its signature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on PR #15296: URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2742833876 FYI @berkaysynnada @ozankabak -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on PR #15296: URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2743593315 > Do you know any use cases where this method would be especially useful? If so, maybe we can study one of those cases in more detail. That could help us understand the real need and guide us toward a more solid algorithm. Yes, we're considering restarting the [work](https://github.com/apache/datafusion/pull/13296/files#diff-8d786f45bc2d5bf629754a119ed6fa7998dcff7faacd954c45945b7047b87fa1R498), and given that `Precision` will be replaced with `Distribution`, so I opened the proposal to discuss how to do **merge** for `Distribution`. > a) Mixture Model (Weighted Average of PDFs) > This is a method for combining different probability distributions. > p1(x) and p2(x) is some PDF's, and we give them equal weight (0.5). The combined PDF would be: > _pmix(x) = 0.5 * p1(x) + 0.5 * p2(x)_ > This creates a probabilistic blend of the two. The result is still a valid PDF (non-negative and integrates to 1). It seems that my current way is close to the mixture model. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on PR #15296: URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2745345742 Thanks for your suggestions!! @alamb @ozankabak @berkaysynnada and @kosiew I'll continue to do such work after the `Migrate to Distribution from Precision` work is done. I think after it's done, the new statistics framework will be relatively stable and we can see how the `Distribution` is integrated into datafusion core. It'll definitely be helpful for me to do some work around the new statistics framework. Again, I sincerely appreciate that you take time to review and discuss ❤️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 closed pull request #15296: feat: support merge for `Distribution` URL: https://github.com/apache/datafusion/pull/15296 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
ozankabak commented on PR #15296: URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2744132824 The most likely way we will end up with `HistogramDistribution`s will be via sampling. We can also leverage statistics in file metadata if a file format stores this information. AFAICT Parquet doesn't store histogram information. If your use case is specific to Parquet files and you can't do sampling, what we can do is to add an optional `num_samples` field to `GenericDistribution`. This way, you can `merge` two `GenericDistrbution` objects **if** both have a value for `num_samples`. In such a scenario, you can update `mean`, `variance` and `range` fields with [appropriate formulas](https://math.stackexchange.com/questions/2971315/how-do-i-combine-standard-deviations-of-two-groups) and add the `num_samples` fields. The `median` value will always be set to `None`, that is not merge-able. In an expression tree, any `num_samples` information of children `GenericDistribution` will combine multiplicatively (due to the independence assumption). When a `GenericDistribution` combines with another `Distribution`, the information will be lost and set to `None` for the resulting `GenericDistribution`. For example, the resulting `GenericDistribution` for `2 * x` will preserve `num_samples` (w.r.t. that of `x`), but the same for `x + y` will be `num_samples_x * num_samples_y`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
ozankabak commented on PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2743724228
> I confused the merge and mix, after reviewing the information, "Merge"
suggests combining datasets that maintain their original properties, but what's
implemented is actually close to a weighted mixture of probability
distributions. Do I understand correctly?
Right -- `merge` coalesces partial information about a single quantity,
while `mix` models a probabilistic selection between two quantities. Your use
case seems to fall in the first category. Use cases for mixture arises when
modeling things like filters that depend on composite expressions involving
random functions etc.
> Yes, I agree. HistogramDistribution is merge-able. Does it look like this?
```rust
pub struct HistogramDistribution {
bins: Vec, // The bin boundaries
counts: Vec,// Frequency in each bin
total_count: u64,// Sum of all bin counts
range: Interval, // Overall range covered by the histogram
}
```
I haven't thought about it in detail but this seems reasonable. We'd
probably want an attribute specifying the maximum number of bins one can have,
because many operations (including `merge`) will have a tendency to increase
bins unless special care is taken to coalesce when necessary. Attribute
`total_count` is derivable from `counts`, so we may not want to store it for
normalization/consistency reasons. Same goes for `range`, it can constructed
from `bins` in O(1) time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2743665328
> We can only merge two statistical objects in certain special
circumstances. For example, if we have a statistical object that tracks sample
averages along with counts, we can merge two instances of them. Our
distributions are not merge-able quantities in this sense. They are _mixable_
(with a given weight), but not _merge-able_.
I confused the `merge` and `mix`, after reviewing the information, "Merge"
suggests combining datasets that maintain their original properties, but what's
implemented is actually close to a weighted mixture of probability
distributions. Do I understand correctly?
> One of the follow-ups we previously discussed was adding a
`HistogramDistribution` object that tracks bins and ranges. These objects will
be merge-able. Therefore, we should start off by adding a
`HistogramDistribution` object first. Then, we can add a `merge` API to that
object.
Yes, I agree. `HistogramDistribution` is merge-able. Does it look like this?
```rust
pub struct HistogramDistribution {
bins: Vec, // The bin boundaries
counts: Vec,// Frequency in each bin
total_count: u64,// Sum of all bin counts
range: Interval, // Overall range covered by the histogram
}
```
> If you think we should have a `mix` API for the general `Distribution`
object, we can add it too. Such an API will need to include a mixing weight in
its signature.
This is my use case:
https://github.com/apache/datafusion/pull/13296/files#diff-8d786f45bc2d5bf629754a119ed6fa7998dcff7faacd954c45945b7047b87fa1R498,
merge the file statistics in the whole file group. I'm still thinking if `mix`
API can satisfy my requirement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2007748334
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,138 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+///
+/// # Important Notes
+///
+/// - The resulting mean, median, and variance are approximations of the
mixture
+/// distribution parameters. They are calculated using weighted averages
based on
+/// the input distributions. Users should not make definitive
assumptions based on these values.
Review Comment:
I agree, this will result in larger uncertainties.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
berkaysynnada commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2007079688
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,138 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+///
+/// # Important Notes
+///
+/// - The resulting mean, median, and variance are approximations of the
mixture
+/// distribution parameters. They are calculated using weighted averages
based on
+/// the input distributions. Users should not make definitive
assumptions based on these values.
+///
+/// - The range of the merged distribution is computed as the union of the
input ranges
+/// and its accuracy directly depends on the accuracy of the input
ranges.
+///
+/// - The result is always a [`Generic`] distribution, which may lose some
specific
+/// properties of the original distribution types.
+///
+/// # Returns
+///
+/// Returns a new [`Distribution`] that approximates the combined
statistics of the
+/// input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate weights for the mixture distribution
+let (weight_a, weight_b) = match (range_a.cardinality(),
range_b.cardinality()) {
+(Some(ca), Some(cb)) => {
+let total = (ca + cb) as f64;
+((ca as f64) / total, (cb as f64) / total)
+}
+_ => (0.5, 0.5), // Equal weights if cardinalities not available
+};
+
+// Get the original statistics
+let mean_a = self.mean()?;
+let mean_b = other.mean()?;
+let median_a = self.median()?;
+let median_b = other.median()?;
+let var_a = self.variance()?;
+let var_b = other.variance()?;
+
+// Always use Float64 for intermediate calculations to avoid truncation
+// I assume that the target type is always numeric
+// Todo: maybe we can keep all `ScalarValue` as `Float64` in
`Distribution`?
+let calc_type = DataType::Float64;
Review Comment:
Why float? decimals have higher precisions? We've thought on that a lot, and
relaxing the datatype is not a good way during computations and representing
intermediate or final results. Rather than presuming a target type, we need to
rely on the data type of the original quantity and standard coercions of it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2004911809
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
+let range_a = a.range()?;
+let range_b = b.range()?;
+
+// Determine data type and create combined range
+let combined_range = if range_a.is_unbounded() || range_b.is_unbounded() {
+Interval::make_unbounded(&range_a.data_type())?
+} else {
+// Take the widest possible range conservatively
+let lower_a = range_a.lower();
+let lower_b = range_b.lower();
+let upper_a = range_a.upper();
+let upper_b = range_b.upper();
+
+let combined_lower = if lower_a.lt(lower_b) {
+lower_a.clone()
+} else {
+lower_b.clone()
+};
+
+let combined_upper = if upper_a.gt(upper_b) {
+upper_a.clone()
+} else {
+upper_b.clone()
+};
+
+Interval::try_new(combined_lower, combined_upper)?
+};
+
+// Calculate weights for the mixture distribution
Review Comment:
And also ping @kosiew , do you have any thoughts for the new way to compute
the weight?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002959262
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,121 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate weights for the mixture distribution
+let (weight_a, weight_b) = match (range_a.cardinality(),
range_b.cardinality()) {
+(Some(ca), Some(cb)) => {
+let total = (ca + cb) as f64;
+((ca as f64) / total, (cb as f64) / total)
+}
+_ => (0.5, 0.5), // Equal weights if cardinalities not available
+};
+
+// Get the original statistics
+let mean_a = self.mean()?;
+let mean_b = other.mean()?;
+let median_a = self.median()?;
+let median_b = other.median()?;
+let var_a = self.variance()?;
+let var_b = other.variance()?;
+
+// Always use Float64 for intermediate calculations to avoid truncation
+// I assume that the target type is always numeric
+// Todo: maybe we can keep all `ScalarValue` as `Float64` in
`Distribution`?
+let calc_type = DataType::Float64;
+
+// Create weight scalars using Float64 to avoid truncation
+let weight_a_scalar = ScalarValue::from(weight_a);
+let weight_b_scalar = ScalarValue::from(weight_b);
+
+// Calculate combined mean
+let combined_mean = if mean_a.is_null() || mean_b.is_null() {
+if mean_a.is_null() {
+mean_b.clone()
+} else {
+mean_a.clone()
+}
+} else {
+// Cast to Float64 for calculation
+let mean_a_f64 = mean_a.cast_to(&calc_type)?;
+let mean_b_f64 = mean_b.cast_to(&calc_type)?;
+
+// Calculate weighted mean
+mean_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&mean_b_f64.mul_checked(&weight_b_scalar)?)?
+};
+
+// Calculate combined median
+let combined_median = if median_a.is_null() || median_b.is_null() {
+if median_a.is_null() {
+median_b
+} else {
+median_a
+}
+} else {
+// Cast to Float64 for calculation
+let median_a_f64 = median_a.cast_to(&calc_type)?;
+let median_b_f64 = median_b.cast_to(&calc_type)?;
+
+// Calculate weighted median
+median_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&median_b_f64.mul_checked(&weight_b_scalar)?)?
Review Comment:
I add some notes for the method:
https://github.com/apache/datafusion/pull/15296/commits/5b98f4c5efe5404a6ae4c8609a357491e9ab
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
Review Comment:
I added some notes for the method:
https://github.com/apache/datafusion/pull/15296/commits/5b98f4c5efe5404a6ae4c8609a357491e9ab
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
kosiew commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002828639
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,121 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate weights for the mixture distribution
+let (weight_a, weight_b) = match (range_a.cardinality(),
range_b.cardinality()) {
+(Some(ca), Some(cb)) => {
+let total = (ca + cb) as f64;
+((ca as f64) / total, (cb as f64) / total)
+}
+_ => (0.5, 0.5), // Equal weights if cardinalities not available
+};
+
+// Get the original statistics
+let mean_a = self.mean()?;
+let mean_b = other.mean()?;
+let median_a = self.median()?;
+let median_b = other.median()?;
+let var_a = self.variance()?;
+let var_b = other.variance()?;
+
+// Always use Float64 for intermediate calculations to avoid truncation
+// I assume that the target type is always numeric
+// Todo: maybe we can keep all `ScalarValue` as `Float64` in
`Distribution`?
+let calc_type = DataType::Float64;
+
+// Create weight scalars using Float64 to avoid truncation
+let weight_a_scalar = ScalarValue::from(weight_a);
+let weight_b_scalar = ScalarValue::from(weight_b);
+
+// Calculate combined mean
+let combined_mean = if mean_a.is_null() || mean_b.is_null() {
+if mean_a.is_null() {
+mean_b.clone()
+} else {
+mean_a.clone()
+}
+} else {
+// Cast to Float64 for calculation
+let mean_a_f64 = mean_a.cast_to(&calc_type)?;
+let mean_b_f64 = mean_b.cast_to(&calc_type)?;
+
+// Calculate weighted mean
+mean_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&mean_b_f64.mul_checked(&weight_b_scalar)?)?
+};
+
+// Calculate combined median
+let combined_median = if median_a.is_null() || median_b.is_null() {
+if median_a.is_null() {
+median_b
+} else {
+median_a
+}
+} else {
+// Cast to Float64 for calculation
+let median_a_f64 = median_a.cast_to(&calc_type)?;
+let median_b_f64 = median_b.cast_to(&calc_type)?;
+
+// Calculate weighted median
+median_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&median_b_f64.mul_checked(&weight_b_scalar)?)?
Review Comment:
Without access to the full data, there isn’t a universally “better” method
than the weighted average approach which you adopted here.
The key, as you mentioned, is to document these assumptions clearly so that
downstream users of the code understand that the computed median is an
approximation that may not capture the true central tendency if the underlying
distributions differ significantly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002526236
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,121 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate weights for the mixture distribution
+let (weight_a, weight_b) = match (range_a.cardinality(),
range_b.cardinality()) {
+(Some(ca), Some(cb)) => {
+let total = (ca + cb) as f64;
+((ca as f64) / total, (cb as f64) / total)
+}
+_ => (0.5, 0.5), // Equal weights if cardinalities not available
+};
+
+// Get the original statistics
+let mean_a = self.mean()?;
+let mean_b = other.mean()?;
+let median_a = self.median()?;
+let median_b = other.median()?;
+let var_a = self.variance()?;
+let var_b = other.variance()?;
+
+// Always use Float64 for intermediate calculations to avoid truncation
+// I assume that the target type is always numeric
+// Todo: maybe we can keep all `ScalarValue` as `Float64` in
`Distribution`?
+let calc_type = DataType::Float64;
+
+// Create weight scalars using Float64 to avoid truncation
+let weight_a_scalar = ScalarValue::from(weight_a);
+let weight_b_scalar = ScalarValue::from(weight_b);
+
+// Calculate combined mean
+let combined_mean = if mean_a.is_null() || mean_b.is_null() {
+if mean_a.is_null() {
+mean_b.clone()
+} else {
+mean_a.clone()
+}
+} else {
+// Cast to Float64 for calculation
+let mean_a_f64 = mean_a.cast_to(&calc_type)?;
+let mean_b_f64 = mean_b.cast_to(&calc_type)?;
+
+// Calculate weighted mean
+mean_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&mean_b_f64.mul_checked(&weight_b_scalar)?)?
+};
+
+// Calculate combined median
+let combined_median = if median_a.is_null() || median_b.is_null() {
+if median_a.is_null() {
+median_b
+} else {
+median_a
+}
+} else {
+// Cast to Float64 for calculation
+let median_a_f64 = median_a.cast_to(&calc_type)?;
+let median_b_f64 = median_b.cast_to(&calc_type)?;
+
+// Calculate weighted median
+median_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&median_b_f64.mul_checked(&weight_b_scalar)?)?
Review Comment:
Yes, I agree with you, thank you for your review. @kosiew
IMO, the computing of weight isn't accurate, so we won't guarantee the
`mean`, `median`, and `variance` are exact, we should let the downstream know
it by adding some comments for the `merge` method.
Also, do you have any thoughts on improving the way of computing the
combined weight to make the result accurate as much as possible?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002503571
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,121 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate weights for the mixture distribution
+let (weight_a, weight_b) = match (range_a.cardinality(),
range_b.cardinality()) {
+(Some(ca), Some(cb)) => {
+let total = (ca + cb) as f64;
+((ca as f64) / total, (cb as f64) / total)
Review Comment:
Yeah, the way of computing weight is discussed in the thread:
https://github.com/apache/datafusion/pull/15296#discussion_r2002041862. And I
propose a new way that considers the safeguard.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
kosiew commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002421418
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,121 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate weights for the mixture distribution
+let (weight_a, weight_b) = match (range_a.cardinality(),
range_b.cardinality()) {
+(Some(ca), Some(cb)) => {
+let total = (ca + cb) as f64;
+((ca as f64) / total, (cb as f64) / total)
+}
+_ => (0.5, 0.5), // Equal weights if cardinalities not available
+};
+
+// Get the original statistics
+let mean_a = self.mean()?;
+let mean_b = other.mean()?;
+let median_a = self.median()?;
+let median_b = other.median()?;
+let var_a = self.variance()?;
+let var_b = other.variance()?;
+
+// Always use Float64 for intermediate calculations to avoid truncation
+// I assume that the target type is always numeric
+// Todo: maybe we can keep all `ScalarValue` as `Float64` in
`Distribution`?
+let calc_type = DataType::Float64;
+
+// Create weight scalars using Float64 to avoid truncation
+let weight_a_scalar = ScalarValue::from(weight_a);
+let weight_b_scalar = ScalarValue::from(weight_b);
+
+// Calculate combined mean
+let combined_mean = if mean_a.is_null() || mean_b.is_null() {
+if mean_a.is_null() {
+mean_b.clone()
+} else {
+mean_a.clone()
+}
+} else {
+// Cast to Float64 for calculation
+let mean_a_f64 = mean_a.cast_to(&calc_type)?;
+let mean_b_f64 = mean_b.cast_to(&calc_type)?;
+
+// Calculate weighted mean
+mean_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&mean_b_f64.mul_checked(&weight_b_scalar)?)?
+};
+
+// Calculate combined median
+let combined_median = if median_a.is_null() || median_b.is_null() {
+if median_a.is_null() {
+median_b
+} else {
+median_a
+}
+} else {
+// Cast to Float64 for calculation
+let median_a_f64 = median_a.cast_to(&calc_type)?;
+let median_b_f64 = median_b.cast_to(&calc_type)?;
+
+// Calculate weighted median
+median_a_f64
+.mul_checked(&weight_a_scalar)?
+.add_checked(&median_b_f64.mul_checked(&weight_b_scalar)?)?
Review Comment:
Medians are not linear statistics. This currently calculates the combined
median as a weighted average, which might not represent the true median of the
combined distribution. Consider adding a comment discussing this approximation
and any potential impact on downstream results.
For instance, consider two distributions where one is symmetric and the
other is highly skewed. The weighted average of their medians may not represent
the true central tendency of the merged distribution because the skewness can
cause the overall median to shift in a non-linear fashion.
Potential Impact on Downstream Results:
- Accuracy: Downstream processes that rely on the combined median might be
misled by this approximation, especially in cases where the data's distribution
shapes differ significantly.
- Interpretability: Users expecting an exact median might misinterpret the
results, leading to potential errors in statistical analysis or decision-making.
- Statistical Validity: For critical applications, the inaccuracy of a
weighted median approximation might necessitate alternative methods, such as
reconstructing the combined distribution's CDF and computing the median
directly from it.
##
datafusion/expr-common/src/statistics.rs:
##
@@ -203,6 +203,121 @@ impl Distribution {
};
Ok(dt)
}
+
+/// Merges two distributions into a single distribution that represents
their combined statistics.
+/// This creates a more general distribution that approximates the mixture
of the input distributions.
+pub fn merge(&self, other: &Self) -> Result {
+let range_a = self.range()?;
+let range_b = other.range()?;
+
+// Determine data type and create combined range
+let combined_range = range_a.union(&range_b)?;
+
+// Calculate
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on PR #15296: URL: https://github.com/apache/datafusion/pull/15296#issuecomment-2735210417 > I think eventually it would be nice to add some tests for this code Yes, as the ticket description said: I'll do it after we are consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002299377
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
+let range_a = a.range()?;
+let range_b = b.range()?;
+
+// Determine data type and create combined range
+let combined_range = if range_a.is_unbounded() || range_b.is_unbounded() {
Review Comment:
Great, one concern is that I found the `Interval::union` works with
intervals of the same data type.
It seems that we can loose the requirement, such as, `Int64` with `Int32`,
`int` with `float`, etc also can be unioned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002335941
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
+let range_a = a.range()?;
+let range_b = b.range()?;
+
+// Determine data type and create combined range
+let combined_range = if range_a.is_unbounded() || range_b.is_unbounded() {
+Interval::make_unbounded(&range_a.data_type())?
+} else {
+// Take the widest possible range conservatively
+let lower_a = range_a.lower();
+let lower_b = range_b.lower();
+let upper_a = range_a.upper();
+let upper_b = range_b.upper();
+
+let combined_lower = if lower_a.lt(lower_b) {
+lower_a.clone()
+} else {
+lower_b.clone()
+};
+
+let combined_upper = if upper_a.gt(upper_b) {
+upper_a.clone()
+} else {
+upper_b.clone()
+};
+
+Interval::try_new(combined_lower, combined_upper)?
+};
+
+// Calculate weights for the mixture distribution
Review Comment:
Your point is correct.
IMO, the best way to compute the weight is based on the count of each
interval, but the count of each interval is unknown.
After thinking, I have a new idea, maybe we can use the variance to
approximate the weight. That means, **lower variance generally indicates more
samples**:
```rust
let (weight_a, weight_b) = {
// Lower variance generally indicates more samples
let var_a = self.variance()?.cast_to(&DataType::Float64)?;
let var_b = other.variance()?.cast_to(&DataType::Float64)?;
match (var_a, var_b) {
(ScalarValue::Float64(Some(va)), ScalarValue::Float64(Some(vb))) => {
// Weighting inversely by variance (with safeguards against
division by zero)
let va_safe = va.max(f64::EPSILON);
let vb_safe = vb.max(f64::EPSILON);
let wa = 1.0 / va_safe;
let wb = 1.0 / vb_safe;
let total = wa + wb;
(wa / total, wb / total)
}
_ => (0.5, 0.5) // Fall back to equal weights
}
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296: URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002309255 ## datafusion/expr-common/src/statistics.rs: ## @@ -857,6 +857,143 @@ pub fn compute_variance( ScalarValue::try_from(target_type) } +/// Merges two distributions into a single distribution that represents their combined statistics. +/// This creates a more general distribution that approximates the mixture of the input distributions. Review Comment: Maybe we can add some notes to `GenericDistribution` ``` /// # Range Guarantees /// The provided range is assumed to be conservative - all possible values of the /// distribution must lie within this range. However, the range itself might be /// an approximation, as the actual distribution could occupy only a subset of the range. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002299377
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
+let range_a = a.range()?;
+let range_b = b.range()?;
+
+// Determine data type and create combined range
+let combined_range = if range_a.is_unbounded() || range_b.is_unbounded() {
Review Comment:
Great, I found the `Interval::union` works with intervals of the same data
type.
I seems that we can loose the requirement, such as, `Int64` with `Int32`,
`int` with `float`, etc also can be unioned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
xudong963 commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002296888
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
Review Comment:
Yes, that's also what I wanted to do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: support merge for `Distribution` [datafusion]
alamb commented on code in PR #15296:
URL: https://github.com/apache/datafusion/pull/15296#discussion_r2002037023
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
+let range_a = a.range()?;
+let range_b = b.range()?;
+
+// Determine data type and create combined range
+let combined_range = if range_a.is_unbounded() || range_b.is_unbounded() {
Review Comment:
I think we could use `Interval::union` here:
https://docs.rs/datafusion/latest/datafusion/logical_expr/interval_arithmetic/struct.Interval.html#method.union
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
Review Comment:
I wonder if this should be a method on `Distribution` rather than a free
function 🤔
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
Review Comment:
I think it would help to explain in comments what assumptions can be made
from the combined distribution
For example, is it guaranteed that the `range` is conservative (as in it is
known that there are no values that lay outside the range?)
Though now that I ask it seems like maybe we need to clarify if the range of
`GenericDistribution` is conservative 🤔
https://github.com/apache/datafusion/blob/8a2e83eb74f89a4e3387817943749f3894e7141a/datafusion/expr-common/src/statistics.rs#L273-L272
##
datafusion/expr-common/src/statistics.rs:
##
@@ -857,6 +857,143 @@ pub fn compute_variance(
ScalarValue::try_from(target_type)
}
+/// Merges two distributions into a single distribution that represents their
combined statistics.
+/// This creates a more general distribution that approximates the mixture of
the input distributions.
+pub fn merge_distributions(a: &Distribution, b: &Distribution) ->
Result {
+let range_a = a.range()?;
+let range_b = b.range()?;
+
+// Determine data type and create combined range
+let combined_range = if range_a.is_unbounded() || range_b.is_unbounded() {
+Interval::make_unbounded(&range_a.data_type())?
+} else {
+// Take the widest possible range conservatively
+let lower_a = range_a.lower();
+let lower_b = range_b.lower();
+let upper_a = range_a.upper();
+let upper_b = range_b.upper();
+
+let combined_lower = if lower_a.lt(lower_b) {
+lower_a.clone()
+} else {
+lower_b.clone()
+};
+
+let combined_upper = if upper_a.gt(upper_b) {
+upper_a.clone()
+} else {
+upper_b.clone()
+};
+
+Interval::try_new(combined_lower, combined_upper)?
+};
+
+// Calculate weights for the mixture distribution
Review Comment:
what does "mixture distribution" mean in this context?
It seems like this code weighs the input distributions on number of distinct
values (cardinality) which seems not right. For example if we have two inputs:
1. 1M rows, 3 distinct values
2. 10 rows, 10 distinct values
I think this code is going to assume the man is close to the second input
even though there are only 10 values
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
