This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 76eb039  [FLINK-11802][table-runtime-blink] Create TypeInfo and 
TypeSerializer for blink data format
76eb039 is described below

commit 76eb0397041915c867df57b7e2478caca3afd7bd
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Mon Mar 4 10:01:58 2019 +0800

    [FLINK-11802][table-runtime-blink] Create TypeInfo and TypeSerializer for 
blink data format
    
    This closes #7887
---
 flink-table/flink-table-runtime-blink/pom.xml      |  17 ++
 .../apache/flink/table/dataformat/BinaryArray.java |  10 +
 .../apache/flink/table/dataformat/BinaryMap.java   |  10 +
 .../apache/flink/table/dataformat/BinaryRow.java   |   8 +-
 .../flink/table/dataformat/BinaryString.java       |  15 ++
 .../flink/table/dataformat/BinaryWriter.java       |  39 +++
 .../flink/table/dataformat/TypeGetterSetters.java  |  39 +++
 .../org/apache/flink/table/type/ArrayType.java     |   0
 .../org/apache/flink/table/type/AtomicType.java    |   0
 .../org/apache/flink/table/type/BooleanType.java   |   0
 .../java/org/apache/flink/table/type/ByteType.java |   0
 .../java/org/apache/flink/table/type/CharType.java |   0
 .../java/org/apache/flink/table/type/DateType.java |   0
 .../org/apache/flink/table/type/DecimalType.java   |   0
 .../org/apache/flink/table/type/DoubleType.java    |   0
 .../org/apache/flink/table/type/FloatType.java     |   0
 .../org/apache/flink/table/type/GenericType.java   |   0
 .../java/org/apache/flink/table/type/IntType.java  |   0
 .../org/apache/flink/table/type/InternalType.java  |   0
 .../org/apache/flink/table/type/InternalTypes.java |   0
 .../java/org/apache/flink/table/type/LongType.java |   0
 .../java/org/apache/flink/table/type/MapType.java  |   0
 .../org/apache/flink/table/type/PrimitiveType.java |   0
 .../java/org/apache/flink/table/type/RowType.java  |   2 +-
 .../org/apache/flink/table/type/ShortType.java     |   0
 .../org/apache/flink/table/type/StringType.java    |   0
 .../java/org/apache/flink/table/type/TimeType.java |   0
 .../org/apache/flink/table/type/TimestampType.java |   0
 .../apache/flink/table/type/TypeConverters.java    |  60 ++++-
 .../flink/table/typeutils/BaseRowSerializer.java   | 282 +++++++++++++++++++++
 .../flink/table/typeutils/BaseRowTypeInfo.java     | 145 +++++++++++
 .../table/typeutils/BinaryArraySerializer.java     | 107 ++++++++
 .../flink/table/typeutils/BinaryArrayTypeInfo.java | 105 ++++++++
 .../flink/table/typeutils/BinaryMapSerializer.java | 107 ++++++++
 .../flink/table/typeutils/BinaryMapTypeInfo.java   | 113 +++++++++
 .../flink/table/typeutils/BinaryRowSerializer.java | 184 ++++++++++++++
 .../table/typeutils/BinaryStringSerializer.java    | 114 +++++++++
 .../table/typeutils/BinaryStringTypeInfo.java      |  89 +++++++
 .../org/apache/flink/table/util/SegmentsUtil.java  |  40 +++
 .../apache/flink/table/type/InternalTypeTest.java  |   0
 .../table/typeutils/BaseRowSerializerTest.java     | 229 +++++++++++++++++
 .../table/typeutils/BinaryArraySerializerTest.java |  66 +++++
 .../table/typeutils/BinaryMapSerializerTest.java   |  66 +++++
 .../table/typeutils/BinaryRowSerializerTest.java   |  64 +++++
 .../table/typeutils/BinaryRowTypeInfoTest.java     |  77 ++++++
 .../typeutils/BinaryStringSerializerTest.java}     |  45 ++--
 .../flink/table/typeutils/BinaryTypeInfoTest.java} |  35 ++-
 47 files changed, 2017 insertions(+), 51 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/pom.xml 
b/flink-table/flink-table-runtime-blink/pom.xml
index 29fb8a7..e8b1623 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -60,5 +60,22 @@ under the License.
                        <artifactId>janino</artifactId>
                        <version>${janino.version}</version>
                </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 </project>
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index e708710..57ce04b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -347,6 +347,16 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
                return values;
        }
 
