stream2000 commented on code in PR #7826: URL: https://github.com/apache/hudi/pull/7826#discussion_r1187063352
########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java: ########## @@ -545,6 +546,116 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta assertTrue(validInstants.containsAll(completedInstants)); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ"}) + public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType tableType) throws Exception { + // create inserts X 1 + if (tableType == HoodieTableType.MERGE_ON_READ) { + setUpMORTestTable(); + } + // Disabling embedded timeline server, it doesn't work with multiwriter + HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false) + .withAsyncClean(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(2).build()) + .withEmbeddedTimelineServerEnabled(false) + // Timeline-server-based markers are not used for multi-writer tests + .withMarkersType(MarkerType.DIRECT.name()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType( + FileSystemViewStorageType.MEMORY).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + // Set the config so that heartbeat will expire in 1 second without update + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .build()).withAutoCommit(false).withProperties(lockProperties); + Set<String> validInstants = new HashSet<>(); + // Create the first commit with inserts + HoodieWriteConfig cfg = writeConfigBuilder.build(); + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + createCommitWithInserts(cfg, client, "000", "001", 200, true); + validInstants.add("001"); + + // Three clients running actions in parallel + final int threadCount = 3; + final ExecutorService executor = Executors.newFixedThreadPool(threadCount); + + final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + final String commitTime2 = "002"; + final String commitTime3 = "003"; + AtomicReference<Object> writeStatus1 = new AtomicReference<>(null); + AtomicReference<Object> writeStatus2 = new AtomicReference<>(null); + + // Create upserts, schedule cleaning, schedule compaction in parallel + Future future1 = executor.submit(() -> { + final int numRecords = 100; + assertDoesNotThrow(() -> { + writeStatus1.set(createCommitWithInserts(cfg, client1, "001", commitTime2, numRecords, false)); + }); + }); + // Create upserts, schedule cleaning, schedule compaction in parallel + Future future2 = executor.submit(() -> { + final int numRecords = 100; + assertDoesNotThrow(() -> { + writeStatus2.set(createCommitWithInserts(cfg, client2, "001", commitTime3, numRecords, false)); + }); + }); + + future1.get(); + future2.get(); + + HoodieTableMetaClient tableMetaClient = client.getTableServiceClient().createMetaClient(true); + + // Get inflight instant stream before commit + List<HoodieInstant> inflightInstants = client + .getTableServiceClient() + .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient) + .getReverseOrderedInstants() + .collect(Collectors.toList()); + + // Commit the instants and get instants to rollback in parallel + future1 = executor.submit(() -> { + long start = System.currentTimeMillis(); + LOG.info(String.format("Start to commit instant %s", commitTime2)); + client1.commit(commitTime2, writeStatus1.get()); + validInstants.add(commitTime2); + LOG.info(String.format("commit the instant cost %d ms", System.currentTimeMillis() - start)); + }); + + future2 = executor.submit(() -> { + long start = System.currentTimeMillis(); + LOG.info(String.format("Start to commit instant %s", commitTime3)); + client2.commit(commitTime3, writeStatus2.get()); + validInstants.add(commitTime3); + LOG.info(String.format("commit the instant %s cost %d ms", commitTime3, System.currentTimeMillis() - start)); + }); + + Future future3 = executor.submit(() -> { + // Sleep to let the writer complete the instants + assertDoesNotThrow(() -> Thread.sleep(6000)); Review Comment: Changed, thanks for your suggestion! In fact, we can do these operations sequentially(without introducing multi-thread). But I think a multi-thread test can show the case that we want to fix in a better way so we can keep using multi-thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org