This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6070817  [FLINK-11702][table-planner-blink] Introduce a new blink 
table type system: InternalType.
6070817 is described below

commit 607081782471e0989432c1fce3c592f4d79a3871
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Mon Feb 25 16:15:48 2019 +0800

    [FLINK-11702][table-planner-blink] Introduce a new blink table type system: 
InternalType.
    
    This closes #7817.
---
 .../org/apache/flink/table/type/ArrayType.java     |  61 +++++++++++
 .../org/apache/flink/table/type/AtomicType.java    |  25 +++++
 .../org/apache/flink/table/type/BooleanType.java   |  29 +++++
 .../java/org/apache/flink/table/type/ByteType.java |  29 +++++
 .../java/org/apache/flink/table/type/CharType.java |  29 +++++
 .../java/org/apache/flink/table/type/DateType.java |  44 ++++++++
 .../org/apache/flink/table/type/DecimalType.java   | 105 ++++++++++++++++++
 .../org/apache/flink/table/type/DoubleType.java    |  29 +++++
 .../org/apache/flink/table/type/FloatType.java     |  29 +++++
 .../org/apache/flink/table/type/GenericType.java   |  83 +++++++++++++++
 .../java/org/apache/flink/table/type/IntType.java  |  29 +++++
 .../org/apache/flink/table/type/InternalType.java  |  38 +++++++
 .../org/apache/flink/table/type/InternalTypes.java |  84 +++++++++++++++
 .../java/org/apache/flink/table/type/LongType.java |  29 +++++
 .../java/org/apache/flink/table/type/MapType.java  |  77 ++++++++++++++
 .../org/apache/flink/table/type/PrimitiveType.java |  41 ++++++++
 .../java/org/apache/flink/table/type/RowType.java  | 109 +++++++++++++++++++
 .../org/apache/flink/table/type/ShortType.java     |  29 +++++
 .../org/apache/flink/table/type/StringType.java    |  45 ++++++++
 .../java/org/apache/flink/table/type/TimeType.java |  45 ++++++++
 .../org/apache/flink/table/type/TimestampType.java |  44 ++++++++
 .../apache/flink/table/type/TypeConverters.java    | 117 +++++++++++++++++++++
 .../apache/flink/table/type/InternalTypeTest.java  | 112 ++++++++++++++++++++
 23 files changed, 1262 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
