stream2000 commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1187063352


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -545,6 +546,116 @@ public void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
     assertTrue(validInstants.containsAll(completedInstants));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ"})
+  public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType 
tableType) throws Exception {
+    // create inserts X 1
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      setUpMORTestTable();
+    }
+    // Disabling embedded timeline server, it doesn't work with multiwriter
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                    .withAutoClean(false)
+                    .withAsyncClean(true)
+                    
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                    .withInlineCompaction(false)
+                    .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+            .withEmbeddedTimelineServerEnabled(false)
+            // Timeline-server-based markers are not used for multi-writer 
tests
+            .withMarkersType(MarkerType.DIRECT.name())
+            
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+                    FileSystemViewStorageType.MEMORY).build())
+            
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+            // Set the config so that heartbeat will expire in 1 second 
without update
+            
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+                    
.build()).withAutoCommit(false).withProperties(lockProperties);
+    Set<String> validInstants = new HashSet<>();
+    // Create the first commit with inserts
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    createCommitWithInserts(cfg, client, "000", "001", 200, true);
+    validInstants.add("001");
+
+    // Three clients running actions in parallel
+    final int threadCount = 3;
+    final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final String commitTime2 = "002";
+    final String commitTime3 = "003";
+    AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
+    AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
+
+    // Create upserts, schedule cleaning, schedule compaction in parallel
+    Future future1 = executor.submit(() -> {
+      final int numRecords = 100;
+      assertDoesNotThrow(() -> {
+        writeStatus1.set(createCommitWithInserts(cfg, client1, "001", 
commitTime2, numRecords, false));
+      });
+    });
+    // Create upserts, schedule cleaning, schedule compaction in parallel
+    Future future2 = executor.submit(() -> {
+      final int numRecords = 100;
+      assertDoesNotThrow(() -> {
+        writeStatus2.set(createCommitWithInserts(cfg, client2, "001", 
commitTime3, numRecords, false));
+      });
+    });
+
+    future1.get();
+    future2.get();
+
+    HoodieTableMetaClient tableMetaClient = 
client.getTableServiceClient().createMetaClient(true);
+
+    // Get inflight instant stream before commit
+    List<HoodieInstant> inflightInstants = client
+            .getTableServiceClient()
+            .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient)
+            .getReverseOrderedInstants()
+            .collect(Collectors.toList());
+
+    // Commit the instants and get instants to rollback in parallel
+    future1 = executor.submit(() -> {
+      long start = System.currentTimeMillis();
+      LOG.info(String.format("Start to commit instant %s", commitTime2));
+      client1.commit(commitTime2, writeStatus1.get());
+      validInstants.add(commitTime2);
+      LOG.info(String.format("commit the instant cost %d ms", 
System.currentTimeMillis() - start));
+    });
+
+    future2 = executor.submit(() -> {
+      long start = System.currentTimeMillis();
+      LOG.info(String.format("Start to commit instant %s", commitTime3));
+      client2.commit(commitTime3, writeStatus2.get());
+      validInstants.add(commitTime3);
+      LOG.info(String.format("commit the instant %s cost %d ms", commitTime3, 
System.currentTimeMillis() - start));
+    });
+
+    Future future3 = executor.submit(() -> {
+      // Sleep to let the writer complete the instants
+      assertDoesNotThrow(() -> Thread.sleep(6000));

Review Comment:
   Changed, thanks for your suggestion!  In fact, we can do these operations 
sequentially(without introducing multi-thread). But I think a multi-thread test 
can show the case that we want to fix in a better way so we can keep using 
multi-thread. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to