lw309637554 commented on a change in pull request #2275: URL: https://github.com/apache/hudi/pull/2275#discussion_r539378301
########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java ########## @@ -456,4 +479,95 @@ public void testBulkInsertRecords(String bulkInsertMode) throws Exception { public void testBulkInsertRecordsWithGlobalSort(String bulkInsertMode) throws Exception { testBulkInsertRecords(bulkInsertMode); } + + protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws IOException { + HoodieClusteringPlan clusteringPlan = + ClusteringUtils.createClusteringPlan(CLUSTERING_STRATEGY_CLASS, STRATEGY_PARAMS, fileSlices, Collections.emptyMap()); + + HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime); + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); + metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)); + return clusteringInstant; + } + + @Test + public void testUpdateRejectForClustering() throws Exception { + Properties properties = new Properties(); + // set max bytes small can easy generate multi file group + properties.setProperty(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, "1024"); + HoodieWriteConfig config = makeHoodieClientConfig(properties); + String firstCommitTime = makeNewCommitTime(); + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + writeClient.startCommitWithTime(firstCommitTime); + metaClient = HoodieTableMetaClient.reload(metaClient); + String partitionPath = "2016/01/31"; + + // 1. insert three record with two filegroup + HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); + // Get some records belong to the same partition (2016/01/31) + String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + + List<HoodieRecord> records = new ArrayList<>(); + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1)); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2)); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); + + writeClient.insert(jsc.parallelize(records, 1), firstCommitTime); + List<String> firstInsertFileGroupIds = table.getFileSystemView().getAllFileGroups(partitionPath) + .map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList()); + List<List<FileSlice>> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups(partitionPath) + .map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList()); + assertEquals(2, firstInsertFileGroupIds.size()); + List<FileSlice>[] fileSlices = (List<FileSlice>[])firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]); + + // 2. generate clustering plan the filegroups + String clusterTime1 = "1"; + createRequestedReplaceInstant(this.metaClient, clusterTime1, fileSlices); + + // 3. insert one record with no updating reject exception + String insertRecordStr4 = "{\"_row_key\":\"8eb5b87d-1fej-4edd-87b4-6ec96dc40pp0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":51}"; + RawTripTestPayload rowChange4 = new RawTripTestPayload(insertRecordStr4); + HoodieRecord insertedRecord1 = + new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + List<HoodieRecord> insertRecords = Arrays.asList(insertedRecord1); + Thread.sleep(1000); + String secondCommitTime = makeNewCommitTime(); + metaClient = HoodieTableMetaClient.reload(metaClient); + writeClient.startCommitWithTime(secondCommitTime); + List<WriteStatus> statuses = writeClient.upsert(jsc.parallelize(insertRecords), secondCommitTime).collect(); + assertEquals(1, statuses.size()); + + // 4. insert one record and update one record + String insertRecordStr5 = "{\"_row_key\":\"8eb5b87d-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":51}"; + // We update the 1st record & add a new record + String updateRecordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + RawTripTestPayload updateRowChanges1 = new RawTripTestPayload(updateRecordStr1); + HoodieRecord updatedRecord1 = new HoodieRecord( + new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), updateRowChanges1); + RawTripTestPayload rowChange5 = new RawTripTestPayload(insertRecordStr5); + HoodieRecord insertedRecord2 = + new HoodieRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), rowChange5); + List<HoodieRecord> updatedRecords = Arrays.asList(updatedRecord1, insertedRecord2); + Thread.sleep(1000); Review comment: okay ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org