JingsongLi commented on code in PR #396:
URL: https://github.com/apache/flink-table-store/pull/396#discussion_r1031267835


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java:
##########
@@ -62,4 +72,193 @@ public static int[] createIndexMapping(
         }
         return null;
     }
+
+    /**
+     * Create index mapping from table projection to underlying data 
projection.
+     *
+     * @param tableProjection the table projection
+     * @param tableFields the fields in table
+     * @param dataProjection the underlying data projection
+     * @param dataFields the fields in underlying data
+     * @return the index mapping
+     */
+    @Nullable
+    public static int[] createIndexMapping(
+            int[] tableProjection,
+            List<DataField> tableFields,
+            int[] dataProjection,
+            List<DataField> dataFields) {
+        return createIndexMapping(
+                tableProjection,
+                tableProjection.length,
+                tableFields,
+                Collections.emptyList(),
+                dataProjection,
+                dataProjection.length,
+                dataFields,
+                Collections.emptyList());
+    }
+
+    /**
+     * Create index mapping from table projection to underlying data 
projection for key value.
+     *
+     * @param tableProjection the table projection
+     * @param tableKeyCount the key count in table
+     * @param tableKeyFields the key fields in table
+     * @param tableValueFields the value fields in table
+     * @param dataProjection the data projection
+     * @param dataKeyCount the data key count
+     * @param dataKeyFields the fields in underlying data
+     * @param dataValueFields the fields in underlying data
+     * @return the index mapping
+     */
+    @Nullable
+    public static int[] createIndexMapping(
+            int[] tableProjection,
+            int tableKeyCount,
+            List<DataField> tableKeyFields,
+            List<DataField> tableValueFields,
+            int[] dataProjection,
+            int dataKeyCount,
+            List<DataField> dataKeyFields,
+            List<DataField> dataValueFields) {
+        List<Integer> tableKeyFieldIdList =
+                
tableKeyFields.stream().map(DataField::id).collect(Collectors.toList());
+        List<Integer> dataKeyFieldIdList =
+                
dataKeyFields.stream().map(DataField::id).collect(Collectors.toList());
+        int[] indexMapping = new int[tableProjection.length];
+
+        int[] dataKeyProjection = Arrays.copyOf(dataProjection, dataKeyCount);
+        for (int i = 0; i < tableKeyCount; i++) {
+            int fieldId = tableKeyFieldIdList.get(tableProjection[i]);
+            int dataFieldIndex = dataKeyFieldIdList.indexOf(fieldId);
+            indexMapping[i] = Ints.indexOf(dataKeyProjection, dataFieldIndex);
+        }
+        if (tableProjection.length >= tableKeyCount + 2) {
+            // seq and value kind
+            for (int i = tableKeyCount; i < tableKeyCount + 2; i++) {
+                indexMapping[i] = i + dataKeyCount - tableKeyCount;
+            }
+
+            int[] dataValueProjection =
+                    Arrays.copyOfRange(dataProjection, dataKeyCount + 2, 
dataProjection.length);
+            for (int i = 0; i < dataValueProjection.length; i++) {
+                dataValueProjection[i] = dataValueProjection[i] - 
dataKeyFields.size() - 2;
+            }
+            List<Integer> tableValueFieldIdList =
+                    
tableValueFields.stream().map(DataField::id).collect(Collectors.toList());
+            List<Integer> dataValueFieldIdList =
+                    
dataValueFields.stream().map(DataField::id).collect(Collectors.toList());
+            for (int i = tableKeyCount + 2; i < tableProjection.length; i++) {
+                int fieldId =
+                        tableValueFieldIdList.get(tableProjection[i] - 
tableKeyFields.size() - 2);
+                int dataFieldIndex = dataValueFieldIdList.indexOf(fieldId);
+                int dataValueIndex = Ints.indexOf(dataValueProjection, 
dataFieldIndex);
+                indexMapping[i] =
+                        dataValueIndex < 0 ? dataValueIndex : dataValueIndex + 
dataKeyCount + 2;
+            }
+        }
+
+        for (int i = 0; i < indexMapping.length; i++) {
+            if (indexMapping[i] != i) {
+                return indexMapping;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Create data projection from table projection.
+     *
+     * @param tableFields the fields of table
+     * @param dataFields the fields of underlying data
+     * @param tableProjection the projection of table
+     * @return the projection of data
+     */
+    public static int[][] createDataProjection(
+            List<DataField> tableFields, List<DataField> dataFields, int[][] 
tableProjection) {
+        List<Integer> tableFieldIdList =
+                
tableFields.stream().map(DataField::id).collect(Collectors.toList());
+        List<Integer> dataFieldIdList =
+                
dataFields.stream().map(DataField::id).collect(Collectors.toList());
+        return Arrays.stream(tableProjection)
+                .map(p -> Arrays.copyOf(p, p.length))
+                .peek(
+                        p -> {
+                            int fieldId = tableFieldIdList.get(p[0]);
+                            p[0] = dataFieldIdList.indexOf(fieldId);
+                        })
+                .filter(p -> p[0] >= 0)
+                .toArray(int[][]::new);
+    }
+
+    /**
+     * Create predicate list from data fields.
+     *
+     * @param tableFields the table fields
+     * @param dataFields the underlying data fields
+     * @param filters the filters
+     * @return the data filters
+     */
+    public static List<Predicate> createDataFilters(
+            List<DataField> tableFields, List<DataField> dataFields, 
List<Predicate> filters) {
+        if (filters == null) {
+            return null;
+        }
+
+        List<Predicate> dataFilters = new ArrayList<>(filters.size());
+        for (Predicate predicate : filters) {
+            dataFilters.add(createDataPredicate(tableFields, dataFields, 
predicate));
+        }
+        return dataFilters;
+    }
+
+    @Nullable
+    private static Predicate createDataPredicate(
+            List<DataField> tableFields, List<DataField> dataFields, Predicate 
predicate) {
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+            List<Predicate> children = compoundPredicate.children();
+            List<Predicate> dataChildren = new ArrayList<>(children.size());
+            for (Predicate child : children) {
+                Predicate dataPredicate = createDataPredicate(tableFields, 
dataFields, child);
+                if (dataPredicate != null) {
+                    dataChildren.add(dataPredicate);
+                }
+            }
+            return new CompoundPredicate(compoundPredicate.function(), 
dataChildren);
+        } else if (predicate instanceof LeafPredicate) {
+            LeafPredicate leafPredicate = (LeafPredicate) predicate;
+            List<DataField> predicateTableFields =
+                    tableFields.stream()
+                            .filter(f -> 
f.name().equals(leafPredicate.fieldName()))
+                            .collect(Collectors.toList());
+            if (predicateTableFields.size() != 1) {
+                throw new IllegalArgumentException(
+                        String.format("Find none or multiple fields %s", 
predicateTableFields));
+            }
+            DataField tableField = predicateTableFields.get(0);
+            List<DataField> predicateDataFields =
+                    dataFields.stream()
+                            .filter(f -> f.id() == tableField.id())
+                            .collect(Collectors.toList());
+            if (predicateDataFields.isEmpty()) {
+                return null;
+            } else if (predicateDataFields.size() > 1) {
+                throw new IllegalArgumentException(
+                        String.format("Find none or multiple fields %s", 
predicateTableFields));
+            }
+            DataField dataField = predicateDataFields.get(0);
+            return new LeafPredicate(
+                    leafPredicate.function(),
+                    leafPredicate.type(),
+                    dataFields.indexOf(dataField),
+                    dataField.name(),
+                    leafPredicate.literals());

Review Comment:
   We can add TODO here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to