This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ee24d6  [FLINK-12834][table-planner-blink] Support CharType and 
BinaryType
1ee24d6 is described below

commit 1ee24d6c626e8d361354721ad3de41d18e33bb70
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Thu Jun 13 13:34:14 2019 +0800

    [FLINK-12834][table-planner-blink] Support CharType and BinaryType
    
    This closes #8730
---
 .../flink/table/expressions/RexNodeConverter.java  |   6 +-
 .../aggfunctions/RankLikeAggFunctionBase.java      |   1 +
 .../table/functions/sql/FlinkSqlOperatorTable.java |  21 +-
 .../flink/table/calcite/FlinkTypeFactory.scala     |  37 +++-
 .../apache/flink/table/codegen/CodeGenUtils.scala  |  22 +-
 .../flink/table/codegen/ExprCodeGenerator.scala    |   5 +-
 .../apache/flink/table/codegen/GenerateUtils.scala |  10 +-
 .../table/codegen/agg/batch/AggCodeGenHelper.scala |   2 +-
 .../table/codegen/calls/FunctionGenerator.scala    | 239 +++------------------
 .../flink/table/codegen/calls/PrintCallGen.scala   |   5 +-
 .../table/codegen/calls/ScalarOperatorGens.scala   |  54 ++---
 ...naryStringCallGen.scala => StringCallGen.scala} | 120 ++++++++++-
 .../table/codegen/sort/SortCodeGenerator.scala     |   8 +-
 .../flink/table/plan/util/AggFunctionFactory.scala |   4 +-
 .../flink/table/plan/util/AggregateUtil.scala      |   2 +-
 .../flink/table/typeutils/TypeCheckUtils.scala     | 100 ---------
 .../flink/table/typeutils/TypeCoercion.scala       |  24 ++-
 .../flink/table/dataformat/BinaryWriter.java       |   2 +
 .../table/dataformat/DataFormatConverters.java     |   2 +
 .../flink/table/dataformat/TypeGetterSetters.java  |   2 +
 .../dataformat/vector/heap/AbstractHeapVector.java |   2 +
 .../table/types/ClassLogicalTypeConverter.java     |   4 +
 .../flink/table/types/InternalSerializers.java     |   2 +
 .../apache/flink/table/types/PlannerTypeUtils.java |  15 +-
 .../table/types/TypeInfoDataTypeConverter.java     |   2 +
 .../flink/table/typeutils/TypeCheckUtils.java      | 162 ++++++++++++++
 26 files changed, 436 insertions(+), 417 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index d7f42e0..6fc1ba8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -61,9 +61,9 @@ import java.util.stream.Collectors;
 import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
 import static org.apache.flink.table.calcite.FlinkTypeFactory.toLogicalType;
 import static 
org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static 
org.apache.flink.table.typeutils.TypeCheckUtils.isCharacterString;
 import static org.apache.flink.table.typeutils.TypeCheckUtils.isTemporal;
 import static org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval;
