hililiwei commented on a change in pull request #3991: URL: https://github.com/apache/iceberg/pull/3991#discussion_r793275020
########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + +/** + * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output. + * + * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output + * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is + * projected by taking it out of the nested body as an independent field. + */ +public class RowDataNestProjection implements RowData { + private final FieldGetter[] getters; + private final boolean nestFlat; + private final int[][] projectedFields; + private RowData rowData; + + private RowDataNestProjection(RowType rowType, + Types.StructType schema, + Types.StructType rowStruct, + Types.StructType projectType, + int[][] projectedFields, + boolean nestFlat) { + this.projectedFields = projectedFields; + this.nestFlat = nestFlat; + + this.getters = new FieldGetter[projectedFields.length]; + for (int i = 0; i < getters.length; i++) { + int[] projectedFieldOne = projectedFields[i]; + int fieldIndex = projectedFieldOne[0]; + + Types.NestedField projectField = rowStruct.fields().get(fieldIndex); + Types.NestedField rowField = schema.field(projectField.fieldId()); + + getters[i] = createFieldGetter(rowType, rowField, fieldIndex, projectType, projectField, i, projectedFields, + projectedFieldOne); + } + } + + private RowDataNestProjection(RowType rowType, + Types.StructType schema, + Types.StructType projectType, + int[][] projectedFields, + boolean nestFlat) { + this.projectedFields = projectedFields; + this.nestFlat = nestFlat; + + this.getters = new FieldGetter[projectType.fields().size()]; + for (int i = 0; i < getters.length; i++) { + int[] projectedFieldOne = projectedFields[0]; + int fieldIndex = projectedFieldOne[0]; + if (i == fieldIndex) { + Types.NestedField projectField = projectType.fields().get(i); + Types.NestedField rowField = schema.field(projectField.fieldId()); + + getters[i] = createFieldGetter(rowType, rowField, i, projectType, projectField, i, projectedFields, + projectedFieldOne); + } + } + } + + private static FieldGetter createFieldGetter(RowType rowType, + Types.NestedField rowField, + int rowTypePosition, + Types.StructType projectFieldsType, + Types.NestedField projectField, + int projectFieldPosition, + int[][] projectedFields, + int[] projectedFieldOne) { + switch (projectField.type().typeId()) { + case STRUCT: + if (projectedFields == null || projectedFields[projectFieldPosition].length <= 1) { + return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition), + projectFieldsType.fields().indexOf(projectField)); + } + RowType nestedRowType = (RowType) rowType.getTypeAt(rowTypePosition); + return row -> { + int[] target = new int[projectedFieldOne.length - 1]; + System.arraycopy(projectedFieldOne, 1, target, 0, target.length); + int[][] temp = {target}; + int rowIndex = projectFieldsType.fields().indexOf(projectField); + RowData nestedRow = rowIndex < 0 ? null : row.getRow(rowIndex, nestedRowType.getFieldCount()); + + return RowDataNestProjection + .create(nestedRowType, rowField.type().asStructType(), projectField.type().asStructType(), temp, true) + .wrap(nestedRow); + }; + case MAP: + Types.MapType projectedMap = projectField.type().asMapType(); + Types.MapType originalMap = rowField.type().asMapType(); + + boolean keyProjectable = !projectedMap.keyType().isNestedType() || + projectedMap.keyType().equals(originalMap.keyType()); + boolean valueProjectable = !projectedMap.valueType().isNestedType() || + projectedMap.valueType().equals(originalMap.valueType()); + Preconditions.checkArgument(keyProjectable && valueProjectable, + "Cannot project a partial map key or value with non-primitive type. Trying to project <%s> out of <%s>", + projectField, rowField); + + return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition), + projectFieldsType.fields().indexOf(projectField)); + + case LIST: + Types.ListType projectedList = projectField.type().asListType(); + Types.ListType originalList = rowField.type().asListType(); + + boolean elementProjectable = !projectedList.elementType().isNestedType() || + projectedList.elementType().equals(originalList.elementType()); + Preconditions.checkArgument(elementProjectable, + "Cannot project a partial list element with non-primitive type. Trying to project <%s> out of <%s>", + projectField, rowField); + + return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition), + projectFieldsType.fields().indexOf(projectField)); + default: + return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition), + projectFieldsType.fields().indexOf(projectField)); + } + } + + public static RowDataNestProjection create(Schema schema, Schema projectedSchema, int[][] projectedFields) { + return RowDataNestProjection.create(FlinkSchemaUtil.convert(schema), schema.asStruct(), schema.asStruct(), + projectedSchema.asStruct(), + projectedFields, true); + } + + public static RowDataNestProjection create(RowType rowType, + Types.StructType schema, + Types.StructType rowStructType, + Types.StructType projectedSchema, + int[][] projectedFields, + boolean nestFlat) { + return new RowDataNestProjection(rowType, schema, rowStructType, projectedSchema, projectedFields, nestFlat); + } + + public static RowDataNestProjection create(RowType rowType, Types.StructType schema, Types.StructType projectedSchema, + int[][] projectedFields, boolean nestFlat) { + return new RowDataNestProjection(rowType, schema, projectedSchema, projectedFields, nestFlat); + } + + public RowData wrap(RowData row) { + this.rowData = row; + return this; + } + + private Object getValue(int pos) { + Object fieldValue = getters[pos].getFieldOrNull(rowData); + while (nestFlat && fieldValue != null && fieldValue.getClass().equals(RowDataNestProjection.class)) { + RowDataNestProjection rowDataNest = (RowDataNestProjection) fieldValue; + fieldValue = rowDataNest.getters[rowDataNest.projectedFields[0][0]].getFieldOrNull(rowDataNest); + } + return fieldValue; Review comment: Here, the nested fields are drilled down to extract the fields that need to be obtained. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
