This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 52d1adc  [FLINK-20366][table-planner-blink] 
ColumnIntervalUtil#getColumnIntervalWithFilter should consider constant 
predicate
52d1adc is described below

commit 52d1adcfd159bf8e952818eb08baddc568d7658a
Author: godfrey he <[email protected]>
AuthorDate: Fri Nov 27 17:30:52 2020 +0800

    [FLINK-20366][table-planner-blink] 
ColumnIntervalUtil#getColumnIntervalWithFilter should consider constant 
predicate
    
    (cherry picked from commit 7bf76c0b41a68ace751d4af48efc1edc2ed2d6c7)
---
 .../planner/plan/utils/ColumnIntervalUtil.scala    | 28 ++++---
 .../{ => plan}/utils/ColumnIntervalUtilTest.scala  | 89 +++++++++++++++++++++-
 2 files changed, 106 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
index 5bb762d..ee1703d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
@@ -19,8 +19,7 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.planner.plan.stats._
-import 
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.ColumnRelatedVisitor
-import 
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.getLiteralValueByBroadType
+import 
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.{ColumnRelatedVisitor,
 getLiteralValueByBroadType}
 
 import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, 
RexNode, RexUtil}
 import org.apache.calcite.sql.SqlKind
@@ -210,13 +209,24 @@ object ColumnIntervalUtil {
     }
     val interval = relatedSubRexNode match {
       case Some(rexNode) =>
-        val orParts = RexUtil.flattenOr(Vector(RexUtil.toDnf(rexBuilder, 
rexNode)))
-        orParts.map(or => {
-          val andParts = RexUtil.flattenAnd(Vector(or))
-          val andIntervals = andParts.map(and => 
columnIntervalOfSinglePredicate(and))
-          val res = andIntervals.filter(_ != 
null).foldLeft(beginInterval)(ValueInterval.intersect)
-          res
-        }).reduceLeft(ValueInterval.union)
+        if (rexNode.isAlwaysTrue) {
+          beginInterval
+        } else if (rexNode.isAlwaysFalse) {
+          ValueInterval.empty
+        } else if (RexUtil.isConstant(rexNode)) {
+          // this should not happen, just protect the following code
+          ValueInterval.infinite
+        } else {
+          val orParts = RexUtil.flattenOr(Vector(RexUtil.toDnf(rexBuilder, 
rexNode)))
+          orParts.map(or => {
+            val andParts = RexUtil.flattenAnd(Vector(or))
+            val andIntervals = andParts.map(and => 
columnIntervalOfSinglePredicate(and))
+            val res = andIntervals
+              .filter(_ != null)
+              .foldLeft(beginInterval)(ValueInterval.intersect)
+            res
+          }).reduceLeft(ValueInterval.union)
+        }
       case _ => beginInterval
     }
     if (interval == ValueInterval.infinite) null else interval
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtilTest.scala
similarity index 70%
rename from 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
rename to 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtilTest.scala
index 183634c..cf1964c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtilTest.scala
@@ -16,12 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.utils
+package org.apache.flink.table.planner.plan.utils
 
+import org.apache.flink.table.planner.calcite.{FlinkRexBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.planner.plan.stats._
 import org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil._
 
-import org.junit.Assert.{assertEquals, assertTrue}
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.junit.Assert.{assertEquals, assertNull}
 import org.junit.Test
 
 import java.lang
@@ -189,4 +193,85 @@ class ColumnIntervalUtilTest {
     )
   }
 
+  @Test
+  def testGetColumnIntervalWithFilter(): Unit = {
+    val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(new 
FlinkTypeSystem)
+    val rexBuilder: RexBuilder = new FlinkRexBuilder(typeFactory)
+
+    // ($1 >= 1 and $1 < 10) or (not($1 > 5)
+    val predicate = rexBuilder.makeCall(
+      SqlStdOperatorTable.OR,
+      rexBuilder.makeCall(
+        SqlStdOperatorTable.AND,
+        rexBuilder.makeCall(
+          SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+          
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.BIGINT), 1),
+          rexBuilder.makeBigintLiteral(java.math.BigDecimal.valueOf(1))),
+        rexBuilder.makeCall(
+          SqlStdOperatorTable.LESS_THAN,
+          
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.BIGINT), 1),
+          rexBuilder.makeBigintLiteral(java.math.BigDecimal.valueOf(10)))),
+      rexBuilder.makeCall(
+        SqlStdOperatorTable.NOT,
+        rexBuilder.makeCall(
+          SqlStdOperatorTable.GREATER_THAN,
+          
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.BIGINT), 1),
+          rexBuilder.makeBigintLiteral(java.math.BigDecimal.valueOf(5))))
+    )
+
+    assertEquals(
+      toBigDecimalInterval(ValueInterval.apply(null, 10L, includeUpper = 
false)),
+      ColumnIntervalUtil.getColumnIntervalWithFilter(
+        None,
+        predicate,
+        1,
+        rexBuilder))
+
+    assertEquals(
+      toBigDecimalInterval(ValueInterval.apply(3L, 8L, includeLower = false, 
includeUpper = false)),
+      ColumnIntervalUtil.getColumnIntervalWithFilter(
+        Some(toBigDecimalInterval(
+          ValueInterval.apply(3L, 8L, includeLower = false, includeUpper = 
false))),
+        predicate,
+        1,
+        rexBuilder))
+
+    assertEquals(
+      ValueInterval.empty,
+      ColumnIntervalUtil.getColumnIntervalWithFilter(
+        None,
+        rexBuilder.makeLiteral(false),
+        0,
+        rexBuilder))
+
+    assertEquals(
+      ValueInterval.empty,
+      ColumnIntervalUtil.getColumnIntervalWithFilter(
+        Some(ValueInterval.apply(1L, 10L)),
+        rexBuilder.makeLiteral(false),
+        0,
+        rexBuilder))
+
+    assertNull(
+      ColumnIntervalUtil.getColumnIntervalWithFilter(
+        None,
+        rexBuilder.makeLiteral(true),
+        0,
+        rexBuilder))
+
+    assertEquals(
+      ValueInterval.apply(1L, 10L),
+      ColumnIntervalUtil.getColumnIntervalWithFilter(
+        Some(ValueInterval.apply(1L, 10L)),
+        rexBuilder.makeLiteral(true),
+        0,
+        rexBuilder))
+
+    assertNull(
+      ColumnIntervalUtil.getColumnIntervalWithFilter(
+        None,
+        rexBuilder.makeBigintLiteral(java.math.BigDecimal.ONE),
+        0,
+        rexBuilder))
+  }
 }

Reply via email to