This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch pull/11651/head in repository https://gitbox.apache.org/repos/asf/flink.git
commit f24fe42d98e90e7419aaba8c250f44f6e855e6a5 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Apr 10 11:48:13 2020 +0200 Improve terminology and consistency of JavaDocs --- .../table/connector/sink/DynamicTableSink.java | 2 +- .../table/connector/source/ScanTableSource.java | 2 +- .../org/apache/flink/table/data/ArrayData.java | 82 +++++++------ .../org/apache/flink/table/data/DecimalData.java | 61 +++++----- .../apache/flink/table/data/GenericArrayData.java | 33 +++-- .../apache/flink/table/data/GenericMapData.java | 20 ++- .../apache/flink/table/data/GenericRowData.java | 135 +++++++++++---------- .../java/org/apache/flink/table/data/MapData.java | 16 ++- .../org/apache/flink/table/data/RawValueData.java | 29 +++-- .../java/org/apache/flink/table/data/RowData.java | 132 +++++++++++--------- .../org/apache/flink/table/data/StringData.java | 21 ++-- .../org/apache/flink/table/data/TimestampData.java | 85 +++++-------- 12 files changed, 337 insertions(+), 281 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java index 4cdb639..49a016f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java @@ -86,7 +86,7 @@ public interface DynamicTableSink { * in other Flink modules. * * <p>Independent of the provider interface, the table runtime expects that a sink implementation - * accepts internal data structures (see {@link org.apache.flink.table.data} for more information). + * accepts internal data structures (see {@link org.apache.flink.table.data.RowData} for more information). * * <p>The given {@link Context} offers utilities by the planner for creating runtime implementation * with minimal dependencies to internal data structures. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java index 47e0917..8e11476 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java @@ -73,7 +73,7 @@ public interface ScanTableSource extends DynamicTableSource { * in other Flink modules. * * <p>Independent of the provider interface, the table runtime expects that a source implementation - * emits internal data structures (see {@link org.apache.flink.table.data} for more information). + * emits internal data structures (see {@link org.apache.flink.table.data.RowData} for more information). * * <p>The given {@link Context} offers utilities by the planner for creating runtime implementation * with minimal dependencies to internal data structures. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java index e96d039..965c252 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java @@ -22,8 +22,12 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.types.logical.ArrayType; /** - * {@link ArrayData} is an internal data structure representing data of {@link ArrayType} - * in Flink Table/SQL, which only contains elements of the internal data structures. + * Base interface of an internal data structure representing data of {@link ArrayType}. + * + * <p>Note: All elements of this data structure must be internal data structures and must be of the + * same type. See {@link RowData} for more information about internal data structures. + * + * <p>Use {@link GenericArrayData} to construct instances of this interface from regular Java arrays. */ @PublicEvolving public interface ArrayData { @@ -34,85 +38,95 @@ public interface ArrayData { int size(); // ------------------------------------------------------------------------------------------ + // Read-only accessor methods + // ------------------------------------------------------------------------------------------ /** - * Returns true if the specific ordinal field is null. + * Returns true if the element is null at the given position. */ - boolean isNullAt(int ordinal); + boolean isNullAt(int pos); /** - * Gets boolean value from the specific ordinal. + * Returns the boolean value at the given position. */ - boolean getBoolean(int ordinal); + boolean getBoolean(int pos); /** - * Gets byte value from the specific ordinal. + * Returns the byte value at the given position. */ - byte getByte(int ordinal); + byte getByte(int pos); /** - * Gets short value from the specific ordinal. + * Returns the short value at the given position. */ - short getShort(int ordinal); + short getShort(int pos); /** - * Get int value from the specific ordinal. + * Returns the integer value at the given position. */ - int getInt(int ordinal); + int getInt(int pos); /** - * Get long value from the specific ordinal. + * Returns the long value at the given position. */ - long getLong(int ordinal); + long getLong(int pos); /** - * Get float value from the specific ordinal. + * Returns the float value at the given position. */ - float getFloat(int ordinal); + float getFloat(int pos); /** - * Get double value from the specific ordinal. + * Returns the double value at the given position. */ - double getDouble(int ordinal); + double getDouble(int pos); /** - * Get string value from the specific ordinal. + * Returns the string value at the given position. */ - StringData getString(int ordinal); + StringData getString(int pos); /** - * Get decimal value from the specific ordinal. + * Returns the decimal value at the given position. + * + * <p>The precision and scale are required to determine whether the decimal value was stored in a + * compact representation (see {@link DecimalData}). */ - DecimalData getDecimal(int ordinal, int precision, int scale); + DecimalData getDecimal(int pos, int precision, int scale); /** - * Get timestamp value from the specific ordinal. + * Returns the timestamp value at the given position. + * + * <p>The precision is required to determine whether the timestamp value was stored in a compact + * representation (see {@link TimestampData}). */ - TimestampData getTimestamp(int ordinal, int precision); + TimestampData getTimestamp(int pos, int precision); /** - * Get raw value from the specific ordinal. + * Returns the raw value at the given position. */ - <T> RawValueData<T> getRawValue(int ordinal); + <T> RawValueData<T> getRawValue(int pos); /** - * Get binary value from the specific ordinal. + * Returns the binary value at the given position. */ - byte[] getBinary(int ordinal); + byte[] getBinary(int pos); /** - * Get array value from the specific ordinal. + * Returns the array value at the given position. */ - ArrayData getArray(int ordinal); + ArrayData getArray(int pos); /** - * Get map value from the specific ordinal. + * Returns the map value at the given position. */ - MapData getMap(int ordinal); + MapData getMap(int pos); /** - * Get row value from the specific ordinal. + * Returns the row value at the given position. + * + * <p>The number of fields is required to correctly extract the row. */ - RowData getRow(int ordinal, int numFields); + RowData getRow(int pos, int numFields); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/DecimalData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/DecimalData.java index 246fbf8..5b922da 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/DecimalData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/DecimalData.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.types.logical.DecimalType; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.math.BigDecimal; import java.math.BigInteger; @@ -30,10 +31,10 @@ import java.math.RoundingMode; import static org.apache.flink.util.Preconditions.checkArgument; /** - * {@link DecimalData} is an internal data structure represents data of {@link DecimalType} - * in Flink Table/SQL. + * An internal data structure representing data of {@link DecimalType}. * - * <p>It is an immutable implementation which can hold a long if values are small enough. + * <p>This data structure is immutable and might store decimal values in a compact representation (as + * a long value) if values are small enough. */ @PublicEvolving public final class DecimalData implements Comparable<DecimalData> { @@ -92,22 +93,23 @@ public final class DecimalData implements Comparable<DecimalData> { // ------------------------------------------------------------------------------------------ /** - * Returns the <i>precision</i> of this {@code DecimalData}. - * The precision is the number of digits in the unscaled value. + * Returns the <i>precision</i> of this {@link DecimalData}. + * + * <p>The precision is the number of digits in the unscaled value. */ public int precision() { return precision; } /** - * Returns the <i>scale</i> of this {@code DecimalData}. + * Returns the <i>scale</i> of this {@link DecimalData}. */ public int scale() { return scale; } /** - * Converts this {@code DecimalData} object into {@code BigDecimal}. + * Converts this {@link DecimalData} into an instance of {@link BigDecimal}. */ public BigDecimal toBigDecimal() { BigDecimal bd = decimalVal; @@ -118,10 +120,9 @@ public final class DecimalData implements Comparable<DecimalData> { } /** - * Returns a long whose value is the <i>unscaled value</i> of this {@code DecimalData}. + * Returns a long describing the <i>unscaled value</i> of this {@link DecimalData}. * - * @return the unscaled value of this {@code DecimalData}. - * @throws ArithmeticException if the value of {@code this} will not exactly fit in a {@code long}. + * @throws ArithmeticException if this {@link DecimalData} does not exactly fit in a long. */ public long toUnscaledLong() { if (isCompact()) { @@ -132,9 +133,9 @@ public final class DecimalData implements Comparable<DecimalData> { } /** - * Returns a byte array whose value is the <i>unscaled value</i> of this {@code DecimalData}. + * Returns a byte array describing the <i>unscaled value</i> of this {@link DecimalData}. * - * @return the unscaled byte array value of this {@code DecimalData}. + * @return the unscaled byte array of this {@link DecimalData}. */ public byte[] toUnscaledBytes() { if (!isCompact()) { @@ -152,14 +153,14 @@ public final class DecimalData implements Comparable<DecimalData> { } /** - * Returns whether the decimal data is small enough to be stored in a long. + * Returns whether the decimal value is small enough to be stored in a long. */ public boolean isCompact() { return precision <= MAX_COMPACT_PRECISION; } /** - * Returns a copy of this {@code DecimalData} object. + * Returns a copy of this {@link DecimalData} object. */ public DecimalData copy() { return new DecimalData(precision, scale, longVal, decimalVal); @@ -196,22 +197,14 @@ public final class DecimalData implements Comparable<DecimalData> { // Constructor Utilities // ------------------------------------------------------------------------------------------ - // convert external BigDecimal to internal representation. - // first, value may be rounded to have the desired `scale` - // then `precision` is checked. if precision overflow, it will return `null` - /** - * Creates a {@code DecimalData} object from a {@code BigDecimal} object and - * the given precision and scale. + * Creates an instance of {@link DecimalData} from a {@link BigDecimal} and the given precision + * and scale. * - * <p>The return decimal value may be rounded to have the desired {@code scale}. - * The {@code precision} will be checked. If precision overflow, it will return null. - * - * @param bd the {@code BigDecimal} object. - * @param precision the specific precision - * @param scale the specific scale + * <p>The returned decimal value may be rounded to have the desired scale. The precision will be + * checked. If the precision overflows, null will be returned. */ - public static DecimalData fromBigDecimal(BigDecimal bd, int precision, int scale) { + public static @Nullable DecimalData fromBigDecimal(BigDecimal bd, int precision, int scale) { bd = bd.setScale(scale, RoundingMode.HALF_UP); if (bd.precision() > precision) { return null; @@ -225,8 +218,8 @@ public final class DecimalData implements Comparable<DecimalData> { } /** - * Creates a {@code DecimalData} object from an unscaled long value and - * the given precision and scale. + * Creates an instance of {@link DecimalData} from an unscaled long value and the given precision + * and scale. */ public static DecimalData fromUnscaledLong(long unscaledLong, int precision, int scale) { checkArgument(precision > 0 && precision <= MAX_LONG_DIGITS); @@ -235,8 +228,8 @@ public final class DecimalData implements Comparable<DecimalData> { } /** - * Creates a {@code DecimalData} object from an unscaled byte array value and - * the given precision and scale. + * Creates an instance of {@link DecimalData} from an unscaled byte array value and the given precision + * and scale. */ public static DecimalData fromUnscaledBytes(byte[] unscaledBytes, int precision, int scale) { if (precision > MAX_COMPACT_PRECISION) { @@ -253,9 +246,11 @@ public final class DecimalData implements Comparable<DecimalData> { } /** - * Creates a {@code DecimalData} for zero value with the given precision and scale. + * Creates an instance of {@link DecimalData} for a zero value with the given precision and scale. + * + * <p>The precision will be checked. If the precision overflows, null will be returned. */ - public static DecimalData zero(int precision, int scale) { + public static @Nullable DecimalData zero(int precision, int scale) { if (precision <= MAX_COMPACT_PRECISION) { return new DecimalData(precision, scale, 0, null); } else { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java index 5c51d07..33186fb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericArrayData.java @@ -19,13 +19,20 @@ package org.apache.flink.table.data; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.logical.ArrayType; import org.apache.commons.lang3.ArrayUtils; /** - * {@link GenericArrayData} is a generic implementation of {@link ArrayData} which - * wraps generic Java arrays. All the elements in the array should have the same type - * and should be in internal data structures. + * An internal data structure representing data of {@link ArrayType}. + * + * <p>{@link GenericArrayData} is a generic implementation of {@link ArrayData} which wraps regular + * Java arrays. + * + * <p>Note: All elements of this data structure must be internal data structures and must be of the + * same type. See {@link RowData} for more information about internal data structures. + * + * <p>For non-primitive arrays, elements can contain null for representing nullability. */ @PublicEvolving public final class GenericArrayData implements ArrayData { @@ -34,6 +41,11 @@ public final class GenericArrayData implements ArrayData { private final int size; private final boolean isPrimitiveArray; + /** + * Creates an instance of {@link GenericArrayData} using the given Java array. + * + * <p>Note: All elements of the array must be internal data structures. + */ public GenericArrayData(Object[] array) { this(array, array.length, false); } @@ -72,18 +84,19 @@ public final class GenericArrayData implements ArrayData { } /** - * Returns true if it is a primitive array. - * A primitive array is an array whose element is primitive type. + * Returns true if this is a primitive array. + * + * <p>A primitive array is an array whose elements are of primitive type. */ public boolean isPrimitiveArray() { return isPrimitiveArray; } /** - * Converts this {@code GenericArrayData} to an array of Java Object. - * It will convert primitive array into generic object array. - * But it will not convert internal data structures into external - * data structures (e.g. {@link StringData} to {@link String}). + * Converts this {@link GenericArrayData} into an array of Java {@link Object}. + * + * <p>The method will convert a primitive array into an object array. But it will not convert internal + * data structures into external data structures (e.g. {@link StringData} to {@link String}). */ public Object[] toObjectArray() { if (isPrimitiveArray) { @@ -103,7 +116,7 @@ public final class GenericArrayData implements ArrayData { } else if (boolean[].class.equals(arrayClass)) { return ArrayUtils.toObject((boolean[]) array); } - throw new RuntimeException("This is not a primary array: " + arrayClass); + throw new RuntimeException("Unsupported primitive array: " + arrayClass); } else { return (Object[]) array; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java index 7a90a65..62521e8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericMapData.java @@ -19,13 +19,22 @@ package org.apache.flink.table.data; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; import java.util.Map; /** - * {@link GenericMapData} is a generic implementation of {@link MapData} which - * wraps generic Java {@link Map}. All the keys in the map should have the same type - * and should be in internal data structures, the same to values in the map. + * An internal data structure representing data of {@link MapType} or {@link MultisetType}. + * + * <p>{@link GenericMapData} is a generic implementation of {@link MapData} which wraps regular + * Java maps. + * + * <p>Note: All keys and values of this data structure must be internal data structures. All keys must + * be of the same type; same for values. See {@link RowData} for more information about internal data + * structures. + * + * <p>Both keys and values can contain null for representing nullability. */ @PublicEvolving public final class GenericMapData implements MapData { @@ -33,10 +42,9 @@ public final class GenericMapData implements MapData { private final Map<?, ?> map; /** - * Creates a {@link GenericMapData} using the given Java Map. The key-value in the map must - * be internal data structures. + * Creates an instance of {@link GenericMapData} using the given Java map. * - * @see RowData for more information about internal data structures. + * <p>Note: All keys and values of the map must be internal data structures. */ public GenericMapData(Map<?, ?> map) { this.map = map; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java index bb9396b..5a3ac59 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java @@ -19,20 +19,26 @@ package org.apache.flink.table.data; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.types.RowKind; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link GenericRowData} can have arbitrary number of fields and contain a set of fields, - * which may all be different types. The fields in {@link GenericRowData} can be null. + * An internal data structure representing data of {@link RowType} and other (possibly nested) structured + * types such as {@link StructuredType}. * - * <p>The fields in the row can be accessed by position (zero-based) {@link #getInt}, - * and can be updated by {@link #setField(int, Object)}. All the fields should be in - * internal data structures. + * <p>{@link GenericRowData} is a generic implementation of {@link RowData} which is backed by an + * array of Java {@link Object}. A {@link GenericRowData} can have an arbitrary number of fields of + * different types. The fields in a row can be accessed by position (0-based) using either the generic + * {@link #getField(int)} or type-specific getters (such as {@link #getInt(int)}). A field can be updated + * by the generic {@link #setField(int, Object)}. * - * @see RowData for more information about the mapping of Table/SQL data types - * and internal data structures. + * <p>Note: All fields of this data structure must be internal data structures. See {@link RowData} for + * more information about internal data structures. + * + * <p>The fields in {@link GenericRowData} can be null for representing nullability. */ @PublicEvolving public final class GenericRowData implements RowData { @@ -40,12 +46,18 @@ public final class GenericRowData implements RowData { /** The array to store the actual internal format values. */ private final Object[] fields; - /** The changelog kind of this row. */ + /** The kind of change that a row describes in a changelog. */ private RowKind kind; /** - * Create a new GenericRow instance. - * @param arity The number of fields in the GenericRow + * Creates an instance of {@link GenericRowData} with given number of fields. + * + * <p>Initially, all fields are set to null. By default, the row describes a {@link RowKind#INSERT} + * in a changelog. + * + * <p>Note: All fields of the row must be internal data structures. + * + * @param arity number of fields */ public GenericRowData(int arity) { this.fields = new Object[arity]; @@ -53,34 +65,28 @@ public final class GenericRowData implements RowData { } /** - * Sets the field at the specified ordinal. + * Sets the field value at the given position. * - * <p>Note: the given field value must in internal data structures, otherwise the - * {@link GenericRowData} is corrupted, and may throw exception when processing. - * See the description of {@link RowData} for more information about internal data structures. + * <p>Note: The given field value must be an internal data structures. Otherwise the {@link GenericRowData} + * is corrupted and may throw exception when processing. See {@link RowData} for more information + * about internal data structures. * - * @param ordinal The ordinal of the field, 0-based. - * @param value The internal data value to be assigned to the field at the specified ordinal. - * @throws IndexOutOfBoundsException if the ordinal is negative, or equal to, or larger than - * the number of fields. + * <p>The field value can be null for representing nullability. */ - public void setField(int ordinal, Object value) { - this.fields[ordinal] = value; + public void setField(int pos, Object value) { + this.fields[pos] = value; } /** - * Gets the field at the specified ordinal. + * Returns the field value at the given position. * - * <p>Note: the returned value is in internal data structure. - * See the description of {@link RowData} for more information about internal data structures. + * <p>Note: The returned value is in internal data structure. See {@link RowData} for more information + * about internal data structures. * - * @param ordinal The ordinal of the field, 0-based. - * @return The field at the specified position. - * @throws IndexOutOfBoundsException if the ordinal is negative, or equal to, or larger than - * the number of fields. + * <p>The returned field value can be null for representing nullability. */ - public Object getField(int ordinal) { - return this.fields[ordinal]; + public Object getField(int pos) { + return this.fields[pos]; } @Override @@ -100,84 +106,84 @@ public final class GenericRowData implements RowData { } @Override - public boolean isNullAt(int ordinal) { - return this.fields[ordinal] == null; + public boolean isNullAt(int pos) { + return this.fields[pos] == null; } @Override - public boolean getBoolean(int ordinal) { - return (boolean) this.fields[ordinal]; + public boolean getBoolean(int pos) { + return (boolean) this.fields[pos]; } @Override - public byte getByte(int ordinal) { - return (byte) this.fields[ordinal]; + public byte getByte(int pos) { + return (byte) this.fields[pos]; } @Override - public short getShort(int ordinal) { - return (short) this.fields[ordinal]; + public short getShort(int pos) { + return (short) this.fields[pos]; } @Override - public int getInt(int ordinal) { - return (int) this.fields[ordinal]; + public int getInt(int pos) { + return (int) this.fields[pos]; } @Override - public long getLong(int ordinal) { - return (long) this.fields[ordinal]; + public long getLong(int pos) { + return (long) this.fields[pos]; } @Override - public float getFloat(int ordinal) { - return (float) this.fields[ordinal]; + public float getFloat(int pos) { + return (float) this.fields[pos]; } @Override - public double getDouble(int ordinal) { - return (double) this.fields[ordinal]; + public double getDouble(int pos) { + return (double) this.fields[pos]; } @Override - public StringData getString(int ordinal) { - return (StringData) this.fields[ordinal]; + public StringData getString(int pos) { + return (StringData) this.fields[pos]; } @Override - public DecimalData getDecimal(int ordinal, int precision, int scale) { - return (DecimalData) this.fields[ordinal]; + public DecimalData getDecimal(int pos, int precision, int scale) { + return (DecimalData) this.fields[pos]; } @Override - public TimestampData getTimestamp(int ordinal, int precision) { - return (TimestampData) this.fields[ordinal]; + public TimestampData getTimestamp(int pos, int precision) { + return (TimestampData) this.fields[pos]; } @SuppressWarnings("unchecked") @Override - public <T> RawValueData<T> getRawValue(int ordinal) { - return (RawValueData<T>) this.fields[ordinal]; + public <T> RawValueData<T> getRawValue(int pos) { + return (RawValueData<T>) this.fields[pos]; } @Override - public byte[] getBinary(int ordinal) { - return (byte[]) this.fields[ordinal]; + public byte[] getBinary(int pos) { + return (byte[]) this.fields[pos]; } @Override - public ArrayData getArray(int ordinal) { - return (ArrayData) this.fields[ordinal]; + public ArrayData getArray(int pos) { + return (ArrayData) this.fields[pos]; } @Override - public MapData getMap(int ordinal) { - return (MapData) this.fields[ordinal]; + public MapData getMap(int pos) { + return (MapData) this.fields[pos]; } @Override - public RowData getRow(int ordinal, int numFields) { - return (RowData) this.fields[ordinal]; + public RowData getRow(int pos, int numFields) { + return (RowData) this.fields[pos]; } // ---------------------------------------------------------------------------------------- @@ -185,10 +191,11 @@ public final class GenericRowData implements RowData { // ---------------------------------------------------------------------------------------- /** - * Creates a GenericRow with the given internal format values and a default - * {@link RowKind#INSERT} change kind. + * Creates an instance of {@link GenericRowData} with given field values. + * + * <p>By default, the row describes a {@link RowKind#INSERT} in a changelog. * - * @param values internal format values + * <p>Note: All fields of the row must be internal data structures. */ public static GenericRowData of(Object... values) { GenericRowData row = new GenericRowData(values.length); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/MapData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/MapData.java index 648005d..a583967 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/MapData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/MapData.java @@ -20,10 +20,16 @@ package org.apache.flink.table.data; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; /** - * {@link MapData} is an internal data structure representing data of {@link MapType} - * in Flink Table/SQL. + * Base interface of an internal data structure representing data of {@link MapType} or {@link MultisetType}. + * + * <p>Note: All keys and values of this data structure must be internal data structures. All keys must + * be of the same type; same for values. See {@link RowData} for more information about internal data + * structures. + * + * <p>Use {@link GenericMapData} to construct instances of this interface from regular Java maps. */ @PublicEvolving public interface MapData { @@ -35,13 +41,15 @@ public interface MapData { /** * Returns an array view of the keys contained in this map. - * A key-value pair has the same index in the {@code keyArray} and {@code valueArray}. + * + * <p>A key-value pair has the same index in the key array and value array. */ ArrayData keyArray(); /** * Returns an array view of the values contained in this map. - * A key-value pair has the same index in the {@code keyArray} and {@code valueArray}. + * + * <p>A key-value pair has the same index in the key array and value array. */ ArrayData valueArray(); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java index 0553a9a..92d7b51 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RawValueData.java @@ -23,8 +23,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.types.logical.RawType; /** - * {@link RawValueData} is a data structure representing data of {@link RawType} - * in Flink Table/SQL. + * An internal data structure representing data of {@link RawType}. + * + * <p>This data structure is immutable. * * @param <T> originating class for the raw value */ @@ -32,27 +33,31 @@ import org.apache.flink.table.types.logical.RawType; public interface RawValueData<T> { /** - * Converts a {@link RawValueData} into a Java object, the {@code serializer} - * is required because the "raw value" might be in binary format which can be - * deserialized by the {@code serializer}. + * Converts this {@link RawValueData} into a Java object. + * + * <p>The given serializer is required because the "raw value" might be represented in a binary format + * and needs to be deserialized first. * - * <p>Note: the returned Java object may be reused. + * <p>Note: The returned Java object may be reused. */ T toObject(TypeSerializer<T> serializer); /** - * Converts a {@link RawValueData} into a byte array, the {@code serializer} - * is required because the "raw value" might be in Java object format which - * can be serialized by the {@code serializer}. + * Converts this {@link RawValueData} into a byte array. + * + * <p>The given serializer is required because the "raw value" might be still be a Java object and + * needs to be serialized first. * - * <p>Note: the returned bytes may be reused. + * <p>Note: The returned byte array may be reused. */ byte[] toBytes(TypeSerializer<T> serializer); - // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------------------------ + // Constructor Utilities + // ------------------------------------------------------------------------------------------ /** - * Creates a {@link RawValueData} instance from a java object. + * Creates an instance of {@link RawValueData} from a Java object. */ static <T> RawValueData<T> fromObject(T javaObject) { // TODO diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java index 9c871c1..31213b6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java @@ -19,35 +19,47 @@ package org.apache.flink.table.data; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.types.RowKind; /** - * {@link RowData} is an internal data structure representing data of {@link RowType} - * in Flink Table/SQL, which only contains columns of the internal data structures. + * Base interface for an internal data structure representing data of {@link RowType} and other + * (possibly nested) structured types such as {@link StructuredType} in the table ecosystem. * - * <p>A {@link RowData} also contains a {@link RowKind} which represents the kind of row in - * a changelog. The {@link RowKind} is just a metadata information of row, not a column. + * <p>All top-level records that are travelling through Table API or SQL pipelines during runtime are + * instances of this interface. Each {@link RowData} contains a {@link RowKind} which represents the + * kind of change that a row describes in a changelog. The {@link RowKind} is just metadata information + * of row and thus not part of the table's schema, i.e., not a dedicated field. * - * <p>{@link RowData} has different implementations which are designed for different scenarios. - * For example, the binary-orient implementation {@code BinaryRowData} is backed by - * {@code MemorySegment} instead of Java Object to reduce the serialization/deserialization cost. - * The object-orient implementation {@code GenericRowData} is backed by an array of Java Object - * which is easy to construct and efficient to update. + * <p>Note: All fields of this data structure must be internal data structures. * - * <p>The mappings from Flink Table/SQL data types to the internal data structures are listed - * in the following table. + * <p>The {@link RowData} interface has different implementations which are designed for different + * scenarios: + * <ul> + * <li>The binary-oriented implementation {@code BinaryRowData} is backed by references to {@link MemorySegment} + * instead of using Java objects to reduce the serialization/deserialization overhead.</li> + * <li>The object-oriented implementation {@link GenericRowData} is backed by an array of Java {@link Object} + * which is easy to construct and efficient to update.</li> + * </ul> + * + * <p>{@link GenericRowData} is intended for public use and has stable behavior. It is recommended to + * construct instances of {@link RowData} with this class if internal data structures are required. + * + * <p>The mappings from Flink's Table API and SQL data types to the internal data structures are listed + * in the following table: * <pre> * +--------------------------------+-----------------------------------------+ * | SQL Data Types | Internal Data Structures | * +--------------------------------+-----------------------------------------+ * | BOOLEAN | boolean | * +--------------------------------+-----------------------------------------+ - * | CHAR / VARCHAR / STRING | StringData | + * | CHAR / VARCHAR / STRING | {@link StringData} | * +--------------------------------+-----------------------------------------+ * | BINARY / VARBINARY / BYTES | byte[] | * +--------------------------------+-----------------------------------------+ - * | DECIMAL | DecimalData | + * | DECIMAL | {@link DecimalData} | * +--------------------------------+-----------------------------------------+ * | TINYINT | byte | * +--------------------------------+-----------------------------------------+ @@ -65,36 +77,38 @@ import org.apache.flink.types.RowKind; * +--------------------------------+-----------------------------------------+ * | TIME | int (number of milliseconds of the day) | * +--------------------------------+-----------------------------------------+ - * | TIMESTAMP | TimestampData | + * | TIMESTAMP | {@link TimestampData} | * +--------------------------------+-----------------------------------------+ - * | TIMESTAMP WITH LOCAL TIME ZONE | TimestampData | + * | TIMESTAMP WITH LOCAL TIME ZONE | {@link TimestampData} | * +--------------------------------+-----------------------------------------+ * | INTERVAL YEAR TO MONTH | int (number of months) | * +--------------------------------+-----------------------------------------+ * | INTERVAL DAY TO MONTH | long (number of milliseconds) | * +--------------------------------+-----------------------------------------+ - * | ROW | RowData | + * | ROW / structured types | {@link RowData} | * +--------------------------------+-----------------------------------------+ - * | ARRAY | ArrayData | + * | ARRAY | {@link ArrayData} | * +--------------------------------+-----------------------------------------+ - * | MAP / MULTISET | MapData | + * | MAP / MULTISET | {@link MapData} | * +--------------------------------+-----------------------------------------+ - * | RAW | RawValueData | + * | RAW | {@link RawValueData} | * +--------------------------------+-----------------------------------------+ * </pre> + * + * <p>Nullability is always handled by the container data structure. */ @PublicEvolving public interface RowData { /** - * Get the number of fields in the RowData. + * Returns the number of fields in this row. * - * @return The number of fields in the RowData. + * <p>The number does not include {@link RowKind}. It is kept separately. */ int getArity(); /** - * Gets the kind of change that this row describes in a changelog. + * Returns the kind of change that this row describes in a changelog. * * @see RowKind */ @@ -108,84 +122,94 @@ public interface RowData { void setRowKind(RowKind kind); // ------------------------------------------------------------------------------------------ + // Read-only accessor methods + // ------------------------------------------------------------------------------------------ /** - * Returns true if the specific ordinal field is null. + * Returns true if the field is null at the given position. */ - boolean isNullAt(int ordinal); + boolean isNullAt(int pos); /** - * Gets boolean value from the specific ordinal. + * Returns the boolean value at the given position. */ - boolean getBoolean(int ordinal); + boolean getBoolean(int pos); /** - * Gets byte value from the specific ordinal. + * Returns the byte value at the given position. */ - byte getByte(int ordinal); + byte getByte(int pos); /** - * Gets short value from the specific ordinal. + * Returns the short value at the given position. */ - short getShort(int ordinal); + short getShort(int pos); /** - * Get int value from the specific ordinal. + * Returns the integer value at the given position. */ - int getInt(int ordinal); + int getInt(int pos); /** - * Get long value from the specific ordinal. + * Returns the long value at the given position. */ - long getLong(int ordinal); + long getLong(int pos); /** - * Get float value from the specific ordinal. + * Returns the float value at the given position. */ - float getFloat(int ordinal); + float getFloat(int pos); /** - * Get double value from the specific ordinal. + * Returns the double value at the given position. */ - double getDouble(int ordinal); + double getDouble(int pos); /** - * Get string value from the specific ordinal. + * Returns the string value at the given position. */ - StringData getString(int ordinal); + StringData getString(int pos); /** - * Get decimal value from the specific ordinal. + * Returns the decimal value at the given position. + * + * <p>The precision and scale are required to determine whether the decimal value was stored in a + * compact representation (see {@link DecimalData}). */ - DecimalData getDecimal(int ordinal, int precision, int scale); + DecimalData getDecimal(int pos, int precision, int scale); /** - * Get timestamp value from the specific ordinal. + * Returns the timestamp value at the given position. + * + * <p>The precision is required to determine whether the timestamp value was stored in a compact + * representation (see {@link TimestampData}). */ - TimestampData getTimestamp(int ordinal, int precision); + TimestampData getTimestamp(int pos, int precision); /** - * Get raw value from the specific ordinal. + * Returns the raw value at the given position. */ - <T> RawValueData<T> getRawValue(int ordinal); + <T> RawValueData<T> getRawValue(int pos); /** - * Get binary value from the specific ordinal. + * Returns the binary value at the given position. */ - byte[] getBinary(int ordinal); + byte[] getBinary(int pos); /** - * Get array value from the specific ordinal. + * Returns the array value at the given position. */ - ArrayData getArray(int ordinal); + ArrayData getArray(int pos); /** - * Get map value from the specific ordinal. + * Returns the map value at the given position. */ - MapData getMap(int ordinal); + MapData getMap(int pos); /** - * Get row value from the specific ordinal. + * Returns the row value at the given position. + * + * <p>The number of fields is required to correctly extract the row. */ - RowData getRow(int ordinal, int numFields); + RowData getRow(int pos, int numFields); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java index f0481c3..c2a18ba 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java @@ -23,27 +23,31 @@ import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.VarCharType; /** - * {@link StringData} is an internal data structure represents data of {@link VarCharType} - * and {@link CharType} in Flink Table/SQL. + * An internal data structure representing data of {@link CharType} and {@link VarCharType}. + * + * <p>This data structure is immutable. */ @PublicEvolving public interface StringData extends Comparable<StringData> { /** - * Converts this {@link StringData} object to a UTF-8 byte array and returns the byte array. - * The returned bytes may be reused. + * Converts this {@link StringData} object to a UTF-8 byte array. + * + * <p>Note: The returned byte array may be reused. */ byte[] toBytes(); /** - * Converts this {@link StringData} object to a {@link String} and returns the String. + * Converts this {@link StringData} object to a {@link String}. */ String toString(); // ------------------------------------------------------------------------------------------ + // Construction Utilities + // ------------------------------------------------------------------------------------------ /** - * Creates a {@link StringData} from the given String. + * Creates an instance of {@link StringData} from the given {@link String}. */ static StringData fromString(String str) { // TODO @@ -51,7 +55,7 @@ public interface StringData extends Comparable<StringData> { } /** - * Creates a {@link StringData} from the given UTF-8 bytes. + * Creates an instance of {@link StringData} from the given UTF-8 byte array. */ static StringData fromBytes(byte[] bytes) { // TODO @@ -59,7 +63,8 @@ public interface StringData extends Comparable<StringData> { } /** - * Creates a {@link StringData} from the given UTF-8 bytes with offset and number of bytes. + * Creates an instance of {@link StringData} from the given UTF-8 byte array with offset and number + * of bytes. */ static StringData fromBytes(byte[] bytes, int offset, int numBytes) { // TODO diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/TimestampData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/TimestampData.java index dad5772..6f34675 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/TimestampData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/TimestampData.java @@ -30,17 +30,15 @@ import java.time.LocalDateTime; import java.time.LocalTime; /** - * {@link TimestampData} is an internal data structure represents data of {@link TimestampType} - * and {@link LocalZonedTimestampType} in Flink Table/SQL. + * An internal data structure representing data of {@link TimestampType} and {@link LocalZonedTimestampType}. * - * <p>It is an immutable implementation which is composite of a millisecond - * and nanoOfMillisecond since {@code 1970-01-01 00:00:00}. + * <p>This data structure is immutable and consists of a milliseconds and nanos-of-millisecond since + * {@code 1970-01-01 00:00:00}. It might be stored in a compact representation (as a long value) if values + * are small enough. */ @PublicEvolving public final class TimestampData implements Comparable<TimestampData> { - private static final long serialVersionUID = 1L; - // the number of milliseconds in a day private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000 @@ -57,33 +55,30 @@ public final class TimestampData implements Comparable<TimestampData> { } /** - * Gets the number of milli-seconds since {@code 1970-01-01 00:00:00}. + * Returns the number of milliseconds since {@code 1970-01-01 00:00:00}. */ public long getMillisecond() { return millisecond; } /** - * Gets the number of nanoseconds the nanosecond within the millisecond. - * The value range is from 0 to 999,999. + * Returns the number of nanoseconds (the nanoseconds within the milliseconds). + * + * <p>The value range is from 0 to 999,999. */ public int getNanoOfMillisecond() { return nanoOfMillisecond; } /** - * Converts this {@code TimestampData} object to a {@link Timestamp}. - * - * @return An instance of {@link Timestamp} + * Converts this {@link TimestampData} object to a {@link Timestamp}. */ public Timestamp toTimestamp() { return Timestamp.valueOf(toLocalDateTime()); } /** - * Convert this {@code TimestampData} object to a {@link LocalDateTime}. - * - * @return An instance of {@link LocalDateTime} + * Converts this {@link TimestampData} object to a {@link LocalDateTime}. */ public LocalDateTime toLocalDateTime() { int date = (int) (millisecond / MILLIS_PER_DAY); @@ -99,9 +94,7 @@ public final class TimestampData implements Comparable<TimestampData> { } /** - * Convert this {@code TimestampData} object to a {@link Instant}. - * - * @return an instance of {@link Instant} + * Converts this {@link TimestampData} object to a {@link Instant}. */ public Instant toInstant() { long epochSecond = millisecond / 1000; @@ -145,46 +138,36 @@ public final class TimestampData implements Comparable<TimestampData> { } // ------------------------------------------------------------------------------------------ - // Construct Utilities + // Constructor Utilities // ------------------------------------------------------------------------------------------ /** - * Obtains an instance of {@code TimestampData} from a millisecond. + * Creates an instance of {@link TimestampData} from milliseconds. * - * <p>This returns a {@code TimestampData} with the specified millisecond. - * The nanoOfMillisecond field will be set to zero. + * <p>The nanos-of-millisecond field will be set to zero. * - * @param millisecond the number of milliseconds since {@code 1970-01-01 00:00:00} - * A negative number is the number of milliseconds before - * {@code 1970-01-01 00:00:00} - * @return an instance of {@code TimestampData} + * @param milliseconds the number of milliseconds since {@code 1970-01-01 00:00:00}; a negative number + * is the number of milliseconds before {@code 1970-01-01 00:00:00} */ - public static TimestampData fromEpochMillis(long millisecond) { - return new TimestampData(millisecond, 0); + public static TimestampData fromEpochMillis(long milliseconds) { + return new TimestampData(milliseconds, 0); } /** - * Obtains an instance of {@code TimestampData} from a millisecond and a nanoOfMillisecond. + * Creates an instance of {@link TimestampData} from milliseconds and a nanos-of-millisecond. * - * <p>This returns a {@code TimestampData} with the specified millisecond and nanoOfMillisecond. - * - * @param millisecond the number of milliseconds since {@code 1970-01-01 00:00:00} - * A negative number is the number of milliseconds before - * {@code 1970-01-01 00:00:00} - * @param nanoOfMillisecond the nanosecond within the millisecond, from 0 to 999,999 - * @return an instance of {@code TimestampData} + * @param milliseconds the number of milliseconds since {@code 1970-01-01 00:00:00}; a negative number + * is the number of milliseconds before {@code 1970-01-01 00:00:00} + * @param nanosOfMillisecond the nanoseconds within the millisecond, from 0 to 999,999 */ - public static TimestampData fromEpochMillis(long millisecond, int nanoOfMillisecond) { - return new TimestampData(millisecond, nanoOfMillisecond); + public static TimestampData fromEpochMillis(long milliseconds, int nanosOfMillisecond) { + return new TimestampData(milliseconds, nanosOfMillisecond); } /** - * Obtains an instance of {@code TimestampData} from an instance of {@link LocalDateTime}. - * - * <p>This returns a {@code TimestampData} with the specified {@link LocalDateTime}. + * Creates an instance of {@link TimestampData} from an instance of {@link LocalDateTime}. * * @param dateTime an instance of {@link LocalDateTime} - * @return an instance of {@code TimestampData} */ public static TimestampData fromLocalDateTime(LocalDateTime dateTime) { long epochDay = dateTime.toLocalDate().toEpochDay(); @@ -197,24 +180,18 @@ public final class TimestampData implements Comparable<TimestampData> { } /** - * Obtains an instance of {@code TimestampData} from an instance of {@link Timestamp}. + * Creates an instance of {@link TimestampData} from an instance of {@link Timestamp}. * - * <p>This returns a {@code TimestampData} with the specified {@link Timestamp}. - * - * @param ts an instance of {@link Timestamp} - * @return an instance of {@code TimestampData} + * @param timestamp an instance of {@link Timestamp} */ - public static TimestampData fromTimestamp(Timestamp ts) { - return fromLocalDateTime(ts.toLocalDateTime()); + public static TimestampData fromTimestamp(Timestamp timestamp) { + return fromLocalDateTime(timestamp.toLocalDateTime()); } /** - * Obtains an instance of {@code TimestampData} from an instance of {@link Instant}. - * - * <p>This returns a {@code TimestampData} with the specified {@link Instant}. + * Creates an instance of {@link TimestampData} from an instance of {@link Instant}. * * @param instant an instance of {@link Instant} - * @return an instance of {@code TimestampData} */ public static TimestampData fromInstant(Instant instant) { long epochSecond = instant.getEpochSecond(); @@ -227,7 +204,7 @@ public final class TimestampData implements Comparable<TimestampData> { } /** - * Returns whether the timestamp data is small enough to be stored in a long of millisecond. + * Returns whether the timestamp data is small enough to be stored in a long of milliseconds. */ public static boolean isCompact(int precision) { return precision <= 3;