This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e2579e3 [FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to convert between internal data format and java format e2579e3 is described below commit e2579e39602ab7d3e906a185353dd413aca58317 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Tue Mar 5 16:36:10 2019 +0800 [FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to convert between internal data format and java format This closes #7904 --- .../apache/flink/table/dataformat/BinaryArray.java | 43 + .../table/dataformat/DataFormatConverters.java | 979 +++++++++++++++++++++ .../table/runtime/functions/DateTimeUtils.java | 96 ++ .../table/dataformat/DataFormatConvertersTest.java | 180 ++++ 4 files changed, 1298 insertions(+) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java index 57ce04b..de9d6ee 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java @@ -20,6 +20,8 @@ package org.apache.flink.table.dataformat; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.type.InternalTypes; import org.apache.flink.table.util.SegmentsUtil; import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; @@ -40,6 +42,7 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class); private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class); + private static final int CHAR_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(char[].class); private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class); private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class); @@ -49,6 +52,34 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { return 4 + ((numFields + 31) / 32) * 4; } + /** + * It store real value when type is primitive. + * It store the length and offset of variable-length part when type is string, map, etc. + */ + public static int calculateFixLengthPartSize(InternalType type) { + if (type.equals(InternalTypes.BOOLEAN)) { + return 1; + } else if (type.equals(InternalTypes.BYTE)) { + return 1; + } else if (type.equals(InternalTypes.SHORT)) { + return 2; + } else if (type.equals(InternalTypes.INT)) { + return 4; + } else if (type.equals(InternalTypes.FLOAT)) { + return 4; + } else if (type.equals(InternalTypes.CHAR)) { + return 2; + } else if (type.equals(InternalTypes.DATE)) { + return 4; + } else if (type.equals(InternalTypes.TIME)) { + return 4; + } else { + // long, double is 8 bytes. + // It store the length and offset of variable-length part when type is string, map, etc. + return 8; + } + } + // The number of elements in this array private int numElements; @@ -315,6 +346,14 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { return values; } + public char[] toCharArray() { + checkNoNull(); + char[] values = new char[numElements]; + SegmentsUtil.copyToUnsafe( + segments, elementOffset, values, CHAR_ARRAY_OFFSET, numElements * 2); + return values; + } + public int[] toIntArray() { checkNoNull(); int[] values = new int[numElements]; @@ -393,6 +432,10 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2); } + public static BinaryArray fromPrimitiveArray(char[] arr) { + return fromPrimitiveArray(arr, CHAR_ARRAY_OFFSET, arr.length, 2); + } + public static BinaryArray fromPrimitiveArray(int[] arr) { return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java new file mode 100644 index 0000000..7fbd9ba --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java @@ -0,0 +1,979 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import org.apache.flink.table.runtime.functions.DateTimeUtils; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.type.TypeConverters; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.table.typeutils.BinaryArrayTypeInfo; +import org.apache.flink.table.typeutils.BinaryMapTypeInfo; +import org.apache.flink.table.typeutils.BinaryStringTypeInfo; +import org.apache.flink.types.Row; + +import java.lang.reflect.Array; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import scala.Product; + +/** + * Converters between internal data format and java format. + * + * <p>The following scenarios will use converter for java format to internal data format: + * In source, data from user define source to internal sql engine. + * In udx return value, User outputs java format data to the SQL engine. + * + * <p>The following scenarios will use converter for internal data format to java format: + * In udx method parameters, data from internal sql engine need to be provided to user udx. + * In sink, data from internal sql engine need to be provided to user define sink. + */ +public class DataFormatConverters { + + private static final Map<TypeInformation, DataFormatConverter> TYPE_INFO_TO_CONVERTER; + static { + Map<TypeInformation, DataFormatConverter> t2C = new HashMap<>(); + t2C.put(BasicTypeInfo.STRING_TYPE_INFO, StringConverter.INSTANCE); + t2C.put(BasicTypeInfo.BOOLEAN_TYPE_INFO, BooleanConverter.INSTANCE); + t2C.put(BasicTypeInfo.INT_TYPE_INFO, IntConverter.INSTANCE); + t2C.put(BasicTypeInfo.LONG_TYPE_INFO, LongConverter.INSTANCE); + t2C.put(BasicTypeInfo.FLOAT_TYPE_INFO, FloatConverter.INSTANCE); + t2C.put(BasicTypeInfo.DOUBLE_TYPE_INFO, DoubleConverter.INSTANCE); + t2C.put(BasicTypeInfo.SHORT_TYPE_INFO, ShortConverter.INSTANCE); + t2C.put(BasicTypeInfo.BYTE_TYPE_INFO, ByteConverter.INSTANCE); + t2C.put(BasicTypeInfo.CHAR_TYPE_INFO, CharConverter.INSTANCE); + + t2C.put(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveBooleanArrayConverter.INSTANCE); + t2C.put(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveIntArrayConverter.INSTANCE); + t2C.put(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveLongArrayConverter.INSTANCE); + t2C.put(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveFloatArrayConverter.INSTANCE); + t2C.put(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveDoubleArrayConverter.INSTANCE); + t2C.put(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveShortArrayConverter.INSTANCE); + t2C.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveByteArrayConverter.INSTANCE); + t2C.put(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveCharArrayConverter.INSTANCE); + + t2C.put(SqlTimeTypeInfo.DATE, DateConverter.INSTANCE); + t2C.put(SqlTimeTypeInfo.TIME, TimeConverter.INSTANCE); + t2C.put(SqlTimeTypeInfo.TIMESTAMP, TimestampConverter.INSTANCE); + + t2C.put(BinaryStringTypeInfo.INSTANCE, BinaryStringConverter.INSTANCE); + + TYPE_INFO_TO_CONVERTER = Collections.unmodifiableMap(t2C); + } + + /** + * Get {@link DataFormatConverter} for {@link TypeInformation}. + * + * @param typeInfo DataFormatConverter is oriented to Java format, while InternalType has + * lost its specific Java format. Only TypeInformation retains all its + * Java format information. + */ + public static DataFormatConverter getConverterForTypeInfo(TypeInformation typeInfo) { + DataFormatConverter converter = TYPE_INFO_TO_CONVERTER.get(typeInfo); + if (converter != null) { + return converter; + } + + if (typeInfo instanceof BasicArrayTypeInfo) { + return new ObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo()); + } else if (typeInfo instanceof ObjectArrayTypeInfo) { + return new ObjectArrayConverter(((ObjectArrayTypeInfo) typeInfo).getComponentInfo()); + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapType = (MapTypeInfo) typeInfo; + return new MapConverter(mapType.getKeyTypeInfo(), mapType.getValueTypeInfo()); + } else if (typeInfo instanceof RowTypeInfo) { + return new RowConverter((RowTypeInfo) typeInfo); + } else if (typeInfo instanceof PojoTypeInfo) { + return new PojoConverter((PojoTypeInfo) typeInfo); + } else if (typeInfo instanceof TupleTypeInfo) { + return new TupleConverter((TupleTypeInfo) typeInfo); + } else if (typeInfo instanceof TupleTypeInfoBase && Product.class.isAssignableFrom(typeInfo.getTypeClass())) { + return new CaseClassConverter((TupleTypeInfoBase) typeInfo); + } else if (typeInfo instanceof BinaryArrayTypeInfo) { + return BinaryArrayConverter.INSTANCE; + } else if (typeInfo instanceof BinaryMapTypeInfo) { + return BinaryMapConverter.INSTANCE; + } else if (typeInfo instanceof BaseRowTypeInfo) { + return BaseRowConverter.INSTANCE; + } else { + throw new RuntimeException("Not support generic yet: " + typeInfo); + } + } + + /** + * Converter between internal data format and java format. + * + * @param <Internal> Internal data format. + * @param <External> External data format. + */ + public abstract static class DataFormatConverter<Internal, External> { + + /** + * Converts a external(Java) data format to its internal equivalent while automatically handling nulls. + */ + public final Internal toInternal(External value) { + return value == null ? null : toInternalImpl(value); + } + + /** + * Converts a non-null external(Java) data format to its internal equivalent. + */ + abstract Internal toInternalImpl(External value); + + /** + * Convert a internal data format to its external(Java) equivalent while automatically handling nulls. + */ + public final External toExternal(Internal value) { + return value == null ? null : toExternalImpl(value); + } + + /** + * Convert a non-null internal data format to its external(Java) equivalent. + */ + abstract External toExternalImpl(Internal value); + + /** + * Given a internalType row, convert the value at column `column` to its external(Java) equivalent. + * This method will only be called on non-null columns. + */ + abstract External toExternalImpl(BaseRow row, int column); + + /** + * Given a internalType row, convert the value at column `column` to its external(Java) equivalent. + */ + public final External toExternal(BaseRow row, int column) { + return row.isNullAt(column) ? null : toExternalImpl(row, column); + } + } + + /** + * Identity converter. + */ + public abstract static class IdentityConverter<T> extends DataFormatConverter<T, T> { + + @Override + T toInternalImpl(T value) { + return value; + } + + @Override + T toExternalImpl(T value) { + return value; + } + } + + /** + * Converter for boolean. + */ + public static class BooleanConverter extends IdentityConverter<Boolean> { + + public static final BooleanConverter INSTANCE = new BooleanConverter(); + + private BooleanConverter() {} + + @Override + Boolean toExternalImpl(BaseRow row, int column) { + return row.getBoolean(column); + } + } + + /** + * Converter for byte. + */ + public static class ByteConverter extends IdentityConverter<Byte> { + + public static final ByteConverter INSTANCE = new ByteConverter(); + + private ByteConverter() {} + + @Override + Byte toExternalImpl(BaseRow row, int column) { + return row.getByte(column); + } + } + + /** + * Converter for short. + */ + public static class ShortConverter extends IdentityConverter<Short> { + + public static final ShortConverter INSTANCE = new ShortConverter(); + + private ShortConverter() {} + + @Override + Short toExternalImpl(BaseRow row, int column) { + return row.getShort(column); + } + } + + /** + * Converter for int. + */ + public static class IntConverter extends IdentityConverter<Integer> { + + public static final IntConverter INSTANCE = new IntConverter(); + + private IntConverter() {} + + @Override + Integer toExternalImpl(BaseRow row, int column) { + return row.getInt(column); + } + } + + /** + * Converter for long. + */ + public static class LongConverter extends IdentityConverter<Long> { + + public static final LongConverter INSTANCE = new LongConverter(); + + private LongConverter() {} + + @Override + Long toExternalImpl(BaseRow row, int column) { + return row.getLong(column); + } + } + + /** + * Converter for float. + */ + public static class FloatConverter extends IdentityConverter<Float> { + + public static final FloatConverter INSTANCE = new FloatConverter(); + + private FloatConverter() {} + + @Override + Float toExternalImpl(BaseRow row, int column) { + return row.getFloat(column); + } + } + + /** + * Converter for double. + */ + public static class DoubleConverter extends IdentityConverter<Double> { + + public static final DoubleConverter INSTANCE = new DoubleConverter(); + + private DoubleConverter() {} + + @Override + Double toExternalImpl(BaseRow row, int column) { + return row.getDouble(column); + } + } + + /** + * Converter for char. + */ + public static class CharConverter extends IdentityConverter<Character> { + + public static final CharConverter INSTANCE = new CharConverter(); + + private CharConverter() {} + + @Override + Character toExternalImpl(BaseRow row, int column) { + return row.getChar(column); + } + } + + /** + * Converter for BinaryString. + */ + public static class BinaryStringConverter extends IdentityConverter<BinaryString> { + + public static final BinaryStringConverter INSTANCE = new BinaryStringConverter(); + + private BinaryStringConverter() {} + + @Override + BinaryString toExternalImpl(BaseRow row, int column) { + return row.getString(column); + } + } + + /** + * Converter for BinaryArray. + */ + public static class BinaryArrayConverter extends IdentityConverter<BinaryArray> { + + public static final BinaryArrayConverter INSTANCE = new BinaryArrayConverter(); + + private BinaryArrayConverter() {} + + @Override + BinaryArray toExternalImpl(BaseRow row, int column) { + return row.getArray(column); + } + } + + /** + * Converter for BinaryMap. + */ + public static class BinaryMapConverter extends IdentityConverter<BinaryMap> { + + public static final BinaryMapConverter INSTANCE = new BinaryMapConverter(); + + private BinaryMapConverter() {} + + @Override + BinaryMap toExternalImpl(BaseRow row, int column) { + return row.getMap(column); + } + } + + /** + * Converter for String. + */ + public static class StringConverter extends DataFormatConverter<BinaryString, String> { + + public static final StringConverter INSTANCE = new StringConverter(); + + private StringConverter() {} + + @Override + BinaryString toInternalImpl(String value) { + return BinaryString.fromString(value); + } + + @Override + String toExternalImpl(BinaryString value) { + return value.toString(); + } + + @Override + String toExternalImpl(BaseRow row, int column) { + return row.getString(column).toString(); + } + } + + /** + * Converter for date. + */ + public static class DateConverter extends DataFormatConverter<Integer, Date> { + + public static final DateConverter INSTANCE = new DateConverter(); + + private DateConverter() {} + + @Override + Integer toInternalImpl(Date value) { + return DateTimeUtils.toInt(value); + } + + @Override + Date toExternalImpl(Integer value) { + return DateTimeUtils.internalToDate(value); + } + + @Override + Date toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getInt(column)); + } + } + + /** + * Converter for time. + */ + public static class TimeConverter extends DataFormatConverter<Integer, Time> { + + public static final TimeConverter INSTANCE = new TimeConverter(); + + private TimeConverter() {} + + @Override + Integer toInternalImpl(Time value) { + return DateTimeUtils.toInt(value); + } + + @Override + Time toExternalImpl(Integer value) { + return DateTimeUtils.internalToTime(value); + } + + @Override + Time toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getInt(column)); + } + } + + /** + * Converter for timestamp. + */ + public static class TimestampConverter extends DataFormatConverter<Long, Timestamp> { + + public static final TimestampConverter INSTANCE = new TimestampConverter(); + + private TimestampConverter() {} + + @Override + Long toInternalImpl(Timestamp value) { + return DateTimeUtils.toLong(value); + } + + @Override + Timestamp toExternalImpl(Long value) { + return DateTimeUtils.internalToTimestamp(value); + } + + @Override + Timestamp toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getLong(column)); + } + } + + /** + * Converter for primitive int array. + */ + public static class PrimitiveIntArrayConverter extends DataFormatConverter<BinaryArray, int[]> { + + public static final PrimitiveIntArrayConverter INSTANCE = new PrimitiveIntArrayConverter(); + + private PrimitiveIntArrayConverter() {} + + @Override + BinaryArray toInternalImpl(int[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + int[] toExternalImpl(BinaryArray value) { + return value.toIntArray(); + } + + @Override + int[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for primitive boolean array. + */ + public static class PrimitiveBooleanArrayConverter extends DataFormatConverter<BinaryArray, boolean[]> { + + public static final PrimitiveBooleanArrayConverter INSTANCE = new PrimitiveBooleanArrayConverter(); + + private PrimitiveBooleanArrayConverter() {} + + @Override + BinaryArray toInternalImpl(boolean[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + boolean[] toExternalImpl(BinaryArray value) { + return value.toBooleanArray(); + } + + @Override + boolean[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for primitive byte array. + */ + public static class PrimitiveByteArrayConverter extends DataFormatConverter<BinaryArray, byte[]> { + + public static final PrimitiveByteArrayConverter INSTANCE = new PrimitiveByteArrayConverter(); + + private PrimitiveByteArrayConverter() {} + + @Override + BinaryArray toInternalImpl(byte[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + byte[] toExternalImpl(BinaryArray value) { + return value.toByteArray(); + } + + @Override + byte[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for primitive short array. + */ + public static class PrimitiveShortArrayConverter extends DataFormatConverter<BinaryArray, short[]> { + + public static final PrimitiveShortArrayConverter INSTANCE = new PrimitiveShortArrayConverter(); + + private PrimitiveShortArrayConverter() {} + + @Override + BinaryArray toInternalImpl(short[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + short[] toExternalImpl(BinaryArray value) { + return value.toShortArray(); + } + + @Override + short[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for primitive long array. + */ + public static class PrimitiveLongArrayConverter extends DataFormatConverter<BinaryArray, long[]> { + + public static final PrimitiveLongArrayConverter INSTANCE = new PrimitiveLongArrayConverter(); + + private PrimitiveLongArrayConverter() {} + + @Override + BinaryArray toInternalImpl(long[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + long[] toExternalImpl(BinaryArray value) { + return value.toLongArray(); + } + + @Override + long[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for primitive float array. + */ + public static class PrimitiveFloatArrayConverter extends DataFormatConverter<BinaryArray, float[]> { + + public static final PrimitiveFloatArrayConverter INSTANCE = new PrimitiveFloatArrayConverter(); + + private PrimitiveFloatArrayConverter() {} + + @Override + BinaryArray toInternalImpl(float[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + float[] toExternalImpl(BinaryArray value) { + return value.toFloatArray(); + } + + @Override + float[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for primitive double array. + */ + public static class PrimitiveDoubleArrayConverter extends DataFormatConverter<BinaryArray, double[]> { + + public static final PrimitiveDoubleArrayConverter INSTANCE = new PrimitiveDoubleArrayConverter(); + + private PrimitiveDoubleArrayConverter() {} + + @Override + BinaryArray toInternalImpl(double[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + double[] toExternalImpl(BinaryArray value) { + return value.toDoubleArray(); + } + + @Override + double[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for primitive char array. + */ + public static class PrimitiveCharArrayConverter extends DataFormatConverter<BinaryArray, char[]> { + + public static final PrimitiveCharArrayConverter INSTANCE = new PrimitiveCharArrayConverter(); + + private PrimitiveCharArrayConverter() {} + + @Override + BinaryArray toInternalImpl(char[] value) { + return BinaryArray.fromPrimitiveArray(value); + } + + @Override + char[] toExternalImpl(BinaryArray value) { + return value.toCharArray(); + } + + @Override + char[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + /** + * Converter for object array. + */ + public static class ObjectArrayConverter<T> extends DataFormatConverter<BinaryArray, T[]> { + + private final InternalType elementType; + private final DataFormatConverter<Object, T> elementConverter; + private final Class<T> componentClass; + private final int elementSize; + + public ObjectArrayConverter(TypeInformation<T> elementTypeInfo) { + this.elementType = TypeConverters.createInternalTypeFromTypeInfo(elementTypeInfo); + this.elementConverter = DataFormatConverters.getConverterForTypeInfo(elementTypeInfo); + this.componentClass = elementTypeInfo.getTypeClass(); + this.elementSize = BinaryArray.calculateFixLengthPartSize(elementType); + } + + @Override + BinaryArray toInternalImpl(T[] value) { + BinaryArray array = new BinaryArray(); + BinaryArrayWriter writer = new BinaryArrayWriter(array, value.length, elementSize); + for (int i = 0; i < value.length; i++) { + Object field = value[i]; + if (field == null) { + // if we reuse BinaryArrayWriter, we need invoke setNullInt. + writer.setNullAt(i); + } else { + BinaryWriter.write(writer, i, elementConverter.toInternalImpl(value[i]), elementType); + } + } + writer.complete(); + return array; + } + + @Override + T[] toExternalImpl(BinaryArray value) { + return binaryArrayToJavaArray(value, elementType, componentClass, elementConverter); + } + + @Override + T[] toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getArray(column)); + } + } + + private static <T> T[] binaryArrayToJavaArray(BinaryArray value, InternalType elementType, + Class<T> componentClass, DataFormatConverter<Object, T> elementConverter) { + int size = value.numElements(); + T[] values = (T[]) Array.newInstance(componentClass, size); + for (int i = 0; i < size; i++) { + if (value.isNullAt(i)) { + values[i] = null; + } else { + values[i] = elementConverter.toExternalImpl( + TypeGetterSetters.get(value, i, elementType)); + } + } + return values; + } + + /** + * Converter for map. + */ + public static class MapConverter extends DataFormatConverter<BinaryMap, Map> { + + private final InternalType keyType; + private final InternalType valueType; + + private final DataFormatConverter keyConverter; + private final DataFormatConverter valueConverter; + + private final int keyElementSize; + private final int valueElementSize; + + private final Class keyComponentClass; + private final Class valueComponentClass; + + public MapConverter(TypeInformation keyTypeInfo, TypeInformation valueTypeInfo) { + this.keyType = TypeConverters.createInternalTypeFromTypeInfo(keyTypeInfo); + this.valueType = TypeConverters.createInternalTypeFromTypeInfo(valueTypeInfo); + this.keyConverter = DataFormatConverters.getConverterForTypeInfo(keyTypeInfo); + this.valueConverter = DataFormatConverters.getConverterForTypeInfo(valueTypeInfo); + this.keyElementSize = BinaryArray.calculateFixLengthPartSize(keyType); + this.valueElementSize = BinaryArray.calculateFixLengthPartSize(valueType); + this.keyComponentClass = keyTypeInfo.getTypeClass(); + this.valueComponentClass = valueTypeInfo.getTypeClass(); + } + + @Override + BinaryMap toInternalImpl(Map value) { + BinaryArray keyArray = new BinaryArray(); + BinaryArrayWriter keyWriter = new BinaryArrayWriter(keyArray, value.size(), keyElementSize); + + BinaryArray valueArray = new BinaryArray(); + BinaryArrayWriter valueWriter = new BinaryArrayWriter(valueArray, value.size(), valueElementSize); + + int i = 0; + for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) value).entrySet()) { + // if we reuse BinaryArrayWriter, we need invoke setNullInt. + if (entry.getKey() == null) { + keyWriter.setNullAt(i); + } else { + BinaryWriter.write(keyWriter, i, keyConverter.toInternalImpl(entry.getKey()), keyType); + } + if (entry.getValue() == null) { + valueWriter.setNullAt(i); + } else { + BinaryWriter.write(valueWriter, i, valueConverter.toInternalImpl(entry.getValue()), valueType); + } + i++; + } + + keyWriter.complete(); + valueWriter.complete(); + return BinaryMap.valueOf(keyArray, valueArray); + } + + @Override + Map toExternalImpl(BinaryMap value) { + Map<Object, Object> map = new HashMap<>(); + Object[] keys = binaryArrayToJavaArray(value.keyArray(), keyType, keyComponentClass, keyConverter); + Object[] values = binaryArrayToJavaArray(value.valueArray(), valueType, valueComponentClass, valueConverter); + for (int i = 0; i < value.numElements(); i++) { + map.put(keys[i], values[i]); + } + return map; + } + + @Override + Map toExternalImpl(BaseRow row, int column) { + return toExternalImpl(row.getMap(column)); + } + } + + /** + * Abstract converter for internal base row. + */ + public abstract static class AbstractBaseRowConverter<I extends BaseRow, E> extends DataFormatConverter<I, E> { + + protected final DataFormatConverter[] converters; + + public AbstractBaseRowConverter(CompositeType t) { + converters = new DataFormatConverter[t.getArity()]; + for (int i = 0; i < t.getArity(); i++) { + converters[i] = getConverterForTypeInfo(t.getTypeAt(i)); + } + } + + @Override + E toExternalImpl(BaseRow row, int column) { + throw new RuntimeException("Not support yet!"); + } + } + + /** + * Converter for base row. + */ + public static class BaseRowConverter extends IdentityConverter<BaseRow> { + + public static final BaseRowConverter INSTANCE = new BaseRowConverter(); + + private BaseRowConverter() {} + + @Override + BaseRow toExternalImpl(BaseRow row, int column) { + throw new RuntimeException("Not support yet!"); + } + } + + /** + * Converter for pojo. + */ + public static class PojoConverter<T> extends AbstractBaseRowConverter<GenericRow, T> { + + private final PojoTypeInfo<T> t; + private final PojoField[] fields; + + public PojoConverter(PojoTypeInfo<T> t) { + super(t); + this.fields = new PojoField[t.getArity()]; + for (int i = 0; i < t.getArity(); i++) { + fields[i] = t.getPojoFieldAt(i); + fields[i].getField().setAccessible(true); + } + this.t = t; + } + + @Override + GenericRow toInternalImpl(T value) { + GenericRow genericRow = new GenericRow(t.getArity()); + for (int i = 0; i < t.getArity(); i++) { + try { + genericRow.setField(i, converters[i].toInternal( + fields[i].getField().get(value))); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return genericRow; + } + + @Override + T toExternalImpl(GenericRow value) { + try { + T pojo = t.getTypeClass().newInstance(); + for (int i = 0; i < t.getArity(); i++) { + fields[i].getField().set(pojo, converters[i].toExternal(value, i)); + } + return pojo; + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Converter for row. + */ + public static class RowConverter extends AbstractBaseRowConverter<GenericRow, Row> { + + private final RowTypeInfo t; + + public RowConverter(RowTypeInfo t) { + super(t); + this.t = t; + } + + @Override + GenericRow toInternalImpl(Row value) { + GenericRow genericRow = new GenericRow(t.getArity()); + for (int i = 0; i < t.getArity(); i++) { + genericRow.setField(i, converters[i].toInternal(value.getField(i))); + } + return genericRow; + } + + @Override + Row toExternalImpl(GenericRow value) { + Row row = new Row(t.getArity()); + for (int i = 0; i < t.getArity(); i++) { + row.setField(i, converters[i].toExternal(value, i)); + } + return row; + } + } + + /** + * Converter for flink tuple. + */ + public static class TupleConverter extends AbstractBaseRowConverter<GenericRow, Tuple> { + + private final TupleTypeInfo t; + + public TupleConverter(TupleTypeInfo t) { + super(t); + this.t = t; + } + + @Override + GenericRow toInternalImpl(Tuple value) { + GenericRow genericRow = new GenericRow(t.getArity()); + for (int i = 0; i < t.getArity(); i++) { + genericRow.setField(i, converters[i].toInternal(value.getField(i))); + } + return genericRow; + } + + @Override + Tuple toExternalImpl(GenericRow value) { + try { + Tuple tuple = (Tuple) t.getTypeClass().newInstance(); + for (int i = 0; i < t.getArity(); i++) { + tuple.setField(converters[i].toExternal(value, i), i); + } + return tuple; + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + + } + } + + /** + * Converter for case class. + */ + public static class CaseClassConverter extends AbstractBaseRowConverter<GenericRow, Product> { + + private final TupleTypeInfoBase t; + private final TupleSerializerBase serializer; + + public CaseClassConverter(TupleTypeInfoBase t) { + super(t); + this.t = t; + this.serializer = (TupleSerializerBase) t.createSerializer(new ExecutionConfig()); + } + + @Override + GenericRow toInternalImpl(Product value) { + GenericRow genericRow = new GenericRow(t.getArity()); + for (int i = 0; i < t.getArity(); i++) { + genericRow.setField(i, converters[i].toInternal(value.productElement(i))); + } + return genericRow; + } + + @Override + Product toExternalImpl(GenericRow value) { + Object[] fields = new Object[t.getArity()]; + for (int i = 0; i < t.getArity(); i++) { + fields[i] = converters[i].toExternal(value, i); + } + return (Product) serializer.createInstance(fields); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/DateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/DateTimeUtils.java new file mode 100644 index 0000000..c86f821 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/DateTimeUtils.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions; + +import java.sql.Timestamp; +import java.util.TimeZone; + +/** + * Utility functions for datetime types: date, time, timestamp. + * Currently, it is a bit messy putting date time functions in various classes because + * the runtime module does not depend on calcite.. + */ +public class DateTimeUtils { + + public static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** + * The number of milliseconds in a day. + * + * <p>This is the modulo 'mask' used when converting + * TIMESTAMP values to DATE and TIME values. + */ + public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000 + + /** Converts the Java type used for UDF parameters of SQL TIME type + * ({@link java.sql.Time}) to internal representation (int). + * + * <p>Converse of {@link #internalToTime(int)}. */ + public static int toInt(java.sql.Time v) { + return (int) (toLong(v) % MILLIS_PER_DAY); + } + + /** Converts the Java type used for UDF parameters of SQL DATE type + * ({@link java.sql.Date}) to internal representation (int). + * + * <p>Converse of {@link #internalToDate(int)}. */ + public static int toInt(java.util.Date v) { + return toInt(v, LOCAL_TZ); + } + + public static int toInt(java.util.Date v, TimeZone timeZone) { + return (int) (toLong(v, timeZone) / MILLIS_PER_DAY); + } + + public static long toLong(java.util.Date v) { + return toLong(v, LOCAL_TZ); + } + + /** Converts the Java type used for UDF parameters of SQL TIMESTAMP type + * ({@link java.sql.Timestamp}) to internal representation (long). + * + * <p>Converse of {@link #internalToTimestamp(long)}. */ + public static long toLong(Timestamp v) { + return toLong(v, LOCAL_TZ); + } + + public static long toLong(java.util.Date v, TimeZone timeZone) { + long time = v.getTime(); + return time + (long) timeZone.getOffset(time); + } + + /** Converts the internal representation of a SQL DATE (int) to the Java + * type used for UDF parameters ({@link java.sql.Date}). */ + public static java.sql.Date internalToDate(int v) { + final long t = v * MILLIS_PER_DAY; + return new java.sql.Date(t - LOCAL_TZ.getOffset(t)); + } + + /** Converts the internal representation of a SQL TIME (int) to the Java + * type used for UDF parameters ({@link java.sql.Time}). */ + public static java.sql.Time internalToTime(int v) { + return new java.sql.Time(v - LOCAL_TZ.getOffset(v)); + } + + /** Converts the internal representation of a SQL TIMESTAMP (long) to the Java + * type used for UDF parameters ({@link java.sql.Timestamp}). */ + public static java.sql.Timestamp internalToTimestamp(long v) { + return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v)); + } + +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java new file mode 100644 index 0000000..cfd407c --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter; +import org.apache.flink.table.runtime.functions.DateTimeUtils; +import org.apache.flink.table.type.InternalTypes; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.table.typeutils.BinaryArrayTypeInfo; +import org.apache.flink.table.typeutils.BinaryMapTypeInfo; +import org.apache.flink.table.typeutils.BinaryStringTypeInfo; +import org.apache.flink.types.Row; + +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.HashMap; + +import static org.apache.flink.table.dataformat.DataFormatConverters.getConverterForTypeInfo; + +/** + * Test for {@link DataFormatConverters}. + */ +public class DataFormatConvertersTest { + + private TypeInformation[] simpleTypes = new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.FLOAT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BYTE_TYPE_INFO, + BasicTypeInfo.CHAR_TYPE_INFO, + + PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, + + SqlTimeTypeInfo.DATE, + SqlTimeTypeInfo.TIME, + SqlTimeTypeInfo.TIMESTAMP, + + BinaryStringTypeInfo.INSTANCE + }; + + private Object[] simpleValues = new Object[] { + "haha", + true, + 22, + 1111L, + 0.5f, + 0.5d, + (short) 1, + (byte) 1, + (char) 1, + + new boolean[] {true, false}, + new int[] {5, 1}, + new long[] {5, 1}, + new float[] {5, 1}, + new double[] {5, 1}, + new short[] {5, 1}, + new byte[] {5, 1}, + new char[] {5, 1}, + + DateTimeUtils.internalToDate(5), + new Time(11), + new Timestamp(11), + + BinaryString.fromString("hahah") + }; + + private static void test(TypeInformation typeInfo, Object value) { + DataFormatConverter converter = getConverterForTypeInfo(typeInfo); + Assert.assertTrue(Arrays.deepEquals( + new Object[] {converter.toExternal(converter.toInternal(value))}, new Object[] {value})); + } + + @Test + public void testTypes() { + for (int i = 0; i < simpleTypes.length; i++) { + test(simpleTypes[i], simpleValues[i]); + } + test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length)); + test(new RowTypeInfo(simpleTypes), Row.of(simpleValues)); + test(new BaseRowTypeInfo(InternalTypes.STRING, InternalTypes.INT), + GenericRow.of(BinaryString.fromString("hehe"), 111)); + test(new BaseRowTypeInfo(InternalTypes.STRING, InternalTypes.INT), GenericRow.of(null, null)); + test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] {1D, 5D}); + test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] {null, null}); + test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {null, null}); + test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] {"haha", "hehe"}); + test(new MapTypeInfo<>(Types.STRING, Types.INT), null); + + HashMap<String, Integer> map = new HashMap<>(); + map.put("haha", 1); + map.put("hah1", 5); + map.put(null, null); + test(new MapTypeInfo<>(Types.STRING, Types.INT), map); + + Tuple2 tuple2 = new Tuple2<>(5, 10); + TupleTypeInfo tupleTypeInfo = new TupleTypeInfo<>(tuple2.getClass(), Types.INT, Types.INT); + test(tupleTypeInfo, tuple2); + + test(TypeExtractor.createTypeInfo(MyPojo.class), new MyPojo(1, 3)); + + test(new BinaryArrayTypeInfo(InternalTypes.INT), BinaryArray.fromPrimitiveArray(new int[]{1, 5})); + test(new BinaryMapTypeInfo(InternalTypes.INT, InternalTypes.INT), + BinaryMap.valueOf( + BinaryArray.fromPrimitiveArray(new int[]{1, 5}), + BinaryArray.fromPrimitiveArray(new int[]{6, 7}))); + } + + /** + * Test pojo. + */ + public static class MyPojo { + public int f1 = 0; + public int f2 = 0; + + public MyPojo() {} + + public MyPojo(int f1, int f2) { + this.f1 = f1; + this.f2 = f2; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MyPojo myPojo = (MyPojo) o; + + return f1 == myPojo.f1 && f2 == myPojo.f2; + } + } +}