trushev commented on code in PR #5443: URL: https://github.com/apache/hudi/pull/5443#discussion_r874317316
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java: ########## @@ -390,4 +417,70 @@ private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(org.apache.h return null; } } + + private ActualFields prepareSchemaEvo(FileInputSplit fileSplit) { + if (!schemaEvoContext.isEnabled()) { + projection = null; + return new ActualFields(fullFieldNames, fullFieldTypes); + } + InternalSchema mergedSchema = getMergedSchema(fileSplit); + ActualFields actualFields = getActualFields(mergedSchema); + projection = getProjection(mergedSchema, actualFields); + return actualFields; + } + + private InternalSchema getMergedSchema(FileInputSplit fileSplit) { + long commitTime = Long.parseLong(FSUtils.getCommitTime(fileSplit.getPath().getPath())); + InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, schemaEvoContext.metaClient(), false); + InternalSchema querySchema = schemaEvoContext.querySchema(); + return new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema(); + } + + private ActualFields getActualFields(InternalSchema mergedSchema) { + int skipFields = HOODIE_META_COLUMNS.size(); + String[] actualFieldNames = mergedSchema.columns() + .stream() + .skip(skipFields) + .map(Types.Field::name) + .toArray(String[]::new); + Schema actualSchema = AvroInternalSchemaConverter.convert(mergedSchema, schemaEvoContext.tableName()); + DataType[] actualFieldTypes = AvroSchemaConverter.convertToDataType(actualSchema).getChildren() + .stream() + .skip(skipFields) + .toArray(DataType[]::new); + return new ActualFields(actualFieldNames, actualFieldTypes); + } + + private RowDataProjection getProjection(InternalSchema mergedSchema, ActualFields actualFields) { + CastMap castMap = CastMap.of(schemaEvoContext.tableName(), schemaEvoContext.querySchema(), mergedSchema); + if (castMap.containsAnyPos(selectedFields)) { + LogicalType[] types = Arrays.stream(actualFields.types()).map(DataType::getLogicalType).toArray(LogicalType[]::new); + LogicalType[] readType = new LogicalType[selectedFields.length]; + for (int i = 0; i < selectedFields.length; i++) { + readType[i] = types[selectedFields[i]]; + } + int[] pos = IntStream.range(0, selectedFields.length).toArray(); + return RowDataProjection.instance(RowType.of(readType), pos, castMap.rearrange(selectedFields, pos)); + } else { + return null; + } + } + + private static final class ActualFields { + private final String[] names; Review Comment: pojo removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org