This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 96a6337 [FLINK-17668][table] Fix shortcomings in new data structures 96a6337 is described below commit 96a633725f1b4616e40576b97473308b289a8028 Author: Timo Walther <twal...@apache.org> AuthorDate: Wed May 13 15:25:36 2020 +0200 [FLINK-17668][table] Fix shortcomings in new data structures This fixes the following shortcomings in the new data structures: - The some data structures do not provide a hashCode/equals for testing. - RawValueData cannot be created from bytes. - Accessing elements requires dealing with logical types during runtime. - Null checks are performed multiple times during runtime even for types that are declared as NOT NULL. This closes #12130. --- .../org/apache/flink/table/data/ArrayData.java | 110 +++++++++++++++++++++ .../apache/flink/table/data/GenericArrayData.java | 24 +++++ .../apache/flink/table/data/GenericMapData.java | 48 +++++++++ .../apache/flink/table/data/GenericRowData.java | 16 ++- .../org/apache/flink/table/data/RawValueData.java | 9 +- .../java/org/apache/flink/table/data/RowData.java | 110 +++++++++++++++++++++ .../flink/table/data/binary/BinaryArrayData.java | 33 ++++++- .../table/data/binary/BinaryRawValueData.java | 31 ++++++ .../flink/table/data/binary/BinaryStringData.java | 10 +- .../flink/table/data/writer/BinaryArrayWriter.java | 69 +++++++++++++ .../flink/table/data/writer/BinaryWriter.java | 84 ++++++++++++++++ .../table/runtime/types/InternalSerializers.java | 9 +- 12 files changed, 538 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java index 8f8a965..59c6b2f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java @@ -21,11 +21,20 @@ package org.apache.flink.table.data; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; +import javax.annotation.Nullable; + +import java.io.Serializable; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + /** * Base interface of an internal data structure representing data of {@link ArrayType}. * @@ -163,7 +172,9 @@ public interface ArrayData { * @param pos position of the element to return * @param elementType the element type of the array * @return the element object at the specified position in this array data + * @deprecated Use {@link #createElementGetter(LogicalType)} for avoiding logical types during runtime. */ + @Deprecated static Object get(ArrayData array, int pos, LogicalType elementType) { if (array.isNullAt(pos)) { return null; @@ -215,4 +226,103 @@ public interface ArrayData { throw new UnsupportedOperationException("Unsupported type: " + elementType); } } + + /** + * Creates an accessor for getting elements in an internal array data structure at the + * given position. + * + * @param elementType the element type of the array + */ + static ElementGetter createElementGetter(LogicalType elementType) { + final ElementGetter elementGetter; + // ordered by type root definition + switch (elementType.getTypeRoot()) { + case CHAR: + case VARCHAR: + elementGetter = ArrayData::getString; + break; + case BOOLEAN: + elementGetter = ArrayData::getBoolean; + break; + case BINARY: + case VARBINARY: + elementGetter = ArrayData::getBinary; + break; + case DECIMAL: + final int decimalPrecision = getPrecision(elementType); + final int decimalScale = getScale(elementType); + elementGetter = (array, pos) -> array.getDecimal(pos, decimalPrecision, decimalScale); + break; + case TINYINT: + elementGetter = ArrayData::getByte; + break; + case SMALLINT: + elementGetter = ArrayData::getShort; + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + elementGetter = ArrayData::getInt; + break; + case BIGINT: + case INTERVAL_DAY_TIME: + elementGetter = ArrayData::getLong; + break; + case FLOAT: + elementGetter = ArrayData::getFloat; + break; + case DOUBLE: + elementGetter = ArrayData::getDouble; + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(elementType); + elementGetter = (array, pos) -> array.getTimestamp(pos, timestampPrecision); + break; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); + case ARRAY: + elementGetter = ArrayData::getArray; + break; + case MULTISET: + case MAP: + elementGetter = ArrayData::getMap; + break; + case ROW: + case STRUCTURED_TYPE: + final int rowFieldCount = getFieldCount(elementType); + elementGetter = (array, pos) -> array.getRow(pos, rowFieldCount); + break; + case DISTINCT_TYPE: + elementGetter = createElementGetter(((DistinctType) elementType).getSourceType()); + break; + case RAW: + elementGetter = ArrayData::getRawValue; + break; + case NULL: + case SYMBOL: + case UNRESOLVED: + default: + throw new IllegalArgumentException(); + } + if (!elementType.isNullable()) { + return elementGetter; + } + return (array, pos) -> { + if (array.isNullAt(pos)) { + return null; + } + return elementGetter.getElementOrNull(array, pos); + }; + } + + /** + * Accessor for getting the elements of an array during runtime. + * + * @see #createElementGetter(LogicalType) + */ + interface ElementGetter extends Serializable { + @Nullable Object getElementOrNull(ArrayData array, int pos); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java index 37babf5..0809012 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java @@ -23,6 +23,9 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.commons.lang3.ArrayUtils; +import java.util.Arrays; +import java.util.Objects; + /** * An internal data structure representing data of {@link ArrayType}. * @@ -132,6 +135,27 @@ public final class GenericArrayData implements ArrayData { return !isPrimitiveArray && ((Object[]) array)[pos] == null; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericArrayData that = (GenericArrayData) o; + return size == that.size && + isPrimitiveArray == that.isPrimitiveArray && + Objects.deepEquals(array, that.array); + } + + @Override + public int hashCode() { + int result = Objects.hash(size, isPrimitiveArray); + result = 31 * result + Arrays.deepHashCode(new Object[]{array}); + return result; + } + // ------------------------------------------------------------------------------------------ // Read-only accessor methods // ------------------------------------------------------------------------------------------ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java index 2e942e8..381e993 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java @@ -23,6 +23,7 @@ import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import java.util.Map; +import java.util.Objects; /** * An internal data structure representing data of {@link MapType} or {@link MultisetType}. @@ -74,5 +75,52 @@ public final class GenericMapData implements MapData { Object[] values = map.values().toArray(); return new GenericArrayData(values); } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof GenericMapData)) { + return false; + } + // deepEquals for values of byte[] + return deepEquals(map, ((GenericMapData) o).map); + } + + private static <K, V> boolean deepEquals(Map<K, V> m1, Map<?, ?> m2) { + // copied from HashMap.equals but with deepEquals comparision + if (m1.size() != m2.size()) { + return false; + } + try { + for (Map.Entry<K, V> e : m1.entrySet()) { + K key = e.getKey(); + V value = e.getValue(); + if (value == null) { + if (!(m2.get(key) == null && m2.containsKey(key))) { + return false; + } + } else { + if (!Objects.deepEquals(value, m2.get(key))) { + return false; + } + } + } + } catch (ClassCastException | NullPointerException unused) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = 0; + for (Object key : map.keySet()) { + // only include key because values can contain byte[] + result += 31 * key.hashCode(); + } + return result; + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java index 157a790..411e65e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java @@ -25,6 +25,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.StringUtils; import java.util.Arrays; +import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -191,17 +192,22 @@ public final class GenericRowData implements RowData { @Override public boolean equals(Object o) { - if (o instanceof GenericRowData) { - GenericRowData other = (GenericRowData) o; - return kind == other.kind && Arrays.equals(fields, other.fields); - } else { + if (this == o) { + return true; + } + if (!(o instanceof GenericRowData)) { return false; } + GenericRowData that = (GenericRowData) o; + return kind == that.kind && + Arrays.deepEquals(fields, that.fields); } @Override public int hashCode() { - return 31 * kind.hashCode() + Arrays.hashCode(fields); + int result = Objects.hash(kind); + result = 31 * result + Arrays.deepHashCode(fields); + return result; } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java index fb98dc4..10ee835 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java @@ -61,7 +61,14 @@ public interface RawValueData<T> { * Creates an instance of {@link RawValueData} from a Java object. */ static <T> RawValueData<T> fromObject(T javaObject) { - return new BinaryRawValueData<>(javaObject); + return BinaryRawValueData.fromObject(javaObject); + } + + /** + * Creates an instance of {@link RawValueData} from the given byte array. + */ + static <T> RawValueData<T> fromBytes(byte[] bytes) { + return BinaryRawValueData.fromBytes(bytes); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java index 929d47b..e4f2c5a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java @@ -21,6 +21,7 @@ package org.apache.flink.table.data; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -28,6 +29,14 @@ import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.types.RowKind; +import javax.annotation.Nullable; + +import java.io.Serializable; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + /** * Base interface for an internal data structure representing data of {@link RowType} and other * (possibly nested) structured types such as {@link StructuredType} in the table ecosystem. @@ -228,6 +237,7 @@ public interface RowData { * @param pos position of the field to return * @param fieldType the field type * @return the field object at the specified position in this row data. + * @deprecated Use {@link #createFieldGetter(LogicalType, int)} for avoiding logical types during runtime. */ static Object get(RowData row, int pos, LogicalType fieldType) { if (row.isNullAt(pos)) { @@ -280,4 +290,104 @@ public interface RowData { throw new UnsupportedOperationException("Unsupported type: " + fieldType); } } + + /** + * Creates an accessor for getting elements in an internal row data structure at the + * given position. + * + * @param fieldType the element type of the row + * @param fieldPos the element type of the row + */ + static FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { + final FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = row -> row.getString(fieldPos); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = getPrecision(fieldType); + final int decimalScale = getScale(fieldType); + fieldGetter = row -> row.getDecimal(fieldPos, decimalPrecision, decimalScale); + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + fieldGetter = row -> row.getInt(fieldPos); + break; + case BIGINT: + case INTERVAL_DAY_TIME: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(fieldType); + fieldGetter = row -> row.getTimestamp(fieldPos, timestampPrecision); + break; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); + case ARRAY: + fieldGetter = row -> row.getArray(fieldPos); + break; + case MULTISET: + case MAP: + fieldGetter = row -> row.getMap(fieldPos); + break; + case ROW: + case STRUCTURED_TYPE: + final int rowFieldCount = getFieldCount(fieldType); + fieldGetter = row -> row.getRow(fieldPos, rowFieldCount); + break; + case DISTINCT_TYPE: + fieldGetter = createFieldGetter(((DistinctType) fieldType).getSourceType(), fieldPos); + break; + case RAW: + fieldGetter = row -> row.getRawValue(fieldPos); + break; + case NULL: + case SYMBOL: + case UNRESOLVED: + default: + throw new IllegalArgumentException(); + } + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); + }; + } + + /** + * Accessor for getting the field of a row during runtime. + * + * @see #createFieldGetter(LogicalType, int) + */ + interface FieldGetter extends Serializable { + @Nullable Object getFieldOrNull(RowData row); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java index 30b94e9..40526d5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java @@ -28,6 +28,7 @@ import org.apache.flink.table.data.RawValueData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; @@ -70,10 +71,33 @@ public final class BinaryArrayData extends BinarySection implements ArrayData, T * It store the length and offset of variable-length part when type is string, map, etc. */ public static int calculateFixLengthPartSize(LogicalType type) { + // ordered by type root definition switch (type.getTypeRoot()) { case BOOLEAN: case TINYINT: return 1; + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + case DECIMAL: + case BIGINT: + case DOUBLE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case INTERVAL_DAY_TIME: + case ARRAY: + case MULTISET: + case MAP: + case ROW: + case STRUCTURED_TYPE: + case RAW: + // long and double are 8 bytes; + // otherwise it stores the length and offset of the variable-length part for types + // such as is string, map, etc. + return 8; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); case SMALLINT: return 2; case INTEGER: @@ -82,10 +106,13 @@ public final class BinaryArrayData extends BinarySection implements ArrayData, T case TIME_WITHOUT_TIME_ZONE: case INTERVAL_YEAR_MONTH: return 4; + case DISTINCT_TYPE: + return calculateFixLengthPartSize(((DistinctType) type).getSourceType()); + case NULL: + case SYMBOL: + case UNRESOLVED: default: - // long, double is 8 bytes. - // It store the length and offset of variable-length part when type is string, map, etc. - return 8; + throw new IllegalArgumentException(); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java index ff5921b..b87b417 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java @@ -104,4 +104,35 @@ public final class BinaryRawValueData<T> extends LazyBinaryFormat<T> implements throw new RuntimeException(e); } } + + // ------------------------------------------------------------------------------------------ + // Construction Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates a {@link BinaryRawValueData} instance from the given Java object. + */ + public static <T> BinaryRawValueData<T> fromObject(T javaObject) { + if (javaObject == null) { + return null; + } + return new BinaryRawValueData<>(javaObject); + } + + /** + * Creates a {@link BinaryStringData} instance from the given bytes. + */ + public static <T> BinaryRawValueData<T> fromBytes(byte[] bytes) { + return fromBytes(bytes, 0, bytes.length); + } + + /** + * Creates a {@link BinaryStringData} instance from the given bytes with offset and number of bytes. + */ + public static <T> BinaryRawValueData<T> fromBytes(byte[] bytes, int offset, int numBytes) { + return new BinaryRawValueData<>( + new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, + offset, + numBytes); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java index 5e85554..bfcf855 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java @@ -63,7 +63,7 @@ public final class BinaryStringData extends LazyBinaryFormat<String> implements // ------------------------------------------------------------------------------------------ /** - * Creates an BinaryStringData from given address (base and offset) and length. + * Creates a {@link BinaryStringData} instance from the given address (base and offset) and length. */ public static BinaryStringData fromAddress( MemorySegment[] segments, @@ -73,7 +73,7 @@ public final class BinaryStringData extends LazyBinaryFormat<String> implements } /** - * Creates an BinaryStringData from given java String. + * Creates a {@link BinaryStringData} instance from the given Java string. */ public static BinaryStringData fromString(String str) { if (str == null) { @@ -84,14 +84,14 @@ public final class BinaryStringData extends LazyBinaryFormat<String> implements } /** - * Creates an BinaryStringData from given UTF-8 bytes. + * Creates a {@link BinaryStringData} instance from the given UTF-8 bytes. */ public static BinaryStringData fromBytes(byte[] bytes) { return fromBytes(bytes, 0, bytes.length); } /** - * Creates an BinaryStringData from given UTF-8 bytes with offset and number of bytes. + * Creates a {@link BinaryStringData} instance from the given UTF-8 bytes with offset and number of bytes. */ public static BinaryStringData fromBytes(byte[] bytes, int offset, int numBytes) { return new BinaryStringData( @@ -101,7 +101,7 @@ public final class BinaryStringData extends LazyBinaryFormat<String> implements } /** - * Creates an BinaryStringData that contains `length` spaces. + * Creates a {@link BinaryStringData} instance that contains `length` spaces. */ public static BinaryStringData blankString(int length) { byte[] spaces = new byte[length]; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java index 12f3948..2ef6aa3 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java @@ -21,8 +21,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.table.data.binary.BinaryArrayData; import org.apache.flink.table.data.binary.BinarySegmentUtils; +import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LogicalType; +import java.io.Serializable; + /** * Writer for binary array. See {@link BinaryArrayData}. */ @@ -110,6 +113,10 @@ public final class BinaryArrayWriter extends AbstractBinaryWriter { setNullLong(ordinal); } + /** + * @deprecated Use {@link #createNullSetter(LogicalType)} for avoiding logical types during runtime. + */ + @Deprecated public void setNullAt(int pos, LogicalType type) { switch (type.getTypeRoot()) { case BOOLEAN: @@ -216,4 +223,66 @@ public final class BinaryArrayWriter extends AbstractBinaryWriter { public int getNumElements() { return numElements; } + + // -------------------------------------------------------------------------------------------- + + /** + * Creates an for accessor setting the elements of an array writer to {@code null} during runtime. + * + * @param elementType the element type of the array + */ + public static NullSetter createNullSetter(LogicalType elementType) { + // ordered by type root definition + switch (elementType.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + case DECIMAL: + case BIGINT: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case INTERVAL_DAY_TIME: + case ARRAY: + case MULTISET: + case MAP: + case ROW: + case STRUCTURED_TYPE: + case RAW: + return BinaryArrayWriter::setNullLong; + case BOOLEAN: + return BinaryArrayWriter::setNullBoolean; + case TINYINT: + return BinaryArrayWriter::setNullByte; + case SMALLINT: + return BinaryArrayWriter::setNullShort; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + return BinaryArrayWriter::setNullInt; + case FLOAT: + return BinaryArrayWriter::setNullFloat; + case DOUBLE: + return BinaryArrayWriter::setNullDouble; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); + case DISTINCT_TYPE: + return createNullSetter(((DistinctType) elementType).getSourceType()); + case NULL: + case SYMBOL: + case UNRESOLVED: + default: + throw new IllegalArgumentException(); + } + } + + /** + * Accessor for setting the elements of an array writer to {@code null} during runtime. + * + * @see #createNullSetter(LogicalType) + */ + public interface NullSetter extends Serializable { + void setNull(BinaryArrayWriter writer, int pos); + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java index 77f55ed..24d903c 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java @@ -26,15 +26,21 @@ import org.apache.flink.table.data.RawValueData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.types.InternalSerializers; import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer; import org.apache.flink.table.runtime.typeutils.MapDataSerializer; import org.apache.flink.table.runtime.typeutils.RawValueDataSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.TimestampType; +import java.io.Serializable; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; + /** * Writer to write a composite data format, like row, array. * 1. Invoke {@link #reset()}. @@ -89,6 +95,12 @@ public interface BinaryWriter { */ void complete(); + // -------------------------------------------------------------------------------------------- + + /** + * @deprecated Use {@link #createValueSetter(LogicalType)} for avoiding logical types during runtime. + */ + @Deprecated static void write( BinaryWriter writer, int pos, Object o, LogicalType type, TypeSerializer<?> serializer) { switch (type.getTypeRoot()) { @@ -154,4 +166,76 @@ public interface BinaryWriter { throw new UnsupportedOperationException("Not support type: " + type); } } + + /** + * Creates an accessor for setting the elements of an array writer during runtime. + * + * @param elementType the element type of the array + */ + static ValueSetter createValueSetter(LogicalType elementType) { + // ordered by type root definition + switch (elementType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return (writer, pos, value) -> writer.writeString(pos, (StringData) value); + case BOOLEAN: + return (writer, pos, value) -> writer.writeBoolean(pos, (boolean) value); + case BINARY: + case VARBINARY: + return (writer, pos, value) -> writer.writeBinary(pos, (byte[]) value); + case DECIMAL: + final int decimalPrecision = getPrecision(elementType); + return (writer, pos, value) -> writer.writeDecimal(pos, (DecimalData) value, decimalPrecision); + case TINYINT: + return (writer, pos, value) -> writer.writeByte(pos, (byte) value); + case SMALLINT: + return (writer, pos, value) -> writer.writeShort(pos, (short) value); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + return (writer, pos, value) -> writer.writeInt(pos, (int) value); + case BIGINT: + case INTERVAL_DAY_TIME: + return (writer, pos, value) -> writer.writeLong(pos, (long) value); + case FLOAT: + return (writer, pos, value) -> writer.writeFloat(pos, (float) value); + case DOUBLE: + return (writer, pos, value) -> writer.writeDouble(pos, (double) value); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(elementType); + return (writer, pos, value) -> writer.writeTimestamp(pos, (TimestampData) value, timestampPrecision); + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); + case ARRAY: + final ArrayDataSerializer arraySerializer = (ArrayDataSerializer) InternalSerializers.create(elementType); + return (writer, pos, value) -> writer.writeArray(pos, (ArrayData) value, arraySerializer); + case MULTISET: + case MAP: + final MapDataSerializer mapSerializer = (MapDataSerializer) InternalSerializers.create(elementType); + return (writer, pos, value) -> writer.writeMap(pos, (MapData) value, mapSerializer); + case ROW: + case STRUCTURED_TYPE: + final RowDataSerializer rowSerializer = (RowDataSerializer) InternalSerializers.create(elementType); + return (writer, pos, value) -> writer.writeRow(pos, (RowData) value, rowSerializer); + case DISTINCT_TYPE: + return createValueSetter(((DistinctType) elementType).getSourceType()); + case RAW: + final RawValueDataSerializer<?> rawSerializer = (RawValueDataSerializer<?>) InternalSerializers.create(elementType); + return (writer, pos, value) -> writer.writeRawValue(pos, (RawValueData<?>) value, rawSerializer); + case NULL: + case SYMBOL: + case UNRESOLVED: + default: + throw new IllegalArgumentException(); + } + } + + /** + * Accessor for setting the elements of an array writer during runtime. + */ + interface ValueSetter extends Serializable { + void setValue(BinaryArrayWriter writer, int pos, Object value); + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java index bd00eac..462a1ae 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java @@ -53,7 +53,14 @@ import org.apache.flink.table.types.logical.TypeInformationRawType; public class InternalSerializers { /** - * Get {@link TypeSerializer} for internal data structure from the given {@link LogicalType}. + * Creates a {@link TypeSerializer} for internal data structures of the given {@link LogicalType}. + */ + public static TypeSerializer<?> create(LogicalType type) { + return create(type, new ExecutionConfig()); + } + + /** + * Creates a {@link TypeSerializer} for internal data structures of the given {@link LogicalType}. */ public static TypeSerializer create(LogicalType type, ExecutionConfig config) { switch (type.getTypeRoot()) {