This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 149f4fd  [FLINK-24803][table-planner] Fix cast BINARY/VARBINARY to 
STRING
149f4fd is described below

commit 149f4fd3009641ee081ea5c6c05ddc281e84ba2e
Author: Marios Trivyzas <mat...@gmail.com>
AuthorDate: Tue Dec 28 12:23:11 2021 +0200

    [FLINK-24803][table-planner] Fix cast BINARY/VARBINARY to STRING
    
    Use an hex string representation when casting any kind of
    `BINARY`, `VARBINARY` or `BYTES` to `CHAR`/`VARCHAR`/`STRING`, e.g.:
    
    ```
    SELECT CAST(CAST(x'68656C6C6F20636F6465' AS BINARY(10)) AS VARCHAR)
    ```
    gives:
    ```
    68656c6c6f20636f6465
    ```
    
    Apply padding or trimming if needed and also implement, the inverse
    cast as well from the hex string to a
    `BINARY`/`VARBINARY`/`BYTES` type.
    
    With legacy behaviour enabled we will converting each byte to a UTF8
    char and the opposite.
    
    This closes #18221.
---
 .../apache/flink/table/utils/EncodingUtils.java    |  56 +++++++++
 .../functions/casting/ArrayToStringCastRule.java   |   2 +-
 .../functions/casting/BinaryToStringCastRule.java  |  77 ++++++++++++-
 .../planner/functions/casting/CastRuleUtils.java   |  13 ++-
 .../casting/MapAndMultisetToStringCastRule.java    |   2 +-
 .../functions/casting/StringToBinaryCastRule.java  |  29 +++--
 .../planner/functions/CastFunctionITCase.java      |  32 +++---
 .../planner/functions/CastFunctionMiscITCase.java  |   4 +-
 .../planner/functions/casting/CastRulesTest.java   | 125 ++++++++++++---------
 .../planner/expressions/ScalarFunctionsTest.scala  |  19 +---
 10 files changed, 252 insertions(+), 107 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index c114062..d47779e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -182,6 +182,62 @@ public abstract class EncodingUtils {
         return new String(hexChars);
     }
 
+    /**
+     * Converts an array of characters representing hexadecimal values into an 
array of bytes of
+     * those same values. The returned array will be half the length of the 
passed array, as it
+     * takes two characters to represent any given byte. An exception is 
thrown if the passed char
+     * array has an odd number of elements.
+     *
+     * <p>Copied from
+     * 
https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/binary/Hex.java.
+     *
+     * @param str An array of characters containing hexadecimal digits
+     * @return A byte array to contain the binary data decoded from the 
supplied char array.
+     * @throws TableException Thrown if an odd number of characters or illegal 
characters are
+     *     supplied
+     */
+    public static byte[] decodeHex(final String str) throws TableException {
+        final int len = str.length();
+
+        if ((len & 0x01) != 0) {
+            throw new TableException("Odd number of characters.");
+        }
+
+        final int outLen = len >> 1;
+        final byte[] out = new byte[outLen];
+
+        // two characters form the hex value.
+        for (int i = 0, j = 0; j < len; i++) {
+            int f = toDigit(str.charAt(j), j) << 4;
+            j++;
+            f = f | toDigit(str.charAt(j), j);
+            j++;
+            out[i] = (byte) (f & 0xFF);
+        }
+
+        return out;
+    }
+
+    /**
+     * Converts a hexadecimal character to an integer.
+     *
+     * <p>Copied from
+     * 
https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/binary/Hex.java.
+     *
+     * @param ch A character to convert to an integer digit
+     * @param idx The index of the character in the source
+     * @return An integer
+     * @throws TableException Thrown if ch is an illegal hex character
+     */
+    private static int toDigit(final char ch, final int idx) throws 
TableException {
+        final int digit = Character.digit(ch, 16);
+        if (digit == -1) {
+            throw new TableException(
+                    "Illegal hexadecimal character: [" + ch + "] at index: [" 
+ idx + "]");
+        }
+        return digit;
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Java String Repetition
     //
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
index 7df8b76..33d7209 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
@@ -148,7 +148,7 @@ class ArrayToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<Array
                                         // Break if the target length is 
already exceeded
                                         loopBodyWriter.ifStmt(
                                                 
stringExceedsLength(builderTerm, length),
-                                                thenBodyWriter -> 
thenBodyWriter.stmt("break"));
+                                                
CastRuleUtils.CodeWriter::breakStmt);
                                     }
                                     loopBodyWriter
                                             // Write the comma
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
index 126e3c0..566584c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
@@ -20,17 +20,21 @@ package org.apache.flink.table.planner.functions.casting;
 
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.utils.EncodingUtils;
 
 import java.nio.charset.StandardCharsets;
 
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /**
  * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
  */
