xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r1016055346


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -78,12 +90,41 @@ public void runMerge(HoodieTable<T, 
HoodieData<HoodieRecord<T>>, HoodieData<Hood
 
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
     HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+
+    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
+    boolean needToReWriteRecord = false;
+    // TODO support bootstrap
+    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
+      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
+      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(readSchema, 
querySchemaOpt.get(), true);
+      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
+      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
+      if (writeInternalSchema.isEmptySchema()) {
+        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
+      }
+      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
+      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
+      List<String> sameCols = colNamesFromWriteSchema.stream()
+              .filter(f -> colNamesFromQuerySchema.contains(f)
+                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
+                      && writeInternalSchema.findIdByName(f) != -1
+                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
+      readSchema = AvroInternalSchemaConverter.convert(new 
InternalSchemaMerger(writeInternalSchema, querySchema, true, 
false).mergeSchema(), readSchema.getName());
+      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());

Review Comment:
   good question!
   question1:
   S4 is not same as S1.  S4 is the really schema from parquet file,  if we do 
lots of DDL opertion on current table, S4 and s1 may differ greatly.
   eg:  
   tableA:  a int, b string, c double  and there exist three files in this 
table: f1, f2, f3
   1) drop column from tableA and add new column d, and then we update tableA, 
but we only update f2,and f3,  f1 is not touched
   now schema 
   ```
   tableA: a int, b string, d long.  
   S1:  a int, b string, d long 
   S4 from f1 is: a int, b string , c double
   ```
   question2:
   no we supporting delete of columns,  Let's use the above example to 
illustrate:
   line 112, we merge S3 and S4 to get the final read Schema,
   ```
   tableA: a int, b string, d long.  
   S3:  a int, b string, d long 
   S4  from f1 is: a int, b string , c double
   merge S3 and S4: a int, b string, d long       column c is dropped,  
   ```
   the values read from parquet f1 will be 
   ```
   a   b       d
   1  'test'  null
   ```
   d is null, since f1 is not contains column d.   column c is dropped, since 
current table is not contains column c.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to