This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 96a6337  [FLINK-17668][table] Fix shortcomings in new data structures
96a6337 is described below

commit 96a633725f1b4616e40576b97473308b289a8028
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed May 13 15:25:36 2020 +0200

    [FLINK-17668][table] Fix shortcomings in new data structures
    
    This fixes the following shortcomings in the new data structures:
    
    - The some data structures do not provide a hashCode/equals for testing.
    - RawValueData cannot be created from bytes.
    - Accessing elements requires dealing with logical types during runtime.
    - Null checks are performed multiple times during runtime even for types 
that are declared as NOT NULL.
    
    This closes #12130.
---
 .../org/apache/flink/table/data/ArrayData.java     | 110 +++++++++++++++++++++
 .../apache/flink/table/data/GenericArrayData.java  |  24 +++++
 .../apache/flink/table/data/GenericMapData.java    |  48 +++++++++
 .../apache/flink/table/data/GenericRowData.java    |  16 ++-
 .../org/apache/flink/table/data/RawValueData.java  |   9 +-
 .../java/org/apache/flink/table/data/RowData.java  | 110 +++++++++++++++++++++
 .../flink/table/data/binary/BinaryArrayData.java   |  33 ++++++-
 .../table/data/binary/BinaryRawValueData.java      |  31 ++++++
 .../flink/table/data/binary/BinaryStringData.java  |  10 +-
 .../flink/table/data/writer/BinaryArrayWriter.java |  69 +++++++++++++
 .../flink/table/data/writer/BinaryWriter.java      |  84 ++++++++++++++++
 .../table/runtime/types/InternalSerializers.java   |   9 +-
 12 files changed, 538 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
index 8f8a965..59c6b2f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -21,11 +21,20 @@ package org.apache.flink.table.data;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
 /**
  * Base interface of an internal data structure representing data of {@link 
ArrayType}.
  *
@@ -163,7 +172,9 @@ public interface ArrayData {
         * @param pos position of the element to return
         * @param elementType the element type of the array
         * @return the element object at the specified position in this array 
data
+        * @deprecated Use {@link #createElementGetter(LogicalType)} for 
avoiding logical types during runtime.
         */
