xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r1014978749


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -78,12 +90,41 @@ public void runMerge(HoodieTable<T, 
HoodieData<HoodieRecord<T>>, HoodieData<Hood
 
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
     HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+
+    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
+    boolean needToReWriteRecord = false;
+    // TODO support bootstrap
+    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
+      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
+      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(readSchema, 
querySchemaOpt.get(), true);
+      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
+      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
+      if (writeInternalSchema.isEmptySchema()) {
+        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
+      }
+      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
+      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
+      List<String> sameCols = colNamesFromWriteSchema.stream()
+              .filter(f -> colNamesFromQuerySchema.contains(f)
+                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
+                      && writeInternalSchema.findIdByName(f) != -1
+                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
+      readSchema = AvroInternalSchemaConverter.convert(new 
InternalSchemaMerger(writeInternalSchema, querySchema, true, 
false).mergeSchema(), readSchema.getName());
+      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());

Review Comment:
   line112:  we need to convert interalSchema to avroSchema and pass it to 
readSchema,  and HoodieAvroUtils.rewriteRecordWithNewSchema will use reaSchema 
to get correct GenericRecord from parquet file.
   
   eg:  old parquet schema is:  a int, b double  , and genericRecord1 is data 
read from old parquet
   but now incoming schema is:  a long, c int,  b  string,  and  genericRecord2 
is incoming data
   we cannot merge genericRecord1 and genericRecord2,  so we need rewrite 
genericRecord1 with new schema: a long, c int,  b  string
   
   line113 is only used to check SchemaCompatibilityType
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to