hililiwei commented on code in PR #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r793274140
##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java:
##########
@@ -90,12 +94,8 @@ private IcebergTableSource(TableLoader loader, TableSchema
schema, Map<String, S
@Override
public void applyProjection(int[][] projectFields) {
- this.projectedFields = new int[projectFields.length];
- for (int i = 0; i < projectFields.length; i++) {
- Preconditions.checkArgument(projectFields[i].length == 1,
- "Don't support nested projection in iceberg source now.");
- this.projectedFields[i] = projectFields[i][0];
- }
+ this.projectedFields = projectFields;
+ this.topProjectedFields = Arrays.stream(projectFields).mapToInt(array ->
array[0]).distinct().toArray();
Review Comment:
Use topProjectedFields to extract data from the original file. In this case,
the nested fields in the data remain the same.
Use projectedFields to flatten the original data. In this case, nested
fields are extracted.
##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field
obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code
[id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code
st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+ private final FieldGetter[] getters;
+ private final boolean nestFlat;
+ private final int[][] projectedFields;
+ private RowData rowData;
+
+ private RowDataNestProjection(RowType rowType,
+ Types.StructType schema,
+ Types.StructType rowStruct,
+ Types.StructType projectType,
+ int[][] projectedFields,
+ boolean nestFlat) {
+ this.projectedFields = projectedFields;
+ this.nestFlat = nestFlat;
+
+ this.getters = new FieldGetter[projectedFields.length];
+ for (int i = 0; i < getters.length; i++) {
+ int[] projectedFieldOne = projectedFields[i];
+ int fieldIndex = projectedFieldOne[0];
+
+ Types.NestedField projectField = rowStruct.fields().get(fieldIndex);
+ Types.NestedField rowField = schema.field(projectField.fieldId());
+
+ getters[i] = createFieldGetter(rowType, rowField, fieldIndex,
projectType, projectField, i, projectedFields,
+ projectedFieldOne);
+ }
+ }
+
+ private RowDataNestProjection(RowType rowType,
+ Types.StructType schema,
+ Types.StructType projectType,
+ int[][] projectedFields,
+ boolean nestFlat) {
+ this.projectedFields = projectedFields;
+ this.nestFlat = nestFlat;
+
+ this.getters = new FieldGetter[projectType.fields().size()];
+ for (int i = 0; i < getters.length; i++) {
+ int[] projectedFieldOne = projectedFields[0];
+ int fieldIndex = projectedFieldOne[0];
+ if (i == fieldIndex) {
+ Types.NestedField projectField = projectType.fields().get(i);
+ Types.NestedField rowField = schema.field(projectField.fieldId());
+
+ getters[i] = createFieldGetter(rowType, rowField, i, projectType,
projectField, i, projectedFields,
+ projectedFieldOne);
+ }
+ }
+ }
+
+ private static FieldGetter createFieldGetter(RowType rowType,
+ Types.NestedField rowField,
+ int rowTypePosition,
+ Types.StructType
projectFieldsType,
+ Types.NestedField projectField,
+ int projectFieldPosition,
+ int[][] projectedFields,
+ int[] projectedFieldOne) {
+ switch (projectField.type().typeId()) {
+ case STRUCT:
+ if (projectedFields == null ||
projectedFields[projectFieldPosition].length <= 1) {
+ return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+ projectFieldsType.fields().indexOf(projectField));
+ }
+ RowType nestedRowType = (RowType) rowType.getTypeAt(rowTypePosition);
+ return row -> {
+ int[] target = new int[projectedFieldOne.length - 1];
+ System.arraycopy(projectedFieldOne, 1, target, 0, target.length);
+ int[][] temp = {target};
+ int rowIndex = projectFieldsType.fields().indexOf(projectField);
+ RowData nestedRow = rowIndex < 0 ? null : row.getRow(rowIndex,
nestedRowType.getFieldCount());
+
+ return RowDataNestProjection
+ .create(nestedRowType, rowField.type().asStructType(),
projectField.type().asStructType(), temp, true)
+ .wrap(nestedRow);
+ };
+ case MAP:
+ Types.MapType projectedMap = projectField.type().asMapType();
+ Types.MapType originalMap = rowField.type().asMapType();
+
+ boolean keyProjectable = !projectedMap.keyType().isNestedType() ||
+ projectedMap.keyType().equals(originalMap.keyType());
+ boolean valueProjectable = !projectedMap.valueType().isNestedType() ||
+ projectedMap.valueType().equals(originalMap.valueType());
+ Preconditions.checkArgument(keyProjectable && valueProjectable,
+ "Cannot project a partial map key or value with non-primitive
type. Trying to project <%s> out of <%s>",
+ projectField, rowField);
+
+ return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+ projectFieldsType.fields().indexOf(projectField));
+
+ case LIST:
+ Types.ListType projectedList = projectField.type().asListType();
+ Types.ListType originalList = rowField.type().asListType();
+
+ boolean elementProjectable =
!projectedList.elementType().isNestedType() ||
+ projectedList.elementType().equals(originalList.elementType());
+ Preconditions.checkArgument(elementProjectable,
+ "Cannot project a partial list element with non-primitive type.
Trying to project <%s> out of <%s>",
+ projectField, rowField);
+
+ return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+ projectFieldsType.fields().indexOf(projectField));
+ default:
+ return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+ projectFieldsType.fields().indexOf(projectField));
+ }
+ }
+
+ public static RowDataNestProjection create(Schema schema, Schema
projectedSchema, int[][] projectedFields) {
+ return RowDataNestProjection.create(FlinkSchemaUtil.convert(schema),
schema.asStruct(), schema.asStruct(),
+ projectedSchema.asStruct(),
+ projectedFields, true);
+ }
+
+ public static RowDataNestProjection create(RowType rowType,
+ Types.StructType schema,
+ Types.StructType rowStructType,
+ Types.StructType projectedSchema,
+ int[][] projectedFields,
+ boolean nestFlat) {
+ return new RowDataNestProjection(rowType, schema, rowStructType,
projectedSchema, projectedFields, nestFlat);
+ }
+
+ public static RowDataNestProjection create(RowType rowType, Types.StructType
schema, Types.StructType projectedSchema,
+ int[][] projectedFields, boolean
nestFlat) {
+ return new RowDataNestProjection(rowType, schema, projectedSchema,
projectedFields, nestFlat);
+ }
+
+ public RowData wrap(RowData row) {
+ this.rowData = row;
+ return this;
+ }
+
+ private Object getValue(int pos) {
+ Object fieldValue = getters[pos].getFieldOrNull(rowData);
+ while (nestFlat && fieldValue != null &&
fieldValue.getClass().equals(RowDataNestProjection.class)) {
+ RowDataNestProjection rowDataNest = (RowDataNestProjection) fieldValue;
+ fieldValue =
rowDataNest.getters[rowDataNest.projectedFields[0][0]].getFieldOrNull(rowDataNest);
+ }
+ return fieldValue;
Review Comment:
Here, the nested fields are drilled down to extract the fields that need to
be obtained.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]