-import static org.apache.flink.table.typeutils.TypeCheckUtils.isVarchar;
 
 /**
  * Visit expression to generator {@link RexNode}.
@@ -129,12 +129,12 @@ public class RexNodeConverter implements 
ExpressionVisitor<RexNode> {
                } else if (BuiltInFunctionDefinitions.IS_NULL.equals(def)) {
                        return relBuilder.isNull(child.get(0));
                } else if (BuiltInFunctionDefinitions.PLUS.equals(def)) {
-                       if (isVarchar(toLogicalType(child.get(0).getType()))) {
+                       if 
(isCharacterString(toLogicalType(child.get(0).getType()))) {
                                return relBuilder.call(
                                                FlinkSqlOperatorTable.CONCAT,
                                                child.get(0),
                                                relBuilder.cast(child.get(1), 
VARCHAR));
-                       } else if 
(isVarchar(toLogicalType(child.get(1).getType()))) {
+                       } else if 
(isCharacterString(toLogicalType(child.get(1).getType()))) {
                                return relBuilder.call(
                                                FlinkSqlOperatorTable.CONCAT,
                                                relBuilder.cast(child.get(0), 
VARCHAR),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
index 053f75e..20541a3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
@@ -115,6 +115,7 @@ public abstract class RankLikeAggFunctionBase extends 
DeclarativeAggregateFuncti
                                return literal(0.0d);
                        case DECIMAL:
                                return literal(java.math.BigDecimal.ZERO);
+                       case CHAR:
                        case VARCHAR:
                                return literal("");
                        case DATE:
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index 3a4cf1e..f6c8d6a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -433,7 +433,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction MD5 = new SqlFunction(
                "MD5",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               VARCHAR_2000_NULLABLE,
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
@@ -443,7 +443,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction SHA1 = new SqlFunction(
                "SHA1",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               VARCHAR_2000_NULLABLE,
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
@@ -453,7 +453,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction SHA224 = new SqlFunction(
                "SHA224",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               VARCHAR_2000_NULLABLE,
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
@@ -463,7 +463,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction SHA256 = new SqlFunction(
                "SHA256",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               VARCHAR_2000_NULLABLE,
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
@@ -473,7 +473,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction SHA384 = new SqlFunction(
                "SHA384",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               VARCHAR_2000_NULLABLE,
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
@@ -483,7 +483,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction SHA512 = new SqlFunction(
                "SHA512",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               VARCHAR_2000_NULLABLE,
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
@@ -493,7 +493,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction SHA2 = new SqlFunction(
                "SHA2",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               VARCHAR_2000_NULLABLE,
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
@@ -839,11 +839,11 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                                SqlTypeFamily.INTEGER)),
                SqlFunctionCategory.NUMERIC);
 
-
        public static final SqlFunction LTRIM = new SqlFunction(
                "LTRIM",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               ReturnTypes.cascade(ReturnTypes.ARG0, 
SqlTypeTransforms.TO_NULLABLE,
+                       SqlTypeTransforms.TO_VARYING),
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
@@ -853,7 +853,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFunction RTRIM = new SqlFunction(
                "RTRIM",
                SqlKind.OTHER_FUNCTION,
-               ARG0_VARCHAR_FORCE_NULLABLE,
+               ReturnTypes.cascade(ReturnTypes.ARG0, 
SqlTypeTransforms.TO_NULLABLE,
+                       SqlTypeTransforms.TO_VARYING),
                null,
                OperandTypes.or(
                        OperandTypes.family(SqlTypeFamily.STRING),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 9fdefe0..4f6f831 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -68,8 +68,8 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImp
       case LogicalTypeRoot.BIGINT => createSqlType(BIGINT)
       case LogicalTypeRoot.FLOAT => createSqlType(FLOAT)
       case LogicalTypeRoot.DOUBLE => createSqlType(DOUBLE)
-      case LogicalTypeRoot.VARCHAR =>
-        createSqlType(VARCHAR, t.asInstanceOf[VarCharType].getLength)
+      case LogicalTypeRoot.VARCHAR => createSqlType(VARCHAR, 
t.asInstanceOf[VarCharType].getLength)
+      case LogicalTypeRoot.CHAR => createSqlType(CHAR, 
t.asInstanceOf[CharType].getLength)
 
       // temporal types
       case LogicalTypeRoot.DATE => createSqlType(DATE)
@@ -83,6 +83,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImp
         createSqlIntervalType(
           new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, 
SqlParserPos.ZERO))
 
+      case LogicalTypeRoot.BINARY => createSqlType(BINARY, 
t.asInstanceOf[BinaryType].getLength)
       case LogicalTypeRoot.VARBINARY =>
         createSqlType(VARBINARY, t.asInstanceOf[VarBinaryType].getLength)
 
@@ -379,14 +380,30 @@ object FlinkTypeFactory {
       case BIGINT => new BigIntType()
       case FLOAT => new FloatType()
       case DOUBLE => new DoubleType()
-      case VARCHAR | CHAR =>
-        // TODO we use VarCharType to support sql CHAR, VarCharType don't 
support 0 length
-        new VarCharType(
-          if (relDataType.getPrecision == 0) VarCharType.MAX_LENGTH else 
relDataType.getPrecision)
-      case VARBINARY | BINARY =>
-        // TODO we use VarBinaryType to support sql BINARY, VarBinaryType 
don't support 0 length
-        new VarBinaryType(
-          if (relDataType.getPrecision == 0) VarBinaryType.MAX_LENGTH else 
relDataType.getPrecision)
+      case CHAR =>
+        if (relDataType.getPrecision == 0) {
+          CharType.ofEmptyLiteral
+        } else {
+          new CharType(relDataType.getPrecision)
+        }
+      case VARCHAR =>
+        if (relDataType.getPrecision == 0) {
+          VarCharType.ofEmptyLiteral
+        } else {
+          new VarCharType(relDataType.getPrecision)
+        }
+      case BINARY =>
+        if (relDataType.getPrecision == 0) {
+          BinaryType.ofEmptyLiteral
+        } else {
+          new BinaryType(relDataType.getPrecision)
+        }
+      case VARBINARY =>
+        if (relDataType.getPrecision == 0) {
+          VarBinaryType.ofEmptyLiteral
+        } else {
+          new VarBinaryType(relDataType.getPrecision)
+        }
       case DECIMAL => new DecimalType(relDataType.getPrecision, 
relDataType.getScale)
 
       // time indicators
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index ddac75a..b9ab71c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -146,8 +146,8 @@ object CodeGenUtils {
     case INTERVAL_YEAR_MONTH => className[JInt]
     case INTERVAL_DAY_TIME => className[JLong]
 
-    case VARCHAR => BINARY_STRING
-    case VARBINARY => "byte[]"
+    case VARCHAR | CHAR => BINARY_STRING
+    case VARBINARY | BINARY => "byte[]"
 
     case DECIMAL => className[Decimal]
     case ARRAY => className[BinaryArray]
@@ -178,7 +178,7 @@ object CodeGenUtils {
     case FLOAT => "-1.0f"
     case DOUBLE => "-1.0d"
     case BOOLEAN => "false"
-    case VARCHAR => s"$BINARY_STRING.EMPTY_UTF8"
+    case VARCHAR | CHAR => s"$BINARY_STRING.EMPTY_UTF8"
 
     case DATE | TIME_WITHOUT_TIME_ZONE => "-1"
     case TIMESTAMP_WITHOUT_TIME_ZONE => "-1L"
@@ -205,8 +205,8 @@ object CodeGenUtils {
     case BIGINT => s"${className[JLong]}.hashCode($term)"
     case FLOAT => s"${className[JFloat]}.hashCode($term)"
     case DOUBLE => s"${className[JDouble]}.hashCode($term)"
-    case VARCHAR => s"$term.hashCode()"
-    case VARBINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
+    case VARCHAR | CHAR => s"$term.hashCode()"
+    case VARBINARY | BINARY => 
s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
       s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
     case DECIMAL => s"$term.hashCode()"
     case DATE => s"${className[JInt]}.hashCode($term)"
@@ -330,8 +330,8 @@ object CodeGenUtils {
       throw new CodeGenException(s"Comparable type expected, but was 
'${genExpr.resultType}'.")
     }
 
-  def requireString(genExpr: GeneratedExpression): Unit =
-    if (!TypeCheckUtils.isVarchar(genExpr.resultType)) {
+  def requireCharacterString(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isCharacterString(genExpr.resultType)) {
       throw new CodeGenException("String expression type expected.")
     }
 
@@ -392,8 +392,8 @@ object CodeGenUtils {
       case BIGINT => s"$rowTerm.getLong($indexTerm)"
       case FLOAT => s"$rowTerm.getFloat($indexTerm)"
       case DOUBLE => s"$rowTerm.getDouble($indexTerm)"
-      case VARCHAR => s"$rowTerm.getString($indexTerm)"
-      case VARBINARY => s"$rowTerm.getBinary($indexTerm)"
+      case VARCHAR | CHAR => s"$rowTerm.getString($indexTerm)"
+      case VARBINARY | BINARY => s"$rowTerm.getBinary($indexTerm)"
       case DECIMAL =>
         val dt = t.asInstanceOf[DecimalType]
         s"$rowTerm.getDecimal($indexTerm, ${dt.getPrecision}, ${dt.getScale})"
@@ -616,8 +616,8 @@ object CodeGenUtils {
       case FLOAT => s"$writerTerm.writeFloat($indexTerm, $fieldValTerm)"
       case DOUBLE => s"$writerTerm.writeDouble($indexTerm, $fieldValTerm)"
       case BOOLEAN => s"$writerTerm.writeBoolean($indexTerm, $fieldValTerm)"
-      case VARBINARY => s"$writerTerm.writeBinary($indexTerm, $fieldValTerm)"
-      case VARCHAR => s"$writerTerm.writeString($indexTerm, $fieldValTerm)"
+      case VARBINARY | BINARY => s"$writerTerm.writeBinary($indexTerm, 
$fieldValTerm)"
+      case VARCHAR | CHAR => s"$writerTerm.writeString($indexTerm, 
$fieldValTerm)"
       case DECIMAL =>
         val dt = t.asInstanceOf[DecimalType]
         s"$writerTerm.writeDecimal($indexTerm, $fieldValTerm, 
${dt.getPrecision})"
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala
index 16d0f54..d4ff105 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala
@@ -27,13 +27,12 @@ import org.apache.flink.table.calcite.{FlinkTypeFactory, 
RexAggLocalVariable, Re
 import org.apache.flink.table.codegen.CodeGenUtils.{requireTemporal, 
requireTimeInterval, _}
 import org.apache.flink.table.codegen.GenerateUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
-import org.apache.flink.table.codegen.calls.{BinaryStringCallGen, 
FunctionGenerator, ScalarFunctionCallGen, TableFunctionCallGen}
+import org.apache.flink.table.codegen.calls.{StringCallGen, FunctionGenerator, 
ScalarFunctionCallGen, TableFunctionCallGen}
 import org.apache.flink.table.codegen.calls.ScalarOperatorGens._
 import org.apache.flink.table.dataformat._
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable._
 import org.apache.flink.table.functions.utils.{ScalarSqlFunction, 
TableSqlFunction}
 import org.apache.flink.table.types.PlannerTypeUtils.isInteroperable
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{VARBINARY, 
VARCHAR}
 import org.apache.flink.table.types.logical._
 import org.apache.flink.table.typeutils.TypeCheckUtils.{isNumeric, isTemporal, 
isTimeInterval}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
@@ -733,7 +732,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
 
       // advanced scalar functions
       case sqlOperator: SqlOperator =>
-        BinaryStringCallGen.generateCallExpression(ctx, operator, operands, 
resultType).getOrElse {
+        StringCallGen.generateCallExpression(ctx, operator, operands, 
resultType).getOrElse {
           FunctionGenerator
             .getCallGenerator(
               sqlOperator,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
index 6a77070d..4b39c05 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.plan.util.SortUtil
 import org.apache.flink.table.types.PlannerTypeUtils
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
-import org.apache.flink.table.typeutils.TypeCheckUtils.{isReference, 
isTemporal}
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isCharacterString, 
isReference, isTemporal}
 
 import org.apache.calcite.avatica.util.ByteString
 import org.apache.commons.lang3.StringEscapeUtils
@@ -173,7 +173,7 @@ object GenerateUtils {
 
     // TODO: should we also consider other types?
     val parameters = operands.map(x =>
-      if (x.resultType.isInstanceOf[VarCharType]){
+      if (isCharacterString(x.resultType)){
         "( " + x.nullTerm + " ) ? null : (" + x.resultTerm + ")"
       } else {
         x.resultTerm
@@ -351,12 +351,12 @@ object GenerateUtils {
           literalValue.asInstanceOf[JBigDecimal], precision, scale)
         generateNonNullLiteral(literalType, fieldTerm, value)
 
-      case VARCHAR =>
+      case VARCHAR | CHAR =>
         val escapedValue = 
StringEscapeUtils.ESCAPE_JAVA.translate(literalValue.toString)
         val field = ctx.addReusableStringConstants(escapedValue)
         generateNonNullLiteral(literalType, field, 
BinaryString.fromString(escapedValue))
 
-      case VARBINARY =>
+      case VARBINARY | BINARY =>
         val bytesVal = literalValue.asInstanceOf[ByteString].getBytes
         val fieldTerm = ctx.addReusableObject(
           bytesVal, "binary", bytesVal.getClass.getCanonicalName)
@@ -658,7 +658,7 @@ object GenerateUtils {
       s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)"
     case _ if PlannerTypeUtils.isPrimitive(t) =>
       s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)"
-    case VARBINARY =>
+    case VARBINARY | BINARY =>
       val sortUtil = 
classOf[org.apache.flink.table.runtime.sort.SortUtil].getCanonicalName
       s"$sortUtil.compareBinary($leftTerm, $rightTerm)"
     case ARRAY =>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
index a58a1fe..58c5654 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
@@ -396,7 +396,7 @@ object AggCodeGenHelper {
     aggBufferExprs.zip(initAggBufferExprs).map {
       case (aggBufVar, initExpr) =>
         val resultCode = aggBufVar.resultType.getTypeRoot match {
-          case VARCHAR | ROW | ARRAY | MULTISET | MAP =>
+          case VARCHAR | CHAR | ROW | ARRAY | MULTISET | MAP =>
             val serializer = InternalSerializers.create(
               aggBufVar.resultType, new ExecutionConfig)
             val term = ctx.addReusableObject(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 9cc1e72..f06cbf1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -357,10 +357,8 @@ object FunctionGenerator {
     Seq(BIGINT),
     BuiltInMethods.HEX_LONG)
 
-  addSqlFunctionMethod(
-    HEX,
-    Seq(VARCHAR),
-    BuiltInMethods.HEX_STRING)
+  addSqlFunctionMethod(HEX, Seq(VARCHAR), BuiltInMethods.HEX_STRING)
+  addSqlFunctionMethod(HEX, Seq(CHAR), BuiltInMethods.HEX_STRING)
 
   // 
----------------------------------------------------------------------------------------------
   // Temporal functions
@@ -606,73 +604,6 @@ object FunctionGenerator {
     Seq(BIGINT, BIGINT),
     BuiltInMethods.BITXOR_LONG)
 
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, VARCHAR),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, BOOLEAN),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, INTEGER),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, SMALLINT),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, INTEGER),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, BIGINT),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, FLOAT),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, DOUBLE),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, DATE),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, TIME_WITHOUT_TIME_ZONE),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(VARCHAR, DECIMAL),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
-    Seq(
-      VARCHAR,
-      VARBINARY),
-    new PrintCallGen())
-
   addSqlFunctionMethod(
     NOW,
     Seq(),
@@ -699,16 +630,6 @@ object FunctionGenerator {
       TIMESTAMP_WITHOUT_TIME_ZONE),
     BuiltInMethods.DATEDIFF_T_T)
 
-  addSqlFunction(
-    IF,
-    Seq(BOOLEAN, VARCHAR, VARCHAR),
-    new IfCallGen())
-
-  addSqlFunction(
-    IF,
-    Seq(BOOLEAN, BOOLEAN, BOOLEAN),
-    new IfCallGen())
-
   // This sequence must be in sync with [[NumericOrDefaultReturnTypeInference]]
   val numericTypes = Seq(
     INTEGER,
@@ -751,6 +672,14 @@ object FunctionGenerator {
       VARBINARY),
     new IfCallGen())
 
+  addSqlFunction(
+    IF,
+    Seq(
+      BOOLEAN,
+      BINARY,
+      BINARY),
+    new IfCallGen())
+
   addSqlFunctionMethod(
     DIV_INT,
     Seq(INTEGER, INTEGER),
@@ -837,6 +766,11 @@ object FunctionGenerator {
 
   addSqlFunction(
     HASH_CODE,
+    Seq(BINARY),
+    new HashCodeCallGen())
+
+  addSqlFunction(
+    HASH_CODE,
     Seq(DECIMAL),
     new HashCodeCallGen())
 
@@ -864,37 +798,6 @@ object FunctionGenerator {
     Seq(TIMESTAMP_WITHOUT_TIME_ZONE),
     BuiltInMethods.TIMESTAMP_TO_BIGINT)
 
-  // Date/Time & BinaryString Converting -- start
-  addSqlFunctionMethod(
-    TO_DATE,
-    Seq(VARCHAR),
-    BuiltInMethod.STRING_TO_DATE.method)
-
-  addSqlFunctionMethod(
-    TO_DATE,
-    Seq(VARCHAR, VARCHAR),
-    BuiltInMethods.STRING_TO_DATE_WITH_FORMAT)
-
-  addSqlFunctionMethod(
-    TO_TIMESTAMP,
-    Seq(VARCHAR),
-    BuiltInMethods.STRING_TO_TIMESTAMP)
-
-  addSqlFunctionMethod(
-    TO_TIMESTAMP,
-    Seq(VARCHAR, VARCHAR),
-    BuiltInMethods.STRING_TO_TIMESTAMP_WITH_FORMAT)
-
-  addSqlFunctionMethod(
-    UNIX_TIMESTAMP,
-    Seq(VARCHAR),
-    BuiltInMethods.UNIX_TIMESTAMP_STR)
-
-  addSqlFunctionMethod(
-    UNIX_TIMESTAMP,
-    Seq(VARCHAR, VARCHAR),
-    BuiltInMethods.UNIX_TIMESTAMP_FORMAT)
-
   INTEGRAL_TYPES foreach (
     dt => addSqlFunctionMethod(
       FROM_UNIXTIME,
@@ -912,108 +815,20 @@ object FunctionGenerator {
     Seq(DECIMAL),
     BuiltInMethods.FROM_UNIXTIME_AS_DECIMAL)
 
-  addSqlFunctionMethod(
-    FROM_UNIXTIME,
-    Seq(BIGINT, VARCHAR),
-    BuiltInMethods.FROM_UNIXTIME_FORMAT)
-
-  addSqlFunctionMethod(
-    DATEDIFF,
-    Seq(TIMESTAMP_WITHOUT_TIME_ZONE, VARCHAR),
-    BuiltInMethods.DATEDIFF_T_S)
-
-  addSqlFunctionMethod(
-    DATEDIFF,
-    Seq(VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE),
-    BuiltInMethods.DATEDIFF_S_T)
-
-  addSqlFunctionMethod(
-    DATEDIFF,
-    Seq(VARCHAR, VARCHAR),
-    BuiltInMethods.DATEDIFF_S_S)
-
-  addSqlFunctionMethod(
-    DATE_FORMAT,
-    Seq(TIMESTAMP_WITHOUT_TIME_ZONE, VARCHAR),
-    BuiltInMethods.DATE_FORMAT_LONG_STRING)
-
-  addSqlFunctionMethod(
-    DATE_FORMAT,
-    Seq(VARCHAR, VARCHAR),
-    BuiltInMethods.DATE_FORMAT_STIRNG_STRING)
-
-  addSqlFunctionMethod(
-    DATE_FORMAT,
-    Seq(
-      VARCHAR,
-      VARCHAR,
-      VARCHAR),
-    BuiltInMethods.DATE_FORMAT_STRING_STRING_STRING)
-
-  addSqlFunctionMethod(
-    DATE_SUB,
-    Seq(VARCHAR, INTEGER),
-    BuiltInMethods.DATE_SUB_S)
-
-  addSqlFunctionMethod(
-    DATE_SUB,
-    Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER),
-    BuiltInMethods.DATE_SUB_T)
-
-  addSqlFunctionMethod(
-    DATE_ADD,
-    Seq(VARCHAR, INTEGER),
-    BuiltInMethods.DATE_ADD_S)
-
-  addSqlFunctionMethod(
-    DATE_ADD,
-    Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER),
-    BuiltInMethods.DATE_ADD_T)
-
-  addSqlFunctionMethod(
-    TO_TIMESTAMP_TZ,
-    Seq(
-      VARCHAR,
-      VARCHAR),
-    BuiltInMethods.STRING_TO_TIMESTAMP_TZ)
+  addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), 
BuiltInMethods.FROM_UNIXTIME_FORMAT)
+  addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), 
BuiltInMethods.FROM_UNIXTIME_FORMAT)
 
-  addSqlFunctionMethod(
-    TO_TIMESTAMP_TZ,
-    Seq(
-      VARCHAR,
-      VARCHAR,
-      VARCHAR),
-    BuiltInMethods.STRING_TO_TIMESTAMP_FORMAT_TZ)
+  addSqlFunctionMethod(DATE_SUB, Seq(VARCHAR, INTEGER), 
BuiltInMethods.DATE_SUB_S)
+  addSqlFunctionMethod(DATE_SUB, Seq(CHAR, INTEGER), BuiltInMethods.DATE_SUB_S)
 
   addSqlFunctionMethod(
-    DATE_FORMAT_TZ,
-    Seq(TIMESTAMP_WITHOUT_TIME_ZONE, VARCHAR),
-    BuiltInMethods.DATE_FORMAT_LONG_ZONE)
+    DATE_SUB, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), 
BuiltInMethods.DATE_SUB_T)
 
-  addSqlFunctionMethod(
-    DATE_FORMAT_TZ,
-    Seq(
-      TIMESTAMP_WITHOUT_TIME_ZONE,
-      VARCHAR,
-      VARCHAR),
-    BuiltInMethods.DATE_FORMAT_LONG_STRING_ZONE)
+  addSqlFunctionMethod(DATE_ADD, Seq(VARCHAR, INTEGER), 
BuiltInMethods.DATE_ADD_S)
+  addSqlFunctionMethod(DATE_ADD, Seq(CHAR, INTEGER), BuiltInMethods.DATE_ADD_S)
 
   addSqlFunctionMethod(
-    CONVERT_TZ,
-    Seq(
-      VARCHAR,
-      VARCHAR,
-      VARCHAR),
-    BuiltInMethods.CONVERT_TZ)
-
-  addSqlFunctionMethod(
-    CONVERT_TZ,
-    Seq(
-      VARCHAR,
-      VARCHAR,
-      VARCHAR,
-      VARCHAR),
-    BuiltInMethods.CONVERT_FORMAT_TZ)
+    DATE_ADD, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), 
BuiltInMethods.DATE_ADD_T)
 
   // 
----------------------------------------------------------------------------------------------
 
@@ -1071,18 +886,14 @@ object FunctionGenerator {
   private def addSqlFunctionMethod(
     sqlOperator: SqlOperator,
     operandTypes: Seq[LogicalTypeRoot],
-    method: Method)
-  : Unit = {
+    method: Method): Unit = {
     sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGen(method)
   }
 
   private def addSqlFunction(
     sqlOperator: SqlOperator,
     operandTypes: Seq[LogicalTypeRoot],
-    callGenerator: CallGenerator)
-  : Unit = {
+    callGenerator: CallGenerator): Unit = {
     sqlFunctions((sqlOperator, operandTypes)) = callGenerator
   }
-
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala
index 843b976..c6491a8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.codegen.calls
 
 import org.apache.flink.table.codegen.CodeGenUtils.{newNames, 
primitiveTypeTermForType}
 import org.apache.flink.table.codegen.{CodeGeneratorContext, 
GeneratedExpression}
-import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
+import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.typeutils.TypeCheckUtils.isBinaryString
 
 /**
   * Generates PRINT function call.
@@ -39,7 +40,7 @@ class PrintCallGen extends CallGenerator {
     val logTerm = "logger$"
     ctx.addReusableLogger(logTerm, "_Print$_")
 
-    val outputCode = if (returnType.getTypeRoot == LogicalTypeRoot.VARBINARY) {
+    val outputCode = if (isBinaryString(returnType)) {
       s"new String($resultTerm, java.nio.charset.Charset.defaultCharset())"
     } else {
       s"String.valueOf(${operands(1).resultTerm})"
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
index 14fae9b..52532fd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
@@ -325,7 +325,7 @@ object ScalarOperatorGens {
       right: GeneratedExpression)
     : GeneratedExpression = {
     val canEqual = isInteroperable(left.resultType, right.resultType)
-    if (left.resultType.getTypeRoot == VARCHAR && right.resultType.getTypeRoot 
== VARCHAR) {
+    if (isCharacterString(left.resultType) && 
isCharacterString(right.resultType)) {
       generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
         (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
       }
@@ -348,14 +348,14 @@ object ScalarOperatorGens {
     }
     // support date/time/timestamp equalTo string.
     // for performance, we cast literal string to literal time.
-    else if (isTimePoint(left.resultType) && right.resultType.getTypeRoot == 
VARCHAR) {
+    else if (isTimePoint(left.resultType) && 
isCharacterString(right.resultType)) {
       if (right.literal) {
         generateEquals(ctx, left, generateCastStringLiteralToDateTime(ctx, 
right, left.resultType))
       } else {
         generateEquals(ctx, left, generateCast(ctx, right, left.resultType))
       }
     }
-    else if (isTimePoint(right.resultType) && left.resultType.getTypeRoot == 
VARCHAR) {
+    else if (isTimePoint(right.resultType) && 
isCharacterString(left.resultType)) {
       if (left.literal) {
         generateEquals(
           ctx,
@@ -368,10 +368,10 @@ object ScalarOperatorGens {
     // non comparable types
     else {
       generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
-        if (isReference(left)) {
+        if (isReference(left.resultType)) {
           (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
         }
-        else if (isReference(right)) {
+        else if (isReference(right.resultType)) {
           (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
         }
         else {
@@ -387,7 +387,7 @@ object ScalarOperatorGens {
       left: GeneratedExpression,
       right: GeneratedExpression)
     : GeneratedExpression = {
-    if (left.resultType.isInstanceOf[VarCharType] && 
right.resultType.isInstanceOf[VarCharType]) {
+    if (isCharacterString(left.resultType) && 
isCharacterString(right.resultType)) {
       generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
         (leftTerm, rightTerm) => s"!$leftTerm.equals($rightTerm)"
       }
@@ -421,10 +421,10 @@ object ScalarOperatorGens {
     // non-comparable types
     else {
       generateOperatorIfNotNull(ctx, new BooleanType(), left, right) {
-        if (isReference(left)) {
+        if (isReference(left.resultType)) {
           (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
         }
-        else if (isReference(right)) {
+        else if (isReference(right.resultType)) {
           (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
         }
         else {
@@ -472,7 +472,7 @@ object ScalarOperatorGens {
         }
       }
       // both sides are binary type
-      else if (isBinary(left.resultType) &&
+      else if (isBinaryString(left.resultType) &&
           isInteroperable(left.resultType, right.resultType)) {
         (leftTerm, rightTerm) =>
           s"java.util.Arrays.equals($leftTerm, $rightTerm)"
@@ -497,7 +497,7 @@ object ScalarOperatorGens {
     if (ctx.nullCheck) {
       GeneratedExpression(operand.nullTerm, NEVER_NULL, operand.code, new 
BooleanType())
     }
-    else if (!ctx.nullCheck && isReference(operand)) {
+    else if (!ctx.nullCheck && isReference(operand.resultType)) {
       val resultTerm = newName("isNull")
       val operatorCode =
         s"""
@@ -523,7 +523,7 @@ object ScalarOperatorGens {
            |""".stripMargin.trim
       GeneratedExpression(resultTerm, NEVER_NULL, operatorCode, new 
BooleanType())
     }
-    else if (!ctx.nullCheck && isReference(operand)) {
+    else if (!ctx.nullCheck && isReference(operand.resultType)) {
       val resultTerm = newName("result")
       val operatorCode =
         s"""
@@ -772,7 +772,7 @@ object ScalarOperatorGens {
       operand.copy(resultType = targetType)
 
     // Date/Time/Timestamp -> String
-    case (_, VARCHAR) if TypeCheckUtils.isTimePoint(operand.resultType) =>
+    case (_, VARCHAR | CHAR) if TypeCheckUtils.isTimePoint(operand.resultType) 
=>
       generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) {
         operandTerm =>
           val zoneTerm = ctx.addReusableTimeZone()
@@ -780,7 +780,7 @@ object ScalarOperatorGens {
       }
 
     // Interval Months -> String
-    case (INTERVAL_YEAR_MONTH, VARCHAR) =>
+    case (INTERVAL_YEAR_MONTH, VARCHAR | CHAR) =>
       val method = 
qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method)
       val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH)
       generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) {
@@ -788,7 +788,7 @@ object ScalarOperatorGens {
       }
 
     // Interval Millis -> String
-    case (INTERVAL_DAY_TIME, VARCHAR) =>
+    case (INTERVAL_DAY_TIME, VARCHAR | CHAR) =>
       val method = 
qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method)
       val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND)
       generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) {
@@ -796,11 +796,11 @@ object ScalarOperatorGens {
       }
 
     // Array -> String
-    case (ARRAY, VARCHAR) =>
+    case (ARRAY, VARCHAR | CHAR) =>
       generateCastArrayToString(ctx, operand, 
operand.resultType.asInstanceOf[ArrayType])
 
     // Byte array -> String UTF-8
-    case (VARBINARY, VARCHAR) =>
+    case (VARBINARY, VARCHAR | CHAR) =>
       val charset = classOf[StandardCharsets].getCanonicalName
       generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) {
         terms => s"(new String(${terms.head}, $charset.UTF_8))"
@@ -808,14 +808,14 @@ object ScalarOperatorGens {
 
 
     // Map -> String
-    case (MAP, VARCHAR) =>
+    case (MAP, VARCHAR | CHAR) =>
       generateCastMapToString(ctx, operand, 
operand.resultType.asInstanceOf[MapType])
 
     // composite type -> String
-    case (ROW, VARCHAR) =>
+    case (ROW, VARCHAR | CHAR) =>
       generateCastBaseRowToString(ctx, operand, 
operand.resultType.asInstanceOf[RowType])
 
-    case (ANY, VARCHAR) =>
+    case (ANY, VARCHAR | CHAR) =>
       generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) {
         terms =>
           val converter = DataFormatConverters.getConverterForDataType(
@@ -826,13 +826,13 @@ object ScalarOperatorGens {
 
     // * (not Date/Time/Timestamp) -> String
     // TODO: GenericType with Date/Time/Timestamp -> String would call 
toString implicitly
-    case (_, VARCHAR) =>
+    case (_, VARCHAR | CHAR) =>
       generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) {
         terms => s""" "" + ${terms.head}"""
       }
 
     // String -> Boolean
-    case (VARCHAR, BOOLEAN) =>
+    case (VARCHAR | CHAR, BOOLEAN) =>
       generateUnaryOperatorIfNotNull(
         ctx,
         targetType,
@@ -842,7 +842,7 @@ object ScalarOperatorGens {
       }
 
     // String -> NUMERIC TYPE (not Character)
-    case (VARCHAR, _)
+    case (VARCHAR | CHAR, _)
       if TypeCheckUtils.isNumeric(targetType) =>
       targetType match {
         case dt: DecimalType =>
@@ -870,7 +870,7 @@ object ScalarOperatorGens {
       }
 
     // String -> Date
-    case (VARCHAR, DATE) =>
+    case (VARCHAR | CHAR, DATE) =>
       generateUnaryOperatorIfNotNull(
         ctx,
         targetType,
@@ -881,7 +881,7 @@ object ScalarOperatorGens {
       }
 
     // String -> Time
-    case (VARCHAR, TIME_WITHOUT_TIME_ZONE) =>
+    case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) =>
       generateUnaryOperatorIfNotNull(
         ctx, 
         targetType,
@@ -892,7 +892,7 @@ object ScalarOperatorGens {
       }
 
     // String -> Timestamp
-    case (VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE) =>
+    case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) =>
       generateUnaryOperatorIfNotNull(
         ctx, 
         targetType,
@@ -905,7 +905,7 @@ object ScalarOperatorGens {
       }
 
     // String -> binary
-    case (VARCHAR, VARBINARY) =>
+    case (VARCHAR | CHAR, VARBINARY | BINARY) =>
       generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
         operandTerm => s"$operandTerm.getBytes()"
       }
@@ -1147,7 +1147,7 @@ object ScalarOperatorGens {
     }
 
     checkArgument(operands(1).literal)
-    checkArgument(operands(1).resultType.isInstanceOf[VarCharType])
+    checkArgument(isCharacterString(operands(1).resultType))
     checkArgument(operands.head.resultType.isInstanceOf[RowType])
 
     val fieldName = operands(1).literalValue.get.toString
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BinaryStringCallGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/StringCallGen.scala
similarity index 83%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BinaryStringCallGen.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/StringCallGen.scala
index 6761f3a..0d73ff1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BinaryStringCallGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/StringCallGen.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.flink.api.common.typeinfo.Types
-import org.apache.flink.api.java.typeutils.MapTypeInfo
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.codegen.CodeGenUtils._
 import 
org.apache.flink.table.codegen.GenerateUtils.{generateCallIfArgsNotNull, 
generateCallIfArgsNullable, generateStringResultCallIfArgsNotNull}
@@ -29,10 +27,14 @@ import 
org.apache.flink.table.dataformat.DataFormatConverters
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable._
 import org.apache.flink.table.runtime.functions.SqlFunctionUtils
 import org.apache.flink.table.types.logical.{BooleanType, IntType, 
LogicalType, MapType, VarBinaryType, VarCharType}
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isCharacterString, 
isTimestamp}
 
 import org.apache.calcite.runtime.SqlFunctions
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, 
TRAILING}
+import org.apache.calcite.util.BuiltInMethod
+
+import java.lang.reflect.Method
 
 /**
   * Code generator for call with string parameters or return value.
@@ -42,13 +44,18 @@ import 
org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING}
   * <p>TODO Need to rewrite most of the methods here, calculated directly on 
the BinaryString
   * instead of convert BinaryString to String.
   */
