zabetak commented on code in PR #6293:
URL: https://github.com/apache/hive/pull/6293#discussion_r2839954476


##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -51,24 +56,73 @@
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
 import java.util.Collections;
-
+import java.util.Objects;
+
+import static org.apache.calcite.sql.type.SqlTypeName.BIGINT;
+import static org.apache.calcite.sql.type.SqlTypeName.DOUBLE;
+import static org.apache.calcite.sql.type.SqlTypeName.FLOAT;
+import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
+import static org.apache.calcite.sql.type.SqlTypeName.SMALLINT;
+import static org.apache.calcite.sql.type.SqlTypeName.TINYINT;
 import static 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator.betweenSelectivity;
 import static 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator.greaterThanOrEqualSelectivity;
 import static 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator.greaterThanSelectivity;
 import static 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator.isHistogramAvailable;
 import static 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator.lessThanOrEqualSelectivity;
 import static 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator.lessThanSelectivity;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TestFilterSelectivityEstimator {
 
+  private static final SqlBinaryOperator GT = SqlStdOperatorTable.GREATER_THAN;
+  private static final SqlBinaryOperator GE = 
SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
+  private static final SqlBinaryOperator LT = SqlStdOperatorTable.LESS_THAN;
+  private static final SqlBinaryOperator LE = 
SqlStdOperatorTable.LESS_THAN_OR_EQUAL;
+  private static final SqlOperator BETWEEN = HiveBetween.INSTANCE;

Review Comment:
   nit: The operators are used only in one place so no need to keep static 
aliases here.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +595,292 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", DOUBLE), int5));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    RelDataType decimal3s1 = decimalType(3, 1);
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+
+    RelDataType decimal4s1 = decimalType(4, 1);
+    checkSelectivity(10 / 28.f, lt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, le(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, ge(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+
+    RelDataType decimal2s1 = decimalType(2, 1);
+    checkSelectivity(2 / 28.f, lt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, le(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+
+    // expected: 100_000f
+    RelDataType decimal7s1 = decimalType(7, 1);
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // the cast would apply a modulo operation to the values outside the range 
of the cast
+    // so instead a default selectivity should be returned
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+  }
+
+  private void checkTimeFieldOnMidnightTimestamps(RexNode field) {
+    // note: use only values from VALUES_TIME that specify a date without 
hh:mm:ss!
+    checkSelectivity(7 / 7.f, ge(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(5 / 7.f, ge(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(1 / 7.f, ge(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(6 / 7.f, gt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(4 / 7.f, gt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(0 / 7.f, gt(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(1 / 7.f, le(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(3 / 7.f, le(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(7 / 7.f, le(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(0 / 7.f, lt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(2 / 7.f, lt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(6 / 7.f, lt(field, literalTimestamp("2020-11-07")));
+  }
+
+  private void checkTimeFieldOnIntraDayTimestamps(RexNode field) {
+    checkSelectivity(3 / 7.f, ge(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(2 / 7.f, gt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(5 / 7.f, le(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, lt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+    checkTimeFieldOnIntraDayTimestamps(currentInputRef);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDateWithCast() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    RexNode field1 = cast("f_date", SqlTypeName.DATE);
+    checkTimeFieldOnMidnightTimestamps(field1);
+    checkTimeFieldOnIntraDayTimestamps(field1);
+
+    RexNode field2 = cast("f_date", SqlTypeName.TIMESTAMP);
+    checkTimeFieldOnMidnightTimestamps(field2);
+    checkTimeFieldOnIntraDayTimestamps(field2);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestampWithCast() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", SqlTypeName.DATE));
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", 
SqlTypeName.TIMESTAMP));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal2_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 2; // the number of values that "survive" the cast
+    RexNode cast = REX_BUILDER.makeCast(decimalType(2, 1), inputRef0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+    checkBetweenSelectivity(1, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal3_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 7;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(3, 1), inputRef0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+    checkBetweenSelectivity(4, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal4_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 23;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(4, 1), inputRef0);
+    // the values between -999.94999... and 999.94999... (both inclusive) pass 
through the cast
+    // the values between 99.95 and 100 are rounded up to 100, so they fulfill 
the BETWEEN
+    checkBetweenSelectivity(13, universe, total, cast, 100, 1000);
+    checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal7_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 26;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(7, 1), inputRef0);
+    checkBetweenSelectivity(14, universe, total, cast, 100, 1000);
+    checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  private void checkSelectivity(float expectedSelectivity, RexNode filter) {
+    FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(filter), DELTA);
+
+    // swap equation, e.g., col < 5 becomes 5 > col; selectivity stays the same
+    RexCall call = (RexCall) filter;
+    SqlOperator operator = ((RexCall) filter).getOperator();
+    SqlOperator swappedOp;
+    if (operator == LE) {
+      swappedOp = GE;
+    } else if (operator == LT) {
+      swappedOp = GT;
+    } else if (operator == GE) {
+      swappedOp = LE;
+    } else if (operator == GT) {
+      swappedOp = LT;
+    } else if (operator == BETWEEN) {
+      // BETWEEN cannot be swapped
+      return;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+    RexNode swapped = REX_BUILDER.makeCall(swappedOp, 
call.getOperands().get(1), call.getOperands().get(0));
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(swapped), DELTA);
+  }
+
+  private void checkBetweenSelectivity(float expectedEntries, float universe, 
float total, RexNode value, float lower,
+      float upper) {
+    RexNode betweenFilter =
+        REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolFalse, value, 
literalFloat(lower), literalFloat(upper));
+    FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
+    String between = "BETWEEN " + lower + " AND " + upper;
+    float expectedSelectivity = expectedEntries / total;
+    String message = between + ": calcite filter " + betweenFilter.toString();
+    Assert.assertEquals(message, expectedSelectivity, 
estimator.estimateSelectivity(betweenFilter), DELTA);
+
+    // invert the filter to a NOT BETWEEN
+    RexNode invBetween =
+        REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, value, 
literalFloat(lower), literalFloat(upper));
+    String invMessage = "NOT " + between + ": calcite filter " + 
invBetween.toString();
+    float invExpectedSelectivity = (universe - expectedEntries) / total;
+    Assert.assertEquals(invMessage, invExpectedSelectivity, 
estimator.estimateSelectivity(invBetween), DELTA);
+  }
+
+  private RexNode cast(String fieldname, SqlTypeName typeName) {
+    return cast(fieldname, type(typeName));
   }
+
+  private RexNode cast(String fieldname, RelDataType type) {
+    int fieldIndex = scan.getRowType().getFieldNames().indexOf(fieldname);
+    RexNode column = REX_BUILDER.makeInputRef(scan, fieldIndex);
+    return REX_BUILDER.makeCast(type, column);
+  }
+
+  private RexNode ge(RexNode expr, RexNode value) {
+    return REX_BUILDER.makeCall(GE, expr, value);
+  }
+
+  private RexNode gt(RexNode expr, RexNode value) {
+    return REX_BUILDER.makeCall(GT, expr, value);
+  }
+
+  private RexNode le(RexNode expr, RexNode value) {
+    return REX_BUILDER.makeCall(LE, expr, value);
+  }
+
+  private RexNode lt(RexNode expr, RexNode value) {
+    return REX_BUILDER.makeCall(LT, expr, value);
+  }
+
+  private static RelDataType type(SqlTypeName typeName) {
+    return REX_BUILDER.getTypeFactory().createSqlType(typeName);
+  }
+
+  private static RelDataType decimalType(int precision, int scale) {
+    return REX_BUILDER.getTypeFactory().createSqlType(SqlTypeName.DECIMAL, 
precision, scale);
+  }
+
+  private static RexLiteral literalTimestamp(String timestamp) {
+    return REX_BUILDER.makeLiteral(timestampMillis(timestamp),
+        REX_BUILDER.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP));
+  }
+
+  private static RexLiteral literalDate(String date) {

Review Comment:
   Is this method used?



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +595,292 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", DOUBLE), int5));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    RelDataType decimal3s1 = decimalType(3, 1);
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+
+    RelDataType decimal4s1 = decimalType(4, 1);
+    checkSelectivity(10 / 28.f, lt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, le(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, ge(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+
+    RelDataType decimal2s1 = decimalType(2, 1);
+    checkSelectivity(2 / 28.f, lt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, le(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+
+    // expected: 100_000f
+    RelDataType decimal7s1 = decimalType(7, 1);
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // the cast would apply a modulo operation to the values outside the range 
of the cast
+    // so instead a default selectivity should be returned
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+  }
+
+  private void checkTimeFieldOnMidnightTimestamps(RexNode field) {
+    // note: use only values from VALUES_TIME that specify a date without 
hh:mm:ss!
+    checkSelectivity(7 / 7.f, ge(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(5 / 7.f, ge(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(1 / 7.f, ge(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(6 / 7.f, gt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(4 / 7.f, gt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(0 / 7.f, gt(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(1 / 7.f, le(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(3 / 7.f, le(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(7 / 7.f, le(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(0 / 7.f, lt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(2 / 7.f, lt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(6 / 7.f, lt(field, literalTimestamp("2020-11-07")));
+  }
+
+  private void checkTimeFieldOnIntraDayTimestamps(RexNode field) {
+    checkSelectivity(3 / 7.f, ge(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(2 / 7.f, gt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(5 / 7.f, le(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, lt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+    checkTimeFieldOnIntraDayTimestamps(currentInputRef);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDateWithCast() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    RexNode field1 = cast("f_date", SqlTypeName.DATE);
+    checkTimeFieldOnMidnightTimestamps(field1);
+    checkTimeFieldOnIntraDayTimestamps(field1);
+
+    RexNode field2 = cast("f_date", SqlTypeName.TIMESTAMP);
+    checkTimeFieldOnMidnightTimestamps(field2);
+    checkTimeFieldOnIntraDayTimestamps(field2);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestampWithCast() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", SqlTypeName.DATE));
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", 
SqlTypeName.TIMESTAMP));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal2_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 2; // the number of values that "survive" the cast
+    RexNode cast = REX_BUILDER.makeCast(decimalType(2, 1), inputRef0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+    checkBetweenSelectivity(1, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal3_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 7;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(3, 1), inputRef0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+    checkBetweenSelectivity(4, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal4_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 23;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(4, 1), inputRef0);
+    // the values between -999.94999... and 999.94999... (both inclusive) pass 
through the cast
+    // the values between 99.95 and 100 are rounded up to 100, so they fulfill 
the BETWEEN
+    checkBetweenSelectivity(13, universe, total, cast, 100, 1000);
+    checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal7_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 26;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(7, 1), inputRef0);
+    checkBetweenSelectivity(14, universe, total, cast, 100, 1000);
+    checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  private void checkSelectivity(float expectedSelectivity, RexNode filter) {
+    FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(filter), DELTA);
+
+    // swap equation, e.g., col < 5 becomes 5 > col; selectivity stays the same
+    RexCall call = (RexCall) filter;
+    SqlOperator operator = ((RexCall) filter).getOperator();
+    SqlOperator swappedOp;
+    if (operator == LE) {
+      swappedOp = GE;
+    } else if (operator == LT) {
+      swappedOp = GT;
+    } else if (operator == GE) {
+      swappedOp = LE;
+    } else if (operator == GT) {
+      swappedOp = LT;
+    } else if (operator == BETWEEN) {
+      // BETWEEN cannot be swapped
+      return;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+    RexNode swapped = REX_BUILDER.makeCall(swappedOp, 
call.getOperands().get(1), call.getOperands().get(0));
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(swapped), DELTA);
+  }
+
+  private void checkBetweenSelectivity(float expectedEntries, float universe, 
float total, RexNode value, float lower,
+      float upper) {
+    RexNode betweenFilter =
+        REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolFalse, value, 
literalFloat(lower), literalFloat(upper));
+    FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
+    String between = "BETWEEN " + lower + " AND " + upper;
+    float expectedSelectivity = expectedEntries / total;
+    String message = between + ": calcite filter " + betweenFilter.toString();

Review Comment:
   nit: When the tests fail the message looks like the following:
   ```
   java.lang.AssertionError: NOT BETWEEN 100.0 AND 0.0: calcite filter 
BETWEEN(true, CAST($0):DECIMAL(2, 1) NOT NULL, 1E2:FLOAT, 0E0:FLOAT) 
   ```
   We are printing the expression twice with different formatting which is a 
bit confusing. I think it is sufficient to do keep just the calcite toString 
serialization. This will also make things more uniform with `checkSelectivity` 
that does this already..



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +595,292 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", DOUBLE), int5));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    RelDataType decimal3s1 = decimalType(3, 1);
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+
+    RelDataType decimal4s1 = decimalType(4, 1);
+    checkSelectivity(10 / 28.f, lt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, le(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, ge(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+
+    RelDataType decimal2s1 = decimalType(2, 1);
+    checkSelectivity(2 / 28.f, lt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, le(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+
+    // expected: 100_000f
+    RelDataType decimal7s1 = decimalType(7, 1);
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // the cast would apply a modulo operation to the values outside the range 
of the cast
+    // so instead a default selectivity should be returned
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+  }
+
+  private void checkTimeFieldOnMidnightTimestamps(RexNode field) {
+    // note: use only values from VALUES_TIME that specify a date without 
hh:mm:ss!
+    checkSelectivity(7 / 7.f, ge(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(5 / 7.f, ge(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(1 / 7.f, ge(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(6 / 7.f, gt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(4 / 7.f, gt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(0 / 7.f, gt(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(1 / 7.f, le(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(3 / 7.f, le(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(7 / 7.f, le(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(0 / 7.f, lt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(2 / 7.f, lt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(6 / 7.f, lt(field, literalTimestamp("2020-11-07")));
+  }
+
+  private void checkTimeFieldOnIntraDayTimestamps(RexNode field) {
+    checkSelectivity(3 / 7.f, ge(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(2 / 7.f, gt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(5 / 7.f, le(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, lt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+    checkTimeFieldOnIntraDayTimestamps(currentInputRef);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDateWithCast() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    RexNode field1 = cast("f_date", SqlTypeName.DATE);
+    checkTimeFieldOnMidnightTimestamps(field1);
+    checkTimeFieldOnIntraDayTimestamps(field1);
+
+    RexNode field2 = cast("f_date", SqlTypeName.TIMESTAMP);
+    checkTimeFieldOnMidnightTimestamps(field2);
+    checkTimeFieldOnIntraDayTimestamps(field2);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestampWithCast() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", SqlTypeName.DATE));
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", 
SqlTypeName.TIMESTAMP));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal2_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 2; // the number of values that "survive" the cast
+    RexNode cast = REX_BUILDER.makeCast(decimalType(2, 1), inputRef0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+    checkBetweenSelectivity(1, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal3_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 7;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(3, 1), inputRef0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+    checkBetweenSelectivity(4, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal4_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 23;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(4, 1), inputRef0);
+    // the values between -999.94999... and 999.94999... (both inclusive) pass 
through the cast
+    // the values between 99.95 and 100 are rounded up to 100, so they fulfill 
the BETWEEN
+    checkBetweenSelectivity(13, universe, total, cast, 100, 1000);
+    checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal7_1() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+    float universe = 26;
+    RexNode cast = REX_BUILDER.makeCast(decimalType(7, 1), inputRef0);
+    checkBetweenSelectivity(14, universe, total, cast, 100, 1000);
+    checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  private void checkSelectivity(float expectedSelectivity, RexNode filter) {
+    FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(filter), DELTA);
+
+    // swap equation, e.g., col < 5 becomes 5 > col; selectivity stays the same
+    RexCall call = (RexCall) filter;
+    SqlOperator operator = ((RexCall) filter).getOperator();
+    SqlOperator swappedOp;
+    if (operator == LE) {
+      swappedOp = GE;
+    } else if (operator == LT) {
+      swappedOp = GT;
+    } else if (operator == GE) {
+      swappedOp = LE;
+    } else if (operator == GT) {
+      swappedOp = LT;
+    } else if (operator == BETWEEN) {
+      // BETWEEN cannot be swapped
+      return;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+    RexNode swapped = REX_BUILDER.makeCall(swappedOp, 
call.getOperands().get(1), call.getOperands().get(0));
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(swapped), DELTA);
+  }
+
+  private void checkBetweenSelectivity(float expectedEntries, float universe, 
float total, RexNode value, float lower,
+      float upper) {
+    RexNode betweenFilter =
+        REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolFalse, value, 
literalFloat(lower), literalFloat(upper));
+    FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
+    String between = "BETWEEN " + lower + " AND " + upper;
+    float expectedSelectivity = expectedEntries / total;
+    String message = between + ": calcite filter " + betweenFilter.toString();
+    Assert.assertEquals(message, expectedSelectivity, 
estimator.estimateSelectivity(betweenFilter), DELTA);
+
+    // invert the filter to a NOT BETWEEN
+    RexNode invBetween =
+        REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, value, 
literalFloat(lower), literalFloat(upper));
+    String invMessage = "NOT " + between + ": calcite filter " + 
invBetween.toString();
+    float invExpectedSelectivity = (universe - expectedEntries) / total;
+    Assert.assertEquals(invMessage, invExpectedSelectivity, 
estimator.estimateSelectivity(invBetween), DELTA);

Review Comment:
   After HIVE-27102 `NOT BETWEEN` should not appear during planning so not sure 
if we should keep adding code & tests for this expression. I guess we can keep 
them for now but in the future we may opt to remove all code and logic around 
this. 



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +203,284 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * If the cast can be removed, just return its operand and adjust the 
boundaries if necessary.
+   *
+   * <p>
+   *   In Hive, if a value cannot be represented by the cast, the result of 
the cast is NULL,
+   *   and therefore cannot fulfill the predicate. So the possible range of 
the values
+   *   is limited by the range of possible values of the type.
+   * </p>
+   *
+   * <p>
+   *   Special care is taken to support the cast to DECIMAL(precision, scale):
+   *   The cast to DECIMAL rounds the value the same way as {@link 
RoundingMode#HALF_UP}.
+   *   The boundaries are adjusted accordingly.
+   * </p>
+   *
+   * @param cast a RexCall of type {@link SqlKind#CAST}
+   * @param tableScan the table that provides the statistics
+   * @param rangeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @param typeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @return the operand if the cast can be removed, otherwise the cast itself
+   */
+  private RexNode removeCastIfPossible(RexCall cast, HiveTableScan tableScan,
+      MutableObject<FloatInterval> rangeBoundaries, 
MutableObject<FloatInterval> typeBoundaries) {
+    RexNode op0 = cast.getOperands().getFirst();
+    if (!(op0 instanceof RexInputRef)) {
+      return cast;
+    }
+    int index = ((RexInputRef) op0).getIndex();
+    final List<ColStatistics> colStats = 
tableScan.getColStat(Collections.singletonList(index));
+    if (colStats.isEmpty()) {
+      return cast;
+    }
+
+    // we need to check that the possible values of the input to the cast are 
all within the type range of the cast
+    // otherwise the CAST introduces some modulo-like behavior (*)
+    ColStatistics colStat = colStats.getFirst();
+    ColStatistics.Range range = colStat.getRange();
+    if (range == null || range.minValue == null || Double.isNaN(
+        range.minValue.doubleValue()) || range.maxValue == null || 
Double.isNaN(range.maxValue.doubleValue())) {
+      return cast;
+    }
+
+    SqlTypeName type = cast.getType().getSqlTypeName();
+
+    double min;
+    double max;
+    switch (type) {
+    case TINYINT, SMALLINT, INTEGER, BIGINT:
+      min = ((Number) type.getLimit(false, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      max = ((Number) type.getLimit(true, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      break;
+    case TIMESTAMP, DATE:
+      min = Long.MIN_VALUE;
+      max = Long.MAX_VALUE;
+      break;
+    case FLOAT:
+      min = -Float.MAX_VALUE;
+      max = Float.MAX_VALUE;
+      break;
+    case DOUBLE, DECIMAL:
+      min = -Double.MAX_VALUE;
+      max = Double.MAX_VALUE;
+      break;
+    default:
+      // unknown type, do not remove the cast
+      return cast;
+    }
+
+    // see (*)
+    if (range.minValue.doubleValue() < min || range.maxValue.doubleValue() > 
max) {
+      return cast;
+    }
+
+    if (type == SqlTypeName.DECIMAL) {
+      adjustBoundariesForDecimal(cast, rangeBoundaries, typeBoundaries);
+    }
+
+    return op0;
+  }
+
+  /**
+   * Adjust the boundaries for a DECIMAL cast.
+   *
+   * @param rangeBoundaries boundaries of the range predicate
+   * @param typeBoundaries if not null, will be set to the boundaries of the 
type range
+   */
+  private static void adjustBoundariesForDecimal(RexCall cast, 
MutableObject<FloatInterval> rangeBoundaries,
+      MutableObject<FloatInterval> typeBoundaries) {
+    // values outside the representable range are cast to NULL, so adapt the 
boundaries
+    int precision = cast.getType().getPrecision();
+    int scale = cast.getType().getScale();
+    int digits = precision - scale;
+    // the cast does some rounding, i.e., CAST(99.9499 AS DECIMAL(3,1)) = 99.9
+    // but CAST(99.95 AS DECIMAL(3,1)) = NULL
+    float adjust = (float) (5 * Math.pow(10, -(scale + 1)));
+    // the range of values supported by the type is interval 
[-typeRangeExtent, typeRangeExtent] (both inclusive)
+    // e.g., the typeRangeExt is 99.94999 for DECIMAL(3,1)
+    float typeRangeExtent = Math.nextDown((float) (Math.pow(10, digits) - 
adjust));
+
+    FloatInterval range = rangeBoundaries.getValue();
+    // the resulting value of +- adjust would be rounded up, so in some cases 
we need to use Math.nextDown
+    float adjusted1 = range.lowerInclusive ? range.lower - adjust : 
Math.nextDown(range.lower + adjust);
+    float adjusted2 = range.upperInclusive ? Math.nextDown(range.upper + 
adjust) : range.upper - adjust;
+
+    float lowerUniverse = range.lowerInclusive ? -typeRangeExtent : 
Math.nextDown(-typeRangeExtent);
+    float upperUniverse = range.upperInclusive ? typeRangeExtent : 
Math.nextUp(typeRangeExtent);
+    float lower = Math.max(adjusted1, lowerUniverse);
+    float upper = Math.min(adjusted2, upperUniverse);
+    rangeBoundaries.setValue(range.withValues(lower, upper));
+    if (typeBoundaries != null) {
+      typeBoundaries.setValue(
+          new FloatInterval(lowerUniverse, range.lowerInclusive, 
upperUniverse, range.upperInclusive));
+    }
+  }
+
   private double computeRangePredicateSelectivity(RexCall call, SqlKind op) {
-    final boolean isLiteralLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.LITERAL);
-    final boolean isLiteralRight = 
call.getOperands().get(1).getKind().equals(SqlKind.LITERAL);
-    final boolean isInputRefLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.INPUT_REF);
-    final boolean isInputRefRight = 
call.getOperands().get(1).getKind().equals(SqlKind.INPUT_REF);
+    double defaultSelectivity = ((double) 1 / (double) 3);
+    if (!(childRel instanceof HiveTableScan)) {
+      return defaultSelectivity;
+    }
 
-    if (childRel instanceof HiveTableScan && isLiteralLeft != isLiteralRight 
&& isInputRefLeft != isInputRefRight) {
-      final HiveTableScan t = (HiveTableScan) childRel;
-      final int inputRefIndex = ((RexInputRef) 
call.getOperands().get(isInputRefLeft ? 0 : 1)).getIndex();
-      final List<ColStatistics> colStats = 
t.getColStat(Collections.singletonList(inputRefIndex));
+    // search for the literal
+    List<RexNode> operands = call.getOperands();
+    final Optional<Float> leftLiteral = extractLiteral(operands.get(0));
+    final Optional<Float> rightLiteral = extractLiteral(operands.get(1));
+    if ((leftLiteral.isPresent()) == (rightLiteral.isPresent())) {
+      return defaultSelectivity;
+    }
+    int literalOpIdx = leftLiteral.isPresent() ? 0 : 1;
+
+    // analyze the predicate
+    float value = leftLiteral.orElseGet(rightLiteral::get);

Review Comment:
   Is there anything preventing both `leftLiteral` and `rightLiteral` to be 
null? It seems that previous version of the code had some logic for this case.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -299,10 +511,10 @@ private float extractLiteral(SqlTypeName typeName, Object 
boundValueObject) {
       value = ((GregorianCalendar) 
boundValueObject).toInstant().getEpochSecond();
       break;
     default:
-      throw new IllegalStateException(
-          "Unsupported type for comparator selectivity evaluation using 
histogram: " + typeName);
+      LOG.warn("Unsupported type for comparator selectivity evaluation using 
histogram: {}", typeName);

Review Comment:
   Should we really raise a warning here? With the changes in this PR it seems 
that we can reach this default branch pretty often (e.g., character types) so 
logging a warning will raise false alarms to developers. 



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +595,292 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", DOUBLE), int5));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    RelDataType decimal3s1 = decimalType(3, 1);
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+
+    RelDataType decimal4s1 = decimalType(4, 1);
+    checkSelectivity(10 / 28.f, lt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, le(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, ge(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+
+    RelDataType decimal2s1 = decimalType(2, 1);
+    checkSelectivity(2 / 28.f, lt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, le(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+
+    // expected: 100_000f
+    RelDataType decimal7s1 = decimalType(7, 1);
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // the cast would apply a modulo operation to the values outside the range 
of the cast
+    // so instead a default selectivity should be returned
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+  }
+
+  private void checkTimeFieldOnMidnightTimestamps(RexNode field) {
+    // note: use only values from VALUES_TIME that specify a date without 
hh:mm:ss!
+    checkSelectivity(7 / 7.f, ge(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(5 / 7.f, ge(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(1 / 7.f, ge(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(6 / 7.f, gt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(4 / 7.f, gt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(0 / 7.f, gt(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(1 / 7.f, le(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(3 / 7.f, le(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(7 / 7.f, le(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(0 / 7.f, lt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(2 / 7.f, lt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(6 / 7.f, lt(field, literalTimestamp("2020-11-07")));
+  }
+
+  private void checkTimeFieldOnIntraDayTimestamps(RexNode field) {
+    checkSelectivity(3 / 7.f, ge(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(2 / 7.f, gt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(5 / 7.f, le(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, lt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+    checkTimeFieldOnIntraDayTimestamps(currentInputRef);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDateWithCast() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    RexNode field1 = cast("f_date", SqlTypeName.DATE);
+    checkTimeFieldOnMidnightTimestamps(field1);
+    checkTimeFieldOnIntraDayTimestamps(field1);
+
+    RexNode field2 = cast("f_date", SqlTypeName.TIMESTAMP);
+    checkTimeFieldOnMidnightTimestamps(field2);
+    checkTimeFieldOnIntraDayTimestamps(field2);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestampWithCast() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", SqlTypeName.DATE));
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", 
SqlTypeName.TIMESTAMP));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCastDecimal2_1() {

Review Comment:
   The test name for this and the following methods testing `BETWEEN` is a bit 
strange. Since we are testing the `BETWEEN` expression I would assume that we 
test 
`org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator#computeBetweenPredicateSelectivity`
 and not 
`org.apache.hadoop.hive.ql.optimizer.calcite.stats.FilterSelectivityEstimator#computeRangePredicateSelectivity`.
   
   How about one of the following alternatives:
   * `testComputeBetweenPredicateSelectivityWithCastDecimal2_1`
   * `testEstimateSelectivityBetweenWithCastDecimal2_1`



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +599,215 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, castAndCompare(TINYINT, GE, int5));
+    checkSelectivity(10 / 13.f, castAndCompare(TINYINT, LT, int5));
+    checkSelectivity(2 / 13.f, castAndCompare(TINYINT, GT, int5));
+    checkSelectivity(11 / 13.f, castAndCompare(TINYINT, LE, int5));
+
+    checkSelectivity(12 / 13f, castAndCompare(TINYINT, GE, int2));
+    checkSelectivity(1 / 13f, castAndCompare(TINYINT, LT, int2));
+    checkSelectivity(5 / 13f, castAndCompare(TINYINT, GT, int2));
+    checkSelectivity(8 / 13f, castAndCompare(TINYINT, LE, int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, castAndCompare(INTEGER, GE, int5));
+    checkSelectivity(3 / 13.f, castAndCompare(BIGINT, GE, int5));
+    checkSelectivity(3 / 13.f, castAndCompare(FLOAT, GE, int5));
+    checkSelectivity(3 / 13.f, castAndCompare(DOUBLE, GE, int5));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    checkSelectivity(4 / 28.f, castAndCompare(DECIMAL_3_1, GE, 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, castAndCompare(DECIMAL_3_1, LT, 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, castAndCompare(DECIMAL_3_1, LE, 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, castAndCompare(DECIMAL_3_1, GT, 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, castAndCompare(DECIMAL_3_1, GE, 
literalFloat(100)));
+
+    checkSelectivity(10 / 28.f, castAndCompare(DECIMAL_4_1, LT, 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, castAndCompare(DECIMAL_4_1, LE, 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, castAndCompare(DECIMAL_4_1, GT, 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, castAndCompare(DECIMAL_4_1, GE, 
literalFloat(100)));
+
+    checkSelectivity(2 / 28.f, castAndCompare(DECIMAL_2_1, LT, 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, castAndCompare(DECIMAL_2_1, LE, 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, castAndCompare(DECIMAL_2_1, GT, 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, castAndCompare(DECIMAL_2_1, GE, 
literalFloat(100)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, castAndCompare(DECIMAL_7_1, GT, 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, castAndCompare(DECIMAL_7_1, GE, 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, castAndCompare(DECIMAL_7_1, GE, 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, castAndCompare(DECIMAL_7_1, GT, 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, castAndCompare(DECIMAL_7_1, GT, 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, castAndCompare(DECIMAL_3_1, GE, 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, castAndCompare(DECIMAL_3_1, GT, 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, castAndCompare(DECIMAL_3_1, LE, 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, castAndCompare(DECIMAL_3_1, LT, 
literalFloat(1)));
+
+    // the cast would apply a modulo operation to the values outside the range 
of the cast
+    // so instead a default selectivity should be returned
+    checkSelectivity(1 / 3.f, castAndCompare(TINYINT, LT, literalFloat(100)));
+    checkSelectivity(1 / 3.f, castAndCompare(TINYINT, LT, literalFloat(100)));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+
+    checkSelectivity(5 / 7.f, REX_BUILDER.makeCall(GE, currentInputRef, 
literalTimestamp("2020-11-03")));
+    checkSelectivity(4 / 7.f, REX_BUILDER.makeCall(GT, currentInputRef, 
literalTimestamp("2020-11-03")));
+    checkSelectivity(5 / 7.f, REX_BUILDER.makeCall(LE, currentInputRef, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, REX_BUILDER.makeCall(LT, currentInputRef, 
literalTimestamp("2020-11-05T11:23:45Z")));
   }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+
+    checkSelectivity(5 / 7.f, REX_BUILDER.makeCall(GE, currentInputRef, 
literalDate("2020-11-03")));
+    checkSelectivity(4 / 7.f, REX_BUILDER.makeCall(GT, currentInputRef, 
literalDate("2020-11-03")));
+    checkSelectivity(4 / 7.f, REX_BUILDER.makeCall(LE, currentInputRef, 
literalDate("2020-11-05")));
+    checkSelectivity(4 / 7.f, REX_BUILDER.makeCall(LT, currentInputRef, 
literalDate("2020-11-05")));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityBetweenWithCast() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    float total = VALUES2.length;
+
+    {
+      float universe = 2; // the number of values that "survive" the cast
+      RexNode cast = REX_BUILDER.makeCast(DECIMAL_2_1, inputRef0);
+      checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+      checkBetweenSelectivity(1, universe, total, cast, 1f, 100f);
+      checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+    }
+
+    {
+      float universe = 7;
+      RexNode cast = REX_BUILDER.makeCast(DECIMAL_3_1, inputRef0);
+      checkBetweenSelectivity(0, universe, total, cast, 100f, 1000f);
+      checkBetweenSelectivity(4, universe, total, cast, 1f, 100f);
+      checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+    }
+
+    {
+      float universe = 23;
+      RexNode cast = REX_BUILDER.makeCast(DECIMAL_4_1, inputRef0);
+      // the values between -999.94999... and 999.94999... (both inclusive) 
pass through the cast
+      // the values between 99.95 and 100 are rounded up to 100, so they 
fulfill the BETWEEN
+      checkBetweenSelectivity(13, universe, total, cast, 100, 1000);
+      checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+      checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+    }
+
+    {
+      float universe = 26;
+      RexNode cast = REX_BUILDER.makeCast(DECIMAL_7_1, inputRef0);
+      checkBetweenSelectivity(14, universe, total, cast, 100, 1000);
+      checkBetweenSelectivity(14, universe, total, cast, 1f, 100f);
+      checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+    }
+  }
+
+  private void checkSelectivity(float expectedSelectivity, RexNode filter) {
+    FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(filter), DELTA);
+
+    // swap equation, e.g., col < 5 becomes 5 > col; selectivity stays the same
+    RexCall call = (RexCall) filter;
+    SqlOperator operator = ((RexCall) filter).getOperator();
+    SqlOperator swappedOp;
+    if (operator == LE) {
+      swappedOp = GE;
+    } else if (operator == LT) {
+      swappedOp = GT;
+    } else if (operator == GE) {
+      swappedOp = LE;
+    } else if (operator == GT) {
+      swappedOp = LT;
+    } else if (operator == BETWEEN) {
+      // BETWEEN cannot be swapped
+      return;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+    RexNode swapped = REX_BUILDER.makeCall(swappedOp, 
call.getOperands().get(1), call.getOperands().get(0));
+    Assert.assertEquals(filter.toString(), expectedSelectivity, 
estimator.estimateSelectivity(swapped), DELTA);
+  }

Review Comment:
   Sounds good, in this case I think you can use 
org.apache.calcite.rex.RexUtil#invert that is doing the same thing.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +203,284 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * If the cast can be removed, just return its operand and adjust the 
boundaries if necessary.
+   *
+   * <p>
+   *   In Hive, if a value cannot be represented by the cast, the result of 
the cast is NULL,
+   *   and therefore cannot fulfill the predicate. So the possible range of 
the values
+   *   is limited by the range of possible values of the type.
+   * </p>
+   *
+   * <p>
+   *   Special care is taken to support the cast to DECIMAL(precision, scale):
+   *   The cast to DECIMAL rounds the value the same way as {@link 
RoundingMode#HALF_UP}.
+   *   The boundaries are adjusted accordingly.
+   * </p>
+   *
+   * @param cast a RexCall of type {@link SqlKind#CAST}
+   * @param tableScan the table that provides the statistics
+   * @param rangeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @param typeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @return the operand if the cast can be removed, otherwise the cast itself
+   */
+  private RexNode removeCastIfPossible(RexCall cast, HiveTableScan tableScan,
+      MutableObject<FloatInterval> rangeBoundaries, 
MutableObject<FloatInterval> typeBoundaries) {
+    RexNode op0 = cast.getOperands().getFirst();
+    if (!(op0 instanceof RexInputRef)) {
+      return cast;
+    }
+    int index = ((RexInputRef) op0).getIndex();
+    final List<ColStatistics> colStats = 
tableScan.getColStat(Collections.singletonList(index));
+    if (colStats.isEmpty()) {
+      return cast;
+    }
+
+    // we need to check that the possible values of the input to the cast are 
all within the type range of the cast
+    // otherwise the CAST introduces some modulo-like behavior (*)
+    ColStatistics colStat = colStats.getFirst();
+    ColStatistics.Range range = colStat.getRange();
+    if (range == null || range.minValue == null || Double.isNaN(
+        range.minValue.doubleValue()) || range.maxValue == null || 
Double.isNaN(range.maxValue.doubleValue())) {
+      return cast;
+    }
+
+    SqlTypeName type = cast.getType().getSqlTypeName();
+
+    double min;
+    double max;
+    switch (type) {
+    case TINYINT, SMALLINT, INTEGER, BIGINT:
+      min = ((Number) type.getLimit(false, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      max = ((Number) type.getLimit(true, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      break;
+    case TIMESTAMP, DATE:
+      min = Long.MIN_VALUE;
+      max = Long.MAX_VALUE;
+      break;
+    case FLOAT:
+      min = -Float.MAX_VALUE;
+      max = Float.MAX_VALUE;
+      break;
+    case DOUBLE, DECIMAL:
+      min = -Double.MAX_VALUE;
+      max = Double.MAX_VALUE;
+      break;
+    default:
+      // unknown type, do not remove the cast
+      return cast;
+    }
+
+    // see (*)
+    if (range.minValue.doubleValue() < min || range.maxValue.doubleValue() > 
max) {
+      return cast;
+    }
+
+    if (type == SqlTypeName.DECIMAL) {
+      adjustBoundariesForDecimal(cast, rangeBoundaries, typeBoundaries);
+    }
+
+    return op0;
+  }
+
+  /**
+   * Adjust the boundaries for a DECIMAL cast.
+   *
+   * @param rangeBoundaries boundaries of the range predicate
+   * @param typeBoundaries if not null, will be set to the boundaries of the 
type range
+   */
+  private static void adjustBoundariesForDecimal(RexCall cast, 
MutableObject<FloatInterval> rangeBoundaries,
+      MutableObject<FloatInterval> typeBoundaries) {
+    // values outside the representable range are cast to NULL, so adapt the 
boundaries
+    int precision = cast.getType().getPrecision();
+    int scale = cast.getType().getScale();
+    int digits = precision - scale;
+    // the cast does some rounding, i.e., CAST(99.9499 AS DECIMAL(3,1)) = 99.9
+    // but CAST(99.95 AS DECIMAL(3,1)) = NULL
+    float adjust = (float) (5 * Math.pow(10, -(scale + 1)));
+    // the range of values supported by the type is interval 
[-typeRangeExtent, typeRangeExtent] (both inclusive)
+    // e.g., the typeRangeExt is 99.94999 for DECIMAL(3,1)
+    float typeRangeExtent = Math.nextDown((float) (Math.pow(10, digits) - 
adjust));
+
+    FloatInterval range = rangeBoundaries.getValue();
+    // the resulting value of +- adjust would be rounded up, so in some cases 
we need to use Math.nextDown
+    float adjusted1 = range.lowerInclusive ? range.lower - adjust : 
Math.nextDown(range.lower + adjust);
+    float adjusted2 = range.upperInclusive ? Math.nextDown(range.upper + 
adjust) : range.upper - adjust;
+
+    float lowerUniverse = range.lowerInclusive ? -typeRangeExtent : 
Math.nextDown(-typeRangeExtent);
+    float upperUniverse = range.upperInclusive ? typeRangeExtent : 
Math.nextUp(typeRangeExtent);
+    float lower = Math.max(adjusted1, lowerUniverse);
+    float upper = Math.min(adjusted2, upperUniverse);
+    rangeBoundaries.setValue(range.withValues(lower, upper));
+    if (typeBoundaries != null) {
+      typeBoundaries.setValue(
+          new FloatInterval(lowerUniverse, range.lowerInclusive, 
upperUniverse, range.upperInclusive));
+    }
+  }
+
   private double computeRangePredicateSelectivity(RexCall call, SqlKind op) {
-    final boolean isLiteralLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.LITERAL);
-    final boolean isLiteralRight = 
call.getOperands().get(1).getKind().equals(SqlKind.LITERAL);
-    final boolean isInputRefLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.INPUT_REF);
-    final boolean isInputRefRight = 
call.getOperands().get(1).getKind().equals(SqlKind.INPUT_REF);
+    double defaultSelectivity = ((double) 1 / (double) 3);
+    if (!(childRel instanceof HiveTableScan)) {
+      return defaultSelectivity;
+    }
 
-    if (childRel instanceof HiveTableScan && isLiteralLeft != isLiteralRight 
&& isInputRefLeft != isInputRefRight) {

Review Comment:
   Not sure if we handle the `isInputRefLeft != isInputRefRight` case in the 
new code.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -94,12 +147,14 @@ public class TestFilterSelectivityEstimator {
   @Mock
   private RelMetadataQuery mq;
 
-  private HiveTableScan tableScan;
+  private ColStatistics stats;
   private RelNode scan;
+  private RexNode currentInputRef;
+  private final MutableObject<float[]> currentValues = new MutableObject<>();

Review Comment:
   I don't think we need the `MutableObject` wrapper. In fact, I think that we 
don't need to store the array either. It seems that we could replace this with 
a single integer variable (`int rowCount`).



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +595,292 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", DOUBLE), int5));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    RelDataType decimal3s1 = decimalType(3, 1);
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+
+    RelDataType decimal4s1 = decimalType(4, 1);
+    checkSelectivity(10 / 28.f, lt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, le(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, ge(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+
+    RelDataType decimal2s1 = decimalType(2, 1);
+    checkSelectivity(2 / 28.f, lt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, le(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+
+    // expected: 100_000f
+    RelDataType decimal7s1 = decimalType(7, 1);
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // the cast would apply a modulo operation to the values outside the range 
of the cast
+    // so instead a default selectivity should be returned
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+    checkSelectivity(1 / 3.f, lt(cast("f_numeric", TINYINT), 
literalFloat(100)));
+  }
+
+  private void checkTimeFieldOnMidnightTimestamps(RexNode field) {
+    // note: use only values from VALUES_TIME that specify a date without 
hh:mm:ss!
+    checkSelectivity(7 / 7.f, ge(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(5 / 7.f, ge(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(1 / 7.f, ge(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(6 / 7.f, gt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(4 / 7.f, gt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(0 / 7.f, gt(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(1 / 7.f, le(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(3 / 7.f, le(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(7 / 7.f, le(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(0 / 7.f, lt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(2 / 7.f, lt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(6 / 7.f, lt(field, literalTimestamp("2020-11-07")));
+  }
+
+  private void checkTimeFieldOnIntraDayTimestamps(RexNode field) {
+    checkSelectivity(3 / 7.f, ge(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(2 / 7.f, gt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(5 / 7.f, le(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, lt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+    checkTimeFieldOnIntraDayTimestamps(currentInputRef);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityDateWithCast() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    RexNode field1 = cast("f_date", SqlTypeName.DATE);
+    checkTimeFieldOnMidnightTimestamps(field1);
+    checkTimeFieldOnIntraDayTimestamps(field1);
+
+    RexNode field2 = cast("f_date", SqlTypeName.TIMESTAMP);
+    checkTimeFieldOnMidnightTimestamps(field2);
+    checkTimeFieldOnIntraDayTimestamps(field2);
+  }
+
+  @Test
+  public void testComputeRangePredicateSelectivityTimestampWithCast() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", SqlTypeName.DATE));
+    checkTimeFieldOnMidnightTimestamps(cast("f_timestamp", 
SqlTypeName.TIMESTAMP));

Review Comment:
   Why `checkTimeFieldOnIntraDayTimestamps` are not relevant here?



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +203,284 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * If the cast can be removed, just return its operand and adjust the 
boundaries if necessary.
+   *
+   * <p>
+   *   In Hive, if a value cannot be represented by the cast, the result of 
the cast is NULL,
+   *   and therefore cannot fulfill the predicate. So the possible range of 
the values
+   *   is limited by the range of possible values of the type.
+   * </p>
+   *
+   * <p>
+   *   Special care is taken to support the cast to DECIMAL(precision, scale):
+   *   The cast to DECIMAL rounds the value the same way as {@link 
RoundingMode#HALF_UP}.
+   *   The boundaries are adjusted accordingly.
+   * </p>
+   *
+   * @param cast a RexCall of type {@link SqlKind#CAST}
+   * @param tableScan the table that provides the statistics
+   * @param rangeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @param typeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @return the operand if the cast can be removed, otherwise the cast itself
+   */
+  private RexNode removeCastIfPossible(RexCall cast, HiveTableScan tableScan,
+      MutableObject<FloatInterval> rangeBoundaries, 
MutableObject<FloatInterval> typeBoundaries) {
+    RexNode op0 = cast.getOperands().getFirst();
+    if (!(op0 instanceof RexInputRef)) {
+      return cast;
+    }
+    int index = ((RexInputRef) op0).getIndex();
+    final List<ColStatistics> colStats = 
tableScan.getColStat(Collections.singletonList(index));
+    if (colStats.isEmpty()) {
+      return cast;
+    }
+
+    // we need to check that the possible values of the input to the cast are 
all within the type range of the cast
+    // otherwise the CAST introduces some modulo-like behavior (*)
+    ColStatistics colStat = colStats.getFirst();
+    ColStatistics.Range range = colStat.getRange();
+    if (range == null || range.minValue == null || Double.isNaN(
+        range.minValue.doubleValue()) || range.maxValue == null || 
Double.isNaN(range.maxValue.doubleValue())) {
+      return cast;
+    }
+
+    SqlTypeName type = cast.getType().getSqlTypeName();
+
+    double min;
+    double max;
+    switch (type) {
+    case TINYINT, SMALLINT, INTEGER, BIGINT:
+      min = ((Number) type.getLimit(false, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      max = ((Number) type.getLimit(true, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      break;
+    case TIMESTAMP, DATE:
+      min = Long.MIN_VALUE;
+      max = Long.MAX_VALUE;
+      break;
+    case FLOAT:
+      min = -Float.MAX_VALUE;
+      max = Float.MAX_VALUE;
+      break;
+    case DOUBLE, DECIMAL:
+      min = -Double.MAX_VALUE;
+      max = Double.MAX_VALUE;
+      break;
+    default:
+      // unknown type, do not remove the cast
+      return cast;
+    }
+
+    // see (*)
+    if (range.minValue.doubleValue() < min || range.maxValue.doubleValue() > 
max) {
+      return cast;
+    }
+
+    if (type == SqlTypeName.DECIMAL) {
+      adjustBoundariesForDecimal(cast, rangeBoundaries, typeBoundaries);
+    }
+
+    return op0;
+  }
+
+  /**
+   * Adjust the boundaries for a DECIMAL cast.
+   *
+   * @param rangeBoundaries boundaries of the range predicate
+   * @param typeBoundaries if not null, will be set to the boundaries of the 
type range
+   */
+  private static void adjustBoundariesForDecimal(RexCall cast, 
MutableObject<FloatInterval> rangeBoundaries,
+      MutableObject<FloatInterval> typeBoundaries) {
+    // values outside the representable range are cast to NULL, so adapt the 
boundaries
+    int precision = cast.getType().getPrecision();
+    int scale = cast.getType().getScale();
+    int digits = precision - scale;
+    // the cast does some rounding, i.e., CAST(99.9499 AS DECIMAL(3,1)) = 99.9
+    // but CAST(99.95 AS DECIMAL(3,1)) = NULL
+    float adjust = (float) (5 * Math.pow(10, -(scale + 1)));
+    // the range of values supported by the type is interval 
[-typeRangeExtent, typeRangeExtent] (both inclusive)
+    // e.g., the typeRangeExt is 99.94999 for DECIMAL(3,1)
+    float typeRangeExtent = Math.nextDown((float) (Math.pow(10, digits) - 
adjust));
+
+    FloatInterval range = rangeBoundaries.getValue();
+    // the resulting value of +- adjust would be rounded up, so in some cases 
we need to use Math.nextDown
+    float adjusted1 = range.lowerInclusive ? range.lower - adjust : 
Math.nextDown(range.lower + adjust);
+    float adjusted2 = range.upperInclusive ? Math.nextDown(range.upper + 
adjust) : range.upper - adjust;
+
+    float lowerUniverse = range.lowerInclusive ? -typeRangeExtent : 
Math.nextDown(-typeRangeExtent);
+    float upperUniverse = range.upperInclusive ? typeRangeExtent : 
Math.nextUp(typeRangeExtent);
+    float lower = Math.max(adjusted1, lowerUniverse);
+    float upper = Math.min(adjusted2, upperUniverse);
+    rangeBoundaries.setValue(range.withValues(lower, upper));
+    if (typeBoundaries != null) {
+      typeBoundaries.setValue(
+          new FloatInterval(lowerUniverse, range.lowerInclusive, 
upperUniverse, range.upperInclusive));
+    }
+  }
+
   private double computeRangePredicateSelectivity(RexCall call, SqlKind op) {
-    final boolean isLiteralLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.LITERAL);
-    final boolean isLiteralRight = 
call.getOperands().get(1).getKind().equals(SqlKind.LITERAL);
-    final boolean isInputRefLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.INPUT_REF);
-    final boolean isInputRefRight = 
call.getOperands().get(1).getKind().equals(SqlKind.INPUT_REF);
+    double defaultSelectivity = ((double) 1 / (double) 3);
+    if (!(childRel instanceof HiveTableScan)) {
+      return defaultSelectivity;
+    }
 
-    if (childRel instanceof HiveTableScan && isLiteralLeft != isLiteralRight 
&& isInputRefLeft != isInputRefRight) {
-      final HiveTableScan t = (HiveTableScan) childRel;
-      final int inputRefIndex = ((RexInputRef) 
call.getOperands().get(isInputRefLeft ? 0 : 1)).getIndex();
-      final List<ColStatistics> colStats = 
t.getColStat(Collections.singletonList(inputRefIndex));
+    // search for the literal
+    List<RexNode> operands = call.getOperands();
+    final Optional<Float> leftLiteral = extractLiteral(operands.get(0));
+    final Optional<Float> rightLiteral = extractLiteral(operands.get(1));
+    if ((leftLiteral.isPresent()) == (rightLiteral.isPresent())) {
+      return defaultSelectivity;
+    }

Review Comment:
   This retains the existing behavior before the patch but is a bit strange for 
two reasons. Firstly, I suppose that if we have a range expression with two 
literals (e.g., `5 < 8`) then it can be either true or false and we know this 
directly so we could return 1 or 0 respectively. Secondly, this kind of 
expressions should be constant folded so not even sure if we can hit this ever.
   Anyways, we can treat it as a follow-up but just wanted to highlight it  
since it caught my attention.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +203,284 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * If the cast can be removed, just return its operand and adjust the 
boundaries if necessary.
+   *
+   * <p>
+   *   In Hive, if a value cannot be represented by the cast, the result of 
the cast is NULL,
+   *   and therefore cannot fulfill the predicate. So the possible range of 
the values
+   *   is limited by the range of possible values of the type.
+   * </p>
+   *
+   * <p>
+   *   Special care is taken to support the cast to DECIMAL(precision, scale):
+   *   The cast to DECIMAL rounds the value the same way as {@link 
RoundingMode#HALF_UP}.
+   *   The boundaries are adjusted accordingly.
+   * </p>
+   *
+   * @param cast a RexCall of type {@link SqlKind#CAST}
+   * @param tableScan the table that provides the statistics
+   * @param rangeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @param typeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @return the operand if the cast can be removed, otherwise the cast itself
+   */
+  private RexNode removeCastIfPossible(RexCall cast, HiveTableScan tableScan,
+      MutableObject<FloatInterval> rangeBoundaries, 
MutableObject<FloatInterval> typeBoundaries) {
+    RexNode op0 = cast.getOperands().getFirst();
+    if (!(op0 instanceof RexInputRef)) {
+      return cast;
+    }
+    int index = ((RexInputRef) op0).getIndex();
+    final List<ColStatistics> colStats = 
tableScan.getColStat(Collections.singletonList(index));
+    if (colStats.isEmpty()) {
+      return cast;
+    }
+
+    // we need to check that the possible values of the input to the cast are 
all within the type range of the cast
+    // otherwise the CAST introduces some modulo-like behavior (*)
+    ColStatistics colStat = colStats.getFirst();
+    ColStatistics.Range range = colStat.getRange();
+    if (range == null || range.minValue == null || Double.isNaN(
+        range.minValue.doubleValue()) || range.maxValue == null || 
Double.isNaN(range.maxValue.doubleValue())) {
+      return cast;
+    }
+
+    SqlTypeName type = cast.getType().getSqlTypeName();
+
+    double min;
+    double max;
+    switch (type) {
+    case TINYINT, SMALLINT, INTEGER, BIGINT:
+      min = ((Number) type.getLimit(false, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      max = ((Number) type.getLimit(true, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      break;
+    case TIMESTAMP, DATE:
+      min = Long.MIN_VALUE;
+      max = Long.MAX_VALUE;
+      break;
+    case FLOAT:
+      min = -Float.MAX_VALUE;
+      max = Float.MAX_VALUE;
+      break;
+    case DOUBLE, DECIMAL:
+      min = -Double.MAX_VALUE;
+      max = Double.MAX_VALUE;
+      break;
+    default:
+      // unknown type, do not remove the cast
+      return cast;
+    }
+
+    // see (*)
+    if (range.minValue.doubleValue() < min || range.maxValue.doubleValue() > 
max) {
+      return cast;
+    }
+
+    if (type == SqlTypeName.DECIMAL) {
+      adjustBoundariesForDecimal(cast, rangeBoundaries, typeBoundaries);
+    }
+
+    return op0;
+  }
+
+  /**
+   * Adjust the boundaries for a DECIMAL cast.
+   *
+   * @param rangeBoundaries boundaries of the range predicate
+   * @param typeBoundaries if not null, will be set to the boundaries of the 
type range
+   */
+  private static void adjustBoundariesForDecimal(RexCall cast, 
MutableObject<FloatInterval> rangeBoundaries,
+      MutableObject<FloatInterval> typeBoundaries) {
+    // values outside the representable range are cast to NULL, so adapt the 
boundaries
+    int precision = cast.getType().getPrecision();
+    int scale = cast.getType().getScale();
+    int digits = precision - scale;
+    // the cast does some rounding, i.e., CAST(99.9499 AS DECIMAL(3,1)) = 99.9
+    // but CAST(99.95 AS DECIMAL(3,1)) = NULL
+    float adjust = (float) (5 * Math.pow(10, -(scale + 1)));
+    // the range of values supported by the type is interval 
[-typeRangeExtent, typeRangeExtent] (both inclusive)
+    // e.g., the typeRangeExt is 99.94999 for DECIMAL(3,1)
+    float typeRangeExtent = Math.nextDown((float) (Math.pow(10, digits) - 
adjust));
+
+    FloatInterval range = rangeBoundaries.getValue();
+    // the resulting value of +- adjust would be rounded up, so in some cases 
we need to use Math.nextDown
+    float adjusted1 = range.lowerInclusive ? range.lower - adjust : 
Math.nextDown(range.lower + adjust);
+    float adjusted2 = range.upperInclusive ? Math.nextDown(range.upper + 
adjust) : range.upper - adjust;
+
+    float lowerUniverse = range.lowerInclusive ? -typeRangeExtent : 
Math.nextDown(-typeRangeExtent);
+    float upperUniverse = range.upperInclusive ? typeRangeExtent : 
Math.nextUp(typeRangeExtent);
+    float lower = Math.max(adjusted1, lowerUniverse);
+    float upper = Math.min(adjusted2, upperUniverse);
+    rangeBoundaries.setValue(range.withValues(lower, upper));
+    if (typeBoundaries != null) {
+      typeBoundaries.setValue(
+          new FloatInterval(lowerUniverse, range.lowerInclusive, 
upperUniverse, range.upperInclusive));
+    }
+  }
+
   private double computeRangePredicateSelectivity(RexCall call, SqlKind op) {
-    final boolean isLiteralLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.LITERAL);
-    final boolean isLiteralRight = 
call.getOperands().get(1).getKind().equals(SqlKind.LITERAL);
-    final boolean isInputRefLeft = 
call.getOperands().get(0).getKind().equals(SqlKind.INPUT_REF);
-    final boolean isInputRefRight = 
call.getOperands().get(1).getKind().equals(SqlKind.INPUT_REF);
+    double defaultSelectivity = ((double) 1 / (double) 3);
+    if (!(childRel instanceof HiveTableScan)) {
+      return defaultSelectivity;
+    }
 
-    if (childRel instanceof HiveTableScan && isLiteralLeft != isLiteralRight 
&& isInputRefLeft != isInputRefRight) {
-      final HiveTableScan t = (HiveTableScan) childRel;
-      final int inputRefIndex = ((RexInputRef) 
call.getOperands().get(isInputRefLeft ? 0 : 1)).getIndex();
-      final List<ColStatistics> colStats = 
t.getColStat(Collections.singletonList(inputRefIndex));
+    // search for the literal
+    List<RexNode> operands = call.getOperands();
+    final Optional<Float> leftLiteral = extractLiteral(operands.get(0));
+    final Optional<Float> rightLiteral = extractLiteral(operands.get(1));
+    if ((leftLiteral.isPresent()) == (rightLiteral.isPresent())) {
+      return defaultSelectivity;
+    }
+    int literalOpIdx = leftLiteral.isPresent() ? 0 : 1;
+
+    // analyze the predicate
+    float value = leftLiteral.orElseGet(rightLiteral::get);
+    int boundaryIdx;
+    boolean openBound = op == SqlKind.LESS_THAN || op == SqlKind.GREATER_THAN;
+    switch (op) {
+    case LESS_THAN, LESS_THAN_OR_EQUAL:
+      boundaryIdx = literalOpIdx;
+      break;
+    case GREATER_THAN, GREATER_THAN_OR_EQUAL:
+      boundaryIdx = 1 - literalOpIdx;
+      break;
+    default:
+      return defaultSelectivity;
+    }
+    float[] boundaryValues = new float[] { Float.NEGATIVE_INFINITY, 
Float.POSITIVE_INFINITY };
+    boolean[] inclusive = new boolean[] { true, true };
+    boundaryValues[boundaryIdx] = value;
+    inclusive[boundaryIdx] = !openBound;
+    MutableObject<FloatInterval> boundaries =
+        new MutableObject<>(new FloatInterval(boundaryValues[0], inclusive[0], 
boundaryValues[1], inclusive[1]));
+
+    // extract the column index from the other operator
+    final HiveTableScan scan = (HiveTableScan) childRel;
+    int inputRefOpIndex = 1 - literalOpIdx;
+    RexNode node = operands.get(inputRefOpIndex);
+    if (node.getKind().equals(SqlKind.CAST)) {
+      node = removeCastIfPossible((RexCall) node, scan, boundaries, null);
+    }
 
-      if (!colStats.isEmpty() && isHistogramAvailable(colStats.get(0))) {
-        final KllFloatsSketch kll = 
KllFloatsSketch.heapify(Memory.wrap(colStats.get(0).getHistogram()));
-        final Object boundValueObject = ((RexLiteral) 
call.getOperands().get(isLiteralLeft ? 0 : 1)).getValue();
-        final SqlTypeName typeName = call.getOperands().get(isInputRefLeft ? 0 
: 1).getType().getSqlTypeName();
-        float value = extractLiteral(typeName, boundValueObject);
-        boolean closedBound = op.equals(SqlKind.LESS_THAN_OR_EQUAL) || 
op.equals(SqlKind.GREATER_THAN_OR_EQUAL);
-
-        double selectivity;
-        if (op.equals(SqlKind.LESS_THAN_OR_EQUAL) || 
op.equals(SqlKind.LESS_THAN)) {
-          selectivity = closedBound ? lessThanOrEqualSelectivity(kll, value) : 
lessThanSelectivity(kll, value);
-        } else {
-          selectivity = closedBound ? greaterThanOrEqualSelectivity(kll, 
value) : greaterThanSelectivity(kll, value);
-        }
+    int inputRefIndex = -1;
+    if (node.getKind().equals(SqlKind.INPUT_REF)) {
+      inputRefIndex = ((RexInputRef) node).getIndex();
+    }
 
-        // selectivity does not account for null values, we multiply for the 
number of non-null values (getN)
-        // and we divide by the total (non-null + null values) to get the 
overall selectivity.
-        //
-        // Example: consider a filter "col < 3", and the following table rows:
-        //  _____
-        // | col |
-        // |_____|
-        // |1    |
-        // |null |
-        // |null |
-        // |3    |
-        // |4    |
-        // -------
-        // kll.getN() would be 3, selectivity 1/3, t.getTable().getRowCount() 5
-        // so the final result would be 3 * 1/3 / 5 = 1/5, as expected.
-        return kll.getN() * selectivity / t.getTable().getRowCount();
-      }
+    if (inputRefIndex < 0) {
+      return defaultSelectivity;
+    }
+
+    final List<ColStatistics> colStats = 
scan.getColStat(Collections.singletonList(inputRefIndex));
+    if (colStats.isEmpty() || !isHistogramAvailable(colStats.get(0))) {
+      return defaultSelectivity;
     }
-    return ((double) 1 / (double) 3);
+
+    final KllFloatsSketch kll = 
KllFloatsSketch.heapify(Memory.wrap(colStats.get(0).getHistogram()));
+    // convert the condition to a range val1 <= x < val2 for 
rangedSelectivity(...)
+    double rawSelectivity = rangedSelectivity(kll, boundaries.getValue());
+    return scaleSelectivityToNullableValues(kll, rawSelectivity, scan);
+  }
+
+  /**
+   * Adjust the selectivity estimate to take NULL values into account.
+   * <p>
+   * The rawSelectivity does not account for null values. We multiply with the 
number of non-null values (getN)
+   * and we divide by the total number (non-null + null values) to get the 
overall selectivity.
+   * <p>
+   * Example: consider a filter "col < 3", and the following table rows:
+   * <pre>
+   *  _____
+   * | col |
+   * |_____|
+   * |1    |
+   * |null |
+   * |null |
+   * |3    |
+   * |4    |
+   * -------
+   * </pre>
+   * kll.getN() would be 3, rawSelectivity 1/3, scan.getTable().getRowCount() 5
+   * so the final result would be 3 * 1/3 / 5 = 1/5, as expected.
+   */
+  private static double scaleSelectivityToNullableValues(KllFloatsSketch kll, 
double rawSelectivity,
+      HiveTableScan scan) {
+    if (scan.getTable() == null) {
+      return rawSelectivity;
+    }
+    return kll.getN() * rawSelectivity / scan.getTable().getRowCount();
   }
 
   private Double computeBetweenPredicateSelectivity(RexCall call) {
-    final boolean hasLiteralBool = 
call.getOperands().get(0).getKind().equals(SqlKind.LITERAL);
-    final boolean hasInputRef = 
call.getOperands().get(1).getKind().equals(SqlKind.INPUT_REF);
-    final boolean hasLiteralLeft = 
call.getOperands().get(2).getKind().equals(SqlKind.LITERAL);
-    final boolean hasLiteralRight = 
call.getOperands().get(3).getKind().equals(SqlKind.LITERAL);
+    if (!(childRel instanceof HiveTableScan)) {
+      return computeFunctionSelectivity(call);
+    }
+
+    List<RexNode> operands = call.getOperands();
+    final boolean hasLiteralBool = 
operands.get(0).getKind().equals(SqlKind.LITERAL);
+    Optional<Float> leftLiteral = extractLiteral(operands.get(2));
+    Optional<Float> rightLiteral = extractLiteral(operands.get(3));
+
+    if (hasLiteralBool && leftLiteral.isPresent() && rightLiteral.isPresent()) 
{
+      final HiveTableScan scan = (HiveTableScan) childRel;
+      float leftValue = leftLiteral.get();
+      float rightValue = rightLiteral.get();
+
+      final Object inverseBoolValueObject = ((RexLiteral) 
operands.getFirst()).getValue();
+      boolean inverseBool = 
Boolean.parseBoolean(inverseBoolValueObject.toString());
+      // when they are equal it's an equality predicate, we cannot handle it 
as "BETWEEN"
+      if (Objects.equals(leftValue, rightValue)) {
+        return inverseBool ? computeNotEqualitySelectivity(call) : 
computeFunctionSelectivity(call);
+      }

Review Comment:
   This is a simplification that happens already or should happen elsewhere. 
Given that previous version had this code as well we can consider the removal 
in a follow-up.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +203,284 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * If the cast can be removed, just return its operand and adjust the 
boundaries if necessary.
+   *
+   * <p>
+   *   In Hive, if a value cannot be represented by the cast, the result of 
the cast is NULL,
+   *   and therefore cannot fulfill the predicate. So the possible range of 
the values
+   *   is limited by the range of possible values of the type.
+   * </p>
+   *
+   * <p>
+   *   Special care is taken to support the cast to DECIMAL(precision, scale):
+   *   The cast to DECIMAL rounds the value the same way as {@link 
RoundingMode#HALF_UP}.
+   *   The boundaries are adjusted accordingly.
+   * </p>
+   *
+   * @param cast a RexCall of type {@link SqlKind#CAST}
+   * @param tableScan the table that provides the statistics
+   * @param rangeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @param typeBoundaries see {@link #adjustBoundariesForDecimal(RexCall, 
MutableObject, MutableObject)}; might get modified
+   * @return the operand if the cast can be removed, otherwise the cast itself
+   */
+  private RexNode removeCastIfPossible(RexCall cast, HiveTableScan tableScan,
+      MutableObject<FloatInterval> rangeBoundaries, 
MutableObject<FloatInterval> typeBoundaries) {
+    RexNode op0 = cast.getOperands().getFirst();
+    if (!(op0 instanceof RexInputRef)) {
+      return cast;
+    }
+    int index = ((RexInputRef) op0).getIndex();
+    final List<ColStatistics> colStats = 
tableScan.getColStat(Collections.singletonList(index));
+    if (colStats.isEmpty()) {
+      return cast;
+    }
+
+    // we need to check that the possible values of the input to the cast are 
all within the type range of the cast
+    // otherwise the CAST introduces some modulo-like behavior (*)
+    ColStatistics colStat = colStats.getFirst();
+    ColStatistics.Range range = colStat.getRange();
+    if (range == null || range.minValue == null || Double.isNaN(
+        range.minValue.doubleValue()) || range.maxValue == null || 
Double.isNaN(range.maxValue.doubleValue())) {
+      return cast;
+    }
+
+    SqlTypeName type = cast.getType().getSqlTypeName();
+
+    double min;
+    double max;
+    switch (type) {
+    case TINYINT, SMALLINT, INTEGER, BIGINT:
+      min = ((Number) type.getLimit(false, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      max = ((Number) type.getLimit(true, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
+      break;
+    case TIMESTAMP, DATE:
+      min = Long.MIN_VALUE;
+      max = Long.MAX_VALUE;
+      break;
+    case FLOAT:
+      min = -Float.MAX_VALUE;
+      max = Float.MAX_VALUE;
+      break;
+    case DOUBLE, DECIMAL:
+      min = -Double.MAX_VALUE;
+      max = Double.MAX_VALUE;
+      break;
+    default:
+      // unknown type, do not remove the cast
+      return cast;
+    }
+
+    // see (*)
+    if (range.minValue.doubleValue() < min || range.maxValue.doubleValue() > 
max) {
+      return cast;
+    }
+
+    if (type == SqlTypeName.DECIMAL) {
+      adjustBoundariesForDecimal(cast, rangeBoundaries, typeBoundaries);
+    }
+
+    return op0;
+  }
+
+  /**
+   * Adjust the boundaries for a DECIMAL cast.
+   *
+   * @param rangeBoundaries boundaries of the range predicate
+   * @param typeBoundaries if not null, will be set to the boundaries of the 
type range
+   */
+  private static void adjustBoundariesForDecimal(RexCall cast, 
MutableObject<FloatInterval> rangeBoundaries,
+      MutableObject<FloatInterval> typeBoundaries) {

Review Comment:
   I wanted to check if it is possible to simplify this method such that we:
   * avoid the `MutableObject `
   * make the mutation explicit by returning object
   * separate/decouple handling of range and type boundaries
   
   The signature would become like:
   ```java
   private static FloatInterval adjustBoundariesForDecimal(RexCall cast, 
FloatInterval boundaries);
   ```
   I may give it a bit more though once we finalize the remaining points.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to