This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new 265d8174165 [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal 265d8174165 is described below commit 265d81741654ff70836485e8e8d31ce33a87e960 Author: zoudan <zou...@bytedance.com> AuthorDate: Thu Oct 19 17:48:05 2023 +0800 [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal Close apache/flink#23551 --- .../table/planner/plan/utils/RexNodeExtractor.scala | 4 ++++ .../plan/utils/NestedProjectionUtilTest.scala | 10 ++++++---- .../planner/plan/utils/RexNodeExtractorTest.scala | 21 +++++++++++++++++++++ .../planner/plan/utils/RexNodeRewriterTest.scala | 7 ++++--- .../table/planner/plan/utils/RexNodeTestBase.scala | 6 ++++-- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index a7cbb4a9ffc..4b57796e792 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._ import org.apache.flink.table.types.logical.YearMonthIntervalType import org.apache.flink.util.Preconditions +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rex._ import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlPostfixOperator} @@ -489,6 +490,9 @@ class RexNodeToExpressionConverter( // convert to BigDecimal literal.getValueAs(classOf[java.math.BigDecimal]) + case BINARY | VARBINARY => + literal.getValueAs(classOf[Array[Byte]]) + case _ => literal.getValue } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala index ec8214f5b91..9cd44c9fea7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala @@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase { "$2", "$3", "$4", + "$5", "*($t2, $t3)", "100", - "<($t5, $t6)", + "<($t6, $t7)", "6", - ">($t1, $t8)", - "AND($t7, $t9)"))) + ">($t1, $t9)", + "AND($t8, $t10)"))) val nestedField = NestedProjectionUtil.build(exprs, rexProgram.getInputRowType) val paths = NestedProjectionUtil.convertToIndexArray(nestedField) @@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase { Array(1), Array(2), Array(3), - Array(4) + Array(4), + Array(5) ) assertArray(paths, orderedPaths) val builder = new FlinkRexBuilder(typeFactory) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala index 2eb87e35cc8..bd5f15d3bed 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala @@ -33,6 +33,7 @@ import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction import org.apache.flink.table.resource.ResourceManager import org.apache.flink.table.utils.CatalogManagerMocks +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName @@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase { assertEquals(0, unconvertedRexNodes.length) } + @Test + def testExtractConditionWithBinaryLiteral(): Unit = { + // blob + val t0 = rexBuilder.makeInputRef(allFieldTypes.get(5), 5) + + // X'616263' + val t1 = rexBuilder.makeBinaryLiteral(ByteString.of("616263", 16)) + + // blob = X'616263' + val a = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t0, t1) + + val relBuilder: RexBuilder = new FlinkRexBuilder(typeFactory) + val (convertedExpressions, unconvertedRexNodes) = + extractConjunctiveConditions(a, -1, allFieldNames, relBuilder, functionCatalog) + + val expected: Array[Expression] = Array($"blob" === Array[Byte](97, 98, 99)) + assertExpressionArrayEquals(expected, convertedExpressions) + assertEquals(0, unconvertedRexNodes.length) + } + // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d) @Test def testExtractCnfCondition(): Unit = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala index 0cea5c8be69..57a0edda39e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala @@ -39,12 +39,13 @@ class RexNodeRewriterTest extends RexNodeTestBase { "$2", "$3", "$4", + "$5", "*($t2, $t3)", "100", - "<($t5, $t6)", + "<($t6, $t7)", "6", - ">($t1, $t8)", - "AND($t7, $t9)"))) + ">($t1, $t9)", + "AND($t8, $t10)"))) // use amount, id, price fields to create a new RexProgram val usedFields = Array(2, 3, 1) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala index ec326c2a540..1ba968b3253 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala @@ -39,14 +39,16 @@ abstract class RexNodeTestBase { val typeFactory: FlinkTypeFactory = new FlinkTypeFactory( Thread.currentThread().getContextClassLoader) - val allFieldNames: java.util.List[String] = List("name", "id", "amount", "price", "flag").asJava + val allFieldNames: java.util.List[String] = + List("name", "id", "amount", "price", "flag", "blob").asJava val allFieldTypes: java.util.List[RelDataType] = List( DataTypes.VARCHAR(100), DataTypes.BIGINT(), DataTypes.INT(), DataTypes.DOUBLE(), - DataTypes.BOOLEAN()) + DataTypes.BOOLEAN(), + DataTypes.BYTES()) .map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType) .map(typeFactory.createFieldTypeFromLogicalType) .asJava