xiarixiaoyao commented on code in PR #7761: URL: https://github.com/apache/hudi/pull/7761#discussion_r1090126531
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java: ########## @@ -93,60 +93,62 @@ public InternalSchema getQuerySchema() { return querySchema; } - InternalSchema getFileSchema(String fileName) { + InternalSchema getMergeSchema(String fileName) { if (querySchema.isEmptySchema()) { return querySchema; } long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName)); - InternalSchema fileSchemaUnmerged = InternalSchemaCache.getInternalSchemaByVersionId( + InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, getHadoopConf(), validCommits); - if (querySchema.equals(fileSchemaUnmerged)) { + if (querySchema.equals(fileSchema)) { return InternalSchema.getEmptyInternalSchema(); } - return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, true).mergeSchema(); + return new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema(); } /** - * This method returns a mapping of columns that have type inconsistencies between the fileSchema and querySchema. + * This method returns a mapping of columns that have type inconsistencies between the mergeSchema and querySchema. * This is done by: * <li>1. Finding the columns with type changes</li> * <li>2. Get a map storing the index of these columns with type changes; Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)</li> * <li>3. For each selectedField with type changes, build a castMap containing the cast/conversion details; * Map of -> (selectedPos, Cast([from] fileType, [to] queryType))</li> * - * @param fileSchema InternalSchema representation of the file's schema (acquired from commit/.schema metadata) + * @param mergeSchema InternalSchema representation of mergeSchema (prioritise use of fileSchemaType) that is used for reading base parquet files * @param queryFieldNames array containing the columns of a Hudi Flink table * @param queryFieldTypes array containing the field types of the columns of a Hudi Flink table * @param selectedFields array containing the index of the columns of interest required (indexes are based on queryFieldNames and queryFieldTypes) * @return a castMap containing the information of how to cast a selectedField from the fileType to queryType. * * @see CastMap */ - CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) { + CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) { Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); - Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); + Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema cannot be empty"); CastMap castMap = new CastMap(); // map storing the indexes of columns with type changes Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema) - Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames); + Map<Integer, Integer> posProxy = getPosProxy(mergeSchema, queryFieldNames); if (posProxy.isEmpty()) { // no type changes castMap.setFileFieldTypes(queryFieldTypes); return castMap; } List<Integer> selectedFieldList = IntStream.of(selectedFields).boxed().collect(Collectors.toList()); - List<DataType> fileSchemaAsDataTypes = AvroSchemaConverter.convertToDataType( - AvroInternalSchemaConverter.convert(fileSchema, "tableName")).getChildren(); + // mergeSchema is built with useColumnTypeFromFileSchema = true Review Comment: how about move this comment to line 106? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org