-object BinaryStringCallGen {
+object StringCallGen {
 
   def generateCallExpression(
       ctx: CodeGeneratorContext,
       operator: SqlOperator,
       operands: Seq[GeneratedExpression],
       returnType: LogicalType): Option[GeneratedExpression] = {
+
+    def methodGen(method: Method): GeneratedExpression = {
+      new MethodCallGen(method).generate(ctx, operands, returnType)
+    }
+
     val generator = operator match {
 
       case LIKE =>
@@ -105,7 +112,7 @@ object BinaryStringCallGen {
 
       case KEYVALUE => generateKeyValue(ctx, operands)
 
-      case HASH_CODE if operands.head.resultType.isInstanceOf[VarCharType] =>
+      case HASH_CODE if isCharacterString(operands.head.resultType) =>
         generateHashCode(ctx, operands)
 
       case MD5 => generateMd5(ctx, operands)
@@ -137,11 +144,11 @@ object BinaryStringCallGen {
       case BIN => generateBin(ctx, operands)
 
       case CONCAT_FUNCTION =>
-        operands.foreach(requireString)
+        operands.foreach(requireCharacterString)
         generateConcat(ctx, operands)
 
       case CONCAT_WS =>
-        operands.foreach(requireString)
+        operands.foreach(requireCharacterString)
         generateConcatWs(ctx, operands)
 
       case STR_TO_MAP => generateStrToMap(ctx, operands)
@@ -155,7 +162,7 @@ object BinaryStringCallGen {
       case CONCAT =>
         val left = operands.head
         val right = operands(1)
-        requireString(left)
+        requireCharacterString(left)
         generateArithmeticConcat(ctx, left, right)
 
       case UUID => generateUuid(ctx, operands)
@@ -168,6 +175,101 @@ object BinaryStringCallGen {
 
       case INSTR => generateInstr(ctx, operands)
 
+      case PRINT => new PrintCallGen().generate(ctx, operands, returnType)
+
+      case IF =>
+        requireBoolean(operands.head)
+        new IfCallGen().generate(ctx, operands, returnType)
+
+      // Date/Time & BinaryString Converting -- start
+
+      case TO_DATE if operands.size == 1 && 
isCharacterString(operands.head.resultType) =>
+        methodGen(BuiltInMethod.STRING_TO_DATE.method)
+
+      case TO_DATE if operands.size == 2 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.STRING_TO_DATE_WITH_FORMAT)
+
+      case TO_TIMESTAMP if operands.size == 1 && 
isCharacterString(operands.head.resultType) =>
+        methodGen(BuiltInMethods.STRING_TO_TIMESTAMP)
+
+      case TO_TIMESTAMP if operands.size == 2 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_WITH_FORMAT)
+
+      case UNIX_TIMESTAMP if operands.size == 1 && 
isCharacterString(operands.head.resultType) =>
+        methodGen(BuiltInMethods.UNIX_TIMESTAMP_STR)
+
+      case UNIX_TIMESTAMP if operands.size == 2 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.UNIX_TIMESTAMP_FORMAT)
+
+      case DATEDIFF if isTimestamp(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.DATEDIFF_T_S)
+
+      case DATEDIFF if isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.DATEDIFF_S_S)
+
+      case DATEDIFF if isCharacterString(operands.head.resultType) &&
+          isTimestamp(operands(1).resultType) =>
+        methodGen(BuiltInMethods.DATEDIFF_S_T)
+
+      case DATE_FORMAT if operands.size == 2 &&
+          isTimestamp(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.DATE_FORMAT_LONG_STRING)
+
+      case DATE_FORMAT if operands.size == 2 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.DATE_FORMAT_STIRNG_STRING)
+
+      case DATE_FORMAT if operands.size == 3 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) &&
+          isCharacterString(operands(2).resultType) =>
+        methodGen(BuiltInMethods.DATE_FORMAT_STRING_STRING_STRING)
+
+      case TO_TIMESTAMP_TZ if operands.size == 2 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_TZ)
+
+      case TO_TIMESTAMP_TZ if operands.size == 3 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) &&
+          isCharacterString(operands(2).resultType) =>
+        methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_FORMAT_TZ)
+
+      case DATE_FORMAT_TZ if operands.size == 2 &&
+          isTimestamp(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) =>
+        methodGen(BuiltInMethods.DATE_FORMAT_LONG_ZONE)
+
+      case DATE_FORMAT_TZ if operands.size == 3 &&
+          isTimestamp(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) &&
+          isCharacterString(operands(2).resultType) =>
+        methodGen(BuiltInMethods.DATE_FORMAT_LONG_STRING_ZONE)
+
+      case CONVERT_TZ if operands.size == 3 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) &&
+          isCharacterString(operands(2).resultType) =>
+        methodGen(BuiltInMethods.CONVERT_TZ)
+
+      case CONVERT_TZ if operands.size == 4 &&
+          isCharacterString(operands.head.resultType) &&
+          isCharacterString(operands(1).resultType) &&
+          isCharacterString(operands(2).resultType) &&
+          isCharacterString(operands(3).resultType) =>
+        methodGen(BuiltInMethods.CONVERT_FORMAT_TZ)
+
       case _ => null
     }
 
