This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push: new faf7cc4 [FLINK-22015][table-planner-blink] Exclude IS NULL from SEARCH operators (#16140) faf7cc4 is described below commit faf7cc43beebce3fee528ec5637e9387b95bec99 Author: TsReaper <tsreape...@gmail.com> AuthorDate: Fri Jun 11 18:19:37 2021 +0800 [FLINK-22015][table-planner-blink] Exclude IS NULL from SEARCH operators (#16140) --- .../java/org/apache/calcite/rex/RexSimplify.java | 28 ++++++++--------- .../table/planner/plan/batch/sql/CalcTest.xml | 35 ++++++++++++++++++++++ .../table/planner/plan/stream/sql/CalcTest.xml | 35 ++++++++++++++++++++++ .../table/planner/plan/batch/sql/CalcTest.scala | 10 +++++++ .../table/planner/plan/stream/sql/CalcTest.scala | 9 ++++++ .../planner/runtime/batch/sql/CalcITCase.scala | 34 +++++++++++++++++++++ 6 files changed, 136 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java index 57b9fb7..ea59497 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rex/RexSimplify.java @@ -68,9 +68,14 @@ import static org.apache.calcite.rex.RexUnknownAs.UNKNOWN; /** * Context required to simplify a row-expression. * - * <p>Copied to fix CALCITE-4364, should be removed for the next Calcite upgrade. + * <p>Copied to fix Calcite 1.26 bugs, should be removed for the next Calcite upgrade. * - * <p>Changes: Line 1307, Line 1764, Line 2638 ~ Line 2656. + * <p>Changes (line numbers are from the original RexSimplify file): + * + * <ol> + * <li>CALCITE-4364 & FLINK-19811: Line 1307, Line 1764, Line 2638 ~ Line 2656. + * <li>CALCITE-4446 & FLINK-22015: Line 2542 ~ Line 2548, Line 2614 ~ Line 2619. + * </ol> */ public class RexSimplify { private final boolean paranoid; @@ -2669,13 +2674,9 @@ public class RexSimplify { ((RexCall) e).operands.get(1), e.getKind(), newTerms); - case IS_NULL: - if (negate) { - return false; - } - final RexNode arg = ((RexCall) e).operands.get(0); - return accept1( - arg, e.getKind(), rexBuilder.makeNullLiteral(arg.getType()), newTerms); + // CHANGED: we remove IS_NULL here + // because SEARCH operator in Calcite 1.26 handles UNKNOWNs incorrectly + // see CALCITE-4446 default: return false; } @@ -2741,12 +2742,9 @@ public class RexSimplify { final Sarg sarg = literal.getValueAs(Sarg.class); b.addSarg(sarg, negate, literal.getType()); return true; - case IS_NULL: - if (negate) { - throw new AssertionError("negate is not supported for IS_NULL"); - } - b.containsNull = true; - return true; + // CHANGED: we remove IS_NULL here + // because SEARCH operator in Calcite 1.26 handles UNKNOWNs incorrectly + // see CALCITE-4446 default: throw new AssertionError("unexpected " + kind); } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml index f8ebe6b..3eb1d5e 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml @@ -374,4 +374,39 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0]) ]]> </Resource> </TestCase> + <TestCase name="testOrWithIsNullPredicate"> + <Resource name="sql"> + <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[OR(=($0, 1), =($0, 10), IS NULL($0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a, b, c], where=[OR(IS NULL(a), SEARCH(a, Sarg[1L:BIGINT, 10L:BIGINT]:BIGINT))]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testOrWithIsNullInIf"> + <Resource name="sql"> + <![CDATA[SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[IF(OR(=($2, _UTF-16LE''), IS NULL($2)), _UTF-16LE'a', _UTF-16LE'b')]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[IF(OR(=(c, _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NULL(c)), _UTF-16LE'a', _UTF-16LE'b') AS EXPR$0]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index 32045fb..6b6f346 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -375,4 +375,39 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0]) ]]> </Resource> </TestCase> + <TestCase name="testOrWithIsNullPredicate"> + <Resource name="sql"> + <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[OR(=($0, 1), =($0, 10), IS NULL($0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a, b, c], where=[OR(IS NULL(a), SEARCH(a, Sarg[1L:BIGINT, 10L:BIGINT]:BIGINT))]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testOrWithIsNullInIf"> + <Resource name="sql"> + <![CDATA[SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[IF(OR(=($2, _UTF-16LE''), IS NULL($2)), _UTF-16LE'a', _UTF-16LE'b')]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[IF(OR(=(c, _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NULL(c)), _UTF-16LE'a', _UTF-16LE'b') AS EXPR$0]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala index 8d3130f..036a3f6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala @@ -161,4 +161,14 @@ class CalcTest extends TableTestBase { def testCollationDeriveOnCalc(): Unit = { util.verifyPlan("SELECT CAST(a AS INT), CAST(b AS VARCHAR) FROM (VALUES (3, 'c')) T(a,b)") } + + @Test + def testOrWithIsNullPredicate(): Unit = { + util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL") + } + + @Test + def testOrWithIsNullInIf(): Unit = { + util.verifyPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable") + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala index 14380cf..9818aa0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala @@ -161,4 +161,13 @@ class CalcTest extends TableTestBase { "WHERE (a, b, c) = ('foo', 12, TIMESTAMP '1984-07-12 14:34:24')") } + @Test + def testOrWithIsNullPredicate(): Unit = { + util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 10 OR a IS NULL") + } + + @Test + def testOrWithIsNullInIf(): Unit = { + util.verifyPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable") + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index 5ea0810..6504177 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -1379,4 +1379,38 @@ class CalcITCase extends BatchTestBase { row(localDateTime("2021-03-30 10:00:00"), localDateTime("2023-03-30 09:59:59")), row(localDateTime("2021-03-30 10:00:00"), localDateTime("2023-03-30 10:00:00")))) } + + @Test + def testOrWithIsNullPredicate(): Unit = { + checkResult( + """ + |SELECT * FROM NullTable3 AS T + |WHERE T.a = 1 OR T.a = 3 OR T.a IS NULL + |""".stripMargin, + Seq( + row(1, 1L, "Hi"), + row(3, 2L, "Hello world"), + row(null, 999L, "NullTuple"), + row(null, 999L, "NullTuple"))) + } + + @Test + def testOrWithIsNullInIf(): Unit = { + val data = Seq( + row("", "N"), + row("X", "Y"), + row(null, "Y")) + registerCollection( + "MyTable", data, new RowTypeInfo(STRING_TYPE_INFO, STRING_TYPE_INFO), "a, b") + + checkResult( + "SELECT IF(a = '', 'a', 'b') FROM MyTable", + Seq(row('a'), row('b'), row('b'))) + checkResult( + "SELECT IF(a IS NULL, 'a', 'b') FROM MyTable", + Seq(row('b'), row('b'), row('a'))) + checkResult( + "SELECT IF(a = '' OR a IS NULL, 'a', 'b') FROM MyTable", + Seq(row('a'), row('b'), row('a'))) + } }