mengdou created CALCITE-6329:
--------------------------------

             Summary: Use weighted-average calculation for the columns in Union 
operator
                 Key: CALCITE-6329
                 URL: https://issues.apache.org/jira/browse/CALCITE-6329
             Project: Calcite
          Issue Type: Improvement
          Components: core
    Affects Versions: 1.37.0
            Reporter: mengdou
            Assignee: mengdou


In the method averageColumnSizes(Union rel, RelMetadataQuery mq) of class 
RelMdSize, it uses a simple average for every column of the current Union 
operator to calculate column_size, so does row_size, which is calculated 
according to all the column sizes.
{code:java}
public List<Double> averageColumnSizes(Union rel, RelMetadataQuery mq) {
  final int fieldCount = rel.getRowType().getFieldCount();
  List<List<Double>> inputColumnSizeList = new ArrayList<>();
  for (RelNode input : rel.getInputs()) {
    final List<Double> inputSizes = mq.getAverageColumnSizes(input);
    if (inputSizes != null) {
      inputColumnSizeList.add(inputSizes);
    }
  }
  switch (inputColumnSizeList.size()) {
  case 0:
    return null; // all were null
  case 1:
    return inputColumnSizeList.get(0); // all but one were null
  }
  final ImmutableNullableList.Builder<Double> sizes =
      ImmutableNullableList.builder();
  int nn = 0;
  for (int i = 0; i < fieldCount; i++) {
    double d = 0d;
    int n = 0;
    for (List<Double> inputColumnSizes : inputColumnSizeList) {
      Double d2 = inputColumnSizes.get(i);
      if (d2 != null) {
        d += d2;
        ++n;
        ++nn;
      }
    }
    sizes.add(n > 0 ? d / n : null);
  }
  if (nn == 0) {
    return null; // all columns are null
  }
  return sizes.build();
} {code}
But it doesn't take the rowCount of each input into account, which may 
introduce a bad case and make a bad impact on the downstream operators. for 
example:

 
{code:java}
# We have two tables A and B

# Logical Plan
ShuffleWrite
  Union
    TableScan(table=A)
    TableScan(table=B)

# stats
row_count(A) = 1E9, row_size(A) = 10
row_count(B) = 1E5, row_size(B) = 100
row_count(Union) = 1.0001E10, row_size(Union) = 55      # using simple average
row_count(ShuffleWrite) = row_count(Union) = 1.0001E10  # inherits from Union
row_size(ShuffleWrite) = row_size(Union) = 55           # inherits from Union


# cost estimation of ShuffleWrite, which is more larger than real input bytes
input_bytes(ShuffleWrite) = 55 * 1.0001E10 = 5.50055E11
input_bytes(Union) = 1E9 * 10 + 1E5 * 100 = 1.001E11{code}
 

So I suggest that we can take row count of Union into consideration and use 
weighted average to calculate every column sizes and the final row size instead.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to