This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3681e850ad01b973e0f2fbc3db70afd96bdfbec6 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Wed Sep 29 17:45:13 2021 +0200 [FLINK-24399][table-common] Refactor ResolvedSchema#to[...]RowDataType() methods and add new ResolvedSchema#getPrimaryKeyIndexes Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../apache/flink/table/catalog/ResolvedSchema.java | 42 ++++++++++++---------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java index 2baaeec..ff7ffb2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java @@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -163,6 +165,15 @@ public final class ResolvedSchema { return Optional.ofNullable(primaryKey); } + /** Returns the primary key indexes, if any, otherwise returns an empty array. */ + public int[] getPrimaryKeyIndexes() { + final List<String> columns = getColumnNames(); + return getPrimaryKey() + .map(UniqueConstraint::getColumns) + .map(pkColumns -> pkColumns.stream().mapToInt(columns::indexOf).toArray()) + .orElseGet(() -> new int[] {}); + } + /** * Converts all columns of this schema into a (possibly nested) row data type. * @@ -177,10 +188,7 @@ public final class ResolvedSchema { * @see #toSinkRowDataType() */ public DataType toSourceRowDataType() { - final DataTypes.Field[] fields = - columns.stream().map(ResolvedSchema::columnToField).toArray(DataTypes.Field[]::new); - // the row should never be null - return ROW(fields).notNull(); + return toRowDataType(c -> true); } /** @@ -194,13 +202,7 @@ public final class ResolvedSchema { * @see #toSinkRowDataType() */ public DataType toPhysicalRowDataType() { - final DataTypes.Field[] fields = - columns.stream() - .filter(Column::isPhysical) - .map(ResolvedSchema::columnToField) - .toArray(DataTypes.Field[]::new); - // the row should never be null - return ROW(fields).notNull(); + return toRowDataType(Column::isPhysical); } /** @@ -217,13 +219,7 @@ public final class ResolvedSchema { * @see #toPhysicalRowDataType() */ public DataType toSinkRowDataType() { - final DataTypes.Field[] fields = - columns.stream() - .filter(Column::isPersisted) - .map(ResolvedSchema::columnToField) - .toArray(DataTypes.Field[]::new); - // the row should never be null - return ROW(fields).notNull(); + return toRowDataType(Column::isPersisted); } @Override @@ -261,6 +257,16 @@ public final class ResolvedSchema { // -------------------------------------------------------------------------------------------- + private FieldsDataType toRowDataType(Predicate<Column> columnPredicate) { + final DataTypes.Field[] fields = + columns.stream() + .filter(columnPredicate) + .map(ResolvedSchema::columnToField) + .toArray(DataTypes.Field[]::new); + // the row should never be null + return (FieldsDataType) ROW(fields).notNull(); + } + private static DataTypes.Field columnToField(Column column) { return FIELD( column.getName(),