codope commented on a change in pull request #3695: URL: https://github.com/apache/hudi/pull/3695#discussion_r713734228
########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java ########## @@ -681,145 +323,68 @@ public void testManualRollbacks(HoodieTableType tableType) throws Exception { @Disabled public void testSync(HoodieTableType tableType) throws Exception { init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - String newCommitTime; - List<HoodieRecord> records; - List<WriteStatus> writeStatuses; - // Initial commits without metadata table enabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateInserts(newCommitTime, 5); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateInserts(newCommitTime, 5); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - } - + writeConfig = getWriteConfigBuilder(true, false, false).build(); + testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 1); + testTable.doWriteOperation("002", WriteOperationType.BULK_INSERT, Arrays.asList("p1", "p2"), 1); // Enable metadata table so it initialized by listing from file system - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 5); - writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - validateMetadata(client); - assertTrue(metadata(client).isInSync()); - } - + testTable.doWriteOperation("003", WriteOperationType.INSERT, Arrays.asList("p1", "p2"), 1); + syncAndValidate(testTable, Collections.emptyList(), true, true, true); // Various table operations without metadata table enabled - String restoreToInstant; - String inflightActionTimestamp; - String beforeInflightActionTimestamp; - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 5); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - - // updates and inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - - // Compaction - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - assertTrue(metadata(client).isInSync()); - } - - // Savepoint - restoreToInstant = newCommitTime; - if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { - client.savepoint("hoodie", "metadata test"); - assertTrue(metadata(client).isInSync()); - } - - // Record a timestamp for creating an inflight instance for sync testing - inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime(); - beforeInflightActionTimestamp = newCommitTime; - - // Deletes - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateDeletes(newCommitTime, 5); - JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); - client.startCommitWithTime(newCommitTime); - client.delete(deleteKeys, newCommitTime); - assertTrue(metadata(client).isInSync()); - - // Clean - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.clean(newCommitTime); - assertTrue(metadata(client).isInSync()); - - // updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); + testTable.doWriteOperation("004", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), 1); + testTable.doWriteOperation("005", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable); - // insert overwrite to test replacecommit - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); - records = dataGen.generateInserts(newCommitTime, 5); - HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime); - writeStatuses = replaceResult.getWriteStatuses().collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); + // trigger compaction + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("006", Arrays.asList("p1", "p2")); + syncAndValidate(testTable); } - // If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the - // in-memory merge should consider all the completed operations. - Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp)); - fs.create(inflightCleanPath).close(); + // trigger an upsert + testTable.doWriteOperation("007", WriteOperationType.UPSERT, Arrays.asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, Collections.emptyList(), true, false, true); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details - client.syncTableMetadata(); - - // Table should sync only before the inflightActionTimestamp - HoodieBackedTableMetadataWriter writer = - (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp); + // savepoint + if (COPY_ON_WRITE.equals(tableType)) { + testTable.doSavepoint("007"); + syncTableMetadata(writeConfig); + assertTrue(metadata(writeConfig, context).isInSync()); + } - // Reader should sync to all the completed instants - HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), - client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); - assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime); + // trigger delete + testTable.doWriteOperation("008", WriteOperationType.DELETE, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, Collections.emptyList(), true, true, false); - // Remove the inflight instance holding back table sync - fs.delete(inflightCleanPath, false); - client.syncTableMetadata(); + // trigger clean + testTable.doClean("009", Arrays.asList("001", "002")); + syncAndValidate(testTable, Collections.emptyList(), true, false, false); - writer = - (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); + // trigger another upsert + testTable.doWriteOperation("010", WriteOperationType.UPSERT, Arrays.asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, Collections.emptyList(), true, false, false); - // Reader should sync to all the completed instants - metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), - client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); - } + // trigger clustering + testTable.doCluster("011", new HashMap<>()); + syncAndValidate(testTable, Collections.emptyList(), true, true, false); - // Enable metadata table and ensure it is synced + // If there is an inflight operation, the Metadata Table is not updated beyond that operations but the + // in-memory merge should consider all the completed operations. + HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("012", WriteOperationType.UPSERT, Collections.emptyList(), Review comment: That's right. Made "007" inflight. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org