This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e2579e3  [FLINK-11827][table-runtime-blink] Introduce 
DataFormatConverters to convert between internal data format and java format
e2579e3 is described below

commit e2579e39602ab7d3e906a185353dd413aca58317
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Tue Mar 5 16:36:10 2019 +0800

    [FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to 
convert between internal data format and java format
    
    This closes #7904
---
 .../apache/flink/table/dataformat/BinaryArray.java |  43 +
 .../table/dataformat/DataFormatConverters.java     | 979 +++++++++++++++++++++
 .../table/runtime/functions/DateTimeUtils.java     |  96 ++
 .../table/dataformat/DataFormatConvertersTest.java | 180 ++++
 4 files changed, 1298 insertions(+)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index 57ce04b..de9d6ee 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.dataformat;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
 import org.apache.flink.table.util.SegmentsUtil;
 
 import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
@@ -40,6 +42,7 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
        private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
        private static final int BOOLEAN_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(boolean[].class);
        private static final int SHORT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(short[].class);
+       private static final int CHAR_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(char[].class);
        private static final int INT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(int[].class);
        private static final int LONG_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(long[].class);
        private static final int FLOAT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(float[].class);
@@ -49,6 +52,34 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
                return 4 + ((numFields + 31) / 32) * 4;
        }
 
+       /**
+        * It store real value when type is primitive.
+        * It store the length and offset of variable-length part when type is 
string, map, etc.
+        */
+       public static int calculateFixLengthPartSize(InternalType type) {
+               if (type.equals(InternalTypes.BOOLEAN)) {
+                       return 1;
+               } else if (type.equals(InternalTypes.BYTE)) {
+                       return 1;
+               } else if (type.equals(InternalTypes.SHORT)) {
+                       return 2;
+               } else if (type.equals(InternalTypes.INT)) {
+                       return 4;
+               } else if (type.equals(InternalTypes.FLOAT)) {
+                       return 4;
+               } else if (type.equals(InternalTypes.CHAR)) {
+                       return 2;
+               } else if (type.equals(InternalTypes.DATE)) {
+                       return 4;
+               } else if (type.equals(InternalTypes.TIME)) {
+                       return 4;
+               } else {
+                       // long, double is 8 bytes.
+                       // It store the length and offset of variable-length 
part when type is string, map, etc.
+                       return 8;
+               }
+       }
+
        // The number of elements in this array
        private int numElements;
 
@@ -315,6 +346,14 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
                return values;
        }
 
+       public char[] toCharArray() {
+               checkNoNull();
+               char[] values = new char[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
CHAR_ARRAY_OFFSET, numElements * 2);
+               return values;
+       }
+
        public int[] toIntArray() {
                checkNoNull();
                int[] values = new int[numElements];
@@ -393,6 +432,10 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
                return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 
2);
        }
 
+       public static BinaryArray fromPrimitiveArray(char[] arr) {
+               return fromPrimitiveArray(arr, CHAR_ARRAY_OFFSET, arr.length, 
2);
+       }
+
        public static BinaryArray fromPrimitiveArray(int[] arr) {
                return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
        }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