@@ -176,7 +278,7 @@ object BinaryStringCallGen {
 
   private def toStringTerms(terms: Seq[String], operands: 
Seq[GeneratedExpression]) = {
     terms.zipWithIndex.map { case (term, index) =>
-      if (operands(index).resultType.isInstanceOf[VarCharType]) {
+      if (isCharacterString(operands(index).resultType)) {
         s"$term.toString()"
       } else {
         term
@@ -186,7 +288,7 @@ object BinaryStringCallGen {
 
   private def safeToStringTerms(terms: Seq[String], operands: 
Seq[GeneratedExpression]) = {
     terms.zipWithIndex.map { case (term, index) =>
-      if (operands(index).resultType.isInstanceOf[VarCharType]) {
+      if (isCharacterString(operands(index).resultType)) {
         s"$BINARY_STRING.safeToString($term)"
       } else {
         term
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala
index e456171..99c52f1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala
@@ -410,8 +410,8 @@ class SortCodeGenerator(
     case FLOAT => "Float"
     case DOUBLE => "Double"
     case BOOLEAN => "Boolean"
-    case VARCHAR => "String"
-    case VARBINARY => "Binary"
+    case VARCHAR | CHAR => "String"
+    case VARBINARY | BINARY => "Binary"
     case DECIMAL => "Decimal"
     case DATE => "Int"
     case TIME_WITHOUT_TIME_ZONE => "Int"
@@ -436,7 +436,7 @@ class SortCodeGenerator(
   def supportNormalizedKey(t: LogicalType): Boolean = {
     t.getTypeRoot match {
       case _ if PlannerTypeUtils.isPrimitive(t) => true
-      case VARCHAR | VARBINARY |
+      case VARCHAR | CHAR | VARBINARY | BINARY |
            DATE | TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITHOUT_TIME_ZONE => true
       case DECIMAL => 
Decimal.isCompact(t.asInstanceOf[DecimalType].getPrecision)
       case _ => false
@@ -458,7 +458,7 @@ class SortCodeGenerator(
       case DATE => 4
       case TIME_WITHOUT_TIME_ZONE => 4
       case DECIMAL if 
Decimal.isCompact(t.asInstanceOf[DecimalType].getPrecision) => 8
-      case VARCHAR | VARBINARY => Int.MaxValue
+      case VARCHAR | CHAR | VARBINARY | BINARY => Int.MaxValue
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala
index 84b84c2..611d59c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala
@@ -284,7 +284,7 @@ class AggFunctionFactory(
           new DoubleMinWithRetractAggFunction
         case BOOLEAN =>
           new BooleanMinWithRetractAggFunction
-        case VARCHAR =>
+        case VARCHAR | CHAR =>
           new StringMinWithRetractAggFunction
         case DECIMAL =>
           val d = argTypes(0).asInstanceOf[DecimalType]
@@ -315,7 +315,7 @@ class AggFunctionFactory(
           new MinAggFunction.DoubleMinAggFunction
         case BOOLEAN =>
           new MinAggFunction.BooleanMinAggFunction
-        case VARCHAR =>
+        case VARCHAR | CHAR =>
           new MinAggFunction.StringMinAggFunction
         case DATE =>
           new MinAggFunction.DateMinAggFunction
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index a80c0a7..c325eba 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -501,7 +501,7 @@ object AggregateUtil extends Enumeration {
       case INTERVAL_YEAR_MONTH => DataTypes.INT
       case INTERVAL_DAY_TIME => DataTypes.BIGINT
 
-      case VARCHAR => fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE)
+      case VARCHAR | CHAR => 
fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE)
       case DECIMAL =>
         val dt = argTypes(0).asInstanceOf[DecimalType]
         DataTypes.DECIMAL(dt.getPrecision, dt.getScale)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
deleted file mode 100644
index e5ff5bb..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils
-
-import org.apache.flink.table.codegen.GeneratedExpression
-import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.types.logical._
-
-object TypeCheckUtils {
-
-  def isNumeric(dataType: LogicalType): Boolean =
-    dataType.getTypeRoot.getFamilies.contains(LogicalTypeFamily.NUMERIC)
-
-  def isTemporal(dataType: LogicalType): Boolean =
-    isTimePoint(dataType) || isTimeInterval(dataType)
-
-  def isTimePoint(dataType: LogicalType): Boolean = dataType.getTypeRoot match 
{
-    case TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE => true
-    case _ => false
-  }
-
-  def isRowTime(dataType: LogicalType): Boolean =
-    dataType match {
-      case t: TimestampType => t.getKind == TimestampKind.ROWTIME
-      case _ => false
-    }
-
-  def isProcTime(dataType: LogicalType): Boolean =
-    dataType match {
-      case t: TimestampType => t.getKind == TimestampKind.PROCTIME
-      case _ => false
-    }
-
-  def isTimeInterval(dataType: LogicalType): Boolean = dataType.getTypeRoot 
match {
-    case INTERVAL_YEAR_MONTH | INTERVAL_DAY_TIME => true
-    case _ => false
-  }
-
-  def isVarchar(dataType: LogicalType): Boolean = dataType.getTypeRoot == 
VARCHAR
-
-  def isBinary(dataType: LogicalType): Boolean = dataType.getTypeRoot == BINARY
-
-  def isBoolean(dataType: LogicalType): Boolean = dataType.getTypeRoot == 
BOOLEAN
-
-  def isDecimal(dataType: LogicalType): Boolean = dataType.getTypeRoot == 
DECIMAL
-
-  def isInteger(dataType: LogicalType): Boolean = dataType.getTypeRoot == 
INTEGER
-
-  def isLong(dataType: LogicalType): Boolean = dataType.getTypeRoot == BIGINT
-
-  def isArray(dataType: LogicalType): Boolean = dataType.getTypeRoot == ARRAY
-
-  def isMap(dataType: LogicalType): Boolean = dataType.getTypeRoot == MAP
-
-  def isAny(dataType: LogicalType): Boolean = dataType.getTypeRoot == ANY
-
-  def isRow(dataType: LogicalType): Boolean = dataType.getTypeRoot == ROW
-
-  def isComparable(dataType: LogicalType): Boolean =
-    !isAny(dataType) && !isMap(dataType) && !isRow(dataType) && 
!isArray(dataType)
-
-  def isMutable(dataType: LogicalType): Boolean = dataType.getTypeRoot match {
-    // the internal representation of String is BinaryString which is mutable
-    case VARCHAR => true
-    case ARRAY | MULTISET | MAP | ROW | ANY => true
-    case _ => false
-  }
-
-  def isReference(t: LogicalType): Boolean = t.getTypeRoot match {
-    case INTEGER |
-         TINYINT |
-         SMALLINT |
-         BIGINT |
-         FLOAT |
-         DOUBLE |
-         BOOLEAN |
-         DATE |
-         TIME_WITHOUT_TIME_ZONE |
-         TIMESTAMP_WITHOUT_TIME_ZONE => false
-    case _ => true
-  }
-
-  def isReference(genExpr: GeneratedExpression): Boolean = 
isReference(genExpr.resultType)
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
index 3348191..c9e7faa 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.typeutils.TypeCheckUtils._
   */
 object TypeCoercion {
 
-  val numericWideningPrecedence: IndexedSeq[LogicalType] =
+  var numericWideningPrecedence: IndexedSeq[LogicalType] =
     IndexedSeq(
       new TinyIntType(),
       new SmallIntType(),
@@ -36,12 +36,14 @@ object TypeCoercion {
       new FloatType(),
       new DoubleType())
 
+  numericWideningPrecedence ++= numericWideningPrecedence.map(_.copy(false))
+
   def widerTypeOf(tp1: LogicalType, tp2: LogicalType): Option[LogicalType] = {
     (tp1.getTypeRoot, tp2.getTypeRoot) match {
       case (_, _) if tp1 == tp2 => Some(tp1)
 
-      case (_, VARCHAR) => Some(tp2)
-      case (VARCHAR, _) => Some(tp1)
+      case (_, VARCHAR | CHAR) => Some(tp2)
+      case (VARCHAR | CHAR, _) => Some(tp1)
 
       case (_, DECIMAL) => Some(tp2)
       case (DECIMAL, _) => Some(tp1)
@@ -63,7 +65,7 @@ object TypeCoercion {
     */
   def canSafelyCast(
       from: LogicalType, to: LogicalType): Boolean = (from.getTypeRoot, 
to.getTypeRoot) match {
-    case (_, VARCHAR) => true
+    case (_, VARCHAR | CHAR) => true
 
     case (_, DECIMAL) if isNumeric(from) => true
 
@@ -89,14 +91,14 @@ object TypeCoercion {
       from: LogicalType, to: LogicalType): Boolean = (from.getTypeRoot, 
to.getTypeRoot) match {
     case (_, _) if from == to => true
 
-    case (_, VARCHAR) => true
+    case (_, VARCHAR | CHAR) => true
 
-    case (VARCHAR, _) if isNumeric(to) => true
-    case (VARCHAR, BOOLEAN) => true
-    case (VARCHAR, DECIMAL) => true
-    case (VARCHAR, DATE) => true
-    case (VARCHAR, TIME_WITHOUT_TIME_ZONE) => true
-    case (VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => true
+    case (VARCHAR | CHAR, _) if isNumeric(to) => true
+    case (VARCHAR | CHAR, BOOLEAN) => true
+    case (VARCHAR | CHAR, DECIMAL) => true
+    case (VARCHAR | CHAR, DATE) => true
+    case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) => true
+    case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => true
 
     case (BOOLEAN, _) if isNumeric(to) => true
     case (BOOLEAN, DECIMAL) => true
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index 728c79a..16fef15 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -100,6 +100,7 @@ public interface BinaryWriter {
                        case DOUBLE:
                                writer.writeDouble(pos, (double) o);
                                break;
+                       case CHAR:
                        case VARCHAR:
                                writer.writeString(pos, (BinaryString) o);
                                break;
@@ -121,6 +122,7 @@ public interface BinaryWriter {
                        case ANY:
                                writer.writeGeneric(pos, (BinaryGeneric) o);
                                break;
+                       case BINARY:
                        case VARBINARY:
                                writer.writeBinary(pos, (byte[]) o);
                                break;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index d94bdf5..35f9b70 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -136,6 +136,7 @@ public class DataFormatConverters {
                Class<?> clazz = dataType.getConversionClass();
                LogicalType logicalType = dataType.getLogicalType();
                switch (logicalType.getTypeRoot()) {
+                       case CHAR:
                        case VARCHAR:
                                if (clazz == String.class) {
                                        return StringConverter.INSTANCE;
@@ -144,6 +145,7 @@ public class DataFormatConverters {
                                } else {
                                        throw new RuntimeException("Not support 
class for VARCHAR: " + clazz);
                                }
+                       case BINARY:
                        case VARBINARY:
                                return PrimitiveByteArrayConverter.INSTANCE;
                        case DECIMAL:
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
index a94111c..92217ed 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -181,6 +181,7 @@ public interface TypeGetterSetters {
                                return row.getFloat(ordinal);
                        case DOUBLE:
                                return row.getDouble(ordinal);
+                       case CHAR:
                        case VARCHAR:
                                return row.getString(ordinal);
                        case DECIMAL:
@@ -193,6 +194,7 @@ public interface TypeGetterSetters {
                                return row.getMap(ordinal);
                        case ROW:
                                return row.getRow(ordinal, ((RowType) 
type).getFieldCount());
+                       case BINARY:
                        case VARBINARY:
                                return row.getBinary(ordinal);
                        case ANY:
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
index 2af5be5..b857a34 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
@@ -122,7 +122,9 @@ public abstract class AbstractHeapVector extends 
AbstractColumnVector {
                                }
                        case SMALLINT:
                                return new HeapShortVector(maxRows);
+                       case CHAR:
                        case VARCHAR:
+                       case BINARY:
                        case VARBINARY:
                                return new HeapBytesVector(maxRows);
                        default:
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
index 9b8cf36..9050435 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
@@ -79,6 +79,7 @@ public class ClassLogicalTypeConverter {
                                return Float.class;
                        case DOUBLE:
                                return Double.class;
+                       case CHAR:
                        case VARCHAR:
                                return String.class;
                        case DECIMAL:
@@ -116,6 +117,7 @@ public class ClassLogicalTypeConverter {
                                return Map.class;
                        case ROW:
                                return Row.class;
+                       case BINARY:
                        case VARBINARY:
                                return byte[].class;
                        case ANY:
@@ -152,6 +154,7 @@ public class ClassLogicalTypeConverter {
                                return Float.class;
                        case DOUBLE:
                                return Double.class;
+                       case CHAR:
                        case VARCHAR:
                                return BinaryString.class;
                        case DECIMAL:
@@ -163,6 +166,7 @@ public class ClassLogicalTypeConverter {
                                return BinaryMap.class;
                        case ROW:
                                return BaseRow.class;
+                       case BINARY:
                        case VARBINARY:
                                return byte[].class;
                        case ANY:
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
index 16840b3..c871038 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
@@ -65,6 +65,7 @@ public class InternalSerializers {
                                return FloatSerializer.INSTANCE;
                        case DOUBLE:
                                return DoubleSerializer.INSTANCE;
+                       case CHAR:
                        case VARCHAR:
                                return BinaryStringSerializer.INSTANCE;
                        case DECIMAL:
@@ -78,6 +79,7 @@ public class InternalSerializers {
                        case ROW:
                                RowType rowType = (RowType) type;
                                return new BaseRowSerializer(config, rowType);
+                       case BINARY:
                        case VARBINARY:
                                return BytePrimitiveArraySerializer.INSTANCE;
                        case ANY:
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java
index 92f1d42..08622aa 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java
@@ -24,6 +24,9 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.util.List;
 
+import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.BINARY_STRING;
+import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
+
 /**
  * Utilities for {@link LogicalType} and {@link DataType}..
  */
@@ -74,15 +77,19 @@ public class PlannerTypeUtils {
         * 2.Join keys.
         */
        public static boolean isInteroperable(LogicalType t1, LogicalType t2) {
+               if (t1.getTypeRoot().getFamilies().contains(CHARACTER_STRING) &&
+                               
t2.getTypeRoot().getFamilies().contains(CHARACTER_STRING)) {
+                       return true;
+               }
+               if (t1.getTypeRoot().getFamilies().contains(BINARY_STRING) &&
+                               
t2.getTypeRoot().getFamilies().contains(BINARY_STRING)) {
+                       return true;
+               }
                if (t1.getTypeRoot() != t2.getTypeRoot()) {
                        return false;
                }
 
                switch (t1.getTypeRoot()) {
-                       // VARCHAR VARBINARY ignore length.
-                       case VARCHAR:
-                       case VARBINARY:
-                               return true;
                        case ARRAY:
                        case MAP:
                        case MULTISET:
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
index 5d984d2..d233083 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
@@ -70,10 +70,12 @@ public class TypeInfoDataTypeConverter {
                                return clazz == Decimal.class ?
                                                new 
DecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()) :
                                                new 
BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
+                       case CHAR:
                        case VARCHAR: // ignore precision
                                return clazz == BinaryString.class ?
                                                BinaryStringTypeInfo.INSTANCE :
                                                BasicTypeInfo.STRING_TYPE_INFO;
+                       case BINARY:
                        case VARBINARY: // ignore precision
                                return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
                        case ARRAY:
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/TypeCheckUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/TypeCheckUtils.java
new file mode 100644
index 0000000..3c93838
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/TypeCheckUtils.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ANY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ARRAY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BOOLEAN;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+
+/**
+ * Utils for type.
+ */
+public class TypeCheckUtils {
+
+       public static boolean isNumeric(LogicalType type) {
+               return 
type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.NUMERIC);
+       }
+
+       public static boolean isTemporal(LogicalType type) {
+               return isTimePoint(type) || isTimeInterval(type);
+       }
+
+       public static boolean isTimePoint(LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return true;
+                       default:
+                               return false;
+               }
+       }
+
+       public static boolean isRowTime(LogicalType type) {
+               return type instanceof TimestampType && ((TimestampType) 
type).getKind() == TimestampKind.ROWTIME;
+       }
+
+       public static boolean isProcTime(LogicalType type) {
+               return type instanceof TimestampType && ((TimestampType) 
type).getKind() == TimestampKind.PROCTIME;
+       }
+
+       public static boolean isTimeInterval(LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case INTERVAL_DAY_TIME:
+                       case INTERVAL_YEAR_MONTH:
+                               return true;
+                       default:
+                               return false;
+               }
+       }
+
+       public static boolean isCharacterString(LogicalType type) {
+               return 
type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING);
+       }
+
+       public static boolean isBinaryString(LogicalType type) {
+               return 
type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.BINARY_STRING);
+       }
+
+       public static boolean isTimestamp(LogicalType type) {
+               return type.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE;
+       }
+
+       public static boolean isBoolean(LogicalType type) {
+               return type.getTypeRoot() == BOOLEAN;
+       }
+
+       public static boolean isDecimal(LogicalType type) {
+               return type.getTypeRoot() == DECIMAL;
+       }
+
+       public static boolean isInteger(LogicalType type) {
+               return type.getTypeRoot() == INTEGER;
+       }
+
+       public static boolean isLong(LogicalType type) {
+               return type.getTypeRoot() == BIGINT;
+       }
+
+       public static boolean isArray(LogicalType type) {
+               return type.getTypeRoot() == ARRAY;
+       }
+
+       public static boolean isMap(LogicalType type) {
+               return type.getTypeRoot() == MAP;
+       }
+
+       public static boolean isAny(LogicalType type) {
+               return type.getTypeRoot() == ANY;
+       }
+
+       public static boolean isRow(LogicalType type) {
+               return type.getTypeRoot() == ROW;
+       }
+
+       public static boolean isComparable(LogicalType type) {
+               return !isAny(type) && !isMap(type) && !isRow(type) && 
!isArray(type);
+       }
+
+       public static boolean isMutable(LogicalType type) {
+               // the internal representation of String is BinaryString which 
is mutable
+               switch (type.getTypeRoot()) {
+                       case VARCHAR:
+                       case CHAR:
+                       case ARRAY:
+                       case MULTISET:
+                       case MAP:
+                       case ROW:
+                       case ANY:
+                               return true;
+                       default:
+                               return false;
+               }
+       }
+
+       public static boolean isReference(LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case BOOLEAN:
+                       case TINYINT:
+                       case SMALLINT:
+                       case INTEGER:
+                       case BIGINT:
+                       case FLOAT:
+                       case DOUBLE:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                       case INTERVAL_YEAR_MONTH:
+                       case INTERVAL_DAY_TIME:
+                               return false;
+                       default:
+                               return true;
+               }
+       }
+}

Reply via email to