pratyakshsharma commented on a change in pull request #2127:
URL: https://github.com/apache/hudi/pull/2127#discussion_r504234269



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
##########
@@ -68,77 +71,158 @@ public void tearDown() throws IOException {
     cleanupResources();
   }
 
-  @Test
-  public void testSchemaEvolutionOnUpdate() throws Exception {
+  private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) 
throws IOException {
     // Create a bunch of records with a old version of schema
     final HoodieWriteConfig config = 
makeHoodieClientConfig("/exampleSchema.txt");
     final HoodieSparkTable table = HoodieSparkTable.create(config, context);
-
     final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x 
-> {
-      String recordStr1 = 
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
-          + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
-      String recordStr2 = 
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
-          + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
-      String recordStr3 = 
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
-          + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
       List<HoodieRecord> insertRecords = new ArrayList<>();
-      RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
-      insertRecords
-          .add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), 
rowChange1.getPartitionPath()), rowChange1));
-      RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
-      insertRecords
-          .add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), 
rowChange2.getPartitionPath()), rowChange2));
-      RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
-      insertRecords
-          .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), 
rowChange3.getPartitionPath()), rowChange3));
-
+      for (String recordStr : recordsStrs) {
+        RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+        insertRecords
+            .add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(), 
rowChange.getPartitionPath()), rowChange));
+      }
       Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
           .collect(Collectors.toMap(r -> r.getRecordKey(), 
Function.identity()));
       HoodieCreateHandle createHandle =
-          new HoodieCreateHandle(config, "100", table, 
rowChange1.getPartitionPath(), "f1-0", insertRecordMap, supplier);
+          new HoodieCreateHandle(config, "100", table, 
insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
       createHandle.write();
       return createHandle.close();
     }).collect();
 
     final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + 
HoodieTimeline.makeCommitFileName("100"));
     FSUtils.getFs(basePath, 
HoodieTestUtils.getDefaultHadoopConf()).create(commitFile);
+    return statuses.get(0);
+  }
 
-    // Now try an update with an evolved schema
-    // Evolved schema does not have guarantee on preserving the original field 
ordering
-    final HoodieWriteConfig config2 = 
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
-    final WriteStatus insertResult = statuses.get(0);
-    String fileId = insertResult.getFileId();
-
-    final HoodieSparkTable table2 = HoodieSparkTable.create(config, context);
-    assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
-      // New content with values for the newly added field
-      String recordStr1 = 
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
-          + 
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
-      List<HoodieRecord> updateRecords = new ArrayList<>();
-      RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
-      HoodieRecord record1 =
-          new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), 
rowChange1.getPartitionPath()), rowChange1);
-      record1.unseal();
-      record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
-      record1.seal();
-      updateRecords.add(record1);
-
-      assertDoesNotThrow(() -> {
-        HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", 
table2,
-            updateRecords.iterator(), record1.getPartitionPath(), fileId, 
supplier);
-        Configuration conf = new Configuration();
-        AvroReadSupport.setAvroReadSchema(conf, 
mergeHandle.getWriterSchemaWithMetafields());
-        List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
-            new Path(config2.getBasePath() + "/" + 
insertResult.getStat().getPath()));
+  private List<String> generateMultiRecordsForExampleSchema() {
+    List<String> recordsStrs = new ArrayList<>();
+    String recordStr1 = 
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+    String recordStr2 = 
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+    String recordStr3 = 
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+    recordsStrs.add(recordStr1);
+    recordsStrs.add(recordStr2);
+    recordsStrs.add(recordStr3);
+    return recordsStrs;
+  }
+
+  private List<String> generateOneRecordForExampleSchema() {
+    List<String> recordsStrs = new ArrayList<>();
+    String recordStr = 
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+    recordsStrs.add(recordStr);
+    return recordsStrs;
+  }
+
+  private void assertSchemaEvolutionOnUpdateResult(WriteStatus insertResult, 
HoodieSparkTable updateTable,
+                                                   List<HoodieRecord> 
updateRecords, String assertMsg, boolean isAssertThrow, Class 
expectedExceptionType) {
+    jsc.parallelize(Arrays.asList(1)).map(x -> {
+      Executable executable = () -> {
+        HoodieMergeHandle mergeHandle = new 
HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
+            updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), 
insertResult.getFileId(), supplier);
+        AvroReadSupport.setAvroReadSchema(updateTable.getHadoopConf(), 
mergeHandle.getWriterSchemaWithMetafields());
+        List<GenericRecord> oldRecords = 
ParquetUtils.readAvroRecords(updateTable.getHadoopConf(),
+            new Path(updateTable.getConfig().getBasePath() + "/" + 
insertResult.getStat().getPath()));
         for (GenericRecord rec : oldRecords) {
           mergeHandle.write(rec);
         }
         mergeHandle.close();
-      }, "UpdateFunction could not read records written with exampleSchema.txt 
using the "
-          + "exampleEvolvedSchema.txt");
-
+      };
+      if (isAssertThrow) {
+        assertThrows(expectedExceptionType, executable, assertMsg);
+      } else {
+        assertDoesNotThrow(executable, assertMsg);
+      }
       return 1;