-class BinaryToStringCastRule extends AbstractCharacterFamilyTargetRule<byte[]> 
{
+class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte[], String> {
 
     static final BinaryToStringCastRule INSTANCE = new 
BinaryToStringCastRule();
 
@@ -38,17 +42,78 @@ class BinaryToStringCastRule extends 
AbstractCharacterFamilyTargetRule<byte[]> {
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.BINARY_STRING)
-                        .target(STRING_TYPE)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
                         .build());
     }
 
+    /* Example generated code
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.lang.String hexString$0;
+        hexString$0 = org.apache.flink.table.utils.EncodingUtils.hex(_myInput);
+        java.lang.String resultString$152;
+        resultString$152 = hexString$0.toString();
+        if (hexString$0.length() > 3) {
+            resultString$152 = hexString$0.substring(0, 
java.lang.Math.min(hexString$0.length(), 3));
+        } else {
+            if (resultString$1.length() < 12) {
+                int padLength$3;
+                padLength$3 = 12 - resultString$152.length();
+                java.lang.StringBuilder sbPadding$4;
+                sbPadding$4 = new java.lang.StringBuilder();
+                for (int i$5 = 0; i$5 < padLength$3; i$5++) {
+                    sbPadding$4.append(" ");
+                }
+                resultString$152 = resultString$152 + sbPadding$4.toString();
+            }
+        }
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$152);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+     */
+
     @Override
