This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d37b8c34ff [FLINK-33313][table] Fix 
RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle 
binary literal
4d37b8c34ff is described below

commit 4d37b8c34ff062b7505ab8c0ca8f2181768aab60
Author: zoudan <zou...@bytedance.com>
AuthorDate: Thu Oct 19 17:48:05 2023 +0800

    [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions 
throws an Exception when handle binary literal
    
    Close apache/flink#23551
---
 .../table/planner/plan/utils/RexNodeExtractor.scala |  4 ++++
 .../plan/utils/NestedProjectionUtilTest.scala       | 10 ++++++----
 .../planner/plan/utils/RexNodeExtractorTest.scala   | 21 +++++++++++++++++++++
 .../planner/plan/utils/RexNodeRewriterTest.scala    |  7 ++++---
 .../table/planner/plan/utils/RexNodeTestBase.scala  |  6 ++++--
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index 482ce56dc63..481cbda8b82 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -37,6 +37,7 @@ import 
org.apache.flink.table.types.logical.YearMonthIntervalType
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.util.Preconditions
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
@@ -502,6 +503,9 @@ class RexNodeToExpressionConverter(
         // convert to BigDecimal
         literal.getValueAs(classOf[java.math.BigDecimal])
 
+      case BINARY | VARBINARY =>
+        literal.getValueAs(classOf[Array[Byte]])
+
       case _ =>
         literal.getValue
     }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
index ec8214f5b91..9cd44c9fea7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
@@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
           "$2",
           "$3",
           "$4",
+          "$5",
           "*($t2, $t3)",
           "100",
-          "<($t5, $t6)",
+          "<($t6, $t7)",
           "6",
-          ">($t1, $t8)",
-          "AND($t7, $t9)")))
+          ">($t1, $t9)",
+          "AND($t8, $t10)")))
 
     val nestedField = NestedProjectionUtil.build(exprs, 
rexProgram.getInputRowType)
     val paths = NestedProjectionUtil.convertToIndexArray(nestedField)
@@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
       Array(1),
       Array(2),
       Array(3),
-      Array(4)
+      Array(4),
+      Array(5)
     )
     assertArray(paths, orderedPaths)
     val builder = new FlinkRexBuilder(typeFactory)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 2eb87e35cc8..bd5f15d3bed 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction
 import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.utils.CatalogManagerMocks
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
@@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase {
     assertEquals(0, unconvertedRexNodes.length)
   }
 
+  @Test
+  def testExtractConditionWithBinaryLiteral(): Unit = {
+    // blob
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(5), 5)
+
+    // X'616263'
+    val t1 = rexBuilder.makeBinaryLiteral(ByteString.of("616263", 16))
+
+    // blob = X'616263'
+    val a = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t0, t1)
+
+    val relBuilder: RexBuilder = new FlinkRexBuilder(typeFactory)
+    val (convertedExpressions, unconvertedRexNodes) =
+      extractConjunctiveConditions(a, -1, allFieldNames, relBuilder, 
functionCatalog)
+
+    val expected: Array[Expression] = Array($"blob" === Array[Byte](97, 98, 
99))
+    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
   // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d)
   @Test
   def testExtractCnfCondition(): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
index 0cea5c8be69..57a0edda39e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
@@ -39,12 +39,13 @@ class RexNodeRewriterTest extends RexNodeTestBase {
           "$2",
           "$3",
           "$4",
+          "$5",
           "*($t2, $t3)",
           "100",
-          "<($t5, $t6)",
+          "<($t6, $t7)",
           "6",
-          ">($t1, $t8)",
-          "AND($t7, $t9)")))
+          ">($t1, $t9)",
+          "AND($t8, $t10)")))
 
     // use amount, id, price fields to create a new RexProgram
     val usedFields = Array(2, 3, 1)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
index ec326c2a540..1ba968b3253 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
@@ -39,14 +39,16 @@ abstract class RexNodeTestBase {
   val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(
     Thread.currentThread().getContextClassLoader)
 
-  val allFieldNames: java.util.List[String] = List("name", "id", "amount", 
"price", "flag").asJava
+  val allFieldNames: java.util.List[String] =
+    List("name", "id", "amount", "price", "flag", "blob").asJava
 
   val allFieldTypes: java.util.List[RelDataType] = List(
     DataTypes.VARCHAR(100),
     DataTypes.BIGINT(),
     DataTypes.INT(),
     DataTypes.DOUBLE(),
-    DataTypes.BOOLEAN())
+    DataTypes.BOOLEAN(),
+    DataTypes.BYTES())
     .map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType)
     .map(typeFactory.createFieldTypeFromLogicalType)
     .asJava

Reply via email to