This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new faf7cc4  [FLINK-22015][table-planner-blink] Exclude IS NULL from 
SEARCH operators (#16140)
faf7cc4 is described below

commit faf7cc43beebce3fee528ec5637e9387b95bec99
Author: TsReaper <tsreape...@gmail.com>
AuthorDate: Fri Jun 11 18:19:37 2021 +0800

    [FLINK-22015][table-planner-blink] Exclude IS NULL from SEARCH operators 
(#16140)
---
 .../java/org/apache/calcite/rex/RexSimplify.java   | 28 ++++++++---------
 .../table/planner/plan/batch/sql/CalcTest.xml      | 35 ++++++++++++++++++++++
 .../table/planner/plan/stream/sql/CalcTest.xml     | 35 ++++++++++++++++++++++
 .../table/planner/plan/batch/sql/CalcTest.scala    | 10 +++++++
 .../table/planner/plan/stream/sql/CalcTest.scala   |  9 ++++++
 .../planner/runtime/batch/sql/CalcITCase.scala     | 34 +++++++++++++++++++++
 6 files changed, 136 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java
index 57b9fb7..ea59497 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java
@@ -68,9 +68,14 @@ import static org.apache.calcite.rex.RexUnknownAs.UNKNOWN;
 /**
  * Context required to simplify a row-expression.
  *
- * <p>Copied to fix CALCITE-4364, should be removed for the next Calcite 
upgrade.
+ * <p>Copied to fix Calcite 1.26 bugs, should be removed for the next Calcite 
upgrade.
  *
- * <p>Changes: Line 1307, Line 1764, Line 2638 ~ Line 2656.
+ * <p>Changes (line numbers are from the original RexSimplify file):
+ *
+ * <ol>
+ *   <li>CALCITE-4364 & FLINK-19811: Line 1307, Line 1764, Line 2638 ~ Line 
2656.
+ *   <li>CALCITE-4446 & FLINK-22015: Line 2542 ~ Line 2548, Line 2614 ~ Line 
2619.
+ * </ol>
  */
 public class RexSimplify {
     private final boolean paranoid;
@@ -2669,13 +2674,9 @@ public class RexSimplify {
                             ((RexCall) e).operands.get(1),
                             e.getKind(),
                             newTerms);
-                case IS_NULL:
-                    if (negate) {
-                        return false;
-                    }
-                    final RexNode arg = ((RexCall) e).operands.get(0);
-                    return accept1(
-                            arg, e.getKind(), 
rexBuilder.makeNullLiteral(arg.getType()), newTerms);
+                    // CHANGED: we remove IS_NULL here
+                    // because SEARCH operator in Calcite 1.26 handles 
UNKNOWNs incorrectly
+                    // see CALCITE-4446
                 default:
                     return false;
             }
@@ -2741,12 +2742,9 @@ public class RexSimplify {
                     final Sarg sarg = literal.getValueAs(Sarg.class);
                     b.addSarg(sarg, negate, literal.getType());
                     return true;
-                case IS_NULL:
-                    if (negate) {
-                        throw new AssertionError("negate is not supported for 
IS_NULL");
-                    }
-                    b.containsNull = true;
-                    return true;
+                    // CHANGED: we remove IS_NULL here
+                    // because SEARCH operator in Calcite 1.26 handles 
UNKNOWNs incorrectly
+                    // see CALCITE-4446
                 default:
                     throw new AssertionError("unexpected " + kind);
             }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
index f8ebe6b..3eb1d5e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
@@ -374,4 +374,39 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testOrWithIsNullPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 10), IS NULL($0))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, b, c], where=[OR(IS NULL(a), SEARCH(a, Sarg[1L:BIGINT, 
10L:BIGINT]:BIGINT))])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOrWithIsNullInIf">
+    <Resource name="sql">
+      <![CDATA[SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[IF(OR(=($2, _UTF-16LE''), IS NULL($2)), _UTF-16LE'a', 
_UTF-16LE'b')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[IF(OR(=(c, _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), IS NULL(c)), _UTF-16LE'a', _UTF-16LE'b') AS EXPR$0])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index 32045fb..6b6f346 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -375,4 +375,39 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testOrWithIsNullPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 10), IS NULL($0))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, b, c], where=[OR(IS NULL(a), SEARCH(a, Sarg[1L:BIGINT, 
10L:BIGINT]:BIGINT))])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOrWithIsNullInIf">
+    <Resource name="sql">
+      <![CDATA[SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[IF(OR(=($2, _UTF-16LE''), IS NULL($2)), _UTF-16LE'a', 
_UTF-16LE'b')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[IF(OR(=(c, _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), IS NULL(c)), _UTF-16LE'a', _UTF-16LE'b') AS EXPR$0])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
index 8d3130f..036a3f6 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
@@ -161,4 +161,14 @@ class CalcTest extends TableTestBase {
   def testCollationDeriveOnCalc(): Unit = {
     util.verifyPlan("SELECT CAST(a AS INT), CAST(b AS VARCHAR) FROM (VALUES 
(3, 'c')) T(a,b)")
   }
+
+  @Test
+  def testOrWithIsNullPredicate(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL")
+  }
+
+  @Test
+  def testOrWithIsNullInIf(): Unit = {
+    util.verifyPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable")
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
index 14380cf..9818aa0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
@@ -161,4 +161,13 @@ class CalcTest extends TableTestBase {
       "WHERE (a, b, c) = ('foo', 12, TIMESTAMP '1984-07-12 14:34:24')")
   }
 
+  @Test
+  def testOrWithIsNullPredicate(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL")
+  }
+
+  @Test
+  def testOrWithIsNullInIf(): Unit = {
+    util.verifyPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable")
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 5ea0810..6504177 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1379,4 +1379,38 @@ class CalcITCase extends BatchTestBase {
         row(localDateTime("2021-03-30 10:00:00"), localDateTime("2023-03-30 
09:59:59")),
         row(localDateTime("2021-03-30 10:00:00"), localDateTime("2023-03-30 
10:00:00"))))
   }
+
+  @Test
+  def testOrWithIsNullPredicate(): Unit = {
+    checkResult(
+      """
+        |SELECT * FROM NullTable3 AS T
+        |WHERE T.a = 1 OR T.a = 3 OR T.a IS NULL
+        |""".stripMargin,
+      Seq(
+        row(1, 1L, "Hi"),
+        row(3, 2L, "Hello world"),
+        row(null, 999L, "NullTuple"),
+        row(null, 999L, "NullTuple")))
+  }
+
+  @Test
+  def testOrWithIsNullInIf(): Unit = {
+    val data = Seq(
+      row("", "N"),
+      row("X", "Y"),
+      row(null, "Y"))
+    registerCollection(
+      "MyTable", data, new RowTypeInfo(STRING_TYPE_INFO, STRING_TYPE_INFO), 
"a, b")
+
+    checkResult(
+      "SELECT IF(a = '', 'a', 'b') FROM MyTable",
+      Seq(row('a'), row('b'), row('b')))
+    checkResult(
+      "SELECT IF(a IS NULL, 'a', 'b') FROM MyTable",
+      Seq(row('b'), row('b'), row('a')))
+    checkResult(
+      "SELECT IF(a = '' OR a IS NULL, 'a', 'b') FROM MyTable",
+      Seq(row('a'), row('b'), row('a')))
+  }
 }

Reply via email to