-    public String generateStringExpression(
+    protected String generateCodeBlockInternal(
             CodeGeneratorCastRule.Context context,
             String inputTerm,
+            String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        return constructorCall(
-                String.class, inputTerm, 
accessStaticField(StandardCharsets.class, "UTF_8"));
+        final String resultStringTerm = newName("resultString");
+        CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
+        if (context.legacyBehaviour()) {
+            writer.declStmt(String.class, resultStringTerm)
+                    .assignStmt(
+                            resultStringTerm,
+                            constructorCall(
+                                    String.class,
+                                    inputTerm,
+                                    accessStaticField(StandardCharsets.class, 
"UTF_8")));
+        } else {
+            final int length = LogicalTypeChecks.getLength(targetLogicalType);
+
+            final String hexStringTerm = newName("hexString");
+            writer.declStmt(String.class, hexStringTerm)
+                    .assignStmt(hexStringTerm, staticCall(EncodingUtils.class, 
"hex", inputTerm));
+            writer =
+                    CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
+                            writer,
+                            targetLogicalType,
+                            context.legacyBehaviour(),
+                            length,
+                            resultStringTerm,
+                            hexStringTerm);
+        }
+        return writer
+                // Assign the result value
+                .assignStmt(
+                        returnVariable,
+                        CastRuleUtils.staticCall(
+                                BINARY_STRING_DATA_FROM_STRING(), 
resultStringTerm))
+                .toString();
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleUtils.java
index 8bbba2b..df2e08e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleUtils.java
@@ -78,8 +78,12 @@ final class CastRuleUtils {
         return className(clazz) + "." + fieldName;
     }
 
-    static String arrayLength(String instanceTerm) {
-        return instanceTerm + ".length";
+    static String arrayLength(String arrayTerm) {
+        return arrayTerm + ".length";
+    }
+
+    static String arrayElement(String arrayTerm, String indexTerm) {
+        return arrayTerm + "[" + indexTerm + "]";
     }
 
     static String ternaryOperator(String condition, String ifTrue, String 
ifFalse) {
@@ -233,6 +237,11 @@ final class CastRuleUtils {
             return this;
         }
 
+        public CodeWriter breakStmt() {
+            builder.append("break;\n");
+            return this;
+        }
+
         public CodeWriter ifStmt(String condition, Consumer<CodeWriter> 
bodyWriterConsumer) {
             final CodeWriter innerWriter = new CodeWriter();
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java
index 23c015d..24ba056 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java
@@ -219,7 +219,7 @@ class MapAndMultisetToStringCastRule
                                                 // exceeded
                                                 .ifStmt(
                                                 
stringExceedsLength(builderTerm, length),
-                                                thenBodyWriter -> 
thenBodyWriter.stmt("break"));
+                                                
CastRuleUtils.CodeWriter::breakStmt);
                                     }
                                     loopBodyWriter
                                             // Write the comma
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
index a0c7bb0..ad95a41 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
@@ -22,13 +22,14 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.utils.EncodingUtils;
 
 import static org.apache.flink.table.codesplit.CodeSplitUtil.newName;
 import static 
org.apache.flink.table.planner.functions.casting.BinaryToBinaryCastRule.couldPad;
-import static 
org.apache.flink.table.planner.functions.casting.BinaryToBinaryCastRule.couldTrim;
 import static 
org.apache.flink.table.planner.functions.casting.BinaryToBinaryCastRule.trimOrPadByteArray;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.arrayLength;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /**
  * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link 
LogicalTypeFamily#BINARY_STRING} cast rule.
@@ -45,7 +46,7 @@ class StringToBinaryCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<Stri
                         .build());
     }
 
-    /* Example generated code for BINARY(2):
+    /* Example generated code for VARBINARY(2):
 
     // legacy behavior
     isNull$0 = _myInputIsNull;
@@ -59,9 +60,12 @@ class StringToBinaryCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<Stri
     // new behavior
     isNull$0 = _myInputIsNull;
     if (!isNull$0) {
-        byte[] byteArrayTerm$0 = _myInput.toBytes();
+        java.lang.String hexStringTerm$10 = _myInput.toString();
+        byte[] byteArrayTerm$0 = 
org.apache.flink.table.utils.EncodingUtils.decodeHex(hexStringTerm$10);
         if (byteArrayTerm$0.length <= 2) {
-            result$1 = byteArrayTerm$0;
+            // If could pad
+            result$1 = java.util.Arrays.copyOf(byteArrayTerm$0, 2);
+            // result$1 = byteArrayTerm$0 // If could not pad
         } else {
             result$1 = java.util.Arrays.copyOf(byteArrayTerm$0, 2);
         }
@@ -79,18 +83,21 @@ class StringToBinaryCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<Stri
             String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        final int targetLength = 
LogicalTypeChecks.getLength(targetLogicalType);
-
-        final String byteArrayTerm = newName("byteArrayTerm");
-
-        if (context.legacyBehaviour()
-                || !(couldTrim(targetLength) || couldPad(targetLogicalType, 
targetLength))) {
+        if (context.legacyBehaviour()) {
             return new CastRuleUtils.CodeWriter()
                     .assignStmt(returnVariable, methodCall(inputTerm, 
"toBytes"))
                     .toString();
         } else {
+            final int targetLength = 
LogicalTypeChecks.getLength(targetLogicalType);
+            final String byteArrayTerm = newName("byteArrayTerm");
+            final String hexStringTerm = newName("hexStringTerm");
+
             return new CastRuleUtils.CodeWriter()
-                    .declStmt(byte[].class, byteArrayTerm, 
methodCall(inputTerm, "toBytes"))
+                    .declStmt(String.class, hexStringTerm, 
methodCall(inputTerm, "toString"))
+                    .declStmt(
+                            byte[].class,
+                            byteArrayTerm,
+                            staticCall(EncodingUtils.class, "decodeHex", 
hexStringTerm))
                     .ifStmt(
                             arrayLength(byteArrayTerm) + " <= " + targetLength,
                             thenWriter -> {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
index 42c042d..a0449a8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
@@ -157,11 +157,11 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                         .fromCase(STRING(), "Apache Flink", "Apache Flink")
                         .fromCase(STRING(), null, null)
                         .fromCase(BOOLEAN(), true, "TRUE")
-                        .fromCase(BINARY(2), DEFAULT_BINARY, "\u0000\u0001")
-                        .fromCase(BINARY(3), DEFAULT_BINARY, 
"\u0000\u0001\u0000")
-                        .fromCase(VARBINARY(3), DEFAULT_VARBINARY, 
"\u0000\u0001\u0002")
-                        .fromCase(VARBINARY(5), DEFAULT_VARBINARY, 
"\u0000\u0001\u0002")
-                        .fromCase(BYTES(), DEFAULT_BYTES, 
"\u0000\u0001\u0002\u0003\u0004")
+                        .fromCase(BINARY(2), DEFAULT_BINARY, "0001")
+                        .fromCase(BINARY(3), DEFAULT_BINARY, "000100")
+                        .fromCase(VARBINARY(3), DEFAULT_VARBINARY, "000102")
+                        .fromCase(VARBINARY(5), DEFAULT_VARBINARY, "000102")
+                        .fromCase(BYTES(), DEFAULT_BYTES, "0001020304")
                         .fromCase(DECIMAL(4, 3), 9.87, "9.870")
                         .fromCase(DECIMAL(10, 5), 1, "1.00000")
                         .fromCase(
@@ -298,11 +298,11 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                         .build(),
                 CastTestSpecBuilder.testCastTo(BINARY(2))
                         .fromCase(BINARY(5), null, null)
-                        .fromCase(CHAR(3), "foo", new byte[] {102, 111})
-                        .fromCase(VARCHAR(5), "Flink", new byte[] {70, 108})
-                        .fromCase(STRING(), "Apache", new byte[] {65, 112})
-                        .fromCase(VARCHAR(5), "f", new byte[] {102, 0})
-                        .fromCase(STRING(), "f", new byte[] {102, 0})
+                        .fromCase(CHAR(4), "666F", new byte[] {102, 111})
+                        .fromCase(VARCHAR(8), "666f", new byte[] {102, 111})
+                        .fromCase(STRING(), "AAbbcCdD", new byte[] {-86, -69})
+                        .fromCase(VARCHAR(4), "FC", new byte[] {-4, 0})
+                        .fromCase(STRING(), "df", new byte[] {-33, 0})
                         // Not supported - no fix
                         .fail(BOOLEAN(), true)
                         //
@@ -336,9 +336,9 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                         .build(),
                 CastTestSpecBuilder.testCastTo(VARBINARY(4))
                         .fromCase(VARBINARY(5), null, null)
-                        .fromCase(CHAR(3), "foo", new byte[] {102, 111, 111})
-                        .fromCase(VARCHAR(5), "Flink", new byte[] {70, 108, 
105, 110})
-                        .fromCase(STRING(), "Apache", new byte[] {65, 112, 97, 
99})
+                        .fromCase(CHAR(4), "666F", new byte[] {102, 111})
+                        .fromCase(VARCHAR(8), "666f", new byte[] {102, 111})
+                        .fromCase(STRING(), "AAbbCcDdEe", new byte[] {-86, 
-69, -52, -35})
                         // Not supported - no fix
                         .fail(BOOLEAN(), true)
                         //
@@ -370,9 +370,9 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                         .build(),
                 CastTestSpecBuilder.testCastTo(BYTES())
                         .fromCase(BYTES(), null, null)
-                        .fromCase(CHAR(3), "foo", new byte[] {102, 111, 111})
-                        .fromCase(VARCHAR(5), "Flink", new byte[] {70, 108, 
105, 110, 107})
-                        .fromCase(STRING(), "Apache", new byte[] {65, 112, 97, 
99, 104, 101})
+                        .fromCase(CHAR(4), "666f", new byte[] {102, 111})
+                        .fromCase(VARCHAR(8), "666F", new byte[] {102, 111})
+                        .fromCase(STRING(), "aaBBCcDdEe", new byte[] {-86, 
-69, -52, -35, -18})
                         // Not supported - no fix
                         .fail(BOOLEAN(), true)
                         //
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
index 3ca2021..54050ad 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
@@ -198,14 +198,14 @@ public class CastFunctionMiscITCase extends 
BuiltInFunctionTestBase {
                         .onFieldsWithData("foo")
                         .testSqlResult(
                                 "CAST(CAST(x'68656C6C6F20636F6465' AS 
BINARY(10)) AS VARCHAR)",
-                                "hello code",
+                                "68656c6c6f20636f6465",
                                 STRING().notNull()),
                 TestSpec.forFunction(
                                 BuiltInFunctionDefinitions.CAST, "test the 
x'....' binary syntax")
                         .onFieldsWithData("foo")
                         .testSqlResult(
                                 "CAST(CAST(x'68656C6C6F2063617374' AS 
BINARY(10)) AS VARCHAR)",
-                                "hello cast",
+                                "68656c6c6f2063617374",
                                 STRING().notNull()));
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 48d2b75..e82436a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -490,19 +490,23 @@ class CastRulesTest {
                         .fromCase(BOOLEAN(), false, fromString("FALSE"))
                         .fromCaseLegacy(BOOLEAN(), true, fromString("true"))
                         .fromCaseLegacy(BOOLEAN(), false, fromString("false"))
-                        .fromCase(BINARY(2), new byte[] {0, 1}, 
fromString("\u0000\u0001"))
-                        .fromCase(
+                        .fromCase(BINARY(2), new byte[] {0, 1}, 
fromString("0001"))
+                        .fromCaseLegacy(BINARY(2), new byte[] {0, 1}, 
fromString("\u0000\u0001"))
+                        .fromCase(VARBINARY(3), new byte[] {0, 1, 2}, 
fromString("000102"))
+                        .fromCaseLegacy(
                                 VARBINARY(3),
                                 new byte[] {0, 1, 2},
                                 fromString("\u0000\u0001\u0002"))
-                        .fromCase(
-                                VARBINARY(5),
-                                new byte[] {0, 1, 2},
-                                fromString("\u0000\u0001\u0002"))
+                        .fromCase(VARBINARY(5), new byte[] {0, -1, -2}, 
fromString("00fffe"))
+                        .fromCaseLegacy(VARBINARY(5), new byte[] {102, 111, 
111}, fromString("foo"))
                         .fromCase(
                                 BYTES(),
-                                new byte[] {0, 1, 2, 3, 4},
-                                fromString("\u0000\u0001\u0002\u0003\u0004"))
+                                new byte[] {-123, 43, -4, 125, 5},
+                                fromString("852bfc7d05"))
+                        .fromCaseLegacy(
+                                BYTES(), new byte[] {70, 108, 105, 110, 107}, 
fromString("Flink"))
+                        .fromCase(BOOLEAN(), true, 
StringData.fromString("TRUE"))
+                        .fromCase(BOOLEAN(), false, 
StringData.fromString("FALSE"))
                         .fromCase(
                                 DECIMAL(4, 3),
                                 fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -660,28 +664,12 @@ class CastRulesTest {
                         .fromCaseLegacy(BOOLEAN(), true, fromString("true"))
                         .fromCase(BOOLEAN(), false, fromString("FALSE "))
                         .fromCaseLegacy(BOOLEAN(), false, fromString("false"))
-                        .fromCase(
-                                BINARY(3),
-                                new byte[] {0, 1, 2},
-                                fromString("\u0000\u0001\u0002   "))
-                        .fromCaseLegacy(
-                                BINARY(3), new byte[] {0, 1, 2}, 
fromString("\u0000\u0001\u0002"))
-                        .fromCase(
-                                VARBINARY(4),
-                                new byte[] {0, 1, 2, 3},
-                                fromString("\u0000\u0001\u0002\u0003  "))
-                        .fromCaseLegacy(
-                                VARBINARY(4),
-                                new byte[] {0, 1, 2, 3},
-                                fromString("\u0000\u0001\u0002\u0003"))
-                        .fromCase(
-                                BYTES(),
-                                new byte[] {0, 1, 2, 3, 4},
-                                fromString("\u0000\u0001\u0002\u0003\u0004 "))
-                        .fromCaseLegacy(
-                                BYTES(),
-                                new byte[] {0, 1, 2, 3, 4},
-                                fromString("\u0000\u0001\u0002\u0003\u0004"))
+                        .fromCase(BINARY(1), new byte[] {-12}, fromString("f4  
  "))
+                        .fromCaseLegacy(BINARY(1), new byte[] {102}, 
fromString("f"))
+                        .fromCase(VARBINARY(1), new byte[] {23}, 
fromString("17    "))
+                        .fromCaseLegacy(VARBINARY(1), new byte[] {33}, 
fromString("\u0021"))
+                        .fromCase(BYTES(), new byte[] {32}, fromString("20    
"))
+                        .fromCaseLegacy(BYTES(), new byte[] {32}, fromString(" 
"))
                         .fromCase(TINYINT(), (byte) -125, fromString("-125  "))
                         .fromCaseLegacy(TINYINT(), (byte) -125, 
fromString("-125"))
                         .fromCase(SMALLINT(), (short) 32767, fromString("32767 
"))
@@ -700,6 +688,22 @@ class CastRulesTest {
                         .fromCaseLegacy(INTERVAL(MONTH()), 5, 
fromString("+0-05")),
                 CastTestSpecBuilder.testCastTo(CHAR(12))
                         .fromCase(
+                                BINARY(4),
+                                new byte[] {-12, 32, 46, -72},
+                                fromString("f4202eb8    "))
+                        .fromCaseLegacy(
+                                BINARY(4),
+                                new byte[] {1, 11, 111, 2},
+                                fromString("\u0001\u000B\u006F\u0002"))
+                        .fromCase(VARBINARY(4), new byte[] {1, 11, 22}, 
fromString("010b16      "))
+                        .fromCaseLegacy(
+                                VARBINARY(4),
+                                new byte[] {1, 11, 22},
+                                fromString("\u0001\u000B\u0016"))
+                        .fromCase(BYTES(), new byte[] {1, 11}, 
fromString("010b        "))
+                        .fromCaseLegacy(
+                                BYTES(), new byte[] {1, 11, 111}, 
fromString("\u0001\u000B\u006F"))
+                        .fromCase(
                                 ARRAY(INT()),
                                 new GenericArrayData(new int[] {-1, 2, 3}),
                                 fromString("[-1, 2, 3]  "))
@@ -785,26 +789,17 @@ class CastRulesTest {
                         .fromCaseLegacy(BOOLEAN(), true, fromString("true"))
                         .fromCase(BOOLEAN(), false, fromString("FAL"))
                         .fromCaseLegacy(BOOLEAN(), false, fromString("false"))
-                        .fromCase(
-                                BINARY(5),
-                                new byte[] {0, 1, 2, 3, 4},
-                                fromString("\u0000\u0001\u0002"))
+                        .fromCase(BINARY(5), new byte[] {0, 1, 2, 3, 4}, 
fromString("000"))
                         .fromCaseLegacy(
                                 BINARY(5),
                                 new byte[] {0, 1, 2, 3, 4},
                                 fromString("\u0000\u0001\u0002\u0003\u0004"))
-                        .fromCase(
-                                VARBINARY(5),
-                                new byte[] {0, 1, 2, 3, 4},
-                                fromString("\u0000\u0001\u0002"))
+                        .fromCase(VARBINARY(5), new byte[] {0, 1, 2, 3, 4}, 
fromString("000"))
                         .fromCaseLegacy(
                                 VARBINARY(5),
                                 new byte[] {0, 1, 2, 3, 4},
                                 fromString("\u0000\u0001\u0002\u0003\u0004"))
-                        .fromCase(
-                                BYTES(),
-                                new byte[] {0, 1, 2, 3, 4},
-                                fromString("\u0000\u0001\u0002"))
+                        .fromCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, 
fromString("000"))
                         .fromCaseLegacy(
                                 BYTES(),
                                 new byte[] {0, 1, 2, 3, 4},
@@ -958,24 +953,30 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), 0.0d, false)
                         .fromCase(DOUBLE(), -0.12345678d, true),
                 CastTestSpecBuilder.testCastTo(BINARY(4))
-                        .fromCase(CHAR(3), fromString("foo"), new byte[] {102, 
111, 111, 0})
+                        .fromCase(CHAR(4), fromString("66"), new byte[] {102, 
0, 0, 0})
                         .fromCaseLegacy(CHAR(3), fromString("foo"), new byte[] 
{102, 111, 111})
-                        .fromCase(CHAR(1), fromString("f"), new byte[] {102, 
0, 0, 0})
+                        .fromCase(CHAR(10), fromString("66A2"), new byte[] 
{102, -94, 0, 0})
                         .fromCaseLegacy(CHAR(1), fromString("f"), new byte[] 
{102})
-                        .fromCase(CHAR(3), fromString("f"), new byte[] {102, 
0, 0, 0})
+                        .fromCase(CHAR(16), fromString("12f4aBc7"), new byte[] 
{18, -12, -85, -57})
                         .fromCaseLegacy(CHAR(3), fromString("f"), new byte[] 
{102})
-                        .fromCase(VARCHAR(5), fromString("Flink"), new byte[] 
{70, 108, 105, 110})
+                        .fromCase(VARCHAR(8), fromString("bACd"), new byte[] 
{-70, -51, 0, 0})
                         .fromCaseLegacy(
                                 VARCHAR(5),
                                 fromString("Flink"),
                                 new byte[] {70, 108, 105, 110, 107})
-                        .fromCase(STRING(), fromString("Apache"), new byte[] 
{65, 112, 97, 99})
+                        .fromCase(
+                                STRING(),
+                                fromString("12f4ABc71232"),
+                                new byte[] {18, -12, -85, -57})
                         .fromCaseLegacy(
                                 STRING(),
                                 fromString("Apache"),
                                 new byte[] {65, 112, 97, 99, 104, 101})
-                        .fromCase(STRING(), fromString("bar"), new byte[] {98, 
97, 114, 0})
+                        .fromCase(STRING(), fromString("12F4ab"), new byte[] 
{18, -12, -85, 0})
                         .fromCaseLegacy(STRING(), fromString("bar"), new 
byte[] {98, 97, 114})
+                        .fail(STRING(), fromString("123"), 
TableException.class)
+                        .fail(STRING(), fromString("12P9"), 
TableException.class)
+                        .fail(STRING(), fromString("12  A9"), 
TableException.class)
                         .fromCase(BINARY(2), new byte[] {1, 2}, new byte[] {1, 
2, 0, 0})
                         .fromCaseLegacy(BINARY(2), new byte[] {1, 2}, new 
byte[] {1, 2})
                         .fromCase(VARBINARY(3), new byte[] {1, 2, 3}, new 
byte[] {1, 2, 3, 0})
@@ -983,17 +984,24 @@ class CastRulesTest {
                         .fromCase(BYTES(), new byte[] {1, 2, 3}, new byte[] 
{1, 2, 3, 0})
                         .fromCaseLegacy(BYTES(), new byte[] {1, 2, 3}, new 
byte[] {1, 2, 3}),
                 CastTestSpecBuilder.testCastTo(VARBINARY(4))
-                        .fromCase(CHAR(3), fromString("foo"), new byte[] {102, 
111, 111})
+                        .fromCase(CHAR(4), fromString("c9"), new byte[] {-55})
+                        .fromCaseLegacy(CHAR(3), fromString("foo"), new byte[] 
{102, 111, 111})
+                        .fromCase(VARCHAR(8), fromString("7de2"), new byte[] 
{125, -30})
                         .fromCaseLegacy(
                                 VARCHAR(5),
                                 fromString("Flink"),
                                 new byte[] {70, 108, 105, 110, 107})
-                        .fromCase(VARCHAR(5), fromString("Flink"), new byte[] 
{70, 108, 105, 110})
-                        .fromCase(STRING(), fromString("Apache"), new byte[] 
{65, 112, 97, 99})
+                        .fromCase(
+                                STRING(),
+                                fromString("12F4abC71232"),
+                                new byte[] {18, -12, -85, -57})
                         .fromCaseLegacy(
                                 STRING(),
                                 fromString("Apache"),
                                 new byte[] {65, 112, 97, 99, 104, 101})
+                        .fail(STRING(), fromString("123"), 
TableException.class)
+                        .fail(STRING(), fromString("12P9"), 
TableException.class)
+                        .fail(STRING(), fromString("12  A9"), 
TableException.class)
                         // We assume that the input length is respected, 
therefore, no trimming is
                         // applied
                         .fromCase(BINARY(2), new byte[] {1, 2, 3, 4, 5}, new 
byte[] {1, 2, 3, 4, 5})
@@ -1008,15 +1016,24 @@ class CastRulesTest {
                                 new byte[] {1, 2, 3, 4, 5},
                                 new byte[] {1, 2, 3, 4, 5}),
                 CastTestSpecBuilder.testCastTo(BYTES())
-                        .fromCase(CHAR(3), fromString("foo"), new byte[] {102, 
111, 111})
-                        .fromCase(
+                        .fromCase(CHAR(4), fromString("9C"), new byte[] {-100})
+                        .fromCaseLegacy(CHAR(3), fromString("foo"), new byte[] 
{102, 111, 111})
+                        .fromCase(VARCHAR(8), fromString("3ee3"), new byte[] 
{62, -29})
+                        .fromCaseLegacy(
                                 VARCHAR(5),
                                 fromString("Flink"),
                                 new byte[] {70, 108, 105, 110, 107})
                         .fromCase(
                                 STRING(),
+                                fromString("AAbbCcDdff"),
+                                new byte[] {-86, -69, -52, -35, -1})
+                        .fromCaseLegacy(
+                                STRING(),
                                 fromString("Apache"),
-                                new byte[] {65, 112, 97, 99, 104, 101}),
+                                new byte[] {65, 112, 97, 99, 104, 101})
+                        .fail(STRING(), fromString("123"), 
TableException.class)
+                        .fail(STRING(), fromString("12P9"), 
TableException.class)
+                        .fail(STRING(), fromString("12  A9"), 
TableException.class),
                 CastTestSpecBuilder.testCastTo(DECIMAL(5, 3))
                         .fail(CHAR(3), fromString("foo"), TableException.class)
                         .fail(VARCHAR(5), fromString("Flink"), 
TableException.class)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 10f57d3..e09918b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -3862,18 +3862,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "SHA2('test', f44)",
       expectedSha256)
 
-    // bytes test
-    testSqlApi("MD5(cast('test' as varbinary))", expectedMd5)
-    testSqlApi("SHA1(cast('test' as varbinary))", expectedSha1)
-    testSqlApi("SHA224(cast('test' as varbinary))", expectedSha224)
-    testSqlApi("SHA2(cast('test' as varbinary), 224)", expectedSha224)
-    testSqlApi("SHA256(cast('test' as varbinary))", expectedSha256)
-    testSqlApi("SHA2(cast('test' as varbinary), 256)", expectedSha256)
-    testSqlApi("SHA384(cast('test' as varbinary))", expectedSha384)
-    testSqlApi("SHA2(cast('test' as varbinary), 384)", expectedSha384)
-    testSqlApi("SHA512(cast('test' as varbinary))", expectedSha512)
-    testSqlApi("SHA2(cast('test' as varbinary), 512)", expectedSha512)
-
+    // null test
     testSqlApi("MD5(cast(null as varbinary))", "NULL")
     testSqlApi("SHA1(cast(null as varbinary))", "NULL")
     testSqlApi("SHA224(cast(null as varbinary))", "NULL")
@@ -4157,7 +4146,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     // the answer BINARY will cast to STRING in ExpressionTestBase.scala
     testSqlApi(
       "IF(f7 < 5, f53, f54)",
-      "hello world") // hello world
+      "68656c6c6f20776f726c64") // hello world
 
     // test DATE, DATE
     testSqlApi(
@@ -4363,6 +4352,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "2021-04-06 11:05:30")
     testSqlApi(s"IFNULL(CAST(INTERVAL '2' YEAR AS VARCHAR(20)), $str2)", 
"+2-00")
     testSqlApi(s"IFNULL(CAST(INTERVAL '2' DAY AS VARCHAR(20)), $str2)", "+2 
00:00:00.000")
-    testSqlApi(s"IFNULL(CAST(f53 AS VARCHAR(100)), $str2)", "hello world")
+    testSqlApi(
+      s"IFNULL(CAST(f53 AS VARCHAR(100)), $str2)",
+      "68656c6c6f20776f726c64")
   }
 }

Reply via email to