-    }).collect().size());
+    }).collect();
+  }
+
+  private List<HoodieRecord> buildUpdateRecords(String recordStr, String 
insertFileId) throws IOException {
+    List<HoodieRecord> updateRecords = new ArrayList<>();
+    RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+    HoodieRecord record =
+        new HoodieRecord(new HoodieKey(rowChange.getRowKey(), 
rowChange.getPartitionPath()), rowChange);
+    record.setCurrentLocation(new HoodieRecordLocation("101", insertFileId));
+    record.seal();
+    updateRecords.add(record);
+    return updateRecords;
+  }
+
+  @Test
+  public void testSchemaEvolutionOnUpdateSuccessWithAddColumnHaveDefault() 
throws Exception {
+    final WriteStatus insertResult = 
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
+    // Now try an update with an evolved schema
+    // Evolved schema does not have guarantee on preserving the original field 
ordering
+    final HoodieWriteConfig config = 
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
+    final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+    // New content with values for the newly added field
+    String recordStr = 
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + 
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
+    List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, 
insertResult.getFileId());
+    String assertMsg = "UpdateFunction could not read records written with 
exampleSchema.txt using the "
+        + "exampleEvolvedSchema.txt";
+    assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, 
assertMsg, false, null);
+  }
+
+  @Test
+  public void testSchemaEvolutionOnUpdateSuccessWithChangeColumnOrder() throws 
Exception {
+    final WriteStatus insertResult = 
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
+    // Now try an update with an evolved schema
+    // Evolved schema does not have guarantee on preserving the original field 
ordering
+    final HoodieWriteConfig config = 
makeHoodieClientConfig("/exampleEvolvedSchemaChangeOrder.txt");
+    final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+    String recordStr = 
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + 
"\"time\":\"2016-01-31T03:16:41.415Z\",\"added_field\":1},\"number\":12";
+    List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, 
insertResult.getFileId());
+    String assertMsg = "UpdateFunction could not read records written with 
exampleSchema.txt using the "
+        + "exampleEvolvedSchemaChangeOrder.txt as column order change";
+    assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, 
assertMsg, false, null);
+  }
+
+  @Test
+  public void testSchemaEvolutionOnUpdateMisMatchWithDeleteColumn() throws 
Exception {
+    final WriteStatus insertResult = 
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+    // Now try an update with an evolved schema
+    // Evolved schema does not have guarantee on preserving the original field 
ordering
+    final HoodieWriteConfig config = 
makeHoodieClientConfig("/exampleEvolvedSchemaDeleteColumn.txt");
+    final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+    String recordStr = 
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\"}";
+    List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, 
insertResult.getFileId());
+    String assertMsg = "UpdateFunction when delete column ,Parquet/Avro schema 
mismatch: Avro field 'xxx' not found";
+    assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, 
assertMsg, true, InvalidRecordException.class);
+  }
+
+  @Test
+  public void testSchemaEvolutionOnUpdateMisMatchWithAddColumnNotHaveDefault() 
throws Exception {
+    final WriteStatus insertResult = 
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+    // Now try an update with an evolved schema
+    // Evolved schema does not have guarantee on preserving the original field 
ordering
+    final HoodieWriteConfig config = 
makeHoodieClientConfig("/exampleEvolvedSchemaColumnRequire.txt");
+    final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+    String recordStr = 
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + 
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
+    List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, 
insertResult.getFileId());
+    String assertMsg = "UpdateFunction could not read records written with 
exampleSchema.txt using the "
+        + "exampleEvolvedSchemaColumnRequire.txt ,because oldrecords do not 
have required column added_field";

Review comment:
       exampleEvolvedSchemaColumnRequire.txt ,because -> 
exampleEvolvedSchemaColumnRequire.txt, because




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to