+       @Deprecated
        static Object get(ArrayData array, int pos, LogicalType elementType) {
                if (array.isNullAt(pos)) {
                        return null;
@@ -215,4 +226,103 @@ public interface ArrayData {
                                throw new 
UnsupportedOperationException("Unsupported type: " + elementType);
                }
        }
+
+       /**
+        * Creates an accessor for getting elements in an internal array data 
structure at the
+        * given position.
+        *
+        * @param elementType the element type of the array
+        */
+       static ElementGetter createElementGetter(LogicalType elementType) {
+               final ElementGetter elementGetter;
+               // ordered by type root definition
+               switch (elementType.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                               elementGetter = ArrayData::getString;
+                               break;
+                       case BOOLEAN:
+                               elementGetter = ArrayData::getBoolean;
+                               break;
+                       case BINARY:
+                       case VARBINARY:
+                               elementGetter = ArrayData::getBinary;
+                               break;
+                       case DECIMAL:
+                               final int decimalPrecision = 
getPrecision(elementType);
+                               final int decimalScale = getScale(elementType);
+                               elementGetter = (array, pos) -> 
array.getDecimal(pos, decimalPrecision, decimalScale);
+                               break;
+                       case TINYINT:
+                               elementGetter = ArrayData::getByte;
+                               break;
+                       case SMALLINT:
+                               elementGetter = ArrayData::getShort;
+                               break;
+                       case INTEGER:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case INTERVAL_YEAR_MONTH:
+                               elementGetter = ArrayData::getInt;
+                               break;
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                               elementGetter = ArrayData::getLong;
+                               break;
+                       case FLOAT:
+                               elementGetter = ArrayData::getFloat;
+                               break;
+                       case DOUBLE:
+                               elementGetter = ArrayData::getDouble;
+                               break;
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                               final int timestampPrecision = 
getPrecision(elementType);
+                               elementGetter = (array, pos) -> 
array.getTimestamp(pos, timestampPrecision);
+                               break;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                               throw new UnsupportedOperationException();
+                       case ARRAY:
+                               elementGetter = ArrayData::getArray;
+                               break;
+                       case MULTISET:
+                       case MAP:
+                               elementGetter = ArrayData::getMap;
+                               break;
+                       case ROW:
+                       case STRUCTURED_TYPE:
+                               final int rowFieldCount = 
getFieldCount(elementType);
+                               elementGetter = (array, pos) -> 
array.getRow(pos, rowFieldCount);
+                               break;
+                       case DISTINCT_TYPE:
+                               elementGetter = 
createElementGetter(((DistinctType) elementType).getSourceType());
+                               break;
+                       case RAW:
+                               elementGetter = ArrayData::getRawValue;
+                               break;
+                       case NULL:
+                       case SYMBOL:
+                       case UNRESOLVED:
+                       default:
+                               throw new IllegalArgumentException();
+               }
+               if (!elementType.isNullable()) {
+                       return elementGetter;
+               }
+               return (array, pos) -> {
+                       if (array.isNullAt(pos)) {
+                               return null;
+                       }
+                       return elementGetter.getElementOrNull(array, pos);
+               };
+       }
+
+       /**
+        * Accessor for getting the elements of an array during runtime.
+        *
+        * @see #createElementGetter(LogicalType)
+        */
+       interface ElementGetter extends Serializable {
+               @Nullable Object getElementOrNull(ArrayData array, int pos);
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java
index 37babf5..0809012 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java
@@ -23,6 +23,9 @@ import org.apache.flink.table.types.logical.ArrayType;
 
 import org.apache.commons.lang3.ArrayUtils;
 
+import java.util.Arrays;
+import java.util.Objects;
+
 /**
  * An internal data structure representing data of {@link ArrayType}.
  *
@@ -132,6 +135,27 @@ public final class GenericArrayData implements ArrayData {
                return !isPrimitiveArray && ((Object[]) array)[pos] == null;
        }
 
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               GenericArrayData that = (GenericArrayData) o;
+               return size == that.size &&
+                       isPrimitiveArray == that.isPrimitiveArray &&
+                       Objects.deepEquals(array, that.array);
+       }
+
+       @Override
+       public int hashCode() {
+               int result = Objects.hash(size, isPrimitiveArray);
+               result = 31 * result + Arrays.deepHashCode(new Object[]{array});
+               return result;
+       }
+
        // 
------------------------------------------------------------------------------------------
        // Read-only accessor methods
        // 
------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java
index 2e942e8..381e993 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * An internal data structure representing data of {@link MapType} or {@link 
MultisetType}.
@@ -74,5 +75,52 @@ public final class GenericMapData implements MapData {
                Object[] values = map.values().toArray();
                return new GenericArrayData(values);
        }
+
+       @Override
+       public boolean equals(Object o) {
+               if (o == this) {
+                       return true;
+               }
+               if (!(o instanceof GenericMapData)) {
+                       return false;
+               }
+               // deepEquals for values of byte[]
+               return deepEquals(map, ((GenericMapData) o).map);
+       }
+
+       private static <K, V> boolean deepEquals(Map<K, V> m1, Map<?, ?> m2) {
+               // copied from HashMap.equals but with deepEquals comparision
+               if (m1.size() != m2.size()) {
+                       return false;
+               }
+               try {
+                       for (Map.Entry<K, V> e : m1.entrySet()) {
+                               K key = e.getKey();
+                               V value = e.getValue();
+                               if (value == null) {
+                                       if (!(m2.get(key) == null && 
m2.containsKey(key))) {
+                                               return false;
+                                       }
+                               } else {
+                                       if (!Objects.deepEquals(value, 
m2.get(key))) {
+                                               return false;
+                                       }
+                               }
+                       }
+               } catch (ClassCastException | NullPointerException unused) {
+                       return false;
+               }
+               return true;
+       }
+
+       @Override
+       public int hashCode() {
+               int result = 0;
+               for (Object key : map.keySet()) {
+                       // only include key because values can contain byte[]
+                       result += 31 * key.hashCode();
+               }
+               return result;
+       }
 }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
index 157a790..411e65e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
@@ -25,6 +25,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Arrays;
+import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -191,17 +192,22 @@ public final class GenericRowData implements RowData {
 
        @Override
        public boolean equals(Object o) {
-               if (o instanceof GenericRowData) {
-                       GenericRowData other = (GenericRowData) o;
-                       return kind == other.kind && Arrays.equals(fields, 
other.fields);
-               } else {
+               if (this == o) {
+                       return true;
+               }
+               if (!(o instanceof GenericRowData)) {
                        return false;
                }
+               GenericRowData that = (GenericRowData) o;
+               return kind == that.kind &&
+                       Arrays.deepEquals(fields, that.fields);
        }
 
        @Override
        public int hashCode() {
-               return 31 * kind.hashCode() + Arrays.hashCode(fields);
+               int result = Objects.hash(kind);
+               result = 31 * result + Arrays.deepHashCode(fields);
+               return result;
        }
 
        @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java
index fb98dc4..10ee835 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java
@@ -61,7 +61,14 @@ public interface RawValueData<T> {
         * Creates an instance of {@link RawValueData} from a Java object.
         */
        static <T> RawValueData<T> fromObject(T javaObject) {
-               return new BinaryRawValueData<>(javaObject);
+               return BinaryRawValueData.fromObject(javaObject);
+       }
+
+       /**
+        * Creates an instance of {@link RawValueData} from the given byte 
array.
+        */
+       static <T> RawValueData<T> fromBytes(byte[] bytes) {
+               return BinaryRawValueData.fromBytes(bytes);
        }
 
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
index 929d47b..e4f2c5a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.data;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -28,6 +29,14 @@ import org.apache.flink.table.types.logical.StructuredType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.types.RowKind;
 
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
 /**
  * Base interface for an internal data structure representing data of {@link 
RowType} and other
  * (possibly nested) structured types such as {@link StructuredType} in the 
table ecosystem.
@@ -228,6 +237,7 @@ public interface RowData {
         * @param pos position of the field to return
         * @param fieldType the field type
         * @return the field object at the specified position in this row data.
+        * @deprecated Use {@link #createFieldGetter(LogicalType, int)} for 
avoiding logical types during runtime.
         */
        static Object get(RowData row, int pos, LogicalType fieldType) {
                if (row.isNullAt(pos)) {
@@ -280,4 +290,104 @@ public interface RowData {
                                throw new 
UnsupportedOperationException("Unsupported type: " + fieldType);
                }
        }
+
+       /**
+        * Creates an accessor for getting elements in an internal row data 
structure at the
+        * given position.
+        *
+        * @param fieldType the element type of the row
+        * @param fieldPos the element type of the row
+        */
+       static FieldGetter createFieldGetter(LogicalType fieldType, int 
fieldPos) {
+               final FieldGetter fieldGetter;
+               // ordered by type root definition
+               switch (fieldType.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                               fieldGetter = row -> row.getString(fieldPos);
+                               break;
+                       case BOOLEAN:
+                               fieldGetter = row -> row.getBoolean(fieldPos);
+                               break;
+                       case BINARY:
+                       case VARBINARY:
+                               fieldGetter = row -> row.getBinary(fieldPos);
+                               break;
+                       case DECIMAL:
+                               final int decimalPrecision = 
getPrecision(fieldType);
+                               final int decimalScale = getScale(fieldType);
+                               fieldGetter = row -> row.getDecimal(fieldPos, 
decimalPrecision, decimalScale);
+                               break;
+                       case TINYINT:
+                               fieldGetter = row -> row.getByte(fieldPos);
+                               break;
+                       case SMALLINT:
+                               fieldGetter = row -> row.getShort(fieldPos);
+                               break;
+                       case INTEGER:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case INTERVAL_YEAR_MONTH:
+                               fieldGetter = row -> row.getInt(fieldPos);
+                               break;
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                               fieldGetter = row -> row.getLong(fieldPos);
+                               break;
+                       case FLOAT:
+                               fieldGetter = row -> row.getFloat(fieldPos);
+                               break;
+                       case DOUBLE:
+                               fieldGetter = row -> row.getDouble(fieldPos);
+                               break;
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                               final int timestampPrecision = 
getPrecision(fieldType);
+                               fieldGetter = row -> row.getTimestamp(fieldPos, 
timestampPrecision);
+                               break;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                               throw new UnsupportedOperationException();
+                       case ARRAY:
+                               fieldGetter = row -> row.getArray(fieldPos);
+                               break;
+                       case MULTISET:
+                       case MAP:
+                               fieldGetter = row -> row.getMap(fieldPos);
+                               break;
+                       case ROW:
+                       case STRUCTURED_TYPE:
+                               final int rowFieldCount = 
getFieldCount(fieldType);
+                               fieldGetter = row -> row.getRow(fieldPos, 
rowFieldCount);
+                               break;
+                       case DISTINCT_TYPE:
+                               fieldGetter = createFieldGetter(((DistinctType) 
fieldType).getSourceType(), fieldPos);
+                               break;
+                       case RAW:
+                               fieldGetter = row -> row.getRawValue(fieldPos);
+                               break;
+                       case NULL:
+                       case SYMBOL:
+                       case UNRESOLVED:
+                       default:
+                               throw new IllegalArgumentException();
+               }
+               if (!fieldType.isNullable()) {
+                       return fieldGetter;
+               }
+               return row -> {
+                       if (row.isNullAt(fieldPos)) {
+                               return null;
+                       }
+                       return fieldGetter.getFieldOrNull(row);
+               };
+       }
+
+       /**
+        * Accessor for getting the field of a row during runtime.
+        *
+        * @see #createFieldGetter(LogicalType, int)
+        */
+       interface FieldGetter extends Serializable {
+               @Nullable Object getFieldOrNull(RowData row);
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
index 30b94e9..40526d5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.RawValueData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 
@@ -70,10 +71,33 @@ public final class BinaryArrayData extends BinarySection 
implements ArrayData, T
         * It store the length and offset of variable-length part when type is 
string, map, etc.
         */
        public static int calculateFixLengthPartSize(LogicalType type) {
+               // ordered by type root definition
                switch (type.getTypeRoot()) {
                        case BOOLEAN:
                        case TINYINT:
                                return 1;
+                       case CHAR:
+                       case VARCHAR:
+                       case BINARY:
+                       case VARBINARY:
+                       case DECIMAL:
+                       case BIGINT:
+                       case DOUBLE:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                       case INTERVAL_DAY_TIME:
+                       case ARRAY:
+                       case MULTISET:
+                       case MAP:
+                       case ROW:
+                       case STRUCTURED_TYPE:
+                       case RAW:
+                               // long and double are 8 bytes;
+                               // otherwise it stores the length and offset of 
the variable-length part for types
+                               // such as is string, map, etc.
+                               return 8;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                               throw new UnsupportedOperationException();
                        case SMALLINT:
                                return 2;
                        case INTEGER:
@@ -82,10 +106,13 @@ public final class BinaryArrayData extends BinarySection 
implements ArrayData, T
                        case TIME_WITHOUT_TIME_ZONE:
                        case INTERVAL_YEAR_MONTH:
                                return 4;
+                       case DISTINCT_TYPE:
+                               return 
calculateFixLengthPartSize(((DistinctType) type).getSourceType());
+                       case NULL:
+                       case SYMBOL:
+                       case UNRESOLVED:
                        default:
-                               // long, double is 8 bytes.
-                               // It store the length and offset of 
variable-length part when type is string, map, etc.
-                               return 8;
+                               throw new IllegalArgumentException();
                }
        }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
index ff5921b..b87b417 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
@@ -104,4 +104,35 @@ public final class BinaryRawValueData<T> extends 
LazyBinaryFormat<T> implements
                        throw new RuntimeException(e);
                }
        }
+
+       // 
------------------------------------------------------------------------------------------
+       // Construction Utilities
+       // 
------------------------------------------------------------------------------------------
+
+       /**
+        * Creates a {@link BinaryRawValueData} instance from the given Java 
object.
+        */
+       public static <T> BinaryRawValueData<T> fromObject(T javaObject) {
+               if (javaObject == null) {
+                       return null;
+               }
+               return new BinaryRawValueData<>(javaObject);
+       }
+
+       /**
+        * Creates a {@link BinaryStringData} instance from the given bytes.
+        */
+       public static <T> BinaryRawValueData<T> fromBytes(byte[] bytes) {
+               return fromBytes(bytes, 0, bytes.length);
+       }
+
+       /**
+        * Creates a {@link BinaryStringData} instance from the given bytes 
with offset and number of bytes.
+        */
+       public static <T> BinaryRawValueData<T> fromBytes(byte[] bytes, int 
offset, int numBytes) {
+               return new BinaryRawValueData<>(
+                       new MemorySegment[] {MemorySegmentFactory.wrap(bytes)},
+                       offset,
+                       numBytes);
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
index 5e85554..bfcf855 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
@@ -63,7 +63,7 @@ public final class BinaryStringData extends 
LazyBinaryFormat<String> implements
        // 
------------------------------------------------------------------------------------------
 
        /**
-        * Creates an BinaryStringData from given address (base and offset) and 
length.
+        * Creates a {@link BinaryStringData} instance from the given address 
(base and offset) and length.
         */
        public static BinaryStringData fromAddress(
                        MemorySegment[] segments,
@@ -73,7 +73,7 @@ public final class BinaryStringData extends 
LazyBinaryFormat<String> implements
        }
 
        /**
-        * Creates an BinaryStringData from given java String.
+        * Creates a {@link BinaryStringData} instance from the given Java 
string.
         */
        public static BinaryStringData fromString(String str) {
                if (str == null) {
@@ -84,14 +84,14 @@ public final class BinaryStringData extends 
LazyBinaryFormat<String> implements
        }
 
        /**
-        * Creates an BinaryStringData from given UTF-8 bytes.
+        * Creates a {@link BinaryStringData} instance from the given UTF-8 
bytes.
         */
        public static BinaryStringData fromBytes(byte[] bytes) {
                return fromBytes(bytes, 0, bytes.length);
        }
 
        /**
-        * Creates an BinaryStringData from given UTF-8 bytes with offset and 
number of bytes.
+        * Creates a {@link BinaryStringData} instance from the given UTF-8 
bytes with offset and number of bytes.
         */
        public static BinaryStringData fromBytes(byte[] bytes, int offset, int 
numBytes) {
                return new BinaryStringData(
@@ -101,7 +101,7 @@ public final class BinaryStringData extends 
LazyBinaryFormat<String> implements
        }
 
        /**
-        * Creates an BinaryStringData that contains `length` spaces.
+        * Creates a {@link BinaryStringData} instance that contains `length` 
spaces.
         */
        public static BinaryStringData blankString(int length) {
                byte[] spaces = new byte[length];
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
index 12f3948..2ef6aa3 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
@@ -21,8 +21,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.data.binary.BinaryArrayData;
 import org.apache.flink.table.data.binary.BinarySegmentUtils;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import java.io.Serializable;
+
 /**
  * Writer for binary array. See {@link BinaryArrayData}.
  */
@@ -110,6 +113,10 @@ public final class BinaryArrayWriter extends 
AbstractBinaryWriter {
                setNullLong(ordinal);
        }
 
+       /**
+        * @deprecated Use {@link #createNullSetter(LogicalType)} for avoiding 
logical types during runtime.
+        */
+       @Deprecated
        public void setNullAt(int pos, LogicalType type) {
                switch (type.getTypeRoot()) {
                        case BOOLEAN:
@@ -216,4 +223,66 @@ public final class BinaryArrayWriter extends 
AbstractBinaryWriter {
        public int getNumElements() {
                return numElements;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Creates an for accessor setting the elements of an array writer to 
{@code null} during runtime.
+        *
+        * @param elementType the element type of the array
+        */
+       public static NullSetter createNullSetter(LogicalType elementType) {
+               // ordered by type root definition
+               switch (elementType.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                       case BINARY:
+                       case VARBINARY:
+                       case DECIMAL:
+                       case BIGINT:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                       case INTERVAL_DAY_TIME:
+                       case ARRAY:
+                       case MULTISET:
+                       case MAP:
+                       case ROW:
+                       case STRUCTURED_TYPE:
+                       case RAW:
+                               return BinaryArrayWriter::setNullLong;
+                       case BOOLEAN:
+                               return BinaryArrayWriter::setNullBoolean;
+                       case TINYINT:
+                               return BinaryArrayWriter::setNullByte;
+                       case SMALLINT:
+                               return BinaryArrayWriter::setNullShort;
+                       case INTEGER:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case INTERVAL_YEAR_MONTH:
+                               return BinaryArrayWriter::setNullInt;
+                       case FLOAT:
+                               return BinaryArrayWriter::setNullFloat;
+                       case DOUBLE:
+                               return BinaryArrayWriter::setNullDouble;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                               throw new UnsupportedOperationException();
+                       case DISTINCT_TYPE:
+                               return createNullSetter(((DistinctType) 
elementType).getSourceType());
+                       case NULL:
+                       case SYMBOL:
+                       case UNRESOLVED:
+                       default:
+                               throw new IllegalArgumentException();
+               }
+       }
+
+       /**
+        * Accessor for setting the elements of an array writer to {@code null} 
during runtime.
+        *
+        * @see #createNullSetter(LogicalType)
+        */
+       public interface NullSetter extends Serializable {
+               void setNull(BinaryArrayWriter writer, int pos);
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
index 77f55ed..24d903c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryWriter.java
@@ -26,15 +26,21 @@ import org.apache.flink.table.data.RawValueData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.types.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer;
 import org.apache.flink.table.runtime.typeutils.MapDataSerializer;
 import org.apache.flink.table.runtime.typeutils.RawValueDataSerializer;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.TimestampType;
 
+import java.io.Serializable;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+
 /**
  * Writer to write a composite data format, like row, array.
  * 1. Invoke {@link #reset()}.
@@ -89,6 +95,12 @@ public interface BinaryWriter {
         */
        void complete();
 
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * @deprecated Use {@link #createValueSetter(LogicalType)} for avoiding 
logical types during runtime.
+        */
+       @Deprecated
        static void write(
                        BinaryWriter writer, int pos, Object o, LogicalType 
type, TypeSerializer<?> serializer) {
                switch (type.getTypeRoot()) {
@@ -154,4 +166,76 @@ public interface BinaryWriter {
                                throw new UnsupportedOperationException("Not 
support type: " + type);
                }
        }
+
+       /**
+        * Creates an accessor for setting the elements of an array writer 
during runtime.
+        *
+        * @param elementType the element type of the array
+        */
+       static ValueSetter createValueSetter(LogicalType elementType) {
+               // ordered by type root definition
+               switch (elementType.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                               return (writer, pos, value) -> 
writer.writeString(pos, (StringData) value);
+                       case BOOLEAN:
+                               return (writer, pos, value) -> 
writer.writeBoolean(pos, (boolean) value);
+                       case BINARY:
+                       case VARBINARY:
+                               return (writer, pos, value) -> 
writer.writeBinary(pos, (byte[]) value);
+                       case DECIMAL:
+                               final int decimalPrecision = 
getPrecision(elementType);
+                               return (writer, pos, value) -> 
writer.writeDecimal(pos, (DecimalData) value, decimalPrecision);
+                       case TINYINT:
+                               return (writer, pos, value) -> 
writer.writeByte(pos, (byte) value);
+                       case SMALLINT:
+                               return (writer, pos, value) -> 
writer.writeShort(pos, (short) value);
+                       case INTEGER:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case INTERVAL_YEAR_MONTH:
+                               return (writer, pos, value) -> 
writer.writeInt(pos, (int) value);
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                               return (writer, pos, value) -> 
writer.writeLong(pos, (long) value);
+                       case FLOAT:
+                               return (writer, pos, value) -> 
writer.writeFloat(pos, (float) value);
+                       case DOUBLE:
+                               return (writer, pos, value) -> 
writer.writeDouble(pos, (double) value);
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                               final int timestampPrecision = 
getPrecision(elementType);
+                               return (writer, pos, value) -> 
writer.writeTimestamp(pos, (TimestampData) value, timestampPrecision);
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                               throw new UnsupportedOperationException();
+                       case ARRAY:
+                               final ArrayDataSerializer arraySerializer = 
(ArrayDataSerializer) InternalSerializers.create(elementType);
+                               return (writer, pos, value) -> 
writer.writeArray(pos, (ArrayData) value, arraySerializer);
+                       case MULTISET:
+                       case MAP:
+                               final MapDataSerializer mapSerializer = 
(MapDataSerializer) InternalSerializers.create(elementType);
+                               return (writer, pos, value) -> 
writer.writeMap(pos, (MapData) value, mapSerializer);
+                       case ROW:
+                       case STRUCTURED_TYPE:
+                               final RowDataSerializer rowSerializer = 
(RowDataSerializer) InternalSerializers.create(elementType);
+                               return (writer, pos, value) -> 
writer.writeRow(pos, (RowData) value, rowSerializer);
+                       case DISTINCT_TYPE:
+                               return createValueSetter(((DistinctType) 
elementType).getSourceType());
+                       case RAW:
+                               final RawValueDataSerializer<?> rawSerializer = 
(RawValueDataSerializer<?>) InternalSerializers.create(elementType);
+                               return (writer, pos, value) -> 
writer.writeRawValue(pos, (RawValueData<?>) value, rawSerializer);
+                       case NULL:
+                       case SYMBOL:
+                       case UNRESOLVED:
+                       default:
+                               throw new IllegalArgumentException();
+               }
+       }
+
+       /**
+        * Accessor for setting the elements of an array writer during runtime.
+        */
+       interface ValueSetter extends Serializable {
+               void setValue(BinaryArrayWriter writer, int pos, Object value);
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
index bd00eac..462a1ae 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/InternalSerializers.java
@@ -53,7 +53,14 @@ import 
org.apache.flink.table.types.logical.TypeInformationRawType;
 public class InternalSerializers {
 
        /**
-        * Get {@link TypeSerializer} for internal data structure from the 
given {@link LogicalType}.
+        * Creates a {@link TypeSerializer} for internal data structures of the 
given {@link LogicalType}.
+        */
+       public static TypeSerializer<?> create(LogicalType type) {
+               return create(type, new ExecutionConfig());
+       }
+
+       /**
+        * Creates a {@link TypeSerializer} for internal data structures of the 
given {@link LogicalType}.
         */
        public static TypeSerializer create(LogicalType type, ExecutionConfig 
config) {
                switch (type.getTypeRoot()) {

Reply via email to