pratyakshsharma commented on a change in pull request #2127: URL: https://github.com/apache/hudi/pull/2127#discussion_r504234269
########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java ########## @@ -68,77 +71,158 @@ public void tearDown() throws IOException { cleanupResources(); } - @Test - public void testSchemaEvolutionOnUpdate() throws Exception { + private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException { // Create a bunch of records with a old version of schema final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt"); final HoodieSparkTable table = HoodieSparkTable.create(config, context); - final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { - String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; - String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; - String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; List<HoodieRecord> insertRecords = new ArrayList<>(); - RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); - insertRecords - .add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1)); - RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); - insertRecords - .add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2)); - RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); - insertRecords - .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); - + for (String recordStr : recordsStrs) { + RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); + insertRecords + .add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), rowChange)); + } Map<String, HoodieRecord> insertRecordMap = insertRecords.stream() .collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity())); HoodieCreateHandle createHandle = - new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecordMap, supplier); + new HoodieCreateHandle(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier); createHandle.write(); return createHandle.close(); }).collect(); final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100")); FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()).create(commitFile); + return statuses.get(0); + } - // Now try an update with an evolved schema - // Evolved schema does not have guarantee on preserving the original field ordering - final HoodieWriteConfig config2 = makeHoodieClientConfig("/exampleEvolvedSchema.txt"); - final WriteStatus insertResult = statuses.get(0); - String fileId = insertResult.getFileId(); - - final HoodieSparkTable table2 = HoodieSparkTable.create(config, context); - assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> { - // New content with values for the newly added field - String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}"; - List<HoodieRecord> updateRecords = new ArrayList<>(); - RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); - HoodieRecord record1 = - new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); - record1.unseal(); - record1.setCurrentLocation(new HoodieRecordLocation("100", fileId)); - record1.seal(); - updateRecords.add(record1); - - assertDoesNotThrow(() -> { - HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, - updateRecords.iterator(), record1.getPartitionPath(), fileId, supplier); - Configuration conf = new Configuration(); - AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchemaWithMetafields()); - List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf, - new Path(config2.getBasePath() + "/" + insertResult.getStat().getPath())); + private List<String> generateMultiRecordsForExampleSchema() { + List<String> recordsStrs = new ArrayList<>(); + String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + recordsStrs.add(recordStr1); + recordsStrs.add(recordStr2); + recordsStrs.add(recordStr3); + return recordsStrs; + } + + private List<String> generateOneRecordForExampleSchema() { + List<String> recordsStrs = new ArrayList<>(); + String recordStr = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + recordsStrs.add(recordStr); + return recordsStrs; + } + + private void assertSchemaEvolutionOnUpdateResult(WriteStatus insertResult, HoodieSparkTable updateTable, + List<HoodieRecord> updateRecords, String assertMsg, boolean isAssertThrow, Class expectedExceptionType) { + jsc.parallelize(Arrays.asList(1)).map(x -> { + Executable executable = () -> { + HoodieMergeHandle mergeHandle = new HoodieMergeHandle(updateTable.getConfig(), "101", updateTable, + updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), insertResult.getFileId(), supplier); + AvroReadSupport.setAvroReadSchema(updateTable.getHadoopConf(), mergeHandle.getWriterSchemaWithMetafields()); + List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(updateTable.getHadoopConf(), + new Path(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath())); for (GenericRecord rec : oldRecords) { mergeHandle.write(rec); } mergeHandle.close(); - }, "UpdateFunction could not read records written with exampleSchema.txt using the " - + "exampleEvolvedSchema.txt"); - + }; + if (isAssertThrow) { + assertThrows(expectedExceptionType, executable, assertMsg); + } else { + assertDoesNotThrow(executable, assertMsg); + } return 1; - }).collect().size()); + }).collect(); + } + + private List<HoodieRecord> buildUpdateRecords(String recordStr, String insertFileId) throws IOException { + List<HoodieRecord> updateRecords = new ArrayList<>(); + RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); + HoodieRecord record = + new HoodieRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), rowChange); + record.setCurrentLocation(new HoodieRecordLocation("101", insertFileId)); + record.seal(); + updateRecords.add(record); + return updateRecords; + } + + @Test + public void testSchemaEvolutionOnUpdateSuccessWithAddColumnHaveDefault() throws Exception { + final WriteStatus insertResult = prepareFirstRecordCommit(generateMultiRecordsForExampleSchema()); + // Now try an update with an evolved schema + // Evolved schema does not have guarantee on preserving the original field ordering + final HoodieWriteConfig config = makeHoodieClientConfig("/exampleEvolvedSchema.txt"); + final HoodieSparkTable table = HoodieSparkTable.create(config, context); + // New content with values for the newly added field + String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}"; + List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, insertResult.getFileId()); + String assertMsg = "UpdateFunction could not read records written with exampleSchema.txt using the " + + "exampleEvolvedSchema.txt"; + assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, false, null); + } + + @Test + public void testSchemaEvolutionOnUpdateSuccessWithChangeColumnOrder() throws Exception { + final WriteStatus insertResult = prepareFirstRecordCommit(generateMultiRecordsForExampleSchema()); + // Now try an update with an evolved schema + // Evolved schema does not have guarantee on preserving the original field ordering + final HoodieWriteConfig config = makeHoodieClientConfig("/exampleEvolvedSchemaChangeOrder.txt"); + final HoodieSparkTable table = HoodieSparkTable.create(config, context); + String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"added_field\":1},\"number\":12"; + List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, insertResult.getFileId()); + String assertMsg = "UpdateFunction could not read records written with exampleSchema.txt using the " + + "exampleEvolvedSchemaChangeOrder.txt as column order change"; + assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, false, null); + } + + @Test + public void testSchemaEvolutionOnUpdateMisMatchWithDeleteColumn() throws Exception { + final WriteStatus insertResult = prepareFirstRecordCommit(generateOneRecordForExampleSchema()); + // Now try an update with an evolved schema + // Evolved schema does not have guarantee on preserving the original field ordering + final HoodieWriteConfig config = makeHoodieClientConfig("/exampleEvolvedSchemaDeleteColumn.txt"); + final HoodieSparkTable table = HoodieSparkTable.create(config, context); + String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\"}"; + List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, insertResult.getFileId()); + String assertMsg = "UpdateFunction when delete column ,Parquet/Avro schema mismatch: Avro field 'xxx' not found"; + assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, true, InvalidRecordException.class); + } + + @Test + public void testSchemaEvolutionOnUpdateMisMatchWithAddColumnNotHaveDefault() throws Exception { + final WriteStatus insertResult = prepareFirstRecordCommit(generateOneRecordForExampleSchema()); + // Now try an update with an evolved schema + // Evolved schema does not have guarantee on preserving the original field ordering + final HoodieWriteConfig config = makeHoodieClientConfig("/exampleEvolvedSchemaColumnRequire.txt"); + final HoodieSparkTable table = HoodieSparkTable.create(config, context); + String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}"; + List<HoodieRecord> updateRecords = buildUpdateRecords(recordStr, insertResult.getFileId()); + String assertMsg = "UpdateFunction could not read records written with exampleSchema.txt using the " + + "exampleEvolvedSchemaColumnRequire.txt ,because oldrecords do not have required column added_field"; Review comment: exampleEvolvedSchemaColumnRequire.txt ,because -> exampleEvolvedSchemaColumnRequire.txt, because ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org