+       public BinaryArray copy() {
+               return copy(new BinaryArray());
+       }
+
+       public BinaryArray copy(BinaryArray reuse) {
+               byte[] bytes = SegmentsUtil.copyToBytes(segments, offset, 
sizeInBytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+               return reuse;
+       }
+
        private static BinaryArray fromPrimitiveArray(
                        Object arr, int offset, int length, int elementSize) {
                final long headerInBytes = calculateHeaderInBytes(length);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
index fefdc27..7c10193 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
@@ -72,6 +72,16 @@ public class BinaryMap extends BinaryFormat {
                return values;
        }
 
+       public BinaryMap copy() {
+               return copy(new BinaryMap());
+       }
+
+       public BinaryMap copy(BinaryMap reuse) {
+               byte[] bytes = SegmentsUtil.copyToBytes(segments, offset, 
sizeInBytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+               return reuse;
+       }
+
        public static BinaryMap valueOf(BinaryArray key, BinaryArray value) {
                checkArgument(key.getSegments().length == 1 && 
value.getSegments().length == 1);
                byte[] bytes = new byte[4 + key.getSizeInBytes() + 
value.getSizeInBytes()];
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
index 472f7b22..b88739e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -59,6 +59,10 @@ public final class BinaryRow extends BinaryFormat<Object> 
implements BaseRow {
                return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
        }
 
+       public static int calculateFixPartSizeInBytes(int arity) {
+               return calculateBitSetWidthInBytes(arity) + 8 * arity;
+       }
+
        private final int arity;
        private final int nullBitsSizeInBytes;
 
@@ -287,8 +291,8 @@ public final class BinaryRow extends BinaryFormat<Object> 
implements BaseRow {
                return copy(new BinaryRow(arity));
        }
 
-       public BinaryRow copy(BaseRow reuse) {
-               return copyInternal((BinaryRow) reuse);
+       public BinaryRow copy(BinaryRow reuse) {
+               return copyInternal(reuse);
        }
 
        private BinaryRow copyInternal(BinaryRow reuse) {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
index 4b388f6..152907e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
@@ -49,6 +49,16 @@ public class BinaryString extends BinaryFormat<String> {
                }
        }
 
+       public static BinaryString fromBytes(byte[] bytes) {
+               return new BinaryString(
+                               new MemorySegment[] 
{MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+       }
+
+       public static BinaryString fromBytes(byte[] bytes, int offset, int 
numBytes) {
+               return new BinaryString(
+                               new MemorySegment[] 
{MemorySegmentFactory.wrap(bytes)}, offset, numBytes);
+       }
+
        @Override
        public String toString() {
                byte[] bytes = BinaryStringUtil.allocateReuseBytes(sizeInBytes);
@@ -56,6 +66,11 @@ public class BinaryString extends BinaryFormat<String> {
                return new String(bytes, 0, sizeInBytes);
        }
 
+       public BinaryString copy() {
+               byte[] copy = SegmentsUtil.copyToBytes(segments, offset, 
sizeInBytes);
+               return BinaryString.fromBytes(copy, 0, copy.length);
+       }
+
        /**
         * Get binary string, if len less than 8, will be include in 
variablePartOffsetAndLen.
         *
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index 435b064..08ba9eb 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -17,6 +17,11 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.table.type.ArrayType;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
+import org.apache.flink.table.type.MapType;
+
 /**
  * Writer to write a composite data format, like row, array.
  * 1. Invoke {@link #reset()}.
@@ -61,4 +66,38 @@ public interface BinaryWriter {
         * Finally, complete write to set real size to binary.
         */
        void complete();
+
+       static void write(BinaryWriter writer, int pos, Object o, InternalType 
type) {
+               if (type.equals(InternalTypes.BOOLEAN)) {
+                       writer.writeBoolean(pos, (boolean) o);
+               } else if (type.equals(InternalTypes.BYTE)) {
+                       writer.writeByte(pos, (byte) o);
+               } else if (type.equals(InternalTypes.SHORT)) {
+                       writer.writeShort(pos, (short) o);
+               } else if (type.equals(InternalTypes.INT)) {
+                       writer.writeInt(pos, (int) o);
+               } else if (type.equals(InternalTypes.LONG)) {
+                       writer.writeLong(pos, (long) o);
+               } else if (type.equals(InternalTypes.FLOAT)) {
+                       writer.writeFloat(pos, (float) o);
+               } else if (type.equals(InternalTypes.DOUBLE)) {
+                       writer.writeDouble(pos, (double) o);
+               } else if (type.equals(InternalTypes.STRING)) {
+                       writer.writeString(pos, (BinaryString) o);
+               } else if (type.equals(InternalTypes.CHAR)) {
+                       writer.writeChar(pos, (char) o);
+               } else if (type.equals(InternalTypes.DATE)) {
+                       writer.writeInt(pos, (int) o);
+               } else if (type.equals(InternalTypes.TIME)) {
+                       writer.writeInt(pos, (int) o);
+               } else if (type.equals(InternalTypes.TIMESTAMP)) {
+                       writer.writeLong(pos, (long) o);
+               } else if (type instanceof ArrayType) {
+                       writer.writeArray(pos, (BinaryArray) o);
+               } else if (type instanceof MapType) {
+                       writer.writeMap(pos, (BinaryMap) o);
+               } else {
+                       throw new RuntimeException("Not support type: " + type);
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
index e567add..73d3d73 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -17,6 +17,11 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.table.type.ArrayType;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
+import org.apache.flink.table.type.MapType;
+
 /**
  * Provide type specialized getters and setters to reduce if/else and 
eliminate box and unbox.
  *
@@ -135,4 +140,38 @@ public interface TypeGetterSetters {
         * Set char value.
         */
        void setChar(int ordinal, char value);
+
+       static Object get(TypeGetterSetters row, int ordinal, InternalType 
type) {
+               if (type.equals(InternalTypes.BOOLEAN)) {
+                       return row.getBoolean(ordinal);
+               } else if (type.equals(InternalTypes.BYTE)) {
+                       return row.getByte(ordinal);
+               } else if (type.equals(InternalTypes.SHORT)) {
+                       return row.getShort(ordinal);
+               } else if (type.equals(InternalTypes.INT)) {
+                       return row.getInt(ordinal);
+               } else if (type.equals(InternalTypes.LONG)) {
+                       return row.getLong(ordinal);
+               } else if (type.equals(InternalTypes.FLOAT)) {
+                       return row.getFloat(ordinal);
+               } else if (type.equals(InternalTypes.DOUBLE)) {
+                       return row.getDouble(ordinal);
+               } else if (type.equals(InternalTypes.STRING)) {
+                       return row.getString(ordinal);
+               } else if (type.equals(InternalTypes.CHAR)) {
+                       return row.getChar(ordinal);
+               } else if (type.equals(InternalTypes.DATE)) {
+                       return row.getInt(ordinal);
+               } else if (type.equals(InternalTypes.TIME)) {
+                       return row.getInt(ordinal);
+               } else if (type.equals(InternalTypes.TIMESTAMP)) {
+                       return row.getLong(ordinal);
+               } else if (type instanceof ArrayType) {
+                       return row.getArray(ordinal);
+               } else if (type instanceof MapType) {
+                       return row.getMap(ordinal);
+               } else {
+                       throw new RuntimeException("Not support type: " + type);
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
similarity index 100%
copy from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
copy to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/AtomicType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/AtomicType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java
similarity index 97%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java
index 6c02996..3fee7c2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java
@@ -46,7 +46,7 @@ public class RowType implements InternalType {
                }
        }
 
-       private static String[] generateDefaultFieldNames(int length) {
+       public static String[] generateDefaultFieldNames(int length) {
                String[] fieldNames = new String[length];
                for (int i = 0; i < length; i++) {
                        fieldNames[i] = "f" + i;
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
similarity index 100%
copy from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
copy to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
similarity index 64%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
index 5bcf99d..9f9d055 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
@@ -29,6 +29,10 @@ import 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.table.typeutils.BinaryArrayTypeInfo;
+import org.apache.flink.table.typeutils.BinaryMapTypeInfo;
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,7 +45,8 @@ import java.util.stream.Stream;
  */
 public class TypeConverters {
 
-       public static final Map<TypeInformation, InternalType> 
TYPE_INFO_TO_INTERNAL_TYPE;
+       private static final Map<TypeInformation, InternalType> 
TYPE_INFO_TO_INTERNAL_TYPE;
+       private static final Map<InternalType, TypeInformation> 
INTERNAL_TYPE_TO_INTERNAL_TYPE_INFO;
        static {
                Map<TypeInformation, InternalType> tiToType = new HashMap<>();
                tiToType.put(BasicTypeInfo.STRING_TYPE_INFO, 
InternalTypes.STRING);
@@ -62,7 +67,26 @@ public class TypeConverters {
                // Decimal(38, 18). If the user's BigDecimal is more precision 
than this, we will
                // throw Exception to remind user to use GenericType in real 
data conversion.
                tiToType.put(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
InternalTypes.SYSTEM_DEFAULT_DECIMAL);
+
+               // Internal type info
+               tiToType.put(BinaryStringTypeInfo.INSTANCE, 
InternalTypes.STRING);
+
                TYPE_INFO_TO_INTERNAL_TYPE = 
Collections.unmodifiableMap(tiToType);
+
+               Map<InternalType, TypeInformation> internalTypeToInfo = new 
HashMap<>();
+               internalTypeToInfo.put(InternalTypes.STRING, 
BinaryStringTypeInfo.INSTANCE);
+               internalTypeToInfo.put(InternalTypes.BOOLEAN, 
BasicTypeInfo.BOOLEAN_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.DOUBLE, 
BasicTypeInfo.DOUBLE_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.FLOAT, 
BasicTypeInfo.FLOAT_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.BYTE, 
BasicTypeInfo.BYTE_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.INT, 
BasicTypeInfo.INT_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.LONG, 
BasicTypeInfo.LONG_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.SHORT, 
BasicTypeInfo.SHORT_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.CHAR, 
BasicTypeInfo.CHAR_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.DATE, 
BasicTypeInfo.INT_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.TIMESTAMP, 
BasicTypeInfo.LONG_TYPE_INFO);
+               internalTypeToInfo.put(InternalTypes.TIME, 
BasicTypeInfo.INT_TYPE_INFO);
+               INTERNAL_TYPE_TO_INTERNAL_TYPE_INFO = 
Collections.unmodifiableMap(internalTypeToInfo);
        }
 
        /**
@@ -110,8 +134,42 @@ public class TypeConverters {
                        return InternalTypes.createMapType(
                                        
createInternalTypeFromTypeInfo(mapType.getKeyTypeInfo()),
                                        
createInternalTypeFromTypeInfo(mapType.getValueTypeInfo()));
+               } else if (typeInfo instanceof BinaryMapTypeInfo) {
+                       BinaryMapTypeInfo mapType = (BinaryMapTypeInfo) 
typeInfo;
+                       return InternalTypes.createMapType(
+                                       mapType.getKeyType(), 
mapType.getValueType());
+               } else if (typeInfo instanceof BinaryArrayTypeInfo) {
+                       BinaryArrayTypeInfo arrayType = (BinaryArrayTypeInfo) 
typeInfo;
+                       return 
InternalTypes.createArrayType(arrayType.getElementType());
                } else {
                        return InternalTypes.createGenericType(typeInfo);
                }
        }
+
+       /**
+        * Create a internal {@link TypeInformation} from a {@link 
InternalType}.
+        *
+        * <p>eg:
+        * {@link InternalTypes#STRING} => {@link BinaryStringTypeInfo}.
+        * {@link RowType} => {@link BaseRowTypeInfo}.
+        */
+       public static TypeInformation 
createInternalTypeInfoFromInternalType(InternalType type) {
+               TypeInformation typeInfo = 
INTERNAL_TYPE_TO_INTERNAL_TYPE_INFO.get(type);
+               if (typeInfo != null) {
+                       return typeInfo;
+               }
+
+               if (type instanceof RowType) {
+                       RowType rowType = (RowType) type;
+                       return new BaseRowTypeInfo(rowType.getFieldTypes(), 
rowType.getFieldNames());
+               } else if (type instanceof ArrayType) {
+                       return new BinaryArrayTypeInfo(((ArrayType) 
type).getElementType());
+               } else if (type instanceof MapType) {
+                       MapType mapType = (MapType) type;
+                       return new BinaryMapTypeInfo(mapType.getKeyType(), 
mapType.getValueType());
+               } else {
+                       // TODO support decimal and generic type.
+                       throw new UnsupportedOperationException("Not support 
yet!");
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
new file mode 100644
index 0000000..55ecb6f
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.BinaryRowWriter;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.TypeConverters;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Serializer for BaseRow.
+ */
+public class BaseRowSerializer extends TypeSerializer<BaseRow> {
+
+       private BinaryRowSerializer binarySerializer;
+       private final InternalType[] types;
+       private final TypeSerializer[] fieldSerializers;
+
+       public BaseRowSerializer(InternalType[] types, ExecutionConfig config) {
+               this(types, Arrays.stream(types)
+                               
.map(TypeConverters::createInternalTypeInfoFromInternalType)
+                               .map(ti -> ti.createSerializer(config))
+                               .toArray(TypeSerializer[]::new));
+       }
+
+       public BaseRowSerializer(InternalType[] types, TypeSerializer[] 
fieldSerializers) {
+               this.types = types;
+               this.fieldSerializers = fieldSerializers;
+               this.binarySerializer = new BinaryRowSerializer(types.length);
+       }
+
+       public int getArity() {
+               return types.length;
+       }
+
+       @Override
+       public TypeSerializer<BaseRow> duplicate() {
+               return new BaseRowSerializer(types, fieldSerializers);
+       }
+
+       @Override
+       public BaseRow createInstance() {
+               // default use binary row to deserializer
+               return new BinaryRow(types.length);
+       }
+
+       @Override
+       public BaseRow copy(BaseRow from) {
+               if (from.getArity() != types.length) {
+                       throw new IllegalArgumentException("Row arity: " + 
from.getArity() +
+                                       ", but serializer arity: " + 
types.length);
+               }
+               if (from instanceof BinaryRow) {
+                       return ((BinaryRow) from).copy();
+               } else {
+                       return copyBaseRow(from, new 
GenericRow(from.getArity()));
+               }
+       }
+
+       @Override
+       public BaseRow copy(BaseRow from, BaseRow reuse) {
+               if (from.getArity() != types.length || reuse.getArity() != 
types.length) {
+                       throw new IllegalArgumentException("Row arity: " + 
from.getArity() +
+                                       ", Ruese Row arity: " + 
reuse.getArity() +
+                                       ", but serializer arity: " + 
types.length);
+               }
+               if (from instanceof BinaryRow) {
+                       return reuse instanceof BinaryRow ?
+                                       ((BinaryRow) from).copy((BinaryRow) 
reuse) :
+                                       ((BinaryRow) from).copy();
+               } else {
+                       return copyBaseRow(from, reuse);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private BaseRow copyBaseRow(BaseRow from, BaseRow reuse) {
+               GenericRow ret;
+               if (reuse instanceof GenericRow) {
+                       ret = (GenericRow) reuse;
+               } else {
+                       ret = new GenericRow(from.getArity());
+               }
+               ret.setHeader(from.getHeader());
+               for (int i = 0; i < from.getArity(); i++) {
+                       if (!from.isNullAt(i)) {
+                               ret.setField(i, 
fieldSerializers[i].copy((TypeGetterSetters.get(from, i, types[i]))));
+                       } else {
+                               ret.setNullAt(i);
+                       }
+               }
+               return ret;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               binarySerializer.copy(source, target);
+       }
+
+       /**
+        * Convert base row to binary row.
+        * TODO modify it to code gen, and reuse BinaryRow&BinaryRowWriter.
+        */
+       public BinaryRow baseRowToBinary(BaseRow row) {
+               if (row instanceof BinaryRow) {
+                       return (BinaryRow) row;
+               }
+               BinaryRow binaryRow = new BinaryRow(types.length);
+               BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+               for (int i = 0; i < types.length; i++) {
+                       if (row.isNullAt(i)) {
+                               writer.setNullAt(i);
+                       } else {
+                               BinaryWriter.write(writer, i, 
TypeGetterSetters.get(row, i, types[i]), types[i]);
+                       }
+               }
+               writer.complete();
+               return binaryRow;
+       }
+
+       @Override
+       public void serialize(BaseRow row, DataOutputView target) throws 
IOException {
+               binarySerializer.serialize(baseRowToBinary(row), target);
+       }
+
+       @Override
+       public BaseRow deserialize(DataInputView source) throws IOException {
+               return binarySerializer.deserialize(source);
+       }
+
+       @Override
+       public BaseRow deserialize(BaseRow reuse, DataInputView source) throws 
IOException {
+               if (reuse instanceof BinaryRow) {
+                       return binarySerializer.deserialize((BinaryRow) reuse, 
source);
+               } else {
+                       return binarySerializer.deserialize(source);
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof BaseRowSerializer) {
+                       BaseRowSerializer other = (BaseRowSerializer) obj;
+                       return Arrays.equals(types, other.types);
+               }
+
+               return false;
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(types);
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public TypeSerializerSnapshot<BaseRow> snapshotConfiguration() {
+               return new BaseRowSerializerSnapshot(types, fieldSerializers);
+       }
+
+       /**
+        * {@link TypeSerializerSnapshot} for {@link BinaryRowSerializer}.
+        */
+       public static final class BaseRowSerializerSnapshot implements 
TypeSerializerSnapshot<BaseRow> {
+               private static final int CURRENT_VERSION = 3;
+
+               private InternalType[] previousTypes;
+               private NestedSerializersSnapshotDelegate 
nestedSerializersSnapshotDelegate;
+
+               @SuppressWarnings("unused")
+               public BaseRowSerializerSnapshot() {
+                       // this constructor is used when restoring from a 
checkpoint/savepoint.
+               }
+
+               BaseRowSerializerSnapshot(InternalType[] types, 
TypeSerializer[] serializers) {
+                       this.previousTypes = types;
+                       this.nestedSerializersSnapshotDelegate = new 
NestedSerializersSnapshotDelegate(serializers);
+               }
+
+               @Override
+               public int getCurrentVersion() {
+                       return CURRENT_VERSION;
+               }
+
+               @Override
+               public void writeSnapshot(DataOutputView out) throws 
IOException {
+                       out.writeInt(previousTypes.length);
+                       DataOutputViewStream stream = new 
DataOutputViewStream(out);
+                       for (InternalType previousType : previousTypes) {
+                               InstantiationUtil.serializeObject(stream, 
previousType);
+                       }
+                       
nestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(out);
+               }
+
+               @Override
+               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
+                       int length = in.readInt();
+                       DataInputViewStream stream = new 
DataInputViewStream(in);
+                       previousTypes = new InternalType[length];
+                       for (int i = 0; i < length; i++) {
+                               try {
+                                       previousTypes[i] = 
InstantiationUtil.deserializeObject(stream, userCodeClassLoader);
+                               } catch (ClassNotFoundException e) {
+                                       throw new IOException(e);
+                               }
+                       }
+                       this.nestedSerializersSnapshotDelegate = 
NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, 
userCodeClassLoader);
+               }
+
+               @Override
+               public BaseRowSerializer restoreSerializer() {
+                       return new BaseRowSerializer(previousTypes, 
nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
+               }
+
+               @Override
+               public TypeSerializerSchemaCompatibility<BaseRow> 
resolveSchemaCompatibility(TypeSerializer<BaseRow> newSerializer) {
+                       if (!(newSerializer instanceof BaseRowSerializer)) {
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
+                       }
+
+                       BaseRowSerializer newRowSerializer = 
(BaseRowSerializer) newSerializer;
+                       if (!Arrays.equals(previousTypes, 
newRowSerializer.types)) {
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
+                       }
+
+                       
CompositeTypeSerializerUtil.IntermediateCompatibilityResult<BaseRow> 
intermediateResult =
+                                       
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
+                                                       
newRowSerializer.fieldSerializers,
+                                                       
nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+
+                       if 
(intermediateResult.isCompatibleWithReconfiguredSerializer()) {
+                               BaseRowSerializer 
reconfiguredCompositeSerializer = restoreSerializer();
+                               return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
+                       }
+
+                       return intermediateResult.getFinalResult();
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
new file mode 100644
index 0000000..b942c6a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.RowType;
+import org.apache.flink.table.type.TypeConverters;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base row type info.
+ */
+public class BaseRowTypeInfo extends TupleTypeInfoBase<BaseRow> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final String[] fieldNames;
+       private final InternalType[] internalTypes;
+
+       public BaseRowTypeInfo(InternalType... internalTypes) {
+               this(internalTypes, 
RowType.generateDefaultFieldNames(internalTypes.length));
+       }
+
+       public BaseRowTypeInfo(InternalType[] internalTypes, String[] 
fieldNames) {
+               super(BaseRow.class, Arrays.stream(internalTypes)
+                               
.map(TypeConverters::createInternalTypeInfoFromInternalType)
+                               .toArray(TypeInformation[]::new));
+               this.internalTypes = internalTypes;
+               checkNotNull(fieldNames, "FieldNames should not be null.");
+               checkArgument(internalTypes.length == fieldNames.length,
+                               "Number of field types and names is 
different.");
+               checkArgument(!hasDuplicateFieldNames(fieldNames),
+                               "Field names are not unique.");
+               this.fieldNames = Arrays.copyOf(fieldNames, fieldNames.length);
+       }
+
+       @Override
+       public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+               throw new UnsupportedOperationException("Not support!");
+       }
+
+       @Override
+       public TypeComparator<BaseRow> createComparator(
+               int[] logicalKeyFields,
+               boolean[] orders,
+               int logicalFieldOffset,
+               ExecutionConfig config) {
+               // TODO support it.
+               throw new UnsupportedOperationException("Not support yet!");
+       }
+
+       @Override
+       public String[] getFieldNames() {
+               return fieldNames;
+       }
+
+       @Override
+       public int getFieldIndex(String fieldName) {
+               for (int i = 0; i < fieldNames.length; i++) {
+                       if (fieldNames[i].equals(fieldName)) {
+                               return i;
+                       }
+               }
+               return -1;
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof BaseRowTypeInfo;
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder bld = new StringBuilder("BaseRow");
+               if (types.length > 0) {
+                       bld.append('(').append(fieldNames[0]).append(": 
").append(types[0]);
+
+                       for (int i = 1; i < types.length; i++) {
+                               bld.append(", 
").append(fieldNames[i]).append(": ").append(types[i]);
+                       }
+
+                       bld.append(')');
+               }
+               return bld.toString();
+       }
+
+       /**
+        * Returns the field types of the row. The order matches the order of 
the field names.
+        */
+       public TypeInformation<?>[] getFieldTypes() {
+               return types;
+       }
+
+       private boolean hasDuplicateFieldNames(String[] fieldNames) {
+               HashSet<String> names = new HashSet<>();
+               for (String field : fieldNames) {
+                       if (!names.add(field)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       @Override
+       public CompositeType.TypeComparatorBuilder<BaseRow> 
createTypeComparatorBuilder() {
+               throw new UnsupportedOperationException("Not support!");
+       }
+
+       @Override
+       public BaseRowSerializer createSerializer(ExecutionConfig config) {
+               return new BaseRowSerializer(internalTypes, config);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java
new file mode 100644
index 0000000..9526e30
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link BinaryArray}.
+ */
+public class BinaryArraySerializer extends 
TypeSerializerSingleton<BinaryArray> {
+
+       public static final BinaryArraySerializer INSTANCE = new 
BinaryArraySerializer();
+
+       private BinaryArraySerializer() {}
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public BinaryArray createInstance() {
+               return new BinaryArray();
+       }
+
+       @Override
+       public BinaryArray copy(BinaryArray from) {
+               return from.copy();
+       }
+
+       @Override
+       public BinaryArray copy(BinaryArray from, BinaryArray reuse) {
+               return from.copy(reuse);
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(BinaryArray record, DataOutputView target) throws 
IOException {
+               target.writeInt(record.getSizeInBytes());
+               SegmentsUtil.serializeToView(record.getSegments(), 
record.getOffset(), record.getSizeInBytes(), target);
+       }
+
+       @Override
+       public BinaryArray deserialize(DataInputView source) throws IOException 
{
+               return deserialize(new BinaryArray(), source);
+       }
+
+       @Override
+       public BinaryArray deserialize(BinaryArray reuse, DataInputView source) 
throws IOException {
+               int length = source.readInt();
+               byte[] bytes = new byte[length];
+               source.readFully(bytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, 
bytes.length);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int length = source.readInt();
+               target.writeInt(length);
+               target.write(source, length);
+       }
+
+       @Override
+       public TypeSerializerSnapshot<BinaryArray> snapshotConfiguration() {
+               return new BinaryArraySerializerSnapshot();
+       }
+
+       /**
+        * Serializer configuration snapshot for compatibility and format 
evolution.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final class BinaryArraySerializerSnapshot extends 
SimpleTypeSerializerSnapshot<BinaryArray> {
+
+               public BinaryArraySerializerSnapshot() {
+                       super(() -> INSTANCE);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArrayTypeInfo.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArrayTypeInfo.java
new file mode 100644
index 0000000..524b807
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArrayTypeInfo.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.type.InternalType;
+
+/**
+ * TypeInfo for BaseArray.
+ */
+public class BinaryArrayTypeInfo extends TypeInformation<BinaryArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final InternalType elementType;
+
+       public BinaryArrayTypeInfo(InternalType elementType) {
+               this.elementType = elementType;
+       }
+
+       public InternalType getElementType() {
+               return elementType;
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @Override
+       public Class<BinaryArray> getTypeClass() {
+               return BinaryArray.class;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<BinaryArray> createSerializer(ExecutionConfig 
executionConfig) {
+               return BinaryArraySerializer.INSTANCE;
+       }
+
+       @Override
+       public String toString() {
+               return this.getClass().getSimpleName() + "<" + this.elementType 
+ ">";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof BinaryArrayTypeInfo) {
+                       BinaryArrayTypeInfo typeInfo = (BinaryArrayTypeInfo) 
obj;
+
+                       return typeInfo.canEqual(this) &&
+                                       
elementType.equals(typeInfo.elementType);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof BinaryArrayTypeInfo;
+       }
+
+       @Override
+       public int hashCode() {
+               return this.elementType.hashCode();
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java
new file mode 100644
index 0000000..6030026
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link BinaryMap}.
+ */
+public class BinaryMapSerializer extends TypeSerializerSingleton<BinaryMap> {
+
+       public static final BinaryMapSerializer INSTANCE = new 
BinaryMapSerializer();
+
+       private BinaryMapSerializer() {}
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public BinaryMap createInstance() {
+               return new BinaryMap();
+       }
+
+       @Override
+       public BinaryMap copy(BinaryMap from) {
+               return from.copy();
+       }
+
+       @Override
+       public BinaryMap copy(BinaryMap from, BinaryMap reuse) {
+               return from.copy(reuse);
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(BinaryMap record, DataOutputView target) throws 
IOException {
+               target.writeInt(record.getSizeInBytes());
+               SegmentsUtil.serializeToView(record.getSegments(), 
record.getOffset(), record.getSizeInBytes(), target);
+       }
+
+       @Override
+       public BinaryMap deserialize(DataInputView source) throws IOException {
+               return deserialize(new BinaryMap(), source);
+       }
+
+       @Override
+       public BinaryMap deserialize(BinaryMap reuse, DataInputView source) 
throws IOException {
+               int length = source.readInt();
+               byte[] bytes = new byte[length];
+               source.readFully(bytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, 
bytes.length);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int length = source.readInt();
+               target.writeInt(length);
+               target.write(source, length);
+       }
+
+       @Override
+       public TypeSerializerSnapshot<BinaryMap> snapshotConfiguration() {
+               return new BinaryMapSerializerSnapshot();
+       }
+
+       /**
+        * Serializer configuration snapshot for compatibility and format 
evolution.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final class BinaryMapSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<BinaryMap> {
+
+               public BinaryMapSerializerSnapshot() {
+                       super(() -> INSTANCE);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapTypeInfo.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapTypeInfo.java
new file mode 100644
index 0000000..ca6dbed
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapTypeInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.type.InternalType;
+
+/**
+ * TypeInfo for BaseMap.
+ */
+public class BinaryMapTypeInfo extends TypeInformation<BinaryMap> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final InternalType keyType;
+       private final InternalType valueType;
+
+       public BinaryMapTypeInfo(InternalType keyType, InternalType valueType) {
+               this.keyType = keyType;
+               this.valueType = valueType;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public InternalType getKeyType() {
+               return keyType;
+       }
+
+       public InternalType getValueType() {
+               return valueType;
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @Override
+       public Class<BinaryMap> getTypeClass() {
+               return BinaryMap.class;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<BinaryMap> createSerializer(ExecutionConfig 
executionConfig) {
+               return BinaryMapSerializer.INSTANCE;
+       }
+
+       @Override
+       public String toString() {
+               return  this.getClass().getSimpleName() +
+                               "<" + keyType + ", " + valueType + ">";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof BinaryMapTypeInfo) {
+                       BinaryMapTypeInfo typeInfo = (BinaryMapTypeInfo) obj;
+
+                       return keyType.equals(typeInfo.keyType) && 
valueType.equals(typeInfo.valueType);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof BinaryMapTypeInfo;
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * keyType.hashCode() + valueType.hashCode();
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
new file mode 100644
index 0000000..078b948
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Serializer for {@link BinaryRow}.
+ */
+public class BinaryRowSerializer extends TypeSerializer<BinaryRow> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final int numFields;
+
+       public BinaryRowSerializer(int numFields) {
+               this.numFields = numFields;
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<BinaryRow> duplicate() {
+               return new BinaryRowSerializer(numFields);
+       }
+
+       @Override
+       public BinaryRow createInstance() {
+               return new BinaryRow(numFields);
+       }
+
+       @Override
+       public BinaryRow copy(BinaryRow from) {
+               return copy(from, new BinaryRow(numFields));
+       }
+
+       @Override
+       public BinaryRow copy(BinaryRow from, BinaryRow reuse) {
+               return from.copy(reuse);
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(BinaryRow record, DataOutputView target) throws 
IOException {
+               target.writeInt(record.getSizeInBytes());
+               SegmentsUtil.serializeToView(record.getSegments(), 
record.getOffset(), record.getSizeInBytes(), target);
+       }
+
+       @Override
+       public BinaryRow deserialize(DataInputView source) throws IOException {
+               BinaryRow row = new BinaryRow(numFields);
+               int length = source.readInt();
+               byte[] bytes = new byte[length];
+               source.readFully(bytes);
+               row.pointTo(MemorySegmentFactory.wrap(bytes), 0, length);
+               return row;
+       }
+
+       @Override
+       public BinaryRow deserialize(BinaryRow reuse, DataInputView source) 
throws IOException {
+               MemorySegment[] segments = reuse.getSegments();
+               checkArgument(segments == null || (segments.length == 1 && 
reuse.getOffset() == 0),
+                               "Reuse BinaryRow should have no segments or 
only one segment and offset start at 0.");
+
+               int length = source.readInt();
+               if (segments == null || segments[0].size() < length) {
+                       segments = new MemorySegment[] 
{MemorySegmentFactory.wrap(new byte[length])};
+               }
+               source.readFully(segments[0].getArray(), 0, length);
+               reuse.pointTo(segments, 0, length);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int length = source.readInt();
+               target.writeInt(length);
+               target.write(source, length);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj instanceof BinaryRowSerializer
+                               && numFields == ((BinaryRowSerializer) 
obj).numFields;
+       }
+
+       @Override
+       public int hashCode() {
+               return Integer.hashCode(numFields);
+       }
+
+       @Override
+       public TypeSerializerSnapshot<BinaryRow> snapshotConfiguration() {
+               return new BinaryRowSerializerSnapshot(numFields);
+       }
+
+       /**
+        * {@link TypeSerializerSnapshot} for {@link BinaryRowSerializer}.
+        */
+       public static final class BinaryRowSerializerSnapshot implements 
TypeSerializerSnapshot<BinaryRow> {
+               private static final int CURRENT_VERSION = 3;
+
+               private int previousNumFields;
+
+               @SuppressWarnings("unused")
+               public BinaryRowSerializerSnapshot() {
+                       // this constructor is used when restoring from a 
checkpoint/savepoint.
+               }
+
+               BinaryRowSerializerSnapshot(int numFields) {
+                       this.previousNumFields = numFields;
+               }
+
+               @Override
+               public int getCurrentVersion() {
+                       return CURRENT_VERSION;
+               }
+
+               @Override
+               public void writeSnapshot(DataOutputView out) throws 
IOException {
+                       out.writeInt(previousNumFields);
+               }
+
+               @Override
+               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
+                       this.previousNumFields = in.readInt();
+               }
+
+               @Override
+               public TypeSerializer<BinaryRow> restoreSerializer() {
+                       return new BinaryRowSerializer(previousNumFields);
+               }
+
+               @Override
+               public TypeSerializerSchemaCompatibility<BinaryRow> 
resolveSchemaCompatibility(TypeSerializer<BinaryRow> newSerializer) {
+                       if (!(newSerializer instanceof BinaryRowSerializer)) {
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
+                       }
+
+                       BinaryRowSerializer newBinaryRowSerializer = 
(BinaryRowSerializer) newSerializer;
+                       if (previousNumFields != 
newBinaryRowSerializer.numFields) {
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
+                       } else {
+                               return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
+                       }
+               }
+       }
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryStringSerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryStringSerializer.java
new file mode 100644
index 0000000..0a45c8a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryStringSerializer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link BinaryString}.
+ */
+@Internal
+public final class BinaryStringSerializer extends 
TypeSerializerSingleton<BinaryString> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final BinaryStringSerializer INSTANCE = new 
BinaryStringSerializer();
+
+       private BinaryStringSerializer() {}
+
+       @Override
+       public boolean isImmutableType() {
+               return true;
+       }
+
+       @Override
+       public BinaryString createInstance() {
+               return BinaryString.fromString("");
+       }
+
+       @Override
+       public BinaryString copy(BinaryString from) {
+               return from.copy();
+       }
+
+       @Override
+       public BinaryString copy(BinaryString from, BinaryString reuse) {
+               return from.copy();
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(BinaryString record, DataOutputView target) 
throws IOException {
+               target.writeInt(record.getSizeInBytes());
+               SegmentsUtil.serializeToView(record.getSegments(), 
record.getOffset(), record.getSizeInBytes(), target);
+       }
+
+       @Override
+       public BinaryString deserialize(DataInputView source) throws 
IOException {
+               return deserializeInternal(source);
+       }
+
+       public static BinaryString deserializeInternal(DataInputView source) 
throws IOException {
+               int length = source.readInt();
+               byte[] bytes = new byte[length];
+               source.readFully(bytes);
+               return BinaryString.fromBytes(bytes);
+       }
+
+       @Override
+       public BinaryString deserialize(BinaryString record, DataInputView 
source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int length = source.readInt();
+               target.writeInt(length);
+               target.write(source, length);
+       }
+
+       @Override
+       public TypeSerializerSnapshot<BinaryString> snapshotConfiguration() {
+               return new BinaryStringSerializerSnapshot();
+       }
+
+       /**
+        * Serializer configuration snapshot for compatibility and format 
evolution.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final class BinaryStringSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<BinaryString> {
+
+               public BinaryStringSerializerSnapshot() {
+                       super(() -> INSTANCE);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryStringTypeInfo.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryStringTypeInfo.java
new file mode 100644
index 0000000..f4a2a19
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryStringTypeInfo.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.dataformat.BinaryString;
+
+/**
+ * TypeInfo for BinaryString.
+ */
+public class BinaryStringTypeInfo extends TypeInformation<BinaryString> {
+
+       public static final BinaryStringTypeInfo INSTANCE = new 
BinaryStringTypeInfo();
+
+       private BinaryStringTypeInfo() {}
+
+       @Override
+       public boolean isBasicType() {
+               return true;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @Override
+       public Class<BinaryString> getTypeClass() {
+               return BinaryString.class;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return true;
+       }
+
+       @Override
+       public TypeSerializer<BinaryString> createSerializer(ExecutionConfig 
config) {
+               return BinaryStringSerializer.INSTANCE;
+       }
+
+       @Override
+       public String toString() {
+               return "BinaryString";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj instanceof BinaryStringTypeInfo;
+       }
+
+       @Override
+       public int hashCode() {
+               return 0;
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof BinaryStringTypeInfo;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
index 4ebb401..bc1148f 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
@@ -18,8 +18,10 @@
 package org.apache.flink.table.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 
+import java.io.IOException;
 import java.nio.ByteOrder;
 
 /**
@@ -808,4 +810,42 @@ public class SegmentsUtil {
                }
                segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b2 : b1));
        }
+
+       /**
+        * Serialize segments to output view with offset and size.
+        * Note: It just copies the data in, not include the length.
+        *
+        * @param segments source segments
+        * @param offset offset for segments
+        * @param sizeInBytes size in bytes
+        * @param target target output view
+        */
+       public static void serializeToView(MemorySegment[] segments, int offset,
+                       int sizeInBytes, DataOutputView target) throws 
IOException {
+               for (MemorySegment sourceSegment : segments) {
+                       int curSegRemain = sourceSegment.size() - offset;
+                       if (curSegRemain > 0) {
+                               int copySize = Math.min(curSegRemain, 
sizeInBytes);
+
+                               // TODO after FLINK-11724
+                               byte[] bytes = new byte[copySize];
+                               sourceSegment.get(offset, bytes);
+                               target.write(bytes);
+
+                               sizeInBytes -= copySize;
+                               offset = 0;
+                       } else {
+                               offset -= sourceSegment.size();
+                       }
+
+                       if (sizeInBytes == 0) {
+                               return;
+                       }
+               }
+
+               if (sizeInBytes != 0) {
+                       throw new RuntimeException("No copy finished, this 
should be a bug, " +
+                                       "The remaining length is: " + 
sizeInBytes);
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java
similarity index 100%
rename from 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java
rename to 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
new file mode 100644
index 0000000..7cc8cde
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.type.InternalTypes;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link BaseRowSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class BaseRowSerializerTest extends SerializerTestInstance<BaseRow> {
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       private final BaseRowSerializer serializer;
+       private final BaseRow[] testData;
+
+       public BaseRowSerializerTest(BaseRowSerializer serializer, BaseRow[] 
testData) {
+               super(serializer, BaseRow.class, -1, testData);
+               this.serializer = serializer;
+               this.testData = testData;
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> parameters() {
+               return Arrays.asList(
+                               testBaseRowSerializer(),
+                               testLargeBaseRowSerializer(),
+                               testBaseRowSerializerWithComplexTypes());
+       }
+
+       private static Object[] testBaseRowSerializer() {
+               BaseRowTypeInfo typeInfo = new 
BaseRowTypeInfo(InternalTypes.INT, InternalTypes.STRING);
+               GenericRow row1 = new GenericRow(2);
+               row1.setField(0, 1);
+               row1.setField(1, fromString("a"));
+
+               GenericRow row2 = new GenericRow(2);
+               row2.setField(0, 2);
+               row2.setField(1, null);
+
+               BaseRowSerializer serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+               return new Object[] {serializer, new BaseRow[]{row1, row2}};
+       }
+
+       private static Object[] testLargeBaseRowSerializer() {
+               BaseRowTypeInfo typeInfo = new BaseRowTypeInfo(
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.INT,
+                       InternalTypes.STRING);
+
+               GenericRow row = new GenericRow(13);
+               row.setField(0, 2);
+               row.setField(1, null);
+               row.setField(3, null);
+               row.setField(4, null);
+               row.setField(5, null);
+               row.setField(6, null);
+               row.setField(7, null);
+               row.setField(8, null);
+               row.setField(9, null);
+               row.setField(10, null);
+               row.setField(11, null);
+               row.setField(12, fromString("Test"));
+
+               BaseRowSerializer serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+               return new Object[] {serializer, new BaseRow[]{row}};
+       }
+
+       private static Object[] testBaseRowSerializerWithComplexTypes() {
+               BaseRowTypeInfo typeInfo = new BaseRowTypeInfo(
+                       InternalTypes.INT,
+                       InternalTypes.DOUBLE,
+                       InternalTypes.STRING,
+                       InternalTypes.createArrayType(InternalTypes.INT),
+                       InternalTypes.createMapType(InternalTypes.INT, 
InternalTypes.INT));
+
+               GenericRow[] data = new GenericRow[]{
+                       createRow(null, null, null, null, null),
+                       createRow(0, null, null, null, null),
+                       createRow(0, 0.0, null, null, null),
+                       createRow(0, 0.0, fromString("a"), null, null),
+                       createRow(1, 0.0, fromString("a"), null, null),
+                       createRow(1, 1.0, fromString("a"), null, null),
+                       createRow(1, 1.0, fromString("b"), null, null),
+                       createRow(1, 1.0, fromString("b"), createArray(1), 
createMap(new int[]{1}, new int[]{1})),
+                       createRow(1, 1.0, fromString("b"), createArray(1, 2), 
createMap(new int[]{1, 4}, new int[]{1, 2})),
+                       createRow(1, 1.0, fromString("b"), createArray(1, 2, 
3), createMap(new int[]{1, 5}, new int[]{1, 3})),
+                       createRow(1, 1.0, fromString("b"), createArray(1, 2, 3, 
4), createMap(new int[]{1, 6}, new int[]{1, 4})),
+                       createRow(1, 1.0, fromString("b"), createArray(1, 2, 3, 
4, 5), createMap(new int[]{1, 7}, new int[]{1, 5})),
+                       createRow(1, 1.0, fromString("b"), createArray(1, 2, 3, 
4, 5, 6), createMap(new int[]{1, 8}, new int[]{1, 6}))
+               };
+
+               BaseRowSerializer serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+               return new Object[] {serializer, data};
+       }
+
+       // 
----------------------------------------------------------------------------------------------
+
+       private static BinaryArray createArray(int... ints) {
+               BinaryArray array = new BinaryArray();
+               BinaryArrayWriter writer = new BinaryArrayWriter(array, 
ints.length, 4);
+               for (int i = 0; i < ints.length; i++) {
+                       writer.writeInt(i, ints[i]);
+               }
+               writer.complete();
+               return array;
+       }
+
+       private static BinaryMap createMap(int[] keys, int[] values) {
+               return BinaryMap.valueOf(createArray(keys), 
createArray(values));
+       }
+
+       private static GenericRow createRow(Object f0, Object f1, Object f2, 
Object f3, Object f4) {
+               GenericRow row = new GenericRow(5);
+               row.setField(0, f0);
+               row.setField(1, f1);
+               row.setField(2, f2);
+               row.setField(3, f3);
+               row.setField(4, f4);
+               return row;
+       }
+
+       @Override
+       protected void deepEquals(String message, BaseRow should, BaseRow is) {
+               int arity = should.getArity();
+               assertEquals(message, arity, is.getArity());
+               assertEquals(serializer.baseRowToBinary(should), 
serializer.baseRowToBinary(is));
+       }
+
+       private void deepEquals(BaseRow should, BaseRow is) {
+               deepEquals("", should, is);
+       }
+
+       /**
+        * Override testDuplicate, Because it uses Object equals, deserialize 
BaseRow to BinaryRow,
+        * which cannot be directly equals.
+        * See {@link BaseRowSerializer#deserialize}.
+        */
+       @Test
+       @Override
+       public void testDuplicate() throws Exception {}
+
+       @Test
+       public void testCopy() {
+               for (BaseRow row : testData) {
+                       deepEquals(row, serializer.copy(row));
+               }
+
+               for (BaseRow row : testData) {
+                       deepEquals(row, serializer.copy(row, new 
GenericRow(row.getArity())));
+               }
+
+               for (BaseRow row : testData) {
+                       deepEquals(row, 
serializer.copy(serializer.baseRowToBinary(row),
+                                       new GenericRow(row.getArity())));
+               }
+
+               for (BaseRow row : testData) {
+                       deepEquals(row, 
serializer.copy(serializer.baseRowToBinary(row)));
+               }
+
+               for (BaseRow row : testData) {
+                       deepEquals(row, 
serializer.copy(serializer.baseRowToBinary(row),
+                                       new BinaryRow(row.getArity())));
+               }
+       }
+
+       @Test
+       public void testWrongCopy() {
+               thrown.expect(IllegalArgumentException.class);
+               serializer.copy(new GenericRow(serializer.getArity() + 1));
+       }
+
+       @Test
+       public void testWrongCopyReuse() {
+               thrown.expect(IllegalArgumentException.class);
+               for (BaseRow row : testData) {
+                       deepEquals(row, serializer.copy(row, new 
GenericRow(row.getArity() + 1)));
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java
new file mode 100644
index 0000000..d1e5b35
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryString;
+
+/**
+ * A test for the {@link BinaryArraySerializer}.
+ */
+public class BinaryArraySerializerTest extends SerializerTestBase<BinaryArray> 
{
+
+       @Override
+       protected BinaryArraySerializer createSerializer() {
+               return BinaryArraySerializer.INSTANCE;
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<BinaryArray> getTypeClass() {
+               return BinaryArray.class;
+       }
+
+       @Override
+       protected BinaryArray[] getTestData() {
+               return new BinaryArray[] {
+                               createArray("11"),
+                               createArray("11", "haa"),
+                               createArray("11", "haa", "ke"),
+                               createArray("11", "haa", "ke"),
+                               createArray("11", "lele", "haa", "ke"),
+               };
+       }
+
+       static BinaryArray createArray(String... vs) {
+               BinaryArray array = new BinaryArray();
+               BinaryArrayWriter writer = new BinaryArrayWriter(array, 
vs.length, 8);
+               for (int i = 0; i < vs.length; i++) {
+                       writer.writeString(i, BinaryString.fromString(vs[i]));
+               }
+               writer.complete();
+               return array;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java
new file mode 100644
index 0000000..fe6b4e5
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+
+/**
+ * A test for the {@link BinaryArraySerializer}.
+ */
+public class BinaryMapSerializerTest extends SerializerTestBase<BinaryMap> {
+
+       @Override
+       protected BinaryMapSerializer createSerializer() {
+               return BinaryMapSerializer.INSTANCE;
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<BinaryMap> getTypeClass() {
+               return BinaryMap.class;
+       }
+
+       @Override
+       protected BinaryMap[] getTestData() {
+               return new BinaryMap[] {
+                               BinaryMap.valueOf(createArray(1), 
BinaryArraySerializerTest.createArray("11")),
+                               BinaryMap.valueOf(createArray(1, 2), 
BinaryArraySerializerTest.createArray("11", "haa")),
+                               BinaryMap.valueOf(createArray(1, 3, 4), 
BinaryArraySerializerTest.createArray("11", "haa", "ke")),
+                               BinaryMap.valueOf(createArray(1, 4, 2), 
BinaryArraySerializerTest.createArray("11", "haa", "ke")),
+                               BinaryMap.valueOf(createArray(1, 5, 6, 7), 
BinaryArraySerializerTest.createArray("11", "lele", "haa", "ke"))
+               };
+       }
+
+       private static BinaryArray createArray(int... vs) {
+               BinaryArray array = new BinaryArray();
+               BinaryArrayWriter writer = new BinaryArrayWriter(array, 
vs.length, 8);
+               for (int i = 0; i < vs.length; i++) {
+                       writer.writeInt(i, vs[i]);
+               }
+               writer.complete();
+               return array;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
new file mode 100644
index 0000000..11f3ea3
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.BinaryRowWriter;
+import org.apache.flink.table.dataformat.BinaryString;
+
+/**
+ * A test for the {@link BinaryArraySerializer}.
+ */
+public class BinaryRowSerializerTest extends SerializerTestBase<BinaryRow> {
+
+       @Override
+       protected BinaryRowSerializer createSerializer() {
+               return new BinaryRowSerializer(2);
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<BinaryRow> getTypeClass() {
+               return BinaryRow.class;
+       }
+
+       @Override
+       protected BinaryRow[] getTestData() {
+               return new BinaryRow[] {
+                               createRow("11", 1),
+                               createRow("12", 2),
+                               createRow("132", 3),
+                               createRow("13", 4)
+               };
+       }
+
+       private static BinaryRow createRow(String f0, int f1) {
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               writer.writeString(0, BinaryString.fromString(f0));
+               writer.writeInt(1, f1);
+               writer.complete();
+               return row;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowTypeInfoTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowTypeInfoTest.java
new file mode 100644
index 0000000..5aa9b44
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowTypeInfoTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test for {@link BaseRowTypeInfo}.
+ */
+public class BinaryRowTypeInfoTest {
+       private static InternalType[] typeList = new InternalType[]{
+                       InternalTypes.INT,
+                       InternalTypes.STRING
+       };
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrongNumberOfFieldNames() {
+               new BaseRowTypeInfo(typeList, new String[]{"int", "string", 
"int"});
+               // number of field names should be equal to number of types, go 
fail
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDuplicateCustomFieldNames() {
+               new BaseRowTypeInfo(typeList, new String[]{"int", "int"});
+               // field names should not be the same, go fail
+       }
+
+       @Test
+       public void testBinaryRowTypeInfoEquality() {
+               BaseRowTypeInfo typeInfo1 = new BaseRowTypeInfo(
+                               InternalTypes.INT,
+                               InternalTypes.STRING);
+
+               BaseRowTypeInfo typeInfo2 = new BaseRowTypeInfo(
+                               InternalTypes.INT,
+                               InternalTypes.STRING);
+
+               assertEquals(typeInfo1, typeInfo2);
+               assertEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+       }
+
+       @Test
+       public void testBinaryRowTypeInfoInequality() {
+               BaseRowTypeInfo typeInfo1 = new BaseRowTypeInfo(
+                               InternalTypes.INT,
+                               InternalTypes.STRING);
+
+               BaseRowTypeInfo typeInfo2 = new BaseRowTypeInfo(
+                               InternalTypes.INT,
+                               InternalTypes.BOOLEAN);
+
+               assertNotEquals(typeInfo1, typeInfo2);
+               assertNotEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryStringSerializerTest.java
similarity index 53%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
rename to 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryStringSerializerTest.java
index f825b22..2c87e77 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryStringSerializerTest.java
@@ -16,46 +16,37 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.type;
+package org.apache.flink.table.typeutils;
 
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.dataformat.BinaryString;
+
+import java.util.Arrays;
 
 /**
- * Type for Array.
+ * A test for the {@link BinaryStringSerializer}.
  */
-public class ArrayType implements InternalType {
-
-       private final InternalType elementType;
-
-       public ArrayType(InternalType elementType) {
-               this.elementType = Preconditions.checkNotNull(elementType);
-       }
+public class BinaryStringSerializerTest extends 
SerializerTestBase<BinaryString> {
 
-       public InternalType getElementType() {
-               return elementType;
+       @Override
+       protected BinaryStringSerializer createSerializer() {
+               return BinaryStringSerializer.INSTANCE;
        }
 
        @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               ArrayType arrayType = (ArrayType) o;
-
-               return elementType.equals(arrayType.elementType);
+       protected int getLength() {
+               return -1;
        }
 
        @Override
-       public int hashCode() {
-               return elementType.hashCode();
+       protected Class<BinaryString> getTypeClass() {
+               return BinaryString.class;
        }
 
        @Override
-       public String toString() {
-               return "ArrayType{elementType=" + elementType + '}';
+       protected BinaryString[] getTestData() {
+               return Arrays.stream(
+                               new String[] {"a", "", "bcd", "jbmbmner8 jhk hj 
\n \t üäßß@µ", "", "non-empty"})
+                               
.map(BinaryString::fromString).toArray(BinaryString[]::new);
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryTypeInfoTest.java
similarity index 52%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
rename to 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryTypeInfoTest.java
index b3cdf5b..a3741ef 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryTypeInfoTest.java
@@ -16,30 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.type;
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+import org.apache.flink.table.type.InternalTypes;
 
 /**
- * Sql time type.
+ * Test for {@link BinaryStringTypeInfo}, {@link BinaryArrayTypeInfo}, {@link 
BinaryMapTypeInfo}.
  */
-public class TimeType implements AtomicType {
-
-       public static final TimeType INSTANCE = new TimeType();
-
-       private TimeType() {}
-
-       @Override
-       public boolean equals(Object o) {
-               return this == o || o != null && getClass() == o.getClass();
-       }
+public class BinaryTypeInfoTest extends 
TypeInformationTestBase<TypeInformation<?>> {
 
        @Override
-       public int hashCode() {
-               return getClass().hashCode();
+       protected TypeInformation[] getTestData() {
+               return new TypeInformation[] {
+                               BinaryStringTypeInfo.INSTANCE,
+                               new BinaryArrayTypeInfo(InternalTypes.INT),
+                               new BinaryArrayTypeInfo(InternalTypes.STRING),
+                               new BinaryMapTypeInfo(InternalTypes.STRING, 
InternalTypes.INT),
+                               new BinaryMapTypeInfo(InternalTypes.DOUBLE, 
InternalTypes.INT)
+               };
        }
-
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
-       }
-
 }

Reply via email to