codope commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713734228



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -681,145 +323,68 @@ public void testManualRollbacks(HoodieTableType 
tableType) throws Exception {
   @Disabled
   public void testSync(HoodieTableType tableType) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    String newCommitTime;
-    List<HoodieRecord> records;
-    List<WriteStatus> writeStatuses;
-
     // Initial commits without metadata table enabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-    }
-
+    writeConfig = getWriteConfigBuilder(true, false, false).build();
+    testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, 
Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("002", WriteOperationType.BULK_INSERT, 
Arrays.asList("p1", "p2"), 1);
     // Enable metadata table so it initialized by listing from file system
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      validateMetadata(client);
-      assertTrue(metadata(client).isInSync());
-    }
-
+    testTable.doWriteOperation("003", WriteOperationType.INSERT, 
Arrays.asList("p1", "p2"), 1);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, true);
     // Various table operations without metadata table enabled
-    String restoreToInstant;
-    String inflightActionTimestamp;
-    String beforeInflightActionTimestamp;
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Savepoint
-      restoreToInstant = newCommitTime;
-      if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
-        client.savepoint("hoodie", "metadata test");
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Record a timestamp for creating an inflight instance for sync testing
-      inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
-      beforeInflightActionTimestamp = newCommitTime;
-
-      // Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 5);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> 
r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    testTable.doWriteOperation("004", WriteOperationType.UPSERT, 
Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, 
Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // insert overwrite to test replacecommit
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      HoodieWriteResult replaceResult = 
client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
-      writeStatuses = replaceResult.getWriteStatuses().collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("006", Arrays.asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
 
-    // If there is an incomplete operation, the Metadata Table is not updated 
beyond that operations but the
-    // in-memory merge should consider all the completed operations.
-    Path inflightCleanPath = new Path(metaClient.getMetaPath(), 
HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
-    fs.create(inflightCleanPath).close();
+    // trigger an upsert
+    testTable.doWriteOperation("007", WriteOperationType.UPSERT, 
Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, false, true);
 
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // Restore cannot be done until the metadata table is in sync. See 
HUDI-1502 for details
-      client.syncTableMetadata();
-
-      // Table should sync only before the inflightActionTimestamp
-      HoodieBackedTableMetadataWriter writer =
-          (HoodieBackedTableMetadataWriter) 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), 
context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
beforeInflightActionTimestamp);
+    // savepoint
+    if (COPY_ON_WRITE.equals(tableType)) {
+      testTable.doSavepoint("007");
+      syncTableMetadata(writeConfig);
+      assertTrue(metadata(writeConfig, context).isInSync());
+    }
 
-      // Reader should sync to all the completed instants
-      HoodieTableMetadata metadata = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      
assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), 
newCommitTime);
+    // trigger delete
+    testTable.doWriteOperation("008", WriteOperationType.DELETE, 
Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
 
-      // Remove the inflight instance holding back table sync
-      fs.delete(inflightCleanPath, false);
-      client.syncTableMetadata();
+    // trigger clean
+    testTable.doClean("009", Arrays.asList("001", "002"));
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
 
-      writer =
-          
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
 client.getConfig(), context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
newCommitTime);
+    // trigger another upsert
+    testTable.doWriteOperation("010", WriteOperationType.UPSERT, 
Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
 
-      // Reader should sync to all the completed instants
-      metadata = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
newCommitTime);
-    }
+    // trigger clustering
+    testTable.doCluster("011", new HashMap<>());
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
 
-    // Enable metadata table and ensure it is synced
+    // If there is an inflight operation, the Metadata Table is not updated 
beyond that operations but the
+    // in-memory merge should consider all the completed operations.
+    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation("012", WriteOperationType.UPSERT, 
Collections.emptyList(),

Review comment:
       That's right. Made "007" inflight.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to