This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e20a9f8  [FLINK-11881][table-planner-blink] Introduce code generated 
typed sorter to blink table (#7958)
e20a9f8 is described below

commit e20a9f8947244315f7732c719ebf8f77e7699a57
Author: Jingsong Lee <lzljs3620...@aliyun.com>
AuthorDate: Thu Mar 14 21:36:23 2019 +0800

    [FLINK-11881][table-planner-blink] Introduce code generated typed sorter to 
blink table (#7958)
---
 .../flink/table/calcite/FlinkPlannerImpl.scala     |  24 +-
 .../apache/flink/table/codegen/CodeGenUtils.scala  | 203 ++++++++
 .../flink/table/codegen/SortCodeGenerator.scala    | 485 ++++++++++++++++++
 .../flink/table/codegen/SortCodeGeneratorTest.java | 550 +++++++++++++++++++++
 .../table/dataformat/AbstractBinaryWriter.java     |  14 +-
 .../apache/flink/table/dataformat/BinaryArray.java |   9 +
 .../flink/table/dataformat/BinaryFormat.java       |  81 +++
 .../flink/table/dataformat/BinaryGeneric.java      |  13 +
 .../apache/flink/table/dataformat/BinaryRow.java   |  28 ++
 .../flink/table/dataformat/BinaryString.java       |  38 --
 .../flink/table/dataformat/BinaryWriter.java       |   6 +-
 .../table/dataformat/DataFormatConverters.java     |  55 +--
 .../apache/flink/table/dataformat/JoinedRow.java   |   9 +
 .../apache/flink/table/dataformat/NestedRow.java   |   8 +
 .../flink/table/dataformat/ObjectArrayRow.java     |   5 +
 .../flink/table/dataformat/TypeGetterSetters.java  |   7 +
 .../apache/flink/table/generated/CompileUtils.java |  14 +
 .../flink/table/generated/GeneratedClass.java      |   5 +-
 .../GeneratedNormalizedKeyComputer.java}           |  28 +-
 .../GeneratedRecordComparator.java}                |  28 +-
 .../sort => generated}/NormalizedKeyComputer.java  |  27 +-
 .../sort => generated}/RecordComparator.java       |  20 +-
 .../table/runtime/sort/BinaryExternalMerger.java   |   1 +
 .../table/runtime/sort/BinaryExternalSorter.java   |   3 +-
 .../runtime/sort/BinaryInMemorySortBuffer.java     |   2 +
 .../table/runtime/sort/BinaryIndexedSortable.java  |   2 +
 .../table/runtime/sort/BinaryKVExternalMerger.java |   1 +
 .../runtime/sort/BinaryKVInMemorySortBuffer.java   |   2 +
 .../table/runtime/sort/BinaryMergeIterator.java    |   1 +
 .../runtime/sort/BufferedKVExternalSorter.java     |   2 +
 .../apache/flink/table/runtime/sort/SortUtil.java  | 101 +++-
 .../org/apache/flink/table/type/ArrayType.java     |   2 +
 .../table/type/{TimeType.java => BinaryType.java}  |  18 +-
 .../org/apache/flink/table/type/BooleanType.java   |   2 +
 .../java/org/apache/flink/table/type/ByteType.java |   2 +
 .../java/org/apache/flink/table/type/CharType.java |   2 +
 .../java/org/apache/flink/table/type/DateType.java |   2 +
 .../org/apache/flink/table/type/DecimalType.java   |   2 +
 .../org/apache/flink/table/type/DoubleType.java    |   2 +
 .../org/apache/flink/table/type/FloatType.java     |   2 +
 .../org/apache/flink/table/type/GenericType.java   |   2 +
 .../java/org/apache/flink/table/type/IntType.java  |   2 +
 .../org/apache/flink/table/type/InternalTypes.java |   2 +-
 .../java/org/apache/flink/table/type/LongType.java |   2 +
 .../java/org/apache/flink/table/type/MapType.java  |   2 +
 .../org/apache/flink/table/type/PrimitiveType.java |   2 +
 .../java/org/apache/flink/table/type/RowType.java  |   2 +
 .../org/apache/flink/table/type/ShortType.java     |   2 +
 .../org/apache/flink/table/type/StringType.java    |   2 +
 .../java/org/apache/flink/table/type/TimeType.java |   2 +
 .../org/apache/flink/table/type/TimestampType.java |   2 +
 .../apache/flink/table/type/TypeConverters.java    |   1 +
 .../apache/flink/table/dataformat/BaseRowTest.java |  20 +-
 .../flink/table/dataformat/BinaryArrayTest.java    |  15 +
 .../flink/table/dataformat/BinaryRowTest.java      |  14 +
 .../flink/table/dataformat/BinaryStringTest.java   |   6 +-
 .../runtime/sort/BinaryMergeIteratorTest.java      |   1 +
 .../runtime/sort/BufferedKVExternalSorterTest.java |   2 +
 .../runtime/sort/IntNormalizedKeyComputer.java     |   3 +-
 .../table/runtime/sort/IntRecordComparator.java    |   3 +-
 .../flink/table/runtime/sort/SortUtilTest.java     |   4 +-
 61 files changed, 1730 insertions(+), 167 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 951bc05..8560a1c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -20,9 +20,8 @@ package org.apache.flink.table.calcite
 
 import java.util
 import java.util.Properties
-
 import com.google.common.collect.ImmutableList
-import org.apache.calcite.config.{CalciteConnectionConfigImpl, 
CalciteConnectionProperty}
+import org.apache.calcite.config.{CalciteConnectionConfigImpl, 
CalciteConnectionProperty, NullCollation}
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._
@@ -221,4 +220,25 @@ object FlinkPlannerImpl {
       rootSchema(schema.getParentSchema)
     }
   }
+
+  /**
+    * the null default direction if not specified. Consistent with 
HIVE/SPARK/MYSQL/BLINK-RUNTIME.
+    * So the default value only is set [[NullCollation.LOW]] for keeping 
consistent with
+    * BLINK-RUNTIME.
+    * [[NullCollation.LOW]] means null values appear first when the order is 
ASC (ascending), and
+    * ordered last when the order is DESC (descending).
+    */
+  val defaultNullCollation: NullCollation = NullCollation.LOW
+
+  /** Returns the default null direction if not specified. */
+  def getNullDefaultOrders(ascendings: Array[Boolean]): Array[Boolean] = {
+    ascendings.map { asc =>
+      FlinkPlannerImpl.defaultNullCollation.last(!asc)
+    }
+  }
+
+  /** Returns the default null direction if not specified. */
+  def getNullDefaultOrder(ascending: Boolean): Boolean = {
+    FlinkPlannerImpl.defaultNullCollation.last(!ascending)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 94236ed..d07815f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -18,8 +18,12 @@
 
 package org.apache.flink.table.codegen
 
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType => AtomicTypeInfo}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.core.memory.MemorySegment
 import org.apache.flink.table.`type`._
+import org.apache.flink.table.calcite.FlinkPlannerImpl
 import org.apache.flink.table.dataformat._
 import org.apache.flink.table.typeutils.TypeCheckUtils
 