new file mode 100644
index 0000000..f825b22
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Type for Array.
+ */
+public class ArrayType implements InternalType {
+
+       private final InternalType elementType;
+
+       public ArrayType(InternalType elementType) {
+               this.elementType = Preconditions.checkNotNull(elementType);
+       }
+
+       public InternalType getElementType() {
+               return elementType;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               ArrayType arrayType = (ArrayType) o;
+
+               return elementType.equals(arrayType.elementType);
+       }
+
+       @Override
+       public int hashCode() {
+               return elementType.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return "ArrayType{elementType=" + elementType + '}';
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java
new file mode 100644
index 0000000..d499f54
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Atomic Type.
+ */
+public interface AtomicType extends InternalType {
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
new file mode 100644
index 0000000..02416ed
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Boolean type.
+ */
+public class BooleanType extends PrimitiveType {
+
+       public static final BooleanType INSTANCE = new BooleanType();
+
+       private BooleanType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java
new file mode 100644
index 0000000..343acb1
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Byte type.
+ */
+public class ByteType extends PrimitiveType {
+
+       public static final ByteType INSTANCE = new ByteType();
+
+       private ByteType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java
new file mode 100644
index 0000000..bdb523c
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Char type.
+ */
+public class CharType extends PrimitiveType {
+
+       public static final CharType INSTANCE = new CharType();
+
+       private CharType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java
new file mode 100644
index 0000000..45588a6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Sql date type.
+ */
+public class DateType implements AtomicType {
+
+       public static final DateType INSTANCE = new DateType();
+
+       private DateType() {}
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o != null && getClass() == o.getClass();
+       }
+
+       @Override
+       public int hashCode() {
+               return getClass().hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getSimpleName();
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
new file mode 100644
index 0000000..b47534f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import java.math.BigDecimal;
+
+import static java.lang.String.format;
+
+/**
+ * Decimal type.
+ */
+public class DecimalType implements AtomicType {
+
+       public static final int MAX_PRECISION = 38;
+
+       private final int precision;
+       private final int scale;
+
+       // Default to be same with Integer.
+       public static final DecimalType USER_DEFAULT = new DecimalType(10, 0);
+
+       // Mainly used for implicitly type cast and test.
+       public static final DecimalType SYSTEM_DEFAULT = new 
DecimalType(MAX_PRECISION, 18);
+
+       public DecimalType(int precision, int scale) {
+               if (precision < 0) {
+                       throw new IllegalArgumentException(format("Decimal 
precision (%s) cannot be negative.",
+                                       precision));
+               }
+
+               if (scale > precision) {
+                       throw new IllegalArgumentException(format("Decimal 
scale (%s) cannot be greater than " +
+                                       "precision (%s).", scale, precision));
+               }
+
+               if (precision > MAX_PRECISION) {
+                       throw new IllegalArgumentException(
+                                       "DecimalType can only support precision 
up to " + MAX_PRECISION);
+               }
+
+               this.precision = precision;
+               this.scale = scale;
+       }
+
+       public int precision() {
+               return this.precision;
+       }
+
+       public int scale() {
+               return this.scale;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               DecimalType that = (DecimalType) o;
+
+               return precision == that.precision && scale == that.scale;
+       }
+
+       @Override
+       public int hashCode() {
+               int result = precision;
+               result = 31 * result + scale;
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "DecimalType{" +
+                               "precision=" + precision +
+                               ", scale=" + scale +
+                               '}';
+       }
+
+       public static DecimalType of(int precision, int scale) {
+               return new DecimalType(precision, scale);
+       }
+
+       public static DecimalType of(BigDecimal value) {
+               return of(value.precision(), value.scale());
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
new file mode 100644
index 0000000..fb1e46b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Double type.
+ */
+public class DoubleType extends PrimitiveType {
+
+       public static final DoubleType INSTANCE = new DoubleType();
+
+       private DoubleType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java
new file mode 100644
index 0000000..a250f161
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Float type.
+ */
+public class FloatType extends PrimitiveType {
+
+       public static final FloatType INSTANCE = new FloatType();
+
+       private FloatType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java
new file mode 100644
index 0000000..828b935
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Generic type.
+ */
+public class GenericType<T> implements AtomicType {
+
+       private final TypeInformation<T> typeInfo;
+       private TypeSerializer<T> serializer;
+
+       public GenericType(Class<T> typeClass) {
+               this(new GenericTypeInfo<>(typeClass));
+       }
+
+       public GenericType(TypeInformation<T> typeInfo) {
+               this.typeInfo = checkNotNull(typeInfo);
+       }
+
+       public TypeInformation<T> getTypeInfo() {
+               return typeInfo;
+       }
+
+       public Class<T> getTypeClass() {
+               return typeInfo.getTypeClass();
+       }
+
+       public TypeSerializer<T> getSerializer() {
+               if (serializer == null) {
+                       this.serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+               }
+               return serializer;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               GenericType<?> that = (GenericType<?>) o;
+               return typeInfo.equals(that.typeInfo);
+       }
+
+       @Override
+       public int hashCode() {
+               return typeInfo.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return "GenericType{" +
+                               "typeClass=" + typeInfo.getTypeClass() +
+                               '}';
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java
new file mode 100644
index 0000000..e89032b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Int type.
+ */
+public class IntType extends PrimitiveType {
+
+       public static final IntType INSTANCE = new IntType();
+
+       private IntType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java
new file mode 100644
index 0000000..452f2ee
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import java.io.Serializable;
+
+/**
+ * InternalType is the base type of all Flink SQL types and it is for internal 
computing
+ * and code generator.
+ *
+ * <p>An InternalType may correspond to multiple data formats. The user uses 
the basic
+ * Java data format, while we use a more efficient binary format inside Table 
for
+ * performance, such as StringType corresponds to BinaryString internally, and 
JDK
+ * String corresponds to the user layer.
+ * So an Internal Type may correspond to multiple TypeSerializers.
+ *
+ * <p>We only support limited data formats, because all other data formats are 
converted
+ * into data formats that we support internally when executed inside the 
table. Convert
+ * it to the user when he needs it (For example, UDF and so on).
+ */
+public interface InternalType extends Serializable {
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
new file mode 100644
index 0000000..f3429b0
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Accessor of {@link InternalType}s.
+ */
+public class InternalTypes {
+
+       public static final StringType STRING = StringType.INSTANCE;
+
+       public static final BooleanType BOOLEAN = BooleanType.INSTANCE;
+
+       public static final DoubleType DOUBLE = DoubleType.INSTANCE;
+
+       public static final FloatType FLOAT = FloatType.INSTANCE;
+
+       public static final ByteType BYTE = ByteType.INSTANCE;
+
+       public static final IntType INT = IntType.INSTANCE;
+
+       public static final LongType LONG = LongType.INSTANCE;
+
+       public static final ShortType SHORT = ShortType.INSTANCE;
+
+       public static final CharType CHAR = CharType.INSTANCE;
+
+       public static final ArrayType BINARY = new ArrayType(BYTE);
+
+       public static final DateType DATE = DateType.INSTANCE;
+
+       public static final TimestampType TIMESTAMP = TimestampType.INSTANCE;
+
+       public static final TimeType TIME = TimeType.INSTANCE;
+
+       public static final DecimalType SYSTEM_DEFAULT_DECIMAL = 
DecimalType.SYSTEM_DEFAULT;
+
+       public static ArrayType createArrayType(InternalType elementType) {
+               return new ArrayType(elementType);
+       }
+
+       public static DecimalType createDecimalType(int precision, int scale) {
+               return new DecimalType(precision, scale);
+       }
+
+       public static MapType createMapType(InternalType keyType, InternalType 
valueType) {
+               return new MapType(keyType, valueType);
+       }
+
+       public static <T> GenericType<T> createGenericType(Class<T> cls) {
+               return new GenericType<>(cls);
+       }
+
+       public static <T> GenericType<T> createGenericType(TypeInformation<T> 
typeInfo) {
+               return new GenericType<>(typeInfo);
+       }
+
+       public static RowType createRowType(InternalType[] types, String[] 
fieldNames) {
+               return new RowType(types, fieldNames);
+       }
+
+       public static RowType createRowType(InternalType... types) {
+               return new RowType(types);
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java
new file mode 100644
index 0000000..c21936f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Long type.
+ */
+public class LongType extends PrimitiveType {
+
+       public static final LongType INSTANCE = new LongType();
+
+       private LongType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java
new file mode 100644
index 0000000..4498d5d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Map type.
+ */
+public class MapType implements InternalType {
+
+       private final InternalType keyType;
+       private final InternalType valueType;
+
+       public MapType(InternalType keyType, InternalType valueType) {
+               if (keyType == null) {
+                       throw new IllegalArgumentException("keyType should not 
be null.");
+               }
+               if (valueType == null) {
+                       throw new IllegalArgumentException("valueType should 
not be null.");
+               }
+               this.keyType = keyType;
+               this.valueType = valueType;
+       }
+
+       public InternalType getKeyType() {
+               return keyType;
+       }
+
+       public InternalType getValueType() {
+               return valueType;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               MapType mapType = (MapType) o;
+
+               return getKeyType().equals(mapType.getKeyType()) &&
+                               getValueType().equals(mapType.getValueType());
+       }
+
+       @Override
+       public int hashCode() {
+               int result = getKeyType().hashCode();
+               result = 31 * result + getValueType().hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "MapType{" +
+                               "keyType=" + keyType +
+                               ", valueType=" + valueType +
+                               '}';
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
new file mode 100644
index 0000000..002f4e0
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Primitive type.
+ */
+public abstract class PrimitiveType implements AtomicType {
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o != null && getClass() == o.getClass();
+       }
+
+       @Override
+       public int hashCode() {
+               return getClass().hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getSimpleName();
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java
new file mode 100644
index 0000000..6c02996
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+/**
+ * Row type for row.
+ *
+ * <p>It's internal data structure is BaseRow, and it's external data 
structure is {@link Row}.
+ */
+public class RowType implements InternalType {
+
+       private final InternalType[] types;
+
+       private final String[] fieldNames;
+
+       public RowType(InternalType... types) {
+               this(types, generateDefaultFieldNames(types.length));
+       }
+
+       public RowType(InternalType[] types, String[] fieldNames) {
+               this.types = types;
+               this.fieldNames = fieldNames;
+               if (types.length != fieldNames.length) {
+                       throw new IllegalArgumentException("Types should be the 
same length as names, types is: "
+                                       + Arrays.toString(types) + ", and the 
names: " + Arrays.toString(fieldNames));
+               }
+       }
+
+       private static String[] generateDefaultFieldNames(int length) {
+               String[] fieldNames = new String[length];
+               for (int i = 0; i < length; i++) {
+                       fieldNames[i] = "f" + i;
+               }
+               return fieldNames;
+       }
+
+       public int getArity() {
+               return types.length;
+       }
+
+       public InternalType[] getFieldTypes() {
+               return types;
+       }
+
+       public InternalType getTypeAt(int i) {
+               return types[i];
+       }
+
+       public String[] getFieldNames() {
+               return fieldNames;
+       }
+
+       public int getFieldIndex(String fieldName) {
+               for (int i = 0; i < fieldNames.length; i++) {
+                       if (fieldNames[i].equals(fieldName)) {
+                               return i;
+                       }
+               }
+               return -1;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               RowType that = (RowType) o;
+
+               // RowType comparisons should not compare names and are 
compatible with the behavior of CompositeTypeInfo.
+               return Arrays.equals(getFieldTypes(), that.getFieldTypes());
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(types);
+       }
+
+       @Override
+       public String toString() {
+               return "RowType{" +
+                               "types=" + Arrays.toString(types) +
+                               ", fieldNames=" + Arrays.toString(fieldNames) +
+                               '}';
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java
new file mode 100644
index 0000000..32d242d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * short type.
+ */
+public class ShortType extends PrimitiveType {
+
+       public static final ShortType INSTANCE = new ShortType();
+
+       private ShortType() {}
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java
new file mode 100644
index 0000000..8893dc2
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * String type.
+ */
+public class StringType implements AtomicType {
+
+       public static final StringType INSTANCE = new StringType();
+
+       private StringType() {
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o != null && getClass() == o.getClass();
+       }
+
+       @Override
+       public int hashCode() {
+               return getClass().hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getSimpleName();
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
new file mode 100644
index 0000000..b3cdf5b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Sql time type.
+ */
+public class TimeType implements AtomicType {
+
+       public static final TimeType INSTANCE = new TimeType();
+
+       private TimeType() {}
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o != null && getClass() == o.getClass();
+       }
+
+       @Override
+       public int hashCode() {
+               return getClass().hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getSimpleName();
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
new file mode 100644
index 0000000..12aa689
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+/**
+ * Sql timestamp type.
+ */
+public class TimestampType implements AtomicType {
+
+       public static final TimestampType INSTANCE = new TimestampType();
+
+       private TimestampType() {}
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o != null && getClass() == o.getClass();
+       }
+
+       @Override
+       public int hashCode() {
+               return getClass().hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getSimpleName();
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
new file mode 100644
index 0000000..5bcf99d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * Converters of {@link InternalType} and {@link TypeInformation}.
+ */
+public class TypeConverters {
+
+       public static final Map<TypeInformation, InternalType> 
TYPE_INFO_TO_INTERNAL_TYPE;
+       static {
+               Map<TypeInformation, InternalType> tiToType = new HashMap<>();
+               tiToType.put(BasicTypeInfo.STRING_TYPE_INFO, 
InternalTypes.STRING);
+               tiToType.put(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
InternalTypes.BOOLEAN);
+               tiToType.put(BasicTypeInfo.DOUBLE_TYPE_INFO, 
InternalTypes.DOUBLE);
+               tiToType.put(BasicTypeInfo.FLOAT_TYPE_INFO, 
InternalTypes.FLOAT);
+               tiToType.put(BasicTypeInfo.BYTE_TYPE_INFO, InternalTypes.BYTE);
+               tiToType.put(BasicTypeInfo.INT_TYPE_INFO, InternalTypes.INT);
+               tiToType.put(BasicTypeInfo.LONG_TYPE_INFO, InternalTypes.LONG);
+               tiToType.put(BasicTypeInfo.SHORT_TYPE_INFO, 
InternalTypes.SHORT);
+               tiToType.put(BasicTypeInfo.CHAR_TYPE_INFO, InternalTypes.CHAR);
+               
tiToType.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, 
InternalTypes.BINARY);
+               tiToType.put(SqlTimeTypeInfo.DATE, InternalTypes.DATE);
+               tiToType.put(SqlTimeTypeInfo.TIMESTAMP, 
InternalTypes.TIMESTAMP);
+               tiToType.put(SqlTimeTypeInfo.TIME, InternalTypes.TIME);
+
+               // BigDecimal have infinity precision and scale, but we 
converted it into a limited
+               // Decimal(38, 18). If the user's BigDecimal is more precision 
than this, we will
+               // throw Exception to remind user to use GenericType in real 
data conversion.
+               tiToType.put(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
InternalTypes.SYSTEM_DEFAULT_DECIMAL);
+               TYPE_INFO_TO_INTERNAL_TYPE = 
Collections.unmodifiableMap(tiToType);
+       }
+
+       /**
+        * Create a {@link InternalType} from a {@link TypeInformation}.
+        *
+        * <p>Note: Information may be lost. For example, after Pojo is 
converted to InternalType,
+        * we no longer know that it is a Pojo and only think it is a Row.
+        *
+        * <p>Eg:
+        * {@link BasicTypeInfo#STRING_TYPE_INFO} => {@link 
InternalTypes#STRING}.
+        * {@link BasicTypeInfo#BIG_DEC_TYPE_INFO} => {@link DecimalType}.
+        * {@link RowTypeInfo} => {@link RowType}.
+        * {@link PojoTypeInfo} (CompositeType) => {@link RowType}.
+        * {@link TupleTypeInfo} (CompositeType) => {@link RowType}.
+        */
+       public static InternalType 
createInternalTypeFromTypeInfo(TypeInformation typeInfo) {
+               InternalType type = TYPE_INFO_TO_INTERNAL_TYPE.get(typeInfo);
+               if (type != null) {
+                       return type;
+               }
+
+               if (typeInfo instanceof CompositeType) {
+                       CompositeType compositeType = (CompositeType) typeInfo;
+                       return InternalTypes.createRowType(
+                                       Stream.iterate(0, x -> x + 
1).limit(compositeType.getArity())
+                                                       .map((Function<Integer, 
TypeInformation>) compositeType::getTypeAt)
+                                                       
.map(TypeConverters::createInternalTypeFromTypeInfo)
+                                                       
.toArray(InternalType[]::new),
+                                       compositeType.getFieldNames()
+                       );
+               } else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
+                       PrimitiveArrayTypeInfo arrayType = 
(PrimitiveArrayTypeInfo) typeInfo;
+                       return InternalTypes.createArrayType(
+                                       
createInternalTypeFromTypeInfo(arrayType.getComponentType()));
+               } else if (typeInfo instanceof BasicArrayTypeInfo) {
+                       BasicArrayTypeInfo arrayType = (BasicArrayTypeInfo) 
typeInfo;
+                       return InternalTypes.createArrayType(
+                                       
createInternalTypeFromTypeInfo(arrayType.getComponentInfo()));
+               } else if (typeInfo instanceof ObjectArrayTypeInfo) {
+                       ObjectArrayTypeInfo arrayType = (ObjectArrayTypeInfo) 
typeInfo;
+                       return InternalTypes.createArrayType(
+                                       
createInternalTypeFromTypeInfo(arrayType.getComponentInfo()));
+               } else if (typeInfo instanceof MapTypeInfo) {
+                       MapTypeInfo mapType = (MapTypeInfo) typeInfo;
+                       return InternalTypes.createMapType(
+                                       
createInternalTypeFromTypeInfo(mapType.getKeyTypeInfo()),
+                                       
createInternalTypeFromTypeInfo(mapType.getValueTypeInfo()));
+               } else {
+                       return InternalTypes.createGenericType(typeInfo);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java
new file mode 100644
index 0000000..be2d148
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.type;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.table.type.TypeConverters.createInternalTypeFromTypeInfo;
+
+/**
+ * Test for {@link InternalType}s.
+ */
+public class InternalTypeTest {
+
+       @Test
+       public void testHashCodeAndEquals() throws IOException, 
ClassNotFoundException {
+               testHashCodeAndEquals(InternalTypes.INT);
+               testHashCodeAndEquals(InternalTypes.DATE);
+               testHashCodeAndEquals(InternalTypes.TIME);
+               testHashCodeAndEquals(InternalTypes.TIMESTAMP);
+               testHashCodeAndEquals(InternalTypes.BINARY);
+               testHashCodeAndEquals(InternalTypes.STRING);
+               testHashCodeAndEquals(new DecimalType(15, 5));
+               testHashCodeAndEquals(new 
GenericType<>(InternalTypeTest.class));
+               testHashCodeAndEquals(new RowType(InternalTypes.STRING, 
InternalTypes.INT, InternalTypes.INT));
+               testHashCodeAndEquals(new ArrayType(InternalTypes.INT));
+               testHashCodeAndEquals(new ArrayType(InternalTypes.STRING));
+               testHashCodeAndEquals(new MapType(InternalTypes.STRING, 
InternalTypes.INT));
+       }
+
+       private void testHashCodeAndEquals(InternalType type) throws 
IOException, ClassNotFoundException {
+               InternalType newType = InstantiationUtil.deserializeObject
+                               (InstantiationUtil.serializeObject(type),
+                                               
Thread.currentThread().getContextClassLoader());
+
+               Assert.assertEquals(type.hashCode(), newType.hashCode());
+               Assert.assertEquals(type, newType);
+
+               // need override toString.
+               Assert.assertFalse(newType.toString().contains("@"));
+       }
+
+       @Test
+       public void testConverter() throws IOException, ClassNotFoundException {
+               testConvertToRowType(new RowTypeInfo(
+                               new TypeInformation[] {Types.INT, Types.STRING},
+                               new String[] {"field1", "field2"}));
+               testConvertToRowType((CompositeType) 
TypeInformation.of(MyPojo.class));
+
+               
testConvertCompare(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+                               new ArrayType(InternalTypes.DOUBLE));
+               testConvertCompare(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, 
new ArrayType(InternalTypes.DOUBLE));
+               
testConvertCompare(ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(MyPojo.class)),
+                               new 
ArrayType(createInternalTypeFromTypeInfo(TypeInformation.of(MyPojo.class))));
+               testConvertCompare(new MapTypeInfo<>(Types.INT, Types.STRING),
+                               new MapType(InternalTypes.INT, 
InternalTypes.STRING));
+               testConvertCompare(new GenericTypeInfo<>(MyPojo.class), new 
GenericType<>(MyPojo.class));
+       }
+
+       private void testConvertToRowType(CompositeType typeInfo) {
+               RowType rowType = (RowType) 
createInternalTypeFromTypeInfo(typeInfo);
+               Assert.assertArrayEquals(
+                               new InternalType[] {InternalTypes.INT, 
InternalTypes.STRING},
+                               rowType.getFieldTypes());
+               Assert.assertArrayEquals(
+                               new String[] {"field1", "field2"},
+                               rowType.getFieldNames());
+       }
+
+       private void testConvertCompare(TypeInformation typeInfo, InternalType 
internalType) {
+               InternalType converted = 
createInternalTypeFromTypeInfo(typeInfo);
+               Assert.assertEquals(internalType, converted);
+               Assert.assertEquals(internalType.hashCode(), 
converted.hashCode());
+       }
+
+       /**
+        * Test pojo.
+        */
+       public static class MyPojo {
+               public int field1;
+               public String field2;
+       }
+}

Reply via email to