new file mode 100644
index 0000000..7fbd9ba
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -0,0 +1,979 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.table.runtime.functions.DateTimeUtils;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.TypeConverters;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.table.typeutils.BinaryArrayTypeInfo;
+import org.apache.flink.table.typeutils.BinaryMapTypeInfo;
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import scala.Product;
+
+/**
+ * Converters between internal data format and java format.
+ *
+ * <p>The following scenarios will use converter for java format to internal 
data format:
+ * In source, data from user define source to internal sql engine.
+ * In udx return value, User outputs java format data to the SQL engine.
+ *
+ * <p>The following scenarios will use converter for internal data format to 
java format:
+ * In udx method parameters, data from internal sql engine need to be provided 
to user udx.
+ * In sink, data from internal sql engine need to be provided to user define 
sink.
+ */
+public class DataFormatConverters {
+
+       private static final Map<TypeInformation, DataFormatConverter> 
TYPE_INFO_TO_CONVERTER;
+       static {
+               Map<TypeInformation, DataFormatConverter> t2C = new HashMap<>();
+               t2C.put(BasicTypeInfo.STRING_TYPE_INFO, 
StringConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
BooleanConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.INT_TYPE_INFO, IntConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.LONG_TYPE_INFO, LongConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.FLOAT_TYPE_INFO, FloatConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.DOUBLE_TYPE_INFO, 
DoubleConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.SHORT_TYPE_INFO, ShortConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.BYTE_TYPE_INFO, ByteConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.CHAR_TYPE_INFO, CharConverter.INSTANCE);
+
+               
t2C.put(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveBooleanArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveIntArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveLongArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveFloatArrayConverter.INSTANCE);
+               
t2C.put(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveDoubleArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveShortArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveByteArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveCharArrayConverter.INSTANCE);
+
+               t2C.put(SqlTimeTypeInfo.DATE, DateConverter.INSTANCE);
+               t2C.put(SqlTimeTypeInfo.TIME, TimeConverter.INSTANCE);
+               t2C.put(SqlTimeTypeInfo.TIMESTAMP, TimestampConverter.INSTANCE);
+
+               t2C.put(BinaryStringTypeInfo.INSTANCE, 
BinaryStringConverter.INSTANCE);
+
+               TYPE_INFO_TO_CONVERTER = Collections.unmodifiableMap(t2C);
+       }
+
+       /**
+        * Get {@link DataFormatConverter} for {@link TypeInformation}.
+        *
+        * @param typeInfo DataFormatConverter is oriented to Java format, 
while InternalType has
+        *                   lost its specific Java format. Only 
TypeInformation retains all its
+        *                   Java format information.
+        */
+       public static DataFormatConverter 
getConverterForTypeInfo(TypeInformation typeInfo) {
+               DataFormatConverter converter = 
TYPE_INFO_TO_CONVERTER.get(typeInfo);
+               if (converter != null) {
+                       return converter;
+               }
+
+               if (typeInfo instanceof BasicArrayTypeInfo) {
+                       return new ObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo());
+               } else if (typeInfo instanceof ObjectArrayTypeInfo) {
+                       return new ObjectArrayConverter(((ObjectArrayTypeInfo) 
typeInfo).getComponentInfo());
+               } else if (typeInfo instanceof MapTypeInfo) {
+                       MapTypeInfo mapType = (MapTypeInfo) typeInfo;
+                       return new MapConverter(mapType.getKeyTypeInfo(), 
mapType.getValueTypeInfo());
+               } else if (typeInfo instanceof RowTypeInfo) {
+                       return new RowConverter((RowTypeInfo) typeInfo);
+               } else if (typeInfo instanceof PojoTypeInfo) {
+                       return new PojoConverter((PojoTypeInfo) typeInfo);
+               } else if (typeInfo instanceof TupleTypeInfo) {
+                       return new TupleConverter((TupleTypeInfo) typeInfo);
+               } else if (typeInfo instanceof TupleTypeInfoBase && 
Product.class.isAssignableFrom(typeInfo.getTypeClass())) {
+                       return new CaseClassConverter((TupleTypeInfoBase) 
typeInfo);
+               } else if (typeInfo instanceof BinaryArrayTypeInfo) {
+                       return BinaryArrayConverter.INSTANCE;
+               } else if (typeInfo instanceof BinaryMapTypeInfo) {
+                       return BinaryMapConverter.INSTANCE;
+               } else if (typeInfo instanceof BaseRowTypeInfo) {
+                       return BaseRowConverter.INSTANCE;
+               } else {
+                       throw new RuntimeException("Not support generic yet: " 
+ typeInfo);
+               }
+       }
+
+       /**
+        * Converter between internal data format and java format.
+        *
+        * @param <Internal> Internal data format.
+        * @param <External> External data format.
+        */
+       public abstract static class DataFormatConverter<Internal, External> {
+
+               /**
+                * Converts a external(Java) data format to its internal 
equivalent while automatically handling nulls.
+                */
+               public final Internal toInternal(External value) {
+                       return value == null ? null : toInternalImpl(value);
+               }
+
+               /**
+                * Converts a non-null external(Java) data format to its 
internal equivalent.
+                */
+               abstract Internal toInternalImpl(External value);
+
+               /**
+                * Convert a internal data format to its external(Java) 
equivalent while automatically handling nulls.
+                */
+               public final External toExternal(Internal value) {
+                       return value == null ? null : toExternalImpl(value);
+               }
+
+               /**
+                * Convert a non-null internal data format to its 
external(Java) equivalent.
+                */
+               abstract External toExternalImpl(Internal value);
+
+               /**
+                * Given a internalType row, convert the value at column 
`column` to its external(Java) equivalent.
+                * This method will only be called on non-null columns.
+                */
+               abstract External toExternalImpl(BaseRow row, int column);
+
+               /**
+                * Given a internalType row, convert the value at column 
`column` to its external(Java) equivalent.
+                */
+               public final External toExternal(BaseRow row, int column) {
+                       return row.isNullAt(column) ? null : 
toExternalImpl(row, column);
+               }
+       }
+
+       /**
+        * Identity converter.
+        */
+       public abstract static class IdentityConverter<T> extends 
DataFormatConverter<T, T> {
+
+               @Override
+               T toInternalImpl(T value) {
+                       return value;
+               }
+
+               @Override
+               T toExternalImpl(T value) {
+                       return value;
+               }
+       }
+
+       /**
+        * Converter for boolean.
+        */
+       public static class BooleanConverter extends IdentityConverter<Boolean> 
{
+
+               public static final BooleanConverter INSTANCE = new 
BooleanConverter();
+
+               private BooleanConverter() {}
+
+               @Override
+               Boolean toExternalImpl(BaseRow row, int column) {
+                       return row.getBoolean(column);
+               }
+       }
+
+       /**
+        * Converter for byte.
+        */
+       public static class ByteConverter extends IdentityConverter<Byte> {
+
+               public static final ByteConverter INSTANCE = new 
ByteConverter();
+
+               private ByteConverter() {}
+
+               @Override
+               Byte toExternalImpl(BaseRow row, int column) {
+                       return row.getByte(column);
+               }
+       }
+
+       /**
+        * Converter for short.
+        */
+       public static class ShortConverter extends IdentityConverter<Short> {
+
+               public static final ShortConverter INSTANCE = new 
ShortConverter();
+
+               private ShortConverter() {}
+
+               @Override
+               Short toExternalImpl(BaseRow row, int column) {
+                       return row.getShort(column);
+               }
+       }
+
+       /**
+        * Converter for int.
+        */
+       public static class IntConverter extends IdentityConverter<Integer> {
+
+               public static final IntConverter INSTANCE = new IntConverter();
+
+               private IntConverter() {}
+
+               @Override
+               Integer toExternalImpl(BaseRow row, int column) {
+                       return row.getInt(column);
+               }
+       }
+
+       /**
+        * Converter for long.
+        */
+       public static class LongConverter extends IdentityConverter<Long> {
+
+               public static final LongConverter INSTANCE = new 
LongConverter();
+
+               private LongConverter() {}
+
+               @Override
+               Long toExternalImpl(BaseRow row, int column) {
+                       return row.getLong(column);
+               }
+       }
+
+       /**
+        * Converter for float.
+        */
+       public static class FloatConverter extends IdentityConverter<Float> {
+
+               public static final FloatConverter INSTANCE = new 
FloatConverter();
+
+               private FloatConverter() {}
+
+               @Override
+               Float toExternalImpl(BaseRow row, int column) {
+                       return row.getFloat(column);
+               }
+       }
+
+       /**
+        * Converter for double.
+        */
+       public static class DoubleConverter extends IdentityConverter<Double> {
+
+               public static final DoubleConverter INSTANCE = new 
DoubleConverter();
+
+               private DoubleConverter() {}
+
+               @Override
+               Double toExternalImpl(BaseRow row, int column) {
+                       return row.getDouble(column);
+               }
+       }
+
+       /**
+        * Converter for char.
+        */
+       public static class CharConverter extends IdentityConverter<Character> {
+
+               public static final CharConverter INSTANCE = new 
CharConverter();
+
+               private CharConverter() {}
+
+               @Override
+               Character toExternalImpl(BaseRow row, int column) {
+                       return row.getChar(column);
+               }
+       }
+
+       /**
+        * Converter for BinaryString.
+        */
+       public static class BinaryStringConverter extends 
IdentityConverter<BinaryString> {
+
+               public static final BinaryStringConverter INSTANCE = new 
BinaryStringConverter();
+
+               private BinaryStringConverter() {}
+
+               @Override
+               BinaryString toExternalImpl(BaseRow row, int column) {
+                       return row.getString(column);
+               }
+       }
+
+       /**
+        * Converter for BinaryArray.
+        */
+       public static class BinaryArrayConverter extends 
IdentityConverter<BinaryArray> {
+
+               public static final BinaryArrayConverter INSTANCE = new 
BinaryArrayConverter();
+
+               private BinaryArrayConverter() {}
+
+               @Override
+               BinaryArray toExternalImpl(BaseRow row, int column) {
+                       return row.getArray(column);
+               }
+       }
+
+       /**
+        * Converter for BinaryMap.
+        */
+       public static class BinaryMapConverter extends 
IdentityConverter<BinaryMap> {
+
+               public static final BinaryMapConverter INSTANCE = new 
BinaryMapConverter();
+
+               private BinaryMapConverter() {}
+
+               @Override
+               BinaryMap toExternalImpl(BaseRow row, int column) {
+                       return row.getMap(column);
+               }
+       }
+
+       /**
+        * Converter for String.
+        */
+       public static class StringConverter extends 
DataFormatConverter<BinaryString, String> {
+
+               public static final StringConverter INSTANCE = new 
StringConverter();
+
+               private StringConverter() {}
+
+               @Override
+               BinaryString toInternalImpl(String value) {
+                       return BinaryString.fromString(value);
+               }
+
+               @Override
+               String toExternalImpl(BinaryString value) {
+                       return value.toString();
+               }
+
+               @Override
+               String toExternalImpl(BaseRow row, int column) {
+                       return row.getString(column).toString();
+               }
+       }
+
+       /**
+        * Converter for date.
+        */
+       public static class DateConverter extends DataFormatConverter<Integer, 
Date> {
+
+               public static final DateConverter INSTANCE = new 
DateConverter();
+
+               private DateConverter() {}
+
+               @Override
+               Integer toInternalImpl(Date value) {
+                       return DateTimeUtils.toInt(value);
+               }
+
+               @Override
+               Date toExternalImpl(Integer value) {
+                       return DateTimeUtils.internalToDate(value);
+               }
+
+               @Override
+               Date toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getInt(column));
+               }
+       }
+
+       /**
+        * Converter for time.
+        */
+       public static class TimeConverter extends DataFormatConverter<Integer, 
Time> {
+
+               public static final TimeConverter INSTANCE = new 
TimeConverter();
+
+               private TimeConverter() {}
+
+               @Override
+               Integer toInternalImpl(Time value) {
+                       return DateTimeUtils.toInt(value);
+               }
+
+               @Override
+               Time toExternalImpl(Integer value) {
+                       return DateTimeUtils.internalToTime(value);
+               }
+
+               @Override
+               Time toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getInt(column));
+               }
+       }
+
+       /**
+        * Converter for timestamp.
+        */
+       public static class TimestampConverter extends 
DataFormatConverter<Long, Timestamp> {
+
+               public static final TimestampConverter INSTANCE = new 
TimestampConverter();
+
+               private TimestampConverter() {}
+
+               @Override
+               Long toInternalImpl(Timestamp value) {
+                       return DateTimeUtils.toLong(value);
+               }
+
+               @Override
+               Timestamp toExternalImpl(Long value) {
+                       return DateTimeUtils.internalToTimestamp(value);
+               }
+
+               @Override
+               Timestamp toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getLong(column));
+               }
+       }
+
+       /**
+        * Converter for primitive int array.
+        */
+       public static class PrimitiveIntArrayConverter extends 
DataFormatConverter<BinaryArray, int[]> {
+
+               public static final PrimitiveIntArrayConverter INSTANCE = new 
PrimitiveIntArrayConverter();
+
+               private PrimitiveIntArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(int[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               int[] toExternalImpl(BinaryArray value) {
+                       return value.toIntArray();
+               }
+
+               @Override
+               int[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for primitive boolean array.
+        */
+       public static class PrimitiveBooleanArrayConverter extends 
DataFormatConverter<BinaryArray, boolean[]> {
+
+               public static final PrimitiveBooleanArrayConverter INSTANCE = 
new PrimitiveBooleanArrayConverter();
+
+               private PrimitiveBooleanArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(boolean[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               boolean[] toExternalImpl(BinaryArray value) {
+                       return value.toBooleanArray();
+               }
+
+               @Override
+               boolean[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for primitive byte array.
+        */
+       public static class PrimitiveByteArrayConverter extends 
DataFormatConverter<BinaryArray, byte[]> {
+
+               public static final PrimitiveByteArrayConverter INSTANCE = new 
PrimitiveByteArrayConverter();
+
+               private PrimitiveByteArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(byte[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               byte[] toExternalImpl(BinaryArray value) {
+                       return value.toByteArray();
+               }
+
+               @Override
+               byte[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for primitive short array.
+        */
+       public static class PrimitiveShortArrayConverter extends 
DataFormatConverter<BinaryArray, short[]> {
+
+               public static final PrimitiveShortArrayConverter INSTANCE = new 
PrimitiveShortArrayConverter();
+
+               private PrimitiveShortArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(short[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               short[] toExternalImpl(BinaryArray value) {
+                       return value.toShortArray();
+               }
+
+               @Override
+               short[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for primitive long array.
+        */
+       public static class PrimitiveLongArrayConverter extends 
DataFormatConverter<BinaryArray, long[]> {
+
+               public static final PrimitiveLongArrayConverter INSTANCE = new 
PrimitiveLongArrayConverter();
+
+               private PrimitiveLongArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(long[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               long[] toExternalImpl(BinaryArray value) {
+                       return value.toLongArray();
+               }
+
+               @Override
+               long[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for primitive float array.
+        */
+       public static class PrimitiveFloatArrayConverter extends 
DataFormatConverter<BinaryArray, float[]> {
+
+               public static final PrimitiveFloatArrayConverter INSTANCE = new 
PrimitiveFloatArrayConverter();
+
+               private PrimitiveFloatArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(float[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               float[] toExternalImpl(BinaryArray value) {
+                       return value.toFloatArray();
+               }
+
+               @Override
+               float[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for primitive double array.
+        */
+       public static class PrimitiveDoubleArrayConverter extends 
DataFormatConverter<BinaryArray, double[]> {
+
+               public static final PrimitiveDoubleArrayConverter INSTANCE = 
new PrimitiveDoubleArrayConverter();
+
+               private PrimitiveDoubleArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(double[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               double[] toExternalImpl(BinaryArray value) {
+                       return value.toDoubleArray();
+               }
+
+               @Override
+               double[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for primitive char array.
+        */
+       public static class PrimitiveCharArrayConverter extends 
DataFormatConverter<BinaryArray, char[]> {
+
+               public static final PrimitiveCharArrayConverter INSTANCE = new 
PrimitiveCharArrayConverter();
+
+               private PrimitiveCharArrayConverter() {}
+
+               @Override
+               BinaryArray toInternalImpl(char[] value) {
+                       return BinaryArray.fromPrimitiveArray(value);
+               }
+
+               @Override
+               char[] toExternalImpl(BinaryArray value) {
+                       return value.toCharArray();
+               }
+
+               @Override
+               char[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       /**
+        * Converter for object array.
+        */
+       public static class ObjectArrayConverter<T> extends 
DataFormatConverter<BinaryArray, T[]> {
+
+               private final InternalType elementType;
+               private final DataFormatConverter<Object, T> elementConverter;
+               private final Class<T> componentClass;
+               private final int elementSize;
+
+               public ObjectArrayConverter(TypeInformation<T> elementTypeInfo) 
{
+                       this.elementType = 
TypeConverters.createInternalTypeFromTypeInfo(elementTypeInfo);
+                       this.elementConverter = 
DataFormatConverters.getConverterForTypeInfo(elementTypeInfo);
+                       this.componentClass = elementTypeInfo.getTypeClass();
+                       this.elementSize = 
BinaryArray.calculateFixLengthPartSize(elementType);
+               }
+
+               @Override
+               BinaryArray toInternalImpl(T[] value) {
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
value.length, elementSize);
+                       for (int i = 0; i < value.length; i++) {
+                               Object field = value[i];
+                               if (field == null) {
+                                       // if we reuse BinaryArrayWriter, we 
need invoke setNullInt.
+                                       writer.setNullAt(i);
+                               } else {
+                                       BinaryWriter.write(writer, i, 
elementConverter.toInternalImpl(value[i]), elementType);
+                               }
+                       }
+                       writer.complete();
+                       return array;
+               }
+
+               @Override
+               T[] toExternalImpl(BinaryArray value) {
+                       return binaryArrayToJavaArray(value, elementType, 
componentClass, elementConverter);
+               }
+
+               @Override
+               T[] toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getArray(column));
+               }
+       }
+
+       private static  <T> T[] binaryArrayToJavaArray(BinaryArray value, 
InternalType elementType,
+                       Class<T> componentClass, DataFormatConverter<Object, T> 
elementConverter) {
+               int size = value.numElements();
+               T[] values = (T[]) Array.newInstance(componentClass, size);
+               for (int i = 0; i < size; i++) {
+                       if (value.isNullAt(i)) {
+                               values[i] = null;
+                       } else {
+                               values[i] = elementConverter.toExternalImpl(
+                                               TypeGetterSetters.get(value, i, 
elementType));
+                       }
+               }
+               return values;
+       }
+
+       /**
+        * Converter for map.
+        */
+       public static class MapConverter extends DataFormatConverter<BinaryMap, 
Map> {
+
+               private final InternalType keyType;
+               private final InternalType valueType;
+
+               private final DataFormatConverter keyConverter;
+               private final DataFormatConverter valueConverter;
+
+               private final int keyElementSize;
+               private final int valueElementSize;
+
+               private final Class keyComponentClass;
+               private final Class valueComponentClass;
+
+               public MapConverter(TypeInformation keyTypeInfo, 
TypeInformation valueTypeInfo) {
+                       this.keyType = 
TypeConverters.createInternalTypeFromTypeInfo(keyTypeInfo);
+                       this.valueType = 
TypeConverters.createInternalTypeFromTypeInfo(valueTypeInfo);
+                       this.keyConverter = 
DataFormatConverters.getConverterForTypeInfo(keyTypeInfo);
+                       this.valueConverter = 
DataFormatConverters.getConverterForTypeInfo(valueTypeInfo);
+                       this.keyElementSize = 
BinaryArray.calculateFixLengthPartSize(keyType);
+                       this.valueElementSize = 
BinaryArray.calculateFixLengthPartSize(valueType);
+                       this.keyComponentClass = keyTypeInfo.getTypeClass();
+                       this.valueComponentClass = valueTypeInfo.getTypeClass();
+               }
+
+               @Override
+               BinaryMap toInternalImpl(Map value) {
+                       BinaryArray keyArray = new BinaryArray();
+                       BinaryArrayWriter keyWriter = new 
BinaryArrayWriter(keyArray, value.size(), keyElementSize);
+
+                       BinaryArray valueArray = new BinaryArray();
+                       BinaryArrayWriter valueWriter = new 
BinaryArrayWriter(valueArray, value.size(), valueElementSize);
+
+                       int i = 0;
+                       for (Map.Entry<Object, Object> entry : ((Map<Object, 
Object>) value).entrySet()) {
+                               // if we reuse BinaryArrayWriter, we need 
invoke setNullInt.
+                               if (entry.getKey() == null) {
+                                       keyWriter.setNullAt(i);
+                               } else {
+                                       BinaryWriter.write(keyWriter, i, 
keyConverter.toInternalImpl(entry.getKey()), keyType);
+                               }
+                               if (entry.getValue() == null) {
+                                       valueWriter.setNullAt(i);
+                               } else {
+                                       BinaryWriter.write(valueWriter, i, 
valueConverter.toInternalImpl(entry.getValue()), valueType);
+                               }
+                               i++;
+                       }
+
+                       keyWriter.complete();
+                       valueWriter.complete();
+                       return BinaryMap.valueOf(keyArray, valueArray);
+               }
+
+               @Override
+               Map toExternalImpl(BinaryMap value) {
+                       Map<Object, Object> map = new HashMap<>();
+                       Object[] keys = 
binaryArrayToJavaArray(value.keyArray(), keyType, keyComponentClass, 
keyConverter);
+                       Object[] values = 
binaryArrayToJavaArray(value.valueArray(), valueType, valueComponentClass, 
valueConverter);
+                       for (int i = 0; i < value.numElements(); i++) {
+                               map.put(keys[i], values[i]);
+                       }
+                       return map;
+               }
+
+               @Override
+               Map toExternalImpl(BaseRow row, int column) {
+                       return toExternalImpl(row.getMap(column));
+               }
+       }
+
+       /**
+        * Abstract converter for internal base row.
+        */
+       public abstract static class AbstractBaseRowConverter<I extends 
BaseRow, E> extends DataFormatConverter<I, E> {
+
+               protected final DataFormatConverter[] converters;
+
+               public AbstractBaseRowConverter(CompositeType t) {
+                       converters = new DataFormatConverter[t.getArity()];
+                       for (int i = 0; i < t.getArity(); i++) {
+                               converters[i] = 
getConverterForTypeInfo(t.getTypeAt(i));
+                       }
+               }
+
+               @Override
+               E toExternalImpl(BaseRow row, int column) {
+                       throw new RuntimeException("Not support yet!");
+               }
+       }
+
+       /**
+        * Converter for base row.
+        */
+       public static class BaseRowConverter extends IdentityConverter<BaseRow> 
{
+
+               public static final BaseRowConverter INSTANCE = new 
BaseRowConverter();
+
+               private BaseRowConverter() {}
+
+               @Override
+               BaseRow toExternalImpl(BaseRow row, int column) {
+                       throw new RuntimeException("Not support yet!");
+               }
+       }
+
+       /**
+        * Converter for pojo.
+        */
+       public static class PojoConverter<T> extends 
AbstractBaseRowConverter<GenericRow, T> {
+
+               private final PojoTypeInfo<T> t;
+               private final PojoField[] fields;
+
+               public PojoConverter(PojoTypeInfo<T> t) {
+                       super(t);
+                       this.fields = new PojoField[t.getArity()];
+                       for (int i = 0; i < t.getArity(); i++) {
+                               fields[i] = t.getPojoFieldAt(i);
+                               fields[i].getField().setAccessible(true);
+                       }
+                       this.t = t;
+               }
+
+               @Override
+               GenericRow toInternalImpl(T value) {
+                       GenericRow genericRow = new GenericRow(t.getArity());
+                       for (int i = 0; i < t.getArity(); i++) {
+                               try {
+                                       genericRow.setField(i, 
converters[i].toInternal(
+                                                       
fields[i].getField().get(value)));
+                               } catch (IllegalAccessException e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+                       return genericRow;
+               }
+
+               @Override
+               T toExternalImpl(GenericRow value) {
+                       try {
+                               T pojo = t.getTypeClass().newInstance();
+                               for (int i = 0; i < t.getArity(); i++) {
+                                       fields[i].getField().set(pojo, 
converters[i].toExternal(value, i));
+                               }
+                               return pojo;
+                       } catch (InstantiationException | 
IllegalAccessException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+       }
+
+       /**
+        * Converter for row.
+        */
+       public static class RowConverter extends 
AbstractBaseRowConverter<GenericRow, Row> {
+
+               private final RowTypeInfo t;
+
+               public RowConverter(RowTypeInfo t) {
+                       super(t);
+                       this.t = t;
+               }
+
+               @Override
+               GenericRow toInternalImpl(Row value) {
+                       GenericRow genericRow = new GenericRow(t.getArity());
+                       for (int i = 0; i < t.getArity(); i++) {
+                               genericRow.setField(i, 
converters[i].toInternal(value.getField(i)));
+                       }
+                       return genericRow;
+               }
+
+               @Override
+               Row toExternalImpl(GenericRow value) {
+                       Row row = new Row(t.getArity());
+                       for (int i = 0; i < t.getArity(); i++) {
+                               row.setField(i, converters[i].toExternal(value, 
i));
+                       }
+                       return row;
+               }
+       }
+
+       /**
+        * Converter for flink tuple.
+        */
+       public static class TupleConverter extends 
AbstractBaseRowConverter<GenericRow, Tuple> {
+
+               private final TupleTypeInfo t;
+
+               public TupleConverter(TupleTypeInfo t) {
+                       super(t);
+                       this.t = t;
+               }
+
+               @Override
+               GenericRow toInternalImpl(Tuple value) {
+                       GenericRow genericRow = new GenericRow(t.getArity());
+                       for (int i = 0; i < t.getArity(); i++) {
+                               genericRow.setField(i, 
converters[i].toInternal(value.getField(i)));
+                       }
+                       return genericRow;
+               }
+
+               @Override
+               Tuple toExternalImpl(GenericRow value) {
+                       try {
+                               Tuple tuple = (Tuple) 
t.getTypeClass().newInstance();
+                               for (int i = 0; i < t.getArity(); i++) {
+                                       
tuple.setField(converters[i].toExternal(value, i), i);
+                               }
+                               return tuple;
+                       } catch (InstantiationException | 
IllegalAccessException e) {
+                               throw new RuntimeException(e);
+                       }
+
+               }
+       }
+
+       /**
+        * Converter for case class.
+        */
+       public static class CaseClassConverter extends 
AbstractBaseRowConverter<GenericRow, Product> {
+
+               private final TupleTypeInfoBase t;
+               private final TupleSerializerBase serializer;
+
+               public CaseClassConverter(TupleTypeInfoBase t) {
+                       super(t);
+                       this.t = t;
+                       this.serializer = (TupleSerializerBase) 
t.createSerializer(new ExecutionConfig());
+               }
+
+               @Override
+               GenericRow toInternalImpl(Product value) {
+                       GenericRow genericRow = new GenericRow(t.getArity());
+                       for (int i = 0; i < t.getArity(); i++) {
+                               genericRow.setField(i, 
converters[i].toInternal(value.productElement(i)));
+                       }
+                       return genericRow;
+               }
+
+               @Override
+               Product toExternalImpl(GenericRow value) {
+                       Object[] fields = new Object[t.getArity()];
+                       for (int i = 0; i < t.getArity(); i++) {
+                               fields[i] = converters[i].toExternal(value, i);
+                       }
+                       return (Product) serializer.createInstance(fields);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/DateTimeUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/DateTimeUtils.java
new file mode 100644
index 0000000..c86f821
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/DateTimeUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+/**
+ * Utility functions for datetime types: date, time, timestamp.
+ * Currently, it is a bit messy putting date time functions in various classes 
because
+ * the runtime module does not depend on calcite..
+ */
+public class DateTimeUtils {
+
+       public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+       /**
+        * The number of milliseconds in a day.
+        *
+        * <p>This is the modulo 'mask' used when converting
+        * TIMESTAMP values to DATE and TIME values.
+        */
+       public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 
* 1000
+
+       /** Converts the Java type used for UDF parameters of SQL TIME type
+        * ({@link java.sql.Time}) to internal representation (int).
+        *
+        * <p>Converse of {@link #internalToTime(int)}. */
+       public static int toInt(java.sql.Time v) {
+               return (int) (toLong(v) % MILLIS_PER_DAY);
+       }
+
+       /** Converts the Java type used for UDF parameters of SQL DATE type
+        * ({@link java.sql.Date}) to internal representation (int).
+        *
+        * <p>Converse of {@link #internalToDate(int)}. */
+       public static int toInt(java.util.Date v) {
+               return toInt(v, LOCAL_TZ);
+       }
+
+       public static int toInt(java.util.Date v, TimeZone timeZone) {
+               return (int) (toLong(v, timeZone) / MILLIS_PER_DAY);
+       }
+
+       public static long toLong(java.util.Date v) {
+               return toLong(v, LOCAL_TZ);
+       }
+
+       /** Converts the Java type used for UDF parameters of SQL TIMESTAMP type
+        * ({@link java.sql.Timestamp}) to internal representation (long).
+        *
+        * <p>Converse of {@link #internalToTimestamp(long)}. */
+       public static long toLong(Timestamp v) {
+               return toLong(v, LOCAL_TZ);
+       }
+
+       public static long toLong(java.util.Date v, TimeZone timeZone) {
+               long time = v.getTime();
+               return time + (long) timeZone.getOffset(time);
+       }
+
+       /** Converts the internal representation of a SQL DATE (int) to the Java
+        * type used for UDF parameters ({@link java.sql.Date}). */
+       public static java.sql.Date internalToDate(int v) {
+               final long t = v * MILLIS_PER_DAY;
+               return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
+       }
+
+       /** Converts the internal representation of a SQL TIME (int) to the Java
+        * type used for UDF parameters ({@link java.sql.Time}). */
+       public static java.sql.Time internalToTime(int v) {
+               return new java.sql.Time(v - LOCAL_TZ.getOffset(v));
+       }
+
+       /** Converts the internal representation of a SQL TIMESTAMP (long) to 
the Java
+        * type used for UDF parameters ({@link java.sql.Timestamp}). */
+       public static java.sql.Timestamp internalToTimestamp(long v) {
+               return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
+       }
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
new file mode 100644
index 0000000..cfd407c
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.runtime.functions.DateTimeUtils;
+import org.apache.flink.table.type.InternalTypes;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.table.typeutils.BinaryArrayTypeInfo;
+import org.apache.flink.table.typeutils.BinaryMapTypeInfo;
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForTypeInfo;
+
+/**
+ * Test for {@link DataFormatConverters}.
+ */
+public class DataFormatConvertersTest {
+
+       private TypeInformation[] simpleTypes = new TypeInformation[] {
+               BasicTypeInfo.STRING_TYPE_INFO,
+               BasicTypeInfo.BOOLEAN_TYPE_INFO,
+               BasicTypeInfo.INT_TYPE_INFO,
+               BasicTypeInfo.LONG_TYPE_INFO,
+               BasicTypeInfo.FLOAT_TYPE_INFO,
+               BasicTypeInfo.DOUBLE_TYPE_INFO,
+               BasicTypeInfo.SHORT_TYPE_INFO,
+               BasicTypeInfo.BYTE_TYPE_INFO,
+               BasicTypeInfo.CHAR_TYPE_INFO,
+
+               PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO,
+               PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+               PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO,
+               PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO,
+               PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+               PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO,
+               PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+               PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO,
+
+               SqlTimeTypeInfo.DATE,
+               SqlTimeTypeInfo.TIME,
+               SqlTimeTypeInfo.TIMESTAMP,
+
+               BinaryStringTypeInfo.INSTANCE
+       };
+
+       private Object[] simpleValues = new Object[] {
+                       "haha",
+                       true,
+                       22,
+                       1111L,
+                       0.5f,
+                       0.5d,
+                       (short) 1,
+                       (byte) 1,
+                       (char) 1,
+
+                       new boolean[] {true, false},
+                       new int[] {5, 1},
+                       new long[] {5, 1},
+                       new float[] {5, 1},
+                       new double[] {5, 1},
+                       new short[] {5, 1},
+                       new byte[] {5, 1},
+                       new char[] {5, 1},
+
+                       DateTimeUtils.internalToDate(5),
+                       new Time(11),
+                       new Timestamp(11),
+
+                       BinaryString.fromString("hahah")
+       };
+
+       private static void test(TypeInformation typeInfo, Object value) {
+               DataFormatConverter converter = 
getConverterForTypeInfo(typeInfo);
+               Assert.assertTrue(Arrays.deepEquals(
+                               new Object[] 
{converter.toExternal(converter.toInternal(value))}, new Object[] {value}));
+       }
+
+       @Test
+       public void testTypes() {
+               for (int i = 0; i < simpleTypes.length; i++) {
+                       test(simpleTypes[i], simpleValues[i]);
+               }
+               test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length));
+               test(new RowTypeInfo(simpleTypes), Row.of(simpleValues));
+               test(new BaseRowTypeInfo(InternalTypes.STRING, 
InternalTypes.INT),
+                               GenericRow.of(BinaryString.fromString("hehe"), 
111));
+               test(new BaseRowTypeInfo(InternalTypes.STRING, 
InternalTypes.INT), GenericRow.of(null, null));
+               test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] 
{1D, 5D});
+               test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] 
{null, null});
+               test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] 
{null, null});
+               test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] 
{"haha", "hehe"});
+               test(new MapTypeInfo<>(Types.STRING, Types.INT), null);
+
+               HashMap<String, Integer> map = new HashMap<>();
+               map.put("haha", 1);
+               map.put("hah1", 5);
+               map.put(null, null);
+               test(new MapTypeInfo<>(Types.STRING, Types.INT), map);
+
+               Tuple2 tuple2 = new Tuple2<>(5, 10);
+               TupleTypeInfo tupleTypeInfo = new 
TupleTypeInfo<>(tuple2.getClass(), Types.INT, Types.INT);
+               test(tupleTypeInfo, tuple2);
+
+               test(TypeExtractor.createTypeInfo(MyPojo.class), new MyPojo(1, 
3));
+
+               test(new BinaryArrayTypeInfo(InternalTypes.INT), 
BinaryArray.fromPrimitiveArray(new int[]{1, 5}));
+               test(new BinaryMapTypeInfo(InternalTypes.INT, 
InternalTypes.INT),
+                               BinaryMap.valueOf(
+                                               
BinaryArray.fromPrimitiveArray(new int[]{1, 5}),
+                                               
BinaryArray.fromPrimitiveArray(new int[]{6, 7})));
+       }
+
+       /**
+        * Test pojo.
+        */
+       public static class MyPojo {
+               public int f1 = 0;
+               public int f2 = 0;
+
+               public MyPojo() {}
+
+               public MyPojo(int f1, int f2) {
+                       this.f1 = f1;
+                       this.f2 = f2;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       MyPojo myPojo = (MyPojo) o;
+
+                       return f1 == myPojo.f1 && f2 == myPojo.f2;
+               }
+       }
+}

Reply via email to