@@ -27,6 +31,8 @@ import java.lang.reflect.Method
 import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, 
Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => 
JShort}
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.mutable
+
 object CodeGenUtils {
 
   // ------------------------------- DEFAULT TERMS 
------------------------------------------
@@ -36,9 +42,12 @@ object CodeGenUtils {
   // -------------------------- CANONICAL CLASS NAMES 
---------------------------------------
 
   val BINARY_ROW: String = className[BinaryRow]
+  val BINARY_ARRAY: String = className[BinaryArray]
+  val BINARY_GENERIC: String = className[BinaryGeneric[_]]
   val BINARY_STRING: String = className[BinaryString]
   val BASE_ROW: String = className[BaseRow]
   val GENERIC_ROW: String = className[GenericRow]
+  val SEGMENT: String = className[MemorySegment]
 
   // 
----------------------------------------------------------------------------------------
 
@@ -108,6 +117,7 @@ object CodeGenUtils {
     case InternalTypes.TIMESTAMP => boxedTypeTermForType(InternalTypes.LONG)
 
     case InternalTypes.STRING => BINARY_STRING
+    case InternalTypes.BINARY => "byte[]"
 
     case _: DecimalType => className[Decimal]
     // BINARY is also an ArrayType and uses BinaryArray internally too
@@ -230,4 +240,197 @@ object CodeGenUtils {
     }
   }
 
+  def baseRowFieldReadAccess(
+      ctx: CodeGeneratorContext, pos: Int, rowTerm: String, fieldType: 
InternalType) : String =
+    baseRowFieldReadAccess(ctx, pos.toString, rowTerm, fieldType)
+
+  def baseRowFieldReadAccess(
+      ctx: CodeGeneratorContext, pos: String, rowTerm: String, fieldType: 
InternalType) : String =
+    fieldType match {
+      case InternalTypes.INT => s"$rowTerm.getInt($pos)"
+      case InternalTypes.LONG => s"$rowTerm.getLong($pos)"
+      case InternalTypes.SHORT => s"$rowTerm.getShort($pos)"
+      case InternalTypes.BYTE => s"$rowTerm.getByte($pos)"
+      case InternalTypes.FLOAT => s"$rowTerm.getFloat($pos)"
+      case InternalTypes.DOUBLE => s"$rowTerm.getDouble($pos)"
+      case InternalTypes.BOOLEAN => s"$rowTerm.getBoolean($pos)"
+      case InternalTypes.STRING => s"$rowTerm.getString($pos)"
+      case InternalTypes.BINARY => s"$rowTerm.getBinary($pos)"
+      case dt: DecimalType => s"$rowTerm.getDecimal($pos, ${dt.precision()}, 
${dt.scale()})"
+      case InternalTypes.CHAR => s"$rowTerm.getChar($pos)"
+      case _: TimestampType => s"$rowTerm.getLong($pos)"
+      case _: DateType => s"$rowTerm.getInt($pos)"
+      case InternalTypes.TIME => s"$rowTerm.getInt($pos)"
+      case _: ArrayType => s"$rowTerm.getArray($pos)"
+      case _: MapType  => s"$rowTerm.getMap($pos)"
+      case rt: RowType => s"$rowTerm.getRow($pos, ${rt.getArity})"
+      case _: GenericType[_] => s"$rowTerm.getGeneric($pos)"
+    }
+
+  /**
+    * Generates code for comparing two field.
+    */
+  def genCompare(
+      ctx: CodeGeneratorContext,
+      t: InternalType,
+      nullsIsLast: Boolean,
+      c1: String,
+      c2: String): String = t match {
+    case InternalTypes.BOOLEAN => s"($c1 == $c2 ? 0 : ($c1 ? 1 : -1))"
+    case _: PrimitiveType | _: DateType | _: TimeType | _: TimestampType =>
+      s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)"
+    case InternalTypes.BINARY =>
+      val sortUtil = 
classOf[org.apache.flink.table.runtime.sort.SortUtil].getCanonicalName
+      s"$sortUtil.compareBinary($c1, $c2)"
+    case at: ArrayType =>
+      val compareFunc = newName("compareArray")
+      val compareCode = genArrayCompare(
+        ctx,
+        FlinkPlannerImpl.getNullDefaultOrder(true), at, "a", "b")
+      val funcCode: String =
+        s"""
+          public int $compareFunc($BINARY_ARRAY a, $BINARY_ARRAY b) {
+            $compareCode
+            return 0;
+          }
+        """
+      ctx.addReusableMember(funcCode)
+      s"$compareFunc($c1, $c2)"
+    case rowType: RowType =>
+      val orders = rowType.getFieldTypes.map(_ => true)
+      val comparisons = genRowCompare(
+        ctx,
+        rowType.getFieldTypes.indices.toArray,
+        rowType.getFieldTypes,
+        orders,
+        FlinkPlannerImpl.getNullDefaultOrders(orders),
+        "a",
+        "b")
+      val compareFunc = newName("compareRow")
+      val funcCode: String =
+        s"""
+          public int $compareFunc($BASE_ROW a, $BASE_ROW b) {
+            $comparisons
+            return 0;
+          }
+        """
+      ctx.addReusableMember(funcCode)
+      s"$compareFunc($c1, $c2)"
+    case gt: GenericType[_] =>
+      val ser = ctx.addReusableObject(gt.getSerializer, "serializer")
+      val comp = ctx.addReusableObject(
+        gt.getTypeInfo.asInstanceOf[AtomicTypeInfo[_]].createComparator(true, 
new ExecutionConfig),
+        "comparator")
+      s"""
+         |$comp.compare(
+         |  $BINARY_GENERIC.getJavaObjectFromBinaryGeneric($c1, $ser),
+         |  $BINARY_GENERIC.getJavaObjectFromBinaryGeneric($c2, $ser)
+         |)
+       """.stripMargin
+    case other if other.isInstanceOf[AtomicType] => s"$c1.compareTo($c2)"
+  }
+
+  /**
+    * Generates code for comparing array.
+    */
+  def genArrayCompare(
+      ctx: CodeGeneratorContext, nullsIsLast: Boolean, t: ArrayType, a: 
String, b: String)
+    : String = {
+    val nullIsLastRet = if (nullsIsLast) 1 else -1
+    val elementType = t.getElementType
+    val fieldA = newName("fieldA")
+    val isNullA = newName("isNullA")
+    val lengthA = newName("lengthA")
+    val fieldB = newName("fieldB")
+    val isNullB = newName("isNullB")
+    val lengthB = newName("lengthB")
+    val minLength = newName("minLength")
+    val i = newName("i")
+    val comp = newName("comp")
+    val typeTerm = primitiveTypeTermForType(elementType)
+    s"""
+        int $lengthA = a.numElements();
+        int $lengthB = b.numElements();
+        int $minLength = ($lengthA > $lengthB) ? $lengthB : $lengthA;
+        for (int $i = 0; $i < $minLength; $i++) {
+          boolean $isNullA = a.isNullAt($i);
+          boolean $isNullB = b.isNullAt($i);
+          if ($isNullA && $isNullB) {
+            // Continue to compare the next element
+          } else if ($isNullA) {
+            return $nullIsLastRet;
+          } else if ($isNullB) {
+            return ${-nullIsLastRet};
+          } else {
+            $typeTerm $fieldA = ${baseRowFieldReadAccess(ctx, i, a, 
elementType)};
+            $typeTerm $fieldB = ${baseRowFieldReadAccess(ctx, i, b, 
elementType)};
+            int $comp = ${genCompare(ctx, elementType, nullsIsLast, fieldA, 
fieldB)};
+            if ($comp != 0) {
+              return $comp;
+            }
+          }
+        }
+
+        if ($lengthA < $lengthB) {
+          return -1;
+        } else if ($lengthA > $lengthB) {
+          return 1;
+        }
+      """
+  }
+
+  /**
+    * Generates code for comparing row keys.
+    */
+  def genRowCompare(
+      ctx: CodeGeneratorContext,
+      keys: Array[Int],
+      keyTypes: Array[InternalType],
+      orders: Array[Boolean],
+      nullsIsLast: Array[Boolean],
+      row1: String,
+      row2: String): String = {
+
+    val compares = new mutable.ArrayBuffer[String]
+
+    for (i <- keys.indices) {
+      val index = keys(i)
+
+      val symbol = if (orders(i)) "" else "-"
+
+      val nullIsLastRet = if (nullsIsLast(i)) 1 else -1
+
+      val t = keyTypes(i)
+
+      val typeTerm = primitiveTypeTermForType(t)
+      val fieldA = newName("fieldA")
+      val isNullA = newName("isNullA")
+      val fieldB = newName("fieldB")
+      val isNullB = newName("isNullB")
+      val comp = newName("comp")
+
+      val code =
+        s"""
+           |boolean $isNullA = $row1.isNullAt($index);
+           |boolean $isNullB = $row2.isNullAt($index);
+           |if ($isNullA && $isNullB) {
+           |  // Continue to compare the next element
+           |} else if ($isNullA) {
+           |  return $nullIsLastRet;
+           |} else if ($isNullB) {
+           |  return ${-nullIsLastRet};
+           |} else {
+           |  $typeTerm $fieldA = ${baseRowFieldReadAccess(ctx, index, row1, 
t)};
+           |  $typeTerm $fieldB = ${baseRowFieldReadAccess(ctx, index, row2, 
t)};
+           |  int $comp = ${genCompare(ctx, t, nullsIsLast(i), fieldA, 
fieldB)};
+           |  if ($comp != 0) {
+           |    return $symbol$comp;
+           |  }
+           |}
+         """.stripMargin
+      compares += code
+    }
+    compares.mkString
+  }
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SortCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SortCodeGenerator.scala
new file mode 100644
index 0000000..2e3df95
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SortCodeGenerator.scala
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.`type`.{DateType, DecimalType, InternalType, 
InternalTypes, PrimitiveType, TimeType, TimestampType}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, SEGMENT, newName}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.dataformat.{BinaryRow, Decimal}
+import org.apache.flink.table.generated.{GeneratedNormalizedKeyComputer, 
GeneratedRecordComparator, NormalizedKeyComputer, RecordComparator}
+import org.apache.flink.table.runtime.sort.SortUtil
+
+import scala.collection.mutable
+
+/**
+  * A code generator for generating [[NormalizedKeyComputer]] and 
[[RecordComparator]].
+  *
+  * @param keys        key positions describe which fields are keys in what 
order.
+  * @param keyTypes       types for the key fields, in the same order as the 
key fields.
+  * @param orders      sorting orders for the key fields.
+  * @param nullsIsLast      Ordering of nulls.
+  */
+class SortCodeGenerator(
+    conf: TableConfig,
+    val keys: Array[Int],
+    val keyTypes: Array[InternalType],
+    val orders: Array[Boolean],
+    val nullsIsLast: Array[Boolean]) {
+
+  private val MAX_NORMALIZED_KEY_LEN = 16
+
+  private val SORT_UTIL = classOf[SortUtil].getCanonicalName
+
+  /** Chunks for long, int, short, byte */
+  private val POSSIBLE_CHUNK_SIZES = Array(8, 4, 2, 1)
+
+  /** For get${operator} set${operator} of 
[[org.apache.flink.core.memory.MemorySegment]] */
+  private val BYTE_OPERATOR_MAPPING = Map(8 -> "Long", 4 -> "Int", 2 -> 
"Short", 1 -> "")
+
+  /** For primitive define */
+  private val BYTE_DEFINE_MAPPING = Map(8 -> "long", 4 -> "int", 2 -> "short", 
1 -> "byte")
+
+  /** For Class of primitive type */
+  private val BYTE_CLASS_MAPPING = Map(8 -> "Long", 4 -> "Integer", 2 -> 
"Short", 1 -> "Byte")
+
+  /** Normalized meta */
+  val (nullAwareNormalizedKeyLen, normalizedKeyNum, invertNormalizedKey, 
normalizedKeyLengths) = {
+    var keyLen = 0
+    var keyNum = 0
+    var inverted = false
+    val keyLengths = new mutable.ArrayBuffer[Int]
+    var break = false
+    var i = 0
+    while (i < keys.length && !break) {
+      val t = keyTypes(i)
+      if (supportNormalizedKey(t)) {
+        val invert = !orders(i)
+
+        if (i == 0) {
+          // the first comparator decides whether we need to invert the key 
direction
+          inverted = invert
+        }
+
+        if (invert != inverted) {
+          // if a successor does not agree on the inversion direction,
+          // it cannot be part of the normalized key
+          break = true
+        } else {
+          keyNum += 1
+          // Need add null aware 1 byte
+          val len = safeAddLength(getNormalizeKeyLen(t), 1)
+          if (len < 0) {
+            throw new RuntimeException(
+              s"$t specifies an invalid length for the normalized key: " + len)
+          }
+          keyLengths += len
+          keyLen = safeAddLength(keyLen, len)
+          if (keyLen == Integer.MAX_VALUE) {
+            break = true
+          }
+        }
+      } else {
+        break = true
+      }
+      i += 1
+    }
+
+    (keyLen, keyNum, inverted, keyLengths)
+  }
+
+  def getKeyFullyDeterminesAndBytes: (Boolean, Int) = {
+    if (nullAwareNormalizedKeyLen > 18) {
+      // The maximum setting is 18 because want to put two null aware long as 
much as possible.
+      // Anyway, we can't fit it, so align the most efficient 8 bytes.
+      (false, Math.min(MAX_NORMALIZED_KEY_LEN, 8 * normalizedKeyNum))
+    } else {
+      (normalizedKeyNum == keys.length, nullAwareNormalizedKeyLen)
+    }
+  }
+
+  /**
+    * Generates a [[NormalizedKeyComputer]] that can be passed to a Java 
compiler.
+    *
+    * @param name Class name of the function.
+    *             Does not need to be unique but has to be a valid Java class 
identifier.
+    * @return A GeneratedNormalizedKeyComputer
+    */
+  def generateNormalizedKeyComputer(name: String): 
GeneratedNormalizedKeyComputer = {
+
+    val className = newName(name)
+
+    val (keyFullyDetermines, numKeyBytes) = getKeyFullyDeterminesAndBytes
+
+    val putKeys = generatePutNormalizedKeys(numKeyBytes)
+
+    val chunks = calculateChunks(numKeyBytes)
+
+    val reverseKeys = generateReverseNormalizedKeys(chunks)
+
+    val compareKeys = generateCompareNormalizedKeys(chunks)
+
+    val swapKeys = generateSwapNormalizedKeys(chunks)
+
+    val baseClass = classOf[NormalizedKeyComputer]
+
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        public $className(Object[] references) {
+          // useless
+        }
+
+        @Override
+        public void putKey($BASE_ROW record, $SEGMENT target, int offset) {
+          ${putKeys.mkString}
+          ${reverseKeys.mkString}
+        }
+
+        @Override
+        public int compareKey($SEGMENT segI, int offsetI, $SEGMENT segJ, int 
offsetJ) {
+          ${compareKeys.mkString}
+        }
+
+        @Override
+        public void swapKey($SEGMENT segI, int offsetI, $SEGMENT segJ, int 
offsetJ) {
+          ${swapKeys.mkString}
+        }
+
+        @Override
+        public int getNumKeyBytes() {
+          return $numKeyBytes;
+        }
+
+        @Override
+        public boolean isKeyFullyDetermines() {
+          return $keyFullyDetermines;
+        }
+
+        @Override
+        public boolean invertKey() {
+          return $invertNormalizedKey;
+        }
+
+      }
+    """.stripMargin
+
+    new GeneratedNormalizedKeyComputer(className, code)
+  }
+
+  def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] 
= {
+    /* Example generated code, for int:
+    if (record.isNullAt(0)) {
+      
org.apache.flink.table.dataformat.util.BinaryRowUtil.minNormalizedKey(target, 
offset+0, 5);
+    } else {
+      target.put(offset+0, (byte) 1);
+      org.apache.flink.table.dataformat.util.BinaryRowUtil.putIntNormalizedKey(
+        record.getInt(0), target, offset+1, 4);
+    }
+     */
+    val putKeys = new mutable.ArrayBuffer[String]
+    var bytesLeft = numKeyBytes
+    var currentOffset = 0
+    var keyIndex = 0
+    while (bytesLeft > 0 && keyIndex < normalizedKeyNum) {
+      var len = normalizedKeyLengths(keyIndex)
+      val index = keys(keyIndex)
+      val nullIsMaxValue = orders(keyIndex) == nullsIsLast(keyIndex)
+      len = if (bytesLeft >= len) len else bytesLeft
+      val t = keyTypes(keyIndex)
+      val prefix = prefixGetFromBinaryRow(t)
+      val putCode = t match {
+        case _ if getNormalizeKeyLen(t) != Int.MaxValue =>
+          val get = getter(t, index)
+          s"""
+             |target.put(offset+$currentOffset, (byte) 1);
+             |$SORT_UTIL.put${prefixPutNormalizedKey(t)}NormalizedKey(
+             |  record.$get, target, offset+${currentOffset + 1}, ${len - 1});
+             |
+         """.stripMargin
+        case _ =>
+          // It is BinaryString/byte[].., we can omit the null aware byte(zero 
is the smallest),
+          // because there is no other field behind, and is not 
keyFullyDetermines.
+          s"""
+             |$SORT_UTIL.put${prefixPutNormalizedKey(t)}NormalizedKey(
+             |  record.get$prefix($index), target, offset+$currentOffset, 
$len);
+             |""".stripMargin
+      }
+      val nullCode = if (nullIsMaxValue) {
+        s"$SORT_UTIL.maxNormalizedKey(target, offset+$currentOffset, $len);"
+      } else {
+        s"$SORT_UTIL.minNormalizedKey(target, offset+$currentOffset, $len);"
+      }
+
+      val code =
+        s"""
+           |if (record.isNullAt($index)) {
+           | $nullCode
+           |} else {
+           | $putCode
+           |}
+           |""".stripMargin
+
+      putKeys += code
+      bytesLeft -= len
+      currentOffset += len
+      keyIndex += 1
+    }
+    putKeys
+  }
+
+  /**
+    * In order to better performance and not use MemorySegment's compare() and 
swap(),
+    * we CodeGen more efficient chunk method.
+    */
+  def calculateChunks(numKeyBytes: Int): Array[Int] = {
+    /* Example chunks, for int:
+      calculateChunks(5) = Array(4, 1)
+     */
+    val chunks = new mutable.ArrayBuffer[Int]
+    var i = 0
+    var remainBytes = numKeyBytes
+    while (remainBytes > 0) {
+      val bytes = POSSIBLE_CHUNK_SIZES(i)
+      if (bytes <= remainBytes) {
+        chunks += bytes
+        remainBytes -= bytes
+      } else {
+        i += 1
+      }
+    }
+    chunks.toArray
+  }
+
+  /**
+    * Because we put normalizedKeys in big endian way, if we are the little 
endian,
+    * we need to reverse these data with chunks for comparation.
+    */
+  def generateReverseNormalizedKeys(chunks: Array[Int]): 
mutable.ArrayBuffer[String] = {
+    /* Example generated code, for int:
+    target.putInt(offset+0, Integer.reverseBytes(target.getInt(offset+0)));
+    //byte don't need reverse.
+     */
+    val reverseKeys = new mutable.ArrayBuffer[String]
+    // If it is big endian, it would be better, no reverse.
+    if (BinaryRow.LITTLE_ENDIAN) {
+      var reverseOffset = 0
+      for (chunk <- chunks) {
+        val operator = BYTE_OPERATOR_MAPPING(chunk)
+        val className = BYTE_CLASS_MAPPING(chunk)
+        if (chunk != 1) {
+          val reverseKey =
+            s"""
+               |target.put$operator(offset+$reverseOffset,
+               |  
$className.reverseBytes(target.get$operator(offset+$reverseOffset)));
+            """.stripMargin
+          reverseKeys += reverseKey
+        }
+        reverseOffset += chunk
+      }
+    }
+    reverseKeys
+  }
+
+  /**
+    * Compare bytes with chunks and nsigned.
+    */
+  def generateCompareNormalizedKeys(chunks: Array[Int]): 
mutable.ArrayBuffer[String] = {
+    /* Example generated code, for int:
+    int l_0_1 = segI.getInt(offsetI+0);
+    int l_0_2 = segJ.getInt(offsetJ+0);
+    if (l_0_1 != l_0_2) {
+      return ((l_0_1 < l_0_2) ^ (l_0_1 < 0) ^
+        (l_0_2 < 0) ? -1 : 1);
+    }
+
+    byte l_1_1 = segI.get(offsetI+4);
+    byte l_1_2 = segJ.get(offsetJ+4);
+    if (l_1_1 != l_1_2) {
+      return ((l_1_1 < l_1_2) ^ (l_1_1 < 0) ^
+        (l_1_2 < 0) ? -1 : 1);
+    }
+    return 0;
+     */
+    val compareKeys = new mutable.ArrayBuffer[String]
+    var compareOffset = 0
+    for (i <- chunks.indices) {
+      val chunk = chunks(i)
+      val operator = BYTE_OPERATOR_MAPPING(chunk)
+      val define = BYTE_DEFINE_MAPPING(chunk)
+      val compareKey =
+        s"""
+           |$define l_${i}_1 = segI.get$operator(offsetI+$compareOffset);
+           |$define l_${i}_2 = segJ.get$operator(offsetJ+$compareOffset);
+           |if (l_${i}_1 != l_${i}_2) {
+           |  return ((l_${i}_1 < l_${i}_2) ^ (l_${i}_1 < 0) ^
+           |    (l_${i}_2 < 0) ? -1 : 1);
+           |}
+            """.stripMargin
+      compareKeys += compareKey
+      compareOffset += chunk
+    }
+    compareKeys += "return 0;"
+    compareKeys
+  }
+
+  /**
+    * Swap bytes with chunks.
+    */
+  def generateSwapNormalizedKeys(chunks: Array[Int]): 
mutable.ArrayBuffer[String] = {
+    /* Example generated code, for int:
+    int temp0 = segI.getInt(offsetI+0);
+    segI.putInt(offsetI+0, segJ.getInt(offsetJ+0));
+    segJ.putInt(offsetJ+0, temp0);
+
+    byte temp1 = segI.get(offsetI+4);
+    segI.put(offsetI+4, segJ.get(offsetJ+4));
+    segJ.put(offsetJ+4, temp1);
+     */
+    val swapKeys = new mutable.ArrayBuffer[String]
+    var swapOffset = 0
+    for (i <- chunks.indices) {
+      val chunk = chunks(i)
+      val operator = BYTE_OPERATOR_MAPPING(chunk)
+      val define = BYTE_DEFINE_MAPPING(chunk)
+      val swapKey =
+        s"""
+           |$define temp$i = segI.get$operator(offsetI+$swapOffset);
+           |segI.put$operator(offsetI+$swapOffset, 
segJ.get$operator(offsetJ+$swapOffset));
+           |segJ.put$operator(offsetJ+$swapOffset, temp$i);
+            """.stripMargin
+      swapKeys += swapKey
+      swapOffset += chunk
+    }
+    swapKeys
+  }
+
+  /**
+    * Generates a [[RecordComparator]] that can be passed to a Java compiler.
+    *
+    * @param name Class name of the function.
+    *             Does not need to be unique but has to be a valid Java class 
identifier.
+    * @return A GeneratedRecordComparator
+    */
+  def generateRecordComparator(name: String): GeneratedRecordComparator = {
+    val className = newName(name)
+    val baseClass = classOf[RecordComparator]
+
+    val ctx = new CodeGeneratorContext(conf)
+    val compareCode = CodeGenUtils.genRowCompare(
+      ctx, keys, keyTypes, orders, nullsIsLast, "o1", "o2")
+
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        private final Object[] references;
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) {
+          this.references = references;
+          ${ctx.reuseInitCode()}
+          ${ctx.reuseOpenCode()}
+        }
+
+        @Override
+        public int compare($BASE_ROW o1, $BASE_ROW o2) {
+          $compareCode
+          return 0;
+        }
+
+      }
+      """.stripMargin
+
+    new GeneratedRecordComparator(className, code, ctx.references.toArray)
+  }
+
+  def getter(t: InternalType, index: Int): String = {
+    val prefix = prefixGetFromBinaryRow(t)
+    t match {
+      case dt: DecimalType =>
+        s"get$prefix($index, ${dt.precision()}, ${dt.scale()})"
+      case _ =>
+        s"get$prefix($index)"
+    }
+  }
+
+  /**
+    * For put${prefix}NormalizedKey() and compare$prefix() of [[SortUtil]].
+    */
+  def prefixPutNormalizedKey(t: InternalType): String = 
prefixGetFromBinaryRow(t)
+
+  /**
+    * For get$prefix() of 
[[org.apache.flink.table.dataformat.TypeGetterSetters]].
+    */
+  def prefixGetFromBinaryRow(t: InternalType): String = t match {
+    case InternalTypes.INT => "Int"
+    case InternalTypes.LONG => "Long"
+    case InternalTypes.SHORT => "Short"
+    case InternalTypes.BYTE => "Byte"
+    case InternalTypes.FLOAT => "Float"
+    case InternalTypes.DOUBLE => "Double"
+    case InternalTypes.BOOLEAN => "Boolean"
+    case InternalTypes.CHAR => "Char"
+    case InternalTypes.STRING => "String"
+    case InternalTypes.BINARY => "Binary"
+    case _: DecimalType => "Decimal"
+    case _: DateType => "Int"
+    case InternalTypes.TIME => "Int"
+    case _: TimestampType => "Long"
+    case _ => null
+  }
+
+  /**
+    * Preventing overflow.
+    */
+  def safeAddLength(i: Int, j: Int): Int = {
+    val sum = i + j
+    if (sum < i || sum < j) {
+      Integer.MAX_VALUE
+    } else {
+      sum
+    }
+  }
+
+  def supportNormalizedKey(t: InternalType): Boolean = {
+    t match {
+      case _: PrimitiveType | InternalTypes.STRING | InternalTypes.BINARY |
+           _: DateType | _: TimeType | _: TimestampType => true
+      case dt: DecimalType => Decimal.isCompact(dt.precision())
+      case _ => false
+    }
+  }
+
+  def getNormalizeKeyLen(t: InternalType): Int = {
+    t match {
+      case InternalTypes.BOOLEAN => 1
+      case InternalTypes.BYTE => 1
+      case InternalTypes.SHORT => 2
+      case InternalTypes.CHAR => 2
+      case InternalTypes.INT => 4
+      case InternalTypes.FLOAT => 4
+      case InternalTypes.DOUBLE => 8
+      case InternalTypes.LONG => 8
+      case dt: DecimalType if Decimal.isCompact(dt.precision()) => 8
+      case InternalTypes.STRING | InternalTypes.BINARY => Int.MaxValue
+    }
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
new file mode 100644
index 0000000..0023831
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.BinaryRowWriter;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.generated.GeneratedNormalizedKeyComputer;
+import org.apache.flink.table.generated.GeneratedRecordComparator;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
+import org.apache.flink.table.generated.RecordComparator;
+import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer;
+import org.apache.flink.table.type.ArrayType;
+import org.apache.flink.table.type.DecimalType;
+import org.apache.flink.table.type.GenericType;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
+import org.apache.flink.table.type.RowType;
+import org.apache.flink.table.typeutils.AbstractRowSerializer;
+import org.apache.flink.table.typeutils.BinaryRowSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Random test for sort code generator.
+ */
+public class SortCodeGeneratorTest {
+
+       private static final int RECORD_NUM = 3000;
+
+       private final InternalType[] types = new InternalType[]{
+                       InternalTypes.BOOLEAN,
+                       InternalTypes.BYTE,
+                       InternalTypes.SHORT,
+                       InternalTypes.INT,
+                       InternalTypes.LONG,
+                       InternalTypes.FLOAT,
+                       InternalTypes.DOUBLE,
+                       InternalTypes.CHAR,
+                       InternalTypes.STRING,
+                       new DecimalType(18, 2),
+                       new DecimalType(38, 18),
+                       InternalTypes.BINARY,
+                       new ArrayType(InternalTypes.BYTE),
+                       new RowType(InternalTypes.INT),
+                       new RowType(new RowType(InternalTypes.INT)),
+                       new GenericType<>(Types.INT)
+       };
+
+       private int[] fields;
+       private int[] keys;
+       private boolean[] orders;
+       private boolean[] nullsIsLast;
+
+       private static final DataFormatConverters.DataFormatConverter 
INT_ROW_CONV =
+                       DataFormatConverters.getConverterForTypeInfo(new 
RowTypeInfo(Types.INT));
+       private static final TypeComparator INT_ROW_COMP = new 
RowTypeInfo(Types.INT).createComparator(
+                       new int[] {0}, new boolean[] {true}, 0, new 
ExecutionConfig());
+       private static final DataFormatConverters.DataFormatConverter 
NEST_ROW_CONV =
+                       DataFormatConverters.getConverterForTypeInfo(new 
RowTypeInfo(new RowTypeInfo(Types.INT)));
+       private static final TypeComparator NEST_ROW_COMP = new RowTypeInfo(new 
RowTypeInfo(Types.INT)).createComparator(
+                       new int[] {0}, new boolean[] {true}, 0, new 
ExecutionConfig());
+
+       @Test
+       public void testMultiKeys() throws Exception {
+               for (int i = 0; i < 100; i++) {
+                       randomKeysAndOrders();
+                       testInner();
+               }
+       }
+
+       @Test
+       public void testOneKey() throws Exception {
+               for (int time = 0; time < 100; time++) {
+                       Random rnd = new Random();
+                       fields = new int[rnd.nextInt(9) + 1];
+                       for (int i = 0; i < fields.length; i++) {
+                               fields[i] = rnd.nextInt(types.length);
+                       }
+
+                       keys = new int[] {0};
+                       orders = new boolean[] {rnd.nextBoolean()};
+                       nullsIsLast = 
FlinkPlannerImpl.getNullDefaultOrders(orders);
+                       testInner();
+               }
+       }
+
+       private void randomKeysAndOrders() {
+               Random rnd = new Random();
+               fields = new int[rnd.nextInt(9) + 1];
+               for (int i = 0; i < fields.length; i++) {
+                       fields[i] = rnd.nextInt(types.length);
+               }
+
+               keys = new int[rnd.nextInt(fields.length) + 1];
+               LinkedList<Integer> indexQueue = new LinkedList<>();
+               for (int i = 0; i < fields.length; i++) {
+                       indexQueue.add(i);
+               }
+               Collections.shuffle(indexQueue);
+               orders = new boolean[keys.length];
+               for (int i = 0; i < keys.length; i++) {
+                       keys[i] = indexQueue.poll();
+                       orders[i] = rnd.nextBoolean();
+               }
+               nullsIsLast = FlinkPlannerImpl.getNullDefaultOrders(orders);
+       }
+
+       private Object[] shuffle(Object[] objects) {
+               Collections.shuffle(Arrays.asList(objects));
+               return objects;
+       }
+
+       private BinaryRow row(int i, Object[][] values) {
+               BinaryRow row = new BinaryRow(fields.length);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               for (int j = 0; j < fields.length; j++) {
+                       Object value = values[j][i];
+                       if (value == null) {
+                               writer.setNullAt(j);
+                       } else {
+                               BinaryWriter.write(writer, j, value, 
types[fields[j]]);
+                       }
+               }
+
+               writer.complete();
+               return row;
+       }
+
+       private BinaryRow[] getTestData() {
+               BinaryRow[] result = new BinaryRow[RECORD_NUM];
+               Object[][] values = new Object[fields.length][];
+               for (int i = 0; i < fields.length; i++) {
+                       values[i] = shuffle(generateValues(types[fields[i]]));
+               }
+               for (int i = 0; i < RECORD_NUM; i++) {
+                       result[i] = row(i, values);
+               }
+               return result;
+       }
+
+       private Object[] generateValues(InternalType type) {
+
+               Random rnd = new Random();
+
+               int seedNum = RECORD_NUM / 5;
+               Object[] seeds = new Object[seedNum];
+               seeds[0] = null;
+               seeds[1] = value1(type, rnd);
+               seeds[2] = value2(type, rnd);
+               seeds[3] = value3(type, rnd);
+               for (int i = 4; i < seeds.length; i++) {
+                       if (type.equals(InternalTypes.BOOLEAN)) {
+                               seeds[i] = rnd.nextBoolean();
+                       } else if (type.equals(InternalTypes.BYTE)) {
+                               seeds[i] = (byte) rnd.nextLong();
+                       } else if (type.equals(InternalTypes.SHORT)) {
+                               seeds[i] = (short) rnd.nextLong();
+                       } else if (type.equals(InternalTypes.INT)) {
+                               seeds[i] = rnd.nextInt();
+                       } else if (type.equals(InternalTypes.LONG)) {
+                               seeds[i] = rnd.nextLong();
+                       } else if (type.equals(InternalTypes.FLOAT)) {
+                               seeds[i] = rnd.nextFloat() * rnd.nextLong();
+                       } else if (type.equals(InternalTypes.DOUBLE)) {
+                               seeds[i] = rnd.nextDouble() * rnd.nextLong();
+                       } else if (type.equals(InternalTypes.CHAR)) {
+                               seeds[i] = (char) rnd.nextInt();
+                       } else if (type.equals(InternalTypes.STRING)) {
+                               seeds[i] = 
BinaryString.fromString(RandomStringUtils.random(rnd.nextInt(20)));
+                       } else if (type instanceof DecimalType) {
+                               DecimalType decimalType = (DecimalType) type;
+                               BigDecimal decimal = new BigDecimal(
+                                               rnd.nextInt()).divide(
+                                               new 
BigDecimal(ThreadLocalRandom.current().nextInt(1, 256)),
+                                               
ThreadLocalRandom.current().nextInt(1, 30), BigDecimal.ROUND_HALF_EVEN);
+                               seeds[i] = Decimal.fromBigDecimal(decimal, 
decimalType.precision(), decimalType.scale());
+                       } else if (type instanceof ArrayType || 
type.equals(InternalTypes.BINARY)) {
+                               byte[] bytes = new byte[rnd.nextInt(16) + 1];
+                               rnd.nextBytes(bytes);
+                               seeds[i] = type.equals(InternalTypes.BINARY) ? 
bytes : BinaryArray.fromPrimitiveArray(bytes);
+                       } else if (type instanceof RowType) {
+                               RowType rowType = (RowType) type;
+                               if 
(rowType.getTypeAt(0).equals(InternalTypes.INT)) {
+                                       seeds[i] = GenericRow.of(rnd.nextInt());
+                               } else {
+                                       seeds[i] = 
GenericRow.of(GenericRow.of(rnd.nextInt()));
+                               }
+                       } else if (type instanceof GenericType) {
+                               seeds[i] = new BinaryGeneric<>(rnd.nextInt(), 
IntSerializer.INSTANCE);
+                       } else {
+                               throw new RuntimeException("Not support!");
+                       }
+               }
+
+               // result values
+               Object[] results = new Object[RECORD_NUM];
+               for (int i = 0; i < RECORD_NUM; i++) {
+                       results[i] = seeds[rnd.nextInt(seedNum)];
+               }
+               return results;
+       }
+
+       private Object value1(InternalType type, Random rnd) {
+               if (type.equals(InternalTypes.BOOLEAN)) {
+                       return false;
+               } else if (type.equals(InternalTypes.BYTE)) {
+                       return Byte.MIN_VALUE;
+               } else if (type.equals(InternalTypes.SHORT)) {
+                       return Short.MIN_VALUE;
+               } else if (type.equals(InternalTypes.INT)) {
+                       return Integer.MIN_VALUE;
+               } else if (type.equals(InternalTypes.LONG)) {
+                       return Long.MIN_VALUE;
+               } else if (type.equals(InternalTypes.FLOAT)) {
+                       return Float.MIN_VALUE;
+               } else if (type.equals(InternalTypes.DOUBLE)) {
+                       return Double.MIN_VALUE;
+               } else if (type.equals(InternalTypes.CHAR)) {
+                       return '1';
+               } else if (type.equals(InternalTypes.STRING)) {
+                       return BinaryString.fromString("");
+               } else if (type instanceof DecimalType) {
+                       DecimalType decimalType = (DecimalType) type;
+                       return Decimal.fromBigDecimal(new 
BigDecimal(Integer.MIN_VALUE),
+                                       decimalType.precision(), 
decimalType.scale());
+               } else if (type instanceof ArrayType) {
+                       byte[] bytes = new byte[rnd.nextInt(7) + 1];
+                       rnd.nextBytes(bytes);
+                       BinaryArray array = 
BinaryArray.fromPrimitiveArray(bytes);
+                       for (int i = 0; i < bytes.length; i++) {
+                               array.setNullByte(i);
+                       }
+                       return array;
+               } else if (type.equals(InternalTypes.BINARY)) {
+                       byte[] bytes = new byte[rnd.nextInt(7) + 1];
+                       rnd.nextBytes(bytes);
+                       return bytes;
+               } else if (type instanceof RowType) {
+                       return GenericRow.of(new Object[]{null});
+               } else if (type instanceof GenericType) {
+                       return new BinaryGeneric<>(rnd.nextInt(), 
IntSerializer.INSTANCE);
+               } else {
+                       throw new RuntimeException("Not support!");
+               }
+       }
+
+       private Object value2(InternalType type, Random rnd) {
+               if (type.equals(InternalTypes.BOOLEAN)) {
+                       return false;
+               } else if (type.equals(InternalTypes.BYTE)) {
+                       return (byte) 0;
+               } else if (type.equals(InternalTypes.SHORT)) {
+                       return (short) 0;
+               } else if (type.equals(InternalTypes.INT)) {
+                       return 0;
+               } else if (type.equals(InternalTypes.LONG)) {
+                       return 0L;
+               } else if (type.equals(InternalTypes.FLOAT)) {
+                       return 0f;
+               } else if (type.equals(InternalTypes.DOUBLE)) {
+                       return 0d;
+               } else if (type.equals(InternalTypes.CHAR)) {
+                       return '0';
+               } else if (type.equals(InternalTypes.STRING)) {
+                       return BinaryString.fromString("0");
+               } else if (type instanceof DecimalType) {
+                       DecimalType decimalType = (DecimalType) type;
+                       return Decimal.fromBigDecimal(new BigDecimal(0),
+                                       decimalType.precision(), 
decimalType.scale());
+               } else if (type instanceof ArrayType || 
type.equals(InternalTypes.BINARY)) {
+                       byte[] bytes = new byte[rnd.nextInt(7) + 10];
+                       rnd.nextBytes(bytes);
+                       return type.equals(InternalTypes.BINARY) ? bytes : 
BinaryArray.fromPrimitiveArray(bytes);
+               } else if (type instanceof RowType) {
+                       RowType rowType = (RowType) type;
+                       if (rowType.getTypeAt(0).equals(InternalTypes.INT)) {
+                               return GenericRow.of(rnd.nextInt());
+                       } else {
+                               return GenericRow.of(GenericRow.of(new 
Object[]{null}));
+                       }
+               } else if (type instanceof GenericType) {
+                       return new BinaryGeneric<>(rnd.nextInt(), 
IntSerializer.INSTANCE);
+               } else {
+                       throw new RuntimeException("Not support!");
+               }
+       }
+
+       private Object value3(InternalType type, Random rnd) {
+               if (type.equals(InternalTypes.BOOLEAN)) {
+                       return true;
+               } else if (type.equals(InternalTypes.BYTE)) {
+                       return Byte.MAX_VALUE;
+               } else if (type.equals(InternalTypes.SHORT)) {
+                       return Short.MAX_VALUE;
+               } else if (type.equals(InternalTypes.INT)) {
+                       return Integer.MAX_VALUE;
+               } else if (type.equals(InternalTypes.LONG)) {
+                       return Long.MAX_VALUE;
+               } else if (type.equals(InternalTypes.FLOAT)) {
+                       return Float.MAX_VALUE;
+               } else if (type.equals(InternalTypes.DOUBLE)) {
+                       return Double.MAX_VALUE;
+               } else if (type.equals(InternalTypes.CHAR)) {
+                       return '鼎';
+               } else if (type.equals(InternalTypes.STRING)) {
+                       return 
BinaryString.fromString(RandomStringUtils.random(100));
+               } else if (type instanceof DecimalType) {
+                       DecimalType decimalType = (DecimalType) type;
+                       return Decimal.fromBigDecimal(new 
BigDecimal(Integer.MAX_VALUE),
+                                       decimalType.precision(), 
decimalType.scale());
+               } else if (type instanceof ArrayType || 
type.equals(InternalTypes.BINARY)) {
+                       byte[] bytes = new byte[rnd.nextInt(100) + 100];
+                       rnd.nextBytes(bytes);
+                       return type.equals(InternalTypes.BINARY) ? bytes : 
BinaryArray.fromPrimitiveArray(bytes);
+               } else if (type instanceof RowType) {
+                       RowType rowType = (RowType) type;
+                       if (rowType.getTypeAt(0).equals(InternalTypes.INT)) {
+                               return GenericRow.of(rnd.nextInt());
+                       } else {
+                               return 
GenericRow.of(GenericRow.of(rnd.nextInt()));
+                       }
+               } else if (type instanceof GenericType) {
+                       return new BinaryGeneric<>(rnd.nextInt(), 
IntSerializer.INSTANCE);
+               } else {
+                       throw new RuntimeException("Not support!");
+               }
+       }
+
+       private InternalType[] getFieldTypes() {
+               InternalType[] result = new InternalType[fields.length];
+               for (int i = 0; i < fields.length; i++) {
+                       result[i] = types[fields[i]];
+               }
+               return result;
+       }
+
+       private InternalType[] getKeyTypes() {
+               InternalType[] result = new InternalType[keys.length];
+               for (int i = 0; i < keys.length; i++) {
+                       result[i] = types[fields[keys[i]]];
+               }
+               return result;
+       }
+
+       private void testInner() throws Exception {
+               List<MemorySegment> segments = new ArrayList<>();
+               for (int i = 0; i < 100; i++) {
+                       segments.add(MemorySegmentFactory.wrap(new 
byte[32768]));
+               }
+
+               InternalType[] fieldTypes = getFieldTypes();
+               InternalType[] keyTypes = getKeyTypes();
+
+               Tuple2<NormalizedKeyComputer, RecordComparator> tuple2 = 
getSortBaseWithNulls(
+                               this.getClass().getSimpleName(), keyTypes, 
keys, orders, nullsIsLast);
+
+               BinaryRowSerializer serializer = new 
BinaryRowSerializer(fieldTypes.length);
+
+               BinaryInMemorySortBuffer sortBuffer = 
BinaryInMemorySortBuffer.createBuffer(
+                               tuple2.f0, (AbstractRowSerializer) serializer, 
serializer,
+                               tuple2.f1, segments);
+
+               BinaryRow[] dataArray = getTestData();
+
+               List<BinaryRow> data = Arrays.asList(dataArray.clone());
+               List<BinaryRow> binaryRows = Arrays.asList(dataArray.clone());
+               Collections.shuffle(binaryRows);
+
+               for (BinaryRow row : binaryRows) {
+                       if (!sortBuffer.write(row)) {
+                               throw new RuntimeException();
+                       }
+               }
+
+               new QuickSort().sort(sortBuffer);
+
+               MutableObjectIterator<BinaryRow> iter = 
sortBuffer.getIterator();
+               List<BinaryRow> result = new ArrayList<>();
+               BinaryRow row = serializer.createInstance();
+               while ((row = iter.next(row)) != null) {
+                       result.add(row.copy());
+               }
+
+               data.sort((o1, o2) -> {
+                       for (int i = 0; i < keys.length; i++) {
+                               InternalType t = types[fields[keys[i]]];
+                               boolean order = orders[i];
+                               Object first = null;
+                               Object second = null;
+                               if (!o1.isNullAt(keys[i])) {
+                                       first = TypeGetterSetters.get(o1, 
keys[i], keyTypes[i]);
+                               }
+                               if (!o2.isNullAt(keys[i])) {
+                                       second = TypeGetterSetters.get(o2, 
keys[i], keyTypes[i]);
+                               }
+
+                               if (first == null && second == null) {
+                               } else if (first == null) {
+                                       return order ? -1 : 1;
+                               } else if (second == null) {
+                                       return order ? 1 : -1;
+                               } else if (first instanceof Comparable) {
+                                       int ret = ((Comparable) 
first).compareTo(second);
+                                       if (ret != 0) {
+                                               return order ? ret : -ret;
+                                       }
+                               } else if (t instanceof ArrayType) {
+                                       BinaryArray leftArray = (BinaryArray) 
first;
+                                       BinaryArray rightArray = (BinaryArray) 
second;
+                                       int minLength = 
Math.min(leftArray.numElements(), rightArray.numElements());
+                                       for (int j = 0; j < minLength; j++) {
+                                               boolean isNullLeft = 
leftArray.isNullAt(j);
+                                               boolean isNullRight = 
rightArray.isNullAt(j);
+                                               if (isNullLeft && isNullRight) {
+                                                       // Do nothing.
+                                               } else if (isNullLeft) {
+                                                       return order ? -1 : 1;
+                                               } else if (isNullRight) {
+                                                       return order ? 1 : -1;
+                                               } else {
+                                                       int comp = 
Byte.compare(leftArray.getByte(j), rightArray.getByte(j));
+                                                       if (comp != 0) {
+                                                               return order ? 
comp : -comp;
+                                                       }
+                                               }
+                                       }
+                                       if (leftArray.numElements() < 
rightArray.numElements()) {
+                                               return order ? -1 : 1;
+                                       } else if (leftArray.numElements() > 
rightArray.numElements()) {
+                                               return order ? 1 : -1;
+                                       }
+                               } else if (t.equals(InternalTypes.BINARY)) {
+                                       int comp = 
org.apache.flink.table.runtime.sort.SortUtil.compareBinary(
+                                                       (byte[]) first, 
(byte[]) second);
+                                       if (comp != 0) {
+                                               return order ? comp : -comp;
+                                       }
+                               } else if (t instanceof RowType) {
+                                       RowType rowType = (RowType) t;
+                                       int comp;
+                                       if 
(rowType.getTypeAt(0).equals(InternalTypes.INT)) {
+                                               comp = 
INT_ROW_COMP.compare(INT_ROW_CONV.toExternal(first),
+                                                               
INT_ROW_CONV.toExternal(second));
+                                       } else {
+                                               comp = 
NEST_ROW_COMP.compare(NEST_ROW_CONV.toExternal(first),
+                                                               
NEST_ROW_CONV.toExternal(second));
+                                       }
+                                       if (comp != 0) {
+                                               return order ? comp : -comp;
+                                       }
+                               } else if (t instanceof GenericType) {
+                                       Integer i1 = 
BinaryGeneric.getJavaObjectFromBinaryGeneric((BinaryGeneric) first, 
IntSerializer.INSTANCE);
+                                       Integer i2 = 
BinaryGeneric.getJavaObjectFromBinaryGeneric((BinaryGeneric) second, 
IntSerializer.INSTANCE);
+                                       int comp = Integer.compare(i1, i2);
+                                       if (comp != 0) {
+                                               return order ? comp : -comp;
+                                       }
+                               } else {
+                                       throw new RuntimeException();
+                               }
+                       }
+                       return 0;
+               });
+
+               StringBuilder builder = new StringBuilder();
+               for (int i = 0; i < data.size(); i++) {
+                       builder.append("\n")
+                                       .append("expect: ")
+                                       
.append(data.get(i).toOriginString(fieldTypes))
+                                       .append("; actual: ")
+                                       
.append(result.get(i).toOriginString(fieldTypes));
+               }
+               builder.append("\n").append("types: 
").append(Arrays.asList(fieldTypes));
+               builder.append("\n").append("keys: 
").append(Arrays.toString(keys));
+               String msg = builder.toString();
+               for (int i = 0; i < data.size(); i++) {
+                       for (int j = 0; j < keys.length; j++) {
+                               boolean isNull1 = data.get(i).isNullAt(keys[j]);
+                               boolean isNull2 = 
result.get(i).isNullAt(keys[j]);
+                               Assert.assertEquals(msg, isNull1, isNull2);
+                               if (!isNull1 || !isNull2) {
+                                       Object o1 = 
TypeGetterSetters.get(data.get(i), keys[j], keyTypes[j]);
+                                       Object o2 = 
TypeGetterSetters.get(result.get(i), keys[j], keyTypes[j]);
+                                       if 
(keyTypes[j].equals(InternalTypes.BINARY)) {
+                                               Assert.assertArrayEquals(msg, 
(byte[]) o1, (byte[]) o2);
+                                       } else {
+                                               Assert.assertEquals(msg, o1, 
o2);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       public static Tuple2<NormalizedKeyComputer, RecordComparator> 
getSortBaseWithNulls(
+                       String namePrefix, InternalType[] keyTypes, int[] keys, 
boolean[] orders, boolean[] nullsIsLast)
+                       throws IllegalAccessException, InstantiationException {
+               SortCodeGenerator generator = new SortCodeGenerator(new 
TableConfig(), keys, keyTypes, orders, nullsIsLast);
+               GeneratedNormalizedKeyComputer computer = 
generator.generateNormalizedKeyComputer(namePrefix + "Computer");
+               GeneratedRecordComparator comparator = 
generator.generateRecordComparator(namePrefix + "Comparator");
+               ClassLoader cl = Thread.currentThread().getContextClassLoader();
+               return new Tuple2<>(computer.newInstance(cl), 
comparator.newInstance(cl));
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
index 663bb26..613d294 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 
+import static 
org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE;
+
 /**
  * Use the special format to write data to a {@link MemorySegment} (its 
capacity grows
  * automatically).
@@ -86,7 +88,7 @@ public abstract class AbstractBinaryWriter implements 
BinaryWriter {
 
        private void writeBytes(int pos, byte[] bytes) {
                int len = bytes.length;
-               if (len <= 7) {
+               if (len <= MAX_FIX_PART_DATA_SIZE) {
                        writeBytesToFixLenPart(segment, getFieldOffset(pos), 
bytes, len);
                } else {
                        writeBytesToVarLenPart(pos, bytes, len);
@@ -142,6 +144,16 @@ public abstract class AbstractBinaryWriter implements 
BinaryWriter {
        }
 
        @Override
+       public void writeBinary(int pos, byte[] bytes) {
+               int len = bytes.length;
+               if (len <= MAX_FIX_PART_DATA_SIZE) {
+                       writeBytesToFixLenPart(segment, getFieldOffset(pos), 
bytes, len);
+               } else {
+                       writeBytesToVarLenPart(pos, bytes, len);
+               }
+       }
+
+       @Override
        public void writeDecimal(int pos, Decimal value, int precision) {
                assert value == null || (value.getPrecision() == precision);
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index 5055bdc..a02c68b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -201,6 +201,15 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
        }
 
        @Override
+       public byte[] getBinary(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndSize = SegmentsUtil.getLong(segments, 
fieldOffset);
+               return readBinaryFieldFromSegments(
+                               segments, offset, fieldOffset, offsetAndSize);
+       }
+
+       @Override
        public BinaryArray getArray(int pos) {
                assertIndexIsValid(pos);
                return BinaryArray.readBinaryArrayFieldFromSegments(segments, 
offset, getLong(pos));
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
index 7d07c09..fbb0fc4 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
@@ -25,6 +25,31 @@ import org.apache.flink.table.util.SegmentsUtil;
  */
 public abstract class BinaryFormat {
 
+       /**
+        * It decides whether to put data in FixLenPart or VarLenPart. See more 
in {@link BinaryRow}.
+        *
+        * <p>If len is less than 8, its binary format is:
+        * 1bit mark(1) = 1, 7bits len, and 7bytes data.
+        * Data is stored in fix-length part.
+        *
+        * <p>If len is greater or equal to 8, its binary format is:
+        * 1bit mark(1) = 0, 31bits offset, and 4bytes len.
+        * Data is stored in variable-length part.
+        */
+       static final int MAX_FIX_PART_DATA_SIZE = 7;
+
+       /**
+        * To get the mark in highest first bit.
+        * Form: 1000 0000 0000 0000 ...
+        */
+       private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE;
+
+       /**
+        * To get the 7 bits length in second bit to eighth bit.
+        * Form: 0111 1111 0000 0000 ...
+        */
+       private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
+
        protected MemorySegment[] segments;
        protected int offset;
        protected int sizeInBytes;
@@ -75,4 +100,60 @@ public abstract class BinaryFormat {
        public int hashCode() {
                return SegmentsUtil.hash(segments, offset, sizeInBytes);
        }
+
+       /**
+        * Get binary, if len less than 8, will be include in 
variablePartOffsetAndLen.
+        *
+        * <p>Note: Need to consider the ByteOrder.
+        *
+        * @param baseOffset base offset of composite binary format.
+        * @param fieldOffset absolute start offset of 
'variablePartOffsetAndLen'.
+        * @param variablePartOffsetAndLen a long value, real data or offset 
and len.
+        */
+       static byte[] readBinaryFieldFromSegments(
+                       MemorySegment[] segments, int baseOffset, int 
fieldOffset,
+                       long variablePartOffsetAndLen) {
+               long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+               if (mark == 0) {
+                       final int subOffset = (int) (variablePartOffsetAndLen 
>> 32);
+                       final int len = (int) variablePartOffsetAndLen;
+                       return SegmentsUtil.copyToBytes(segments, baseOffset + 
subOffset, len);
+               } else {
+                       int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+                       if (SegmentsUtil.LITTLE_ENDIAN) {
+                               return SegmentsUtil.copyToBytes(segments, 
fieldOffset, len);
+                       } else {
+                               // fieldOffset + 1 to skip header.
+                               return SegmentsUtil.copyToBytes(segments, 
fieldOffset + 1, len);
+                       }
+               }
+       }
+
+       /**
+        * Get binary string, if len less than 8, will be include in 
variablePartOffsetAndLen.
+        *
+        * <p>Note: Need to consider the ByteOrder.
+        *
+        * @param baseOffset base offset of composite binary format.
+        * @param fieldOffset absolute start offset of 
'variablePartOffsetAndLen'.
+        * @param variablePartOffsetAndLen a long value, real data or offset 
and len.
+        */
+       static BinaryString readBinaryStringFieldFromSegments(
+                       MemorySegment[] segments, int baseOffset, int 
fieldOffset,
+                       long variablePartOffsetAndLen) {
+               long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+               if (mark == 0) {
+                       final int subOffset = (int) (variablePartOffsetAndLen 
>> 32);
+                       final int len = (int) variablePartOffsetAndLen;
+                       return new BinaryString(segments, baseOffset + 
subOffset, len);
+               } else {
+                       int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+                       if (SegmentsUtil.LITTLE_ENDIAN) {
+                               return new BinaryString(segments, fieldOffset, 
len);
+                       } else {
+                               // fieldOffset + 1 to skip header.
+                               return new BinaryString(segments, fieldOffset + 
1, len);
+                       }
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
index 8519ba9..9dc1313 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.type.GenericType;
 import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
@@ -79,4 +80,16 @@ public class BinaryGeneric<T> extends LazyBinaryFormat<T> {
                int offset = (int) (offsetAndSize >> 32);
                return new BinaryGeneric<>(segments, offset + baseOffset, size, 
null);
        }
+
+       public static <T> T getJavaObjectFromBinaryGeneric(BinaryGeneric<T> 
value, TypeSerializer<T> ser) {
+               if (value.getJavaObject() == null) {
+                       try {
+                               
value.setJavaObject(InstantiationUtil.deserializeFromByteArray(ser,
+                                               
SegmentsUtil.copyToBytes(value.getSegments(), value.getOffset(), 
value.getSizeInBytes())));
+                       } catch (IOException e) {
+                               throw new FlinkRuntimeException(e);
+                       }
+               }
+               return value.getJavaObject();
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
index bc4b536..7108548 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -334,6 +334,14 @@ public final class BinaryRow extends BinaryFormat 
implements BaseRow {
        }
 
        @Override
+       public byte[] getBinary(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndLen = segments[0].getLong(fieldOffset);
+               return readBinaryFieldFromSegments(segments, offset, 
fieldOffset, offsetAndLen);
+       }
+
+       @Override
        public BinaryArray getArray(int pos) {
                assertIndexIsValid(pos);
                return BinaryArray.readBinaryArrayFieldFromSegments(segments, 
offset, getLong(pos));
@@ -394,4 +402,24 @@ public final class BinaryRow extends BinaryFormat 
implements BaseRow {
        public int hashCode() {
                return SegmentsUtil.hashByWords(segments, offset, sizeInBytes);
        }
+
+       public String toOriginString(InternalType... types) {
+               return toOriginString(this, types);
+       }
+
+       public static String toOriginString(BaseRow row, InternalType[] types) {
+               checkArgument(types.length == row.getArity());
+               StringBuilder build = new StringBuilder("[");
+               build.append(row.getHeader());
+               for (int i = 0; i < row.getArity(); i++) {
+                       build.append(',');
+                       if (row.isNullAt(i)) {
+                               build.append("null");
+                       } else {
+                               build.append(TypeGetterSetters.get(row, i, 
types[i]));
+                       }
+               }
+               build.append(']');
+               return build.toString();
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
index 0db18b4..64edc6d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
@@ -36,9 +36,6 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class BinaryString extends LazyBinaryFormat<String> implements 
Comparable<BinaryString> {
 
-       private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE;
-       private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
-
        public static final BinaryString EMPTY_UTF8 = 
BinaryString.fromBytes("".getBytes());
 
        public BinaryString(MemorySegment[] segments, int offset, int 
sizeInBytes) {
@@ -245,39 +242,4 @@ public class BinaryString extends LazyBinaryFormat<String> 
implements Comparable
 
                return sizeInBytes - other.sizeInBytes;
        }
-
-       /**
-        * Get binary string, if len less than 8, will be include in 
variablePartOffsetAndLen.
-        *
-        * <p>If len is less than 8, its binary format is:
-        * 1bit mark(1) = 1, 7bits len, and 7bytes data.
-        *
-        * <p>If len is greater or equal to 8, its binary format is:
-        * 1bit mark(1) = 0, 31bits offset, and 4bytes len.
-        * Data is stored in variable-length part.
-        *
-        * <p>Note: Need to consider the ByteOrder.
-        *
-        * @param baseOffset base offset of composite binary format.
-        * @param fieldOffset absolute start offset of 
'variablePartOffsetAndLen'.
-        * @param variablePartOffsetAndLen a long value, real data or offset 
and len.
-        */
-       static BinaryString readBinaryStringFieldFromSegments(
-                       MemorySegment[] segments, int baseOffset, int 
fieldOffset,
-                       long variablePartOffsetAndLen) {
-               long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
-               if (mark == 0) {
-                       final int subOffset = (int) (variablePartOffsetAndLen 
>> 32);
-                       final int len = (int) variablePartOffsetAndLen;
-                       return new BinaryString(segments, baseOffset + 
subOffset, len);
-               } else {
-                       int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
-                       if (SegmentsUtil.LITTLE_ENDIAN) {
-                               return new BinaryString(segments, fieldOffset, 
len);
-                       } else {
-                               // fieldOffset + 1 to skip header.
-                               return new BinaryString(segments, fieldOffset + 
1, len);
-                       }
-               }
-       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index deb5e3c..b1730ec 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -62,6 +62,8 @@ public interface BinaryWriter {
 
        void writeString(int pos, BinaryString value);
 
+       void writeBinary(int pos, byte[] bytes);
+
        void writeDecimal(int pos, Decimal value, int precision);
 
        void writeArray(int pos, BinaryArray value);
@@ -112,8 +114,10 @@ public interface BinaryWriter {
                } else if (type instanceof RowType) {
                        RowType rowType = (RowType) type;
                        writer.writeRow(pos, (BaseRow) o, 
rowType.getBaseRowSerializer());
-               }  else if (type instanceof GenericType) {
+               } else if (type instanceof GenericType) {
                        writer.writeGeneric(pos, (BinaryGeneric) o);
+               } else if (type.equals(InternalTypes.BINARY)) {
+                       writer.writeBinary(pos, (byte[]) o);
                } else {
                        throw new RuntimeException("Not support type: " + type);
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index d568cfe..e3f371b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -44,11 +44,8 @@ import 
org.apache.flink.table.typeutils.BinaryGenericTypeInfo;
 import org.apache.flink.table.typeutils.BinaryMapTypeInfo;
 import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
 import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import org.apache.flink.table.util.SegmentsUtil;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.InstantiationUtil;
 
-import java.io.IOException;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -488,15 +485,7 @@ public class DataFormatConverters {
 
                @Override
                T toExternalImpl(BinaryGeneric<T> value) {
-                       if (value.getJavaObject() == null) {
-                               try {
-                                       
value.setJavaObject(InstantiationUtil.deserializeFromByteArray(serializer,
-                                                       
SegmentsUtil.copyToBytes(value.getSegments(), value.getOffset(), 
value.getSizeInBytes())));
-                               } catch (IOException e) {
-                                       throw new RuntimeException(e);
-                               }
-                       }
-                       return value.getJavaObject();
+                       return 
BinaryGeneric.getJavaObjectFromBinaryGeneric(value, serializer);
                }
 
                @Override
@@ -633,25 +622,15 @@ public class DataFormatConverters {
        /**
         * Converter for primitive byte array.
         */
-       public static class PrimitiveByteArrayConverter extends 
DataFormatConverter<BinaryArray, byte[]> {
+       public static class PrimitiveByteArrayConverter extends 
IdentityConverter<byte[]> {
 
                public static final PrimitiveByteArrayConverter INSTANCE = new 
PrimitiveByteArrayConverter();
 
                private PrimitiveByteArrayConverter() {}
 
                @Override
-               BinaryArray toInternalImpl(byte[] value) {
-                       return BinaryArray.fromPrimitiveArray(value);
-               }
-
-               @Override
-               byte[] toExternalImpl(BinaryArray value) {
-                       return value.toByteArray();
-               }
-
-               @Override
                byte[] toExternalImpl(BaseRow row, int column) {
-                       return toExternalImpl(row.getArray(column));
+                       return row.getBinary(column);
                }
        }
 
@@ -917,7 +896,7 @@ public class DataFormatConverters {
        /**
         * Abstract converter for internal base row.
         */
-       public abstract static class AbstractBaseRowConverter<I extends 
BaseRow, E> extends DataFormatConverter<I, E> {
+       public abstract static class AbstractBaseRowConverter<E> extends 
DataFormatConverter<BaseRow, E> {
 
                protected final DataFormatConverter[] converters;
 
@@ -930,7 +909,7 @@ public class DataFormatConverters {
 
                @Override
                E toExternalImpl(BaseRow row, int column) {
-                       throw new RuntimeException("Not support yet!");
+                       return toExternalImpl(row.getRow(column, 
converters.length));
                }
        }
 
@@ -952,7 +931,7 @@ public class DataFormatConverters {
        /**
         * Converter for pojo.
         */
-       public static class PojoConverter<T> extends 
AbstractBaseRowConverter<GenericRow, T> {
+       public static class PojoConverter<T> extends 
AbstractBaseRowConverter<T> {
 
                private final PojoTypeInfo<T> t;
                private final PojoField[] fields;
@@ -968,7 +947,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               GenericRow toInternalImpl(T value) {
+               BaseRow toInternalImpl(T value) {
                        GenericRow genericRow = new GenericRow(t.getArity());
                        for (int i = 0; i < t.getArity(); i++) {
                                try {
@@ -982,7 +961,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               T toExternalImpl(GenericRow value) {
+               T toExternalImpl(BaseRow value) {
                        try {
                                T pojo = t.getTypeClass().newInstance();
                                for (int i = 0; i < t.getArity(); i++) {
@@ -998,7 +977,7 @@ public class DataFormatConverters {
        /**
         * Converter for row.
         */
-       public static class RowConverter extends 
AbstractBaseRowConverter<GenericRow, Row> {
+       public static class RowConverter extends AbstractBaseRowConverter<Row> {
 
                private final RowTypeInfo t;
 
@@ -1008,7 +987,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               GenericRow toInternalImpl(Row value) {
+               BaseRow toInternalImpl(Row value) {
                        GenericRow genericRow = new GenericRow(t.getArity());
                        for (int i = 0; i < t.getArity(); i++) {
                                genericRow.setField(i, 
converters[i].toInternal(value.getField(i)));
@@ -1017,7 +996,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               Row toExternalImpl(GenericRow value) {
+               Row toExternalImpl(BaseRow value) {
                        Row row = new Row(t.getArity());
                        for (int i = 0; i < t.getArity(); i++) {
                                row.setField(i, converters[i].toExternal(value, 
i));
@@ -1029,7 +1008,7 @@ public class DataFormatConverters {
        /**
         * Converter for flink tuple.
         */
-       public static class TupleConverter extends 
AbstractBaseRowConverter<GenericRow, Tuple> {
+       public static class TupleConverter extends 
AbstractBaseRowConverter<Tuple> {
 
                private final TupleTypeInfo t;
 
@@ -1039,7 +1018,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               GenericRow toInternalImpl(Tuple value) {
+               BaseRow toInternalImpl(Tuple value) {
                        GenericRow genericRow = new GenericRow(t.getArity());
                        for (int i = 0; i < t.getArity(); i++) {
                                genericRow.setField(i, 
converters[i].toInternal(value.getField(i)));
@@ -1048,7 +1027,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               Tuple toExternalImpl(GenericRow value) {
+               Tuple toExternalImpl(BaseRow value) {
                        try {
                                Tuple tuple = (Tuple) 
t.getTypeClass().newInstance();
                                for (int i = 0; i < t.getArity(); i++) {
@@ -1065,7 +1044,7 @@ public class DataFormatConverters {
        /**
         * Converter for case class.
         */
-       public static class CaseClassConverter extends 
AbstractBaseRowConverter<GenericRow, Product> {
+       public static class CaseClassConverter extends 
AbstractBaseRowConverter<Product> {
 
                private final TupleTypeInfoBase t;
                private final TupleSerializerBase serializer;
@@ -1077,7 +1056,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               GenericRow toInternalImpl(Product value) {
+               BaseRow toInternalImpl(Product value) {
                        GenericRow genericRow = new GenericRow(t.getArity());
                        for (int i = 0; i < t.getArity(); i++) {
                                genericRow.setField(i, 
converters[i].toInternal(value.productElement(i)));
@@ -1086,7 +1065,7 @@ public class DataFormatConverters {
                }
 
                @Override
-               Product toExternalImpl(GenericRow value) {
+               Product toExternalImpl(BaseRow value) {
                        Object[] fields = new Object[t.getArity()];
                        for (int i = 0; i < t.getArity(); i++) {
                                fields[i] = converters[i].toExternal(value, i);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
index 85dd2c9..0747c7b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
@@ -154,6 +154,15 @@ public final class JoinedRow implements BaseRow {
        }
 
        @Override
+       public byte[] getBinary(int i) {
+               if (i < row1.getArity()) {
+                       return row1.getBinary(i);
+               } else {
+                       return row2.getBinary(i - row1.getArity());
+               }
+       }
+
+       @Override
        public BinaryString getString(int i) {
                if (i < row1.getArity()) {
                        return row1.getString(i);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index 163216b..1c8b43c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -257,6 +257,14 @@ public final class NestedRow extends BinaryFormat 
implements BaseRow {
        }
 
        @Override
+       public byte[] getBinary(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndLen = segments[0].getLong(fieldOffset);
+               return readBinaryFieldFromSegments(segments, offset, 
fieldOffset, offsetAndLen);
+       }
+
+       @Override
        public BaseRow getRow(int pos, int numFields) {
                assertIndexIsValid(pos);
                return NestedRow.readNestedRowFieldFromSegments(segments, 
numFields, offset, getLong(pos));
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
index 28edebe..42fe8b6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
@@ -70,6 +70,11 @@ public abstract class ObjectArrayRow implements BaseRow {
        }
 
        @Override
+       public byte[] getBinary(int ordinal) {
+               return (byte[]) this.fields[ordinal];
+       }
+
+       @Override
        public BinaryArray getArray(int ordinal) {
                return (BinaryArray) this.fields[ordinal];
        }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
index be572b4..f1e080c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -105,6 +105,11 @@ public interface TypeGetterSetters {
        <T> BinaryGeneric<T> getGeneric(int ordinal);
 
        /**
+        * Get binary value, internal format is byte[].
+        */
+       byte[] getBinary(int ordinal);
+
+       /**
         * Get array value, internal format is BinaryArray.
         */
        BinaryArray getArray(int ordinal);
@@ -205,6 +210,8 @@ public interface TypeGetterSetters {
                        return row.getRow(ordinal, ((RowType) type).getArity());
                } else if (type instanceof GenericType) {
                        return row.getGeneric(ordinal);
+               } else if (type.equals(InternalTypes.BINARY)) {
+                       return row.getBinary(ordinal);
                } else {
                        throw new RuntimeException("Not support type: " + type);
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
index c81d2cc..99f5dba 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
@@ -77,6 +77,7 @@ public final class CompileUtils {
                try {
                        compiler.cook(code);
                } catch (Throwable t) {
+                       System.out.println(addLineNumber(code));
                        throw new InvalidProgramException(
                                "Table program cannot be compiled. This is a 
bug. Please file an issue.", t);
                }
@@ -87,4 +88,17 @@ public final class CompileUtils {
                        throw new RuntimeException("Can not load class " + 
name, e);
                }
        }
+
+       /**
+        * To output more information when an error occurs.
+        * Generally, when cook fails, it shows which line is wrong. This line 
number starts at 1.
+        */
+       private static String addLineNumber(String code) {
+               String[] lines = code.split("\n");
+               StringBuilder builder = new StringBuilder();
+               for (int i = 0; i < lines.length; i++) {
+                       builder.append(i + 1).append(" 
").append(lines[i]).append("\n");
+               }
+               return builder.toString();
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
index 8f26e14..043340b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
@@ -49,7 +49,10 @@ public abstract class GeneratedClass<T> implements 
Serializable {
        @SuppressWarnings("unchecked")
        public T newInstance(ClassLoader classLoader) {
                try {
-                       return (T) 
compile(classLoader).getConstructor(Object[].class).newInstance(references);
+                       return (T) 
compile(classLoader).getConstructor(Object[].class)
+                                       // Because 
Constructor.newInstance(Object... initargs), we need to load
+                                       // references into a new Object[], 
otherwise it cannot be compiled.
+                                       .newInstance(new Object[] {references});
                } catch (Exception e) {
                        throw new RuntimeException(
                                "Could not instantiate generated class '" + 
className + "'", e);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedNormalizedKeyComputer.java
similarity index 61%
copy from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
copy to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedNormalizedKeyComputer.java
index 002f4e0..e524d40 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedNormalizedKeyComputer.java
@@ -16,26 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.type;
+package org.apache.flink.table.generated;
 
 /**
- * Primitive type.
+ * Describes a generated {@link NormalizedKeyComputer}.
  */
-public abstract class PrimitiveType implements AtomicType {
+public class GeneratedNormalizedKeyComputer extends 
GeneratedClass<NormalizedKeyComputer> {
 
-       @Override
-       public boolean equals(Object o) {
-               return this == o || o != null && getClass() == o.getClass();
-       }
-
-       @Override
-       public int hashCode() {
-               return getClass().hashCode();
-       }
+       private static final long serialVersionUID = 1L;
 
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
+       /**
+        * Creates a GeneratedNormalizedKeyComputer.
+        *
+        * @param className class name of the generated class.
+        * @param code code of the generated class.
+        */
+       public GeneratedNormalizedKeyComputer(String className, String code) {
+               super(className, code, new Object[0]);
        }
-
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedRecordComparator.java
similarity index 59%
copy from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
copy to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedRecordComparator.java
index 002f4e0..2db63cc 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedRecordComparator.java
@@ -16,26 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.type;
+package org.apache.flink.table.generated;
 
 /**
- * Primitive type.
+ * Describes a generated {@link RecordComparator}.
  */
-public abstract class PrimitiveType implements AtomicType {
+public class GeneratedRecordComparator extends 
GeneratedClass<RecordComparator> {
 
-       @Override
-       public boolean equals(Object o) {
-               return this == o || o != null && getClass() == o.getClass();
-       }
-
-       @Override
-       public int hashCode() {
-               return getClass().hashCode();
-       }
+       private static final long serialVersionUID = 1L;
 
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
+       /**
+        * Creates a GeneratedRecordComparator.
+        *
+        * @param className class name of the generated class.
+        * @param code code of the generated class.
+        * @param references referenced objects of the generated class.
+        */
+       public GeneratedRecordComparator(String className, String code, 
Object[] references) {
+               super(className, code, references);
        }
 
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/NormalizedKeyComputer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/NormalizedKeyComputer.java
similarity index 64%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/NormalizedKeyComputer.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/NormalizedKeyComputer.java
index 121fb50..d960a74 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/NormalizedKeyComputer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/NormalizedKeyComputer.java
@@ -16,54 +16,45 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.sort;
+package org.apache.flink.table.generated;
 
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer;
 
 /**
  * Normalized key computer for {@link BinaryInMemorySortBuffer}.
  * For performance, subclasses are usually implemented through CodeGenerator.
  */
-public abstract class NormalizedKeyComputer {
-
-       protected TypeSerializer[] serializers;
-       protected TypeComparator[] comparators;
-
-       public void init(TypeSerializer[] serializers, TypeComparator[] 
comparators) {
-               this.serializers = serializers;
-               this.comparators = comparators;
-       }
+public interface NormalizedKeyComputer {
 
        /**
         * Writes a normalized key for the given record into the target {@link 
MemorySegment}.
         */
-       public abstract void putKey(BaseRow record, MemorySegment target, int 
offset);
+       void putKey(BaseRow record, MemorySegment target, int offset);
 
        /**
         * Compares two normalized keys in respective {@link MemorySegment}.
         */
-       public abstract int compareKey(MemorySegment segI, int offsetI, 
MemorySegment segJ, int offsetJ);
+       int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, int 
offsetJ);
 
        /**
         * Swaps two normalized keys in respective {@link MemorySegment}.
         */
-       public abstract void swapKey(MemorySegment segI, int offsetI, 
MemorySegment segJ, int offsetJ);
+       void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int 
offsetJ);
 
        /**
         * Get normalized keys bytes length.
         */
-       public abstract int getNumKeyBytes();
+       int getNumKeyBytes();
 
        /**
         * whether the normalized key can fully determines the comparison.
         */
-       public abstract boolean isKeyFullyDetermines();
+       boolean isKeyFullyDetermines();
 
        /**
         * Flag whether normalized key comparisons should be inverted key.
         */
-       public abstract boolean invertKey();
+       boolean invertKey();
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/RecordComparator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/RecordComparator.java
similarity index 64%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/RecordComparator.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/RecordComparator.java
index c5a274e..5331bcf 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/RecordComparator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/RecordComparator.java
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.sort;
+package org.apache.flink.table.generated;
 
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer;
 
 import java.io.Serializable;
 import java.util.Comparator;
@@ -28,19 +27,10 @@ import java.util.Comparator;
 /**
  * Record comparator for {@link BinaryInMemorySortBuffer}.
  * For performance, subclasses are usually implemented through CodeGenerator.
+ * A new interface for helping JVM inline.
  */
-public abstract class RecordComparator implements Comparator<BaseRow>, 
Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       protected TypeSerializer[] serializers;
-       protected TypeComparator[] comparators;
-
-       public void init(TypeSerializer[] serializers, TypeComparator[] 
comparators) {
-               this.serializers = serializers;
-               this.comparators = comparators;
-       }
+public interface RecordComparator extends Comparator<BaseRow>, Serializable {
 
        @Override
-       public abstract int compare(BaseRow o1, BaseRow o2);
+       int compare(BaseRow o1, BaseRow o2);
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java
index 366d822..97a9b75 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 import org.apache.flink.util.MutableObjectIterator;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java
index d04f0ae..cf32bb0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java
@@ -35,6 +35,8 @@ import 
org.apache.flink.runtime.util.EmptyMutableObjectIterator;
 import org.apache.flink.table.api.TableConfigOptions;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
 import org.apache.flink.table.runtime.io.ChannelWithMeta;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
@@ -970,7 +972,6 @@ public class BinaryExternalSorter implements 
Sorter<BinaryRow> {
                        // loop as long as the thread is marked alive and we do 
not see the final currWriteBuffer
                        while (isRunning()) {
                                try {
-                                       // TODO let cache in memory instead of 
disk.
                                        element = cache.isEmpty() ? 
queues.spill.take() : cache.poll();
                                } catch (InterruptedException iex) {
                                        if (isRunning()) {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java
index 166afca..f51cc1a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java
@@ -22,6 +22,8 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.typeutils.AbstractRowSerializer;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java
index a6e13f0..76cf216 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.runtime.operators.sort.IndexedSortable;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java
index 487651c..137dcdd 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 import org.apache.flink.util.MutableObjectIterator;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java
index d4ca5b2..ce1dd90 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java
@@ -21,6 +21,8 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.RandomAccessInputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java
index fe3e1c0..9e447aa 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.sort;
 
 import org.apache.flink.runtime.operators.sort.MergeIterator;
 import org.apache.flink.runtime.operators.sort.PartialOrderPriorityQueue;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java
index 3c23738..c601a4b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.operators.sort.IndexedSorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.table.api.TableConfigOptions;
 import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
 import org.apache.flink.table.runtime.io.ChannelWithMeta;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java
index a8e5607..e199efe 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java
@@ -23,11 +23,19 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.Decimal;
 
+import java.nio.ByteOrder;
+
+import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
+
 /**
- * Util for data formats.
+ * Util for sort.
  */
 public class SortUtil {
 
+       private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+       private static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN;
+       private static final int LONG_BYTES = 8;
+
        public static void minNormalizedKey(MemorySegment target, int offset, 
int numBytes) {
                //write min value.
                for (int i = 0; i < numBytes; i++) {
@@ -63,7 +71,7 @@ public class SortUtil {
        /**
         * UTF-8 supports bytes comparison.
         */
-       public static void putBinaryStringNormalizedKey(
+       public static void putStringNormalizedKey(
                        BinaryString value, MemorySegment target, int offset, 
int numBytes) {
                final int limit = offset + numBytes;
                final int end = value.getSizeInBytes();
@@ -117,4 +125,93 @@ public class SortUtil {
        public static void putCharNormalizedKey(char value, MemorySegment 
target, int offset, int numBytes) {
                NormalizedKeyUtil.putCharNormalizedKey(value, target, offset, 
numBytes);
        }
+
+       public static void putBinaryNormalizedKey(
+                       byte[] value, MemorySegment target, int offset, int 
numBytes) {
+               final int limit = offset + numBytes;
+               final int end = value.length;
+               for (int i = 0; i < end && offset < limit; i++) {
+                       target.put(offset++, value[i]);
+               }
+
+               for (int i = offset; i < limit; i++) {
+                       target.put(i, (byte) 0);
+               }
+       }
+
+       public static int compareBinary(byte[] a, byte[] b) {
+               return compareBinary(a, 0, a.length, b, 0, b.length);
+       }
+
+       public static int compareBinary(
+                       byte[] buffer1, int offset1, int length1,
+                       byte[] buffer2, int offset2, int length2) {
+               // Short circuit equal case
+               if (buffer1 == buffer2 &&
+                               offset1 == offset2 &&
+                               length1 == length2) {
+                       return 0;
+               }
+               int minLength = Math.min(length1, length2);
+               int minWords = minLength / LONG_BYTES;
+               int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
+               int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+               for (int i = 0; i < minWords * LONG_BYTES; i += LONG_BYTES) {
+                       long lw = UNSAFE.getLong(buffer1, offset1Adj + (long) 
i);
+                       long rw = UNSAFE.getLong(buffer2, offset2Adj + (long) 
i);
+                       long diff = lw ^ rw;
+
+                       if (diff != 0) {
+                               if (!LITTLE_ENDIAN) {
+                                       return lessThanUnsigned(lw, rw) ? -1 : 
1;
+                               }
+
+                               // Use binary search
+                               int n = 0;
+                               int y;
+                               int x = (int) diff;
+                               if (x == 0) {
+                                       x = (int) (diff >>> 32);
+                                       n = 32;
+                               }
+
+                               y = x << 16;
+                               if (y == 0) {
+                                       n += 16;
+                               } else {
+                                       x = y;
+                               }
+
+                               y = x << 8;
+                               if (y == 0) {
+                                       n += 8;
+                               }
+                               return (int) (((lw >>> n) & 0xFFL) - ((rw >>> 
n) & 0xFFL));
+                       }
+               }
+
+               // The epilogue to cover the last (minLength % 8) elements.
+               for (int i = minWords * LONG_BYTES; i < minLength; i++) {
+                       int result = unsignedByteToInt(buffer1[offset1 + i]) -
+                                       unsignedByteToInt(buffer2[offset2 + i]);
+                       if (result != 0) {
+                               return result;
+                       }
+               }
+               return length1 - length2;
+       }
+
+       private static int unsignedByteToInt(byte value) {
+               return value & 0xff;
+       }
+
+       private static boolean lessThanUnsigned(long x1, long x2) {
+               return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
index f825b22..a768a75 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
@@ -25,6 +25,8 @@ import org.apache.flink.util.Preconditions;
  */
 public class ArrayType implements InternalType {
 
+       private static final long serialVersionUID = 1L;
+
        private final InternalType elementType;
 
        public ArrayType(InternalType elementType) {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BinaryType.java
similarity index 63%
copy from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
copy to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BinaryType.java
index b3cdf5b..7c79243 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BinaryType.java
@@ -19,13 +19,22 @@
 package org.apache.flink.table.type;
 
 /**
- * Sql time type.
+ * Binary type, It differs from ArrayType(Byte):
+ *
+ * <p>1. Comparisons: Unsigned comparisons, not signed byte comparisons.
+ * According to: 
https://docs.oracle.com/cd/E11882_01/timesten.112/e21642/types.htm#TTSQL148
+ * The BINARY data type is a fixed-length binary value with a length of n 
bytes. In database,
+ * byte value usually expresses a range of 0-255, so the comparison is 
unsigned comparisons.
+ *
+ * <p>2. Its elements cannot have null values.
  */
-public class TimeType implements AtomicType {
+public class BinaryType implements AtomicType {
 
-       public static final TimeType INSTANCE = new TimeType();
+       private static final long serialVersionUID = 1L;
 
-       private TimeType() {}
+       public static final BinaryType INSTANCE = new BinaryType();
+
+       private BinaryType() {}
 
        @Override
        public boolean equals(Object o) {
@@ -41,5 +50,4 @@ public class TimeType implements AtomicType {
        public String toString() {
                return getClass().getSimpleName();
        }
-
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
index 02416ed..5bfc550 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class BooleanType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final BooleanType INSTANCE = new BooleanType();
 
        private BooleanType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java
index 343acb1..6a15d81 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class ByteType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final ByteType INSTANCE = new ByteType();
 
        private ByteType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
index bdb523c..90aced6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class CharType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final CharType INSTANCE = new CharType();
 
        private CharType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java
index 2c86a85..da36035 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class DateType implements AtomicType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final DateType DATE = new DateType(0, "DateType");
        public static final DateType INTERVAL_MONTHS = new DateType(1, 
"IntervalMonths");
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
index bb07118..5c66d19 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
@@ -27,6 +27,8 @@ import static java.lang.String.format;
  */
 public class DecimalType implements AtomicType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final int MAX_PRECISION = 38;
 
        public static final int MAX_COMPACT_PRECISION = 18;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
index fb1e46b..9262371 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class DoubleType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final DoubleType INSTANCE = new DoubleType();
 
        private DoubleType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java
index a250f161..26a36e4 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class FloatType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final FloatType INSTANCE = new FloatType();
 
        private FloatType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java
index d647480..f344e06 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java
@@ -30,6 +30,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class GenericType<T> implements AtomicType {
 
+       private static final long serialVersionUID = 1L;
+
        private final TypeInformation<T> typeInfo;
 
        private transient TypeSerializer<T> serializer;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java
index e89032b..5b39ea7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class IntType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final IntType INSTANCE = new IntType();
 
        private IntType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
index 7d750e6..644f428 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
@@ -43,7 +43,7 @@ public class InternalTypes {
 
        public static final CharType CHAR = CharType.INSTANCE;
 
-       public static final ArrayType BINARY = new ArrayType(BYTE);
+       public static final BinaryType BINARY = BinaryType.INSTANCE;
 
        public static final DateType DATE = DateType.DATE;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java
index c21936f..7be209d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class LongType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final LongType INSTANCE = new LongType();
 
        private LongType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java
index 4498d5d..f5dc0a1 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class MapType implements InternalType {
 
+       private static final long serialVersionUID = 1L;
+
        private final InternalType keyType;
        private final InternalType valueType;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
index 002f4e0..03c78ab 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public abstract class PrimitiveType implements AtomicType {
 
+       private static final long serialVersionUID = 1L;
+
        @Override
        public boolean equals(Object o) {
                return this == o || o != null && getClass() == o.getClass();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java
index 3401573..380ec44 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java
@@ -31,6 +31,8 @@ import java.util.Arrays;
  */
 public class RowType implements InternalType {
 
+       private static final long serialVersionUID = 1L;
+
        private final InternalType[] types;
 
        private final String[] fieldNames;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java
index 32d242d..173c688 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class ShortType extends PrimitiveType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final ShortType INSTANCE = new ShortType();
 
        private ShortType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java
index 8893dc2..504a494 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class StringType implements AtomicType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final StringType INSTANCE = new StringType();
 
        private StringType() {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
index b3cdf5b..a313c3a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class TimeType implements AtomicType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final TimeType INSTANCE = new TimeType();
 
        private TimeType() {}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
index 20b3652..34737c1 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
@@ -23,6 +23,8 @@ package org.apache.flink.table.type;
  */
 public class TimestampType implements AtomicType {
 
+       private static final long serialVersionUID = 1L;
+
        public static final TimestampType TIMESTAMP = new TimestampType(0, 
"TimestampType");
        public static final TimestampType INTERVAL_MILLIS =
                        new TimestampType(1, "IntervalMillis");
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
index c016590..ea90a77 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
@@ -88,6 +88,7 @@ public class TypeConverters {
                internalTypeToInfo.put(InternalTypes.DATE, 
BasicTypeInfo.INT_TYPE_INFO);
                internalTypeToInfo.put(InternalTypes.TIMESTAMP, 
BasicTypeInfo.LONG_TYPE_INFO);
                internalTypeToInfo.put(InternalTypes.TIME, 
BasicTypeInfo.INT_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.BINARY, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
                INTERNAL_TYPE_TO_INTERNAL_TYPE_INFO = 
Collections.unmodifiableMap(internalTypeToInfo);
        }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index 96057af..35dc8d6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -43,6 +44,7 @@ public class BaseRowTest {
        private BinaryArray array;
        private BinaryMap map;
        private BinaryRow underRow;
+       private byte[] bytes;
 
        @Before
        public void before() {
@@ -65,6 +67,7 @@ public class BaseRowTest {
                        writer.writeInt(1, 16);
                        writer.complete();
                }
+               bytes = new byte[] {1, 5, 6};
        }
 
        @Test
@@ -78,11 +81,11 @@ public class BaseRowTest {
                BinaryRowWriter writer = new BinaryRowWriter(row);
                writer.writeRow(0, getBinaryRow(), null);
                writer.complete();
-               testAll(row.getRow(0, 15));
+               testAll(row.getRow(0, 16));
        }
 
        private BinaryRow getBinaryRow() {
-               BinaryRow row = new BinaryRow(15);
+               BinaryRow row = new BinaryRow(16);
                BinaryRowWriter writer = new BinaryRowWriter(row);
                writer.writeBoolean(0, true);
                writer.writeByte(1, (byte) 1);
@@ -100,12 +103,13 @@ public class BaseRowTest {
                writer.writeMap(13, map);
                writer.writeRow(14, underRow, new BaseRowSerializer(
                                new ExecutionConfig(), InternalTypes.INT, 
InternalTypes.INT));
+               writer.writeBinary(15, bytes);
                return row;
        }
 
        @Test
        public void testGenericRow() {
-               GenericRow row = new GenericRow(15);
+               GenericRow row = new GenericRow(16);
                row.setField(0, true);
                row.setField(1, (byte) 1);
                row.setField(2, (short) 2);
@@ -121,12 +125,13 @@ public class BaseRowTest {
                row.setField(12, array);
                row.setField(13, map);
                row.setField(14, underRow);
+               row.setField(15, bytes);
                testAll(row);
        }
 
        @Test
        public void testBoxedWrapperRow() {
-               BoxedWrapperRow row = new BoxedWrapperRow(15);
+               BoxedWrapperRow row = new BoxedWrapperRow(16);
                row.setBoolean(0, true);
                row.setByte(1, (byte) 1);
                row.setShort(2, (short) 2);
@@ -142,6 +147,7 @@ public class BaseRowTest {
                row.setNonPrimitiveValue(12, array);
                row.setNonPrimitiveValue(13, map);
                row.setNonPrimitiveValue(14, underRow);
+               row.setNonPrimitiveValue(15, bytes);
                testAll(row);
        }
 
@@ -154,7 +160,7 @@ public class BaseRowTest {
                row1.setField(3, 3);
                row1.setField(4, (long) 4);
 
-               GenericRow row2 = new GenericRow(10);
+               GenericRow row2 = new GenericRow(11);
                row2.setField(0, (float) 5);
                row2.setField(1, (double) 6);
                row2.setField(2, (char) 7);
@@ -165,11 +171,12 @@ public class BaseRowTest {
                row2.setField(7, array);
                row2.setField(8, map);
                row2.setField(9, underRow);
+               row2.setField(10, bytes);
                testAll(new JoinedRow(row1, row2));
        }
 
        private void testAll(BaseRow row) {
-               assertEquals(15, row.getArity());
+               assertEquals(16, row.getArity());
 
                // test header
                assertEquals(0, row.getHeader());
@@ -193,6 +200,7 @@ public class BaseRowTest {
                assertEquals(map, row.getMap(13));
                assertEquals(15, row.getRow(14, 2).getInt(0));
                assertEquals(16, row.getRow(14, 2).getInt(1));
+               assertArrayEquals(bytes, row.getBinary(15));
 
                // test set
                row.setBoolean(0, false);
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index abc23ce..beeb8c0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.type.InternalTypes;
 import org.apache.flink.table.typeutils.BaseRowSerializer;
 import org.apache.flink.table.util.SegmentsUtil;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.math.BigDecimal;
@@ -502,4 +503,18 @@ public class BinaryArrayTest {
                assertEquals(1, nestedRow.getInt(1));
                assertTrue(array.isNullAt(1));
        }
+
+       @Test
+       public void testBinary() {
+               BinaryArray array = new BinaryArray();
+               BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+               byte[] bytes1 = new byte[] {1, -1, 5};
+               byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5};
+               writer.writeBinary(0, bytes1);
+               writer.writeBinary(1, bytes2);
+               writer.complete();
+
+               Assert.assertArrayEquals(bytes1, array.getBinary(0));
+               Assert.assertArrayEquals(bytes2, array.getBinary(1));
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index 1803e5a..15334f1 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -450,4 +450,18 @@ public class BinaryRowTest {
                assertEquals(1, nestedRow.getInt(1));
                assertTrue(row.isNullAt(1));
        }
+
+       @Test
+       public void testBinary() {
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               byte[] bytes1 = new byte[] {1, -1, 5};
+               byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5};
+               writer.writeBinary(0, bytes1);
+               writer.writeBinary(1, bytes2);
+               writer.complete();
+
+               Assert.assertArrayEquals(bytes1, row.getBinary(0));
+               Assert.assertArrayEquals(bytes2, row.getBinary(1));
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
index 1b78155..4570608 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
@@ -166,10 +166,10 @@ public class BinaryStringTest {
 
                MemorySegment segment1 = 
MemorySegmentFactory.allocateUnpooledSegment(1024);
                MemorySegment segment2 = 
MemorySegmentFactory.allocateUnpooledSegment(1024);
-               SortUtil.putBinaryStringNormalizedKey(fromString("abcabcabc"), 
segment1, 0, 9);
-               SortUtil.putBinaryStringNormalizedKey(fromString("abcabcabC"), 
segment2, 0, 9);
+               SortUtil.putStringNormalizedKey(fromString("abcabcabc"), 
segment1, 0, 9);
+               SortUtil.putStringNormalizedKey(fromString("abcabcabC"), 
segment2, 0, 9);
                assertTrue(segment1.compare(segment2, 0, 0, 9) > 0);
-               SortUtil.putBinaryStringNormalizedKey(fromString("abcab"), 
segment1, 0, 9);
+               SortUtil.putStringNormalizedKey(fromString("abcab"), segment1, 
0, 9);
                assertTrue(segment1.compare(segment2, 0, 0, 9) < 0);
        }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java
index 30bde3e..d170f5c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowWriter;
 import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
index 13356cc..1928696 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.api.TableConfigOptions;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowWriter;
 import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
+import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java
index 9cad238..42706e7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java
@@ -20,11 +20,12 @@ package org.apache.flink.table.runtime.sort;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.NormalizedKeyComputer;
 
 /**
  * Example for int {@link NormalizedKeyComputer}.
  */
-public class IntNormalizedKeyComputer extends 
org.apache.flink.table.runtime.sort.NormalizedKeyComputer {
+public class IntNormalizedKeyComputer implements NormalizedKeyComputer {
 
        public static final IntNormalizedKeyComputer INSTANCE = new 
IntNormalizedKeyComputer();
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java
index b41de22..5bbc48e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java
@@ -19,11 +19,12 @@
 package org.apache.flink.table.runtime.sort;
 
 import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.RecordComparator;
 
 /**
  * Example String {@link RecordComparator}.
  */
-public class IntRecordComparator extends RecordComparator {
+public class IntRecordComparator implements RecordComparator {
 
        public static final IntRecordComparator INSTANCE = new 
IntRecordComparator();
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java
index de2f61e..73a491e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java
@@ -115,12 +115,12 @@ public class SortUtilTest {
                        BinaryString[] arr = new BinaryString[len];
                        for (int i = 0; i < len; i++) {
                                arr[i] = 
BinaryString.fromString(String.valueOf(random.nextLong()));
-                               SortUtil.putBinaryStringNormalizedKey(arr[i], 
segments[i], 0, 8);
+                               SortUtil.putStringNormalizedKey(arr[i], 
segments[i], 0, 8);
                        }
 
                        Arrays.sort(arr, BinaryString::compareTo);
                        for (int i = 0; i < len; i++) {
-                               SortUtil.putBinaryStringNormalizedKey(arr[i], 
compareSegs[i], 0, 8);
+                               SortUtil.putStringNormalizedKey(arr[i], 
compareSegs[i], 0, 8);
                        }
 
                        Arrays.sort(segments, (o1, o2) -> o1.compare(o2, 0, 0, 
8));

Reply via email to