This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3681e850ad01b973e0f2fbc3db70afd96bdfbec6
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Wed Sep 29 17:45:13 2021 +0200

    [FLINK-24399][table-common] Refactor ResolvedSchema#to[...]RowDataType() 
methods and add new ResolvedSchema#getPrimaryKeyIndexes
    
    Signed-off-by: slinkydeveloper <francescogu...@gmail.com>
---
 .../apache/flink/table/catalog/ResolvedSchema.java | 42 ++++++++++++----------
 1 file changed, 24 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
index 2baaeec..ff7ffb2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -35,6 +36,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -163,6 +165,15 @@ public final class ResolvedSchema {
         return Optional.ofNullable(primaryKey);
     }
 
+    /** Returns the primary key indexes, if any, otherwise returns an empty 
array. */
+    public int[] getPrimaryKeyIndexes() {
+        final List<String> columns = getColumnNames();
+        return getPrimaryKey()
+                .map(UniqueConstraint::getColumns)
+                .map(pkColumns -> 
pkColumns.stream().mapToInt(columns::indexOf).toArray())
+                .orElseGet(() -> new int[] {});
+    }
+
     /**
      * Converts all columns of this schema into a (possibly nested) row data 
type.
      *
@@ -177,10 +188,7 @@ public final class ResolvedSchema {
      * @see #toSinkRowDataType()
      */
     public DataType toSourceRowDataType() {
-        final DataTypes.Field[] fields =
-                
columns.stream().map(ResolvedSchema::columnToField).toArray(DataTypes.Field[]::new);
-        // the row should never be null
-        return ROW(fields).notNull();
+        return toRowDataType(c -> true);
     }
 
     /**
@@ -194,13 +202,7 @@ public final class ResolvedSchema {
      * @see #toSinkRowDataType()
      */
     public DataType toPhysicalRowDataType() {
-        final DataTypes.Field[] fields =
-                columns.stream()
-                        .filter(Column::isPhysical)
-                        .map(ResolvedSchema::columnToField)
-                        .toArray(DataTypes.Field[]::new);
-        // the row should never be null
-        return ROW(fields).notNull();
+        return toRowDataType(Column::isPhysical);
     }
 
     /**
@@ -217,13 +219,7 @@ public final class ResolvedSchema {
      * @see #toPhysicalRowDataType()
      */
     public DataType toSinkRowDataType() {
-        final DataTypes.Field[] fields =
-                columns.stream()
-                        .filter(Column::isPersisted)
-                        .map(ResolvedSchema::columnToField)
-                        .toArray(DataTypes.Field[]::new);
-        // the row should never be null
-        return ROW(fields).notNull();
+        return toRowDataType(Column::isPersisted);
     }
 
     @Override
@@ -261,6 +257,16 @@ public final class ResolvedSchema {
 
     // 
--------------------------------------------------------------------------------------------
 
+    private FieldsDataType toRowDataType(Predicate<Column> columnPredicate) {
+        final DataTypes.Field[] fields =
+                columns.stream()
+                        .filter(columnPredicate)
+                        .map(ResolvedSchema::columnToField)
+                        .toArray(DataTypes.Field[]::new);
+        // the row should never be null
+        return (FieldsDataType) ROW(fields).notNull();
+    }
+
     private static DataTypes.Field columnToField(Column column) {
         return FIELD(
                 column.getName(),

Reply via email to