stream2000 commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1196010098


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -545,6 +546,120 @@ public void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
     assertTrue(validInstants.containsAll(completedInstants));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ", 
"COPY_ON_WRITE"})
+  public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType 
tableType) throws Exception {
+    // create inserts X 1
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      setUpMORTestTable();
+    }
+    // Disabling embedded timeline server, it doesn't work with multiwriter
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                    .withAutoClean(false)
+                    .withAsyncClean(true)
+                    
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                    .withInlineCompaction(false)
+                    .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+            .withEmbeddedTimelineServerEnabled(false)
+            // Timeline-server-based markers are not used for multi-writer 
tests
+            .withMarkersType(MarkerType.DIRECT.name())
+            
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+                    FileSystemViewStorageType.MEMORY).build())
+            
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+            // Set the config so that heartbeat will expire in 1 second 
without update
+            
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+                    
.build()).withAutoCommit(false).withProperties(lockProperties);
+    Set<String> validInstants = new HashSet<>();
+    // Create the first commit with inserts
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    createCommitWithInserts(cfg, client, "000", "001", 200, true);
+    validInstants.add("001");
+
+    // Three clients running actions in parallel
+    final int threadCount = 3;
+    final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final String commitTime2 = "002";
+    final String commitTime3 = "003";
+    AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
+    AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
+
+    Future future1 = executor.submit(() -> {
+      final int numRecords = 100;
+      assertDoesNotThrow(() -> {
+        writeStatus1.set(createCommitWithInserts(cfg, client1, "001", 
commitTime2, numRecords, false));
+      });
+    });
+    Future future2 = executor.submit(() -> {
+      final int numRecords = 100;
+      assertDoesNotThrow(() -> {
+        writeStatus2.set(createCommitWithInserts(cfg, client2, "001", 
commitTime3, numRecords, false));
+      });
+    });
+
+    future1.get();
+    future2.get();
+
+    final CountDownLatch commitCountDownLatch = new CountDownLatch(2);
+    HoodieTableMetaClient tableMetaClient = 
client.getTableServiceClient().createMetaClient(true);
+
+    // Get inflight instant stream before commit
+    List<HoodieInstant> inflightInstants = client
+            .getTableServiceClient()
+            .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient)
+            .getReverseOrderedInstants()
+            .collect(Collectors.toList());
+
+    // Commit the instants and get instants to rollback in parallel
+    future1 = executor.submit(() -> {
+      long start = System.currentTimeMillis();
+      LOG.info(String.format("Start to commit instant %s", commitTime2));
+      client1.commit(commitTime2, writeStatus1.get());
+      commitCountDownLatch.countDown();
+      validInstants.add(commitTime2);
+      LOG.info(String.format("commit the instant cost %d ms", 
System.currentTimeMillis() - start));

Review Comment:
   for debug, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to