trushev commented on code in PR #5443:
URL: https://github.com/apache/hudi/pull/5443#discussion_r874317316


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java:
##########
@@ -390,4 +417,70 @@ private InflaterInputStreamFactory<?> 
getInflaterInputStreamFactory(org.apache.h
       return null;
     }
   }
+
+  private ActualFields prepareSchemaEvo(FileInputSplit fileSplit) {
+    if (!schemaEvoContext.isEnabled()) {
+      projection = null;
+      return new ActualFields(fullFieldNames, fullFieldTypes);
+    }
+    InternalSchema mergedSchema = getMergedSchema(fileSplit);
+    ActualFields actualFields = getActualFields(mergedSchema);
+    projection = getProjection(mergedSchema, actualFields);
+    return actualFields;
+  }
+
+  private InternalSchema getMergedSchema(FileInputSplit fileSplit) {
+    long commitTime = 
Long.parseLong(FSUtils.getCommitTime(fileSplit.getPath().getPath()));
+    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(commitTime, 
schemaEvoContext.metaClient(), false);
+    InternalSchema querySchema = schemaEvoContext.querySchema();
+    return new InternalSchemaMerger(fileSchema, querySchema, true, 
true).mergeSchema();
+  }
+
+  private ActualFields getActualFields(InternalSchema mergedSchema) {
+    int skipFields = HOODIE_META_COLUMNS.size();
+    String[] actualFieldNames = mergedSchema.columns()
+        .stream()
+        .skip(skipFields)
+        .map(Types.Field::name)
+        .toArray(String[]::new);
+    Schema actualSchema = AvroInternalSchemaConverter.convert(mergedSchema, 
schemaEvoContext.tableName());
+    DataType[] actualFieldTypes = 
AvroSchemaConverter.convertToDataType(actualSchema).getChildren()
+        .stream()
+        .skip(skipFields)
+        .toArray(DataType[]::new);
+    return new ActualFields(actualFieldNames, actualFieldTypes);
+  }
+
+  private RowDataProjection getProjection(InternalSchema mergedSchema, 
ActualFields actualFields) {
+    CastMap castMap = CastMap.of(schemaEvoContext.tableName(), 
schemaEvoContext.querySchema(), mergedSchema);
+    if (castMap.containsAnyPos(selectedFields)) {
+      LogicalType[] types = 
Arrays.stream(actualFields.types()).map(DataType::getLogicalType).toArray(LogicalType[]::new);
+      LogicalType[] readType = new LogicalType[selectedFields.length];
+      for (int i = 0; i < selectedFields.length; i++) {
+        readType[i] = types[selectedFields[i]];
+      }
+      int[] pos = IntStream.range(0, selectedFields.length).toArray();
+      return RowDataProjection.instance(RowType.of(readType), pos, 
castMap.rearrange(selectedFields, pos));
+    } else {
+      return null;
+    }
+  }
+
+  private static final class ActualFields {
+    private final String[] names;

Review Comment:
   pojo removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to