mengdou created CALCITE-6329:
--------------------------------
Summary: Use weighted-average calculation for the columns in Union
operator
Key: CALCITE-6329
URL: https://issues.apache.org/jira/browse/CALCITE-6329
Project: Calcite
Issue Type: Improvement
Components: core
Affects Versions: 1.37.0
Reporter: mengdou
Assignee: mengdou
In the method averageColumnSizes(Union rel, RelMetadataQuery mq) of class
RelMdSize, it uses a simple average for every column of the current Union
operator to calculate column_size, so does row_size, which is calculated
according to all the column sizes.
{code:java}
public List<Double> averageColumnSizes(Union rel, RelMetadataQuery mq) {
final int fieldCount = rel.getRowType().getFieldCount();
List<List<Double>> inputColumnSizeList = new ArrayList<>();
for (RelNode input : rel.getInputs()) {
final List<Double> inputSizes = mq.getAverageColumnSizes(input);
if (inputSizes != null) {
inputColumnSizeList.add(inputSizes);
}
}
switch (inputColumnSizeList.size()) {
case 0:
return null; // all were null
case 1:
return inputColumnSizeList.get(0); // all but one were null
}
final ImmutableNullableList.Builder<Double> sizes =
ImmutableNullableList.builder();
int nn = 0;
for (int i = 0; i < fieldCount; i++) {
double d = 0d;
int n = 0;
for (List<Double> inputColumnSizes : inputColumnSizeList) {
Double d2 = inputColumnSizes.get(i);
if (d2 != null) {
d += d2;
++n;
++nn;
}
}
sizes.add(n > 0 ? d / n : null);
}
if (nn == 0) {
return null; // all columns are null
}
return sizes.build();
} {code}
But it doesn't take the rowCount of each input into account, which may
introduce a bad case and make a bad impact on the downstream operators. for
example:
{code:java}
# We have two tables A and B
# Logical Plan
ShuffleWrite
Union
TableScan(table=A)
TableScan(table=B)
# stats
row_count(A) = 1E9, row_size(A) = 10
row_count(B) = 1E5, row_size(B) = 100
row_count(Union) = 1.0001E10, row_size(Union) = 55 # using simple average
row_count(ShuffleWrite) = row_count(Union) = 1.0001E10 # inherits from Union
row_size(ShuffleWrite) = row_size(Union) = 55 # inherits from Union
# cost estimation of ShuffleWrite, which is more larger than real input bytes
input_bytes(ShuffleWrite) = 55 * 1.0001E10 = 5.50055E11
input_bytes(Union) = 1E9 * 10 + 1E5 * 100 = 1.001E11{code}
So I suggest that we can take row count of Union into consideration and use
weighted average to calculate every column sizes and the final row size instead.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)