[ https://issues.apache.org/jira/browse/HUDI-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376619#comment-17376619 ]
ASF GitHub Bot commented on HUDI-1483: -------------------------------------- nsivabalan commented on a change in pull request #3142: URL: https://github.com/apache/hudi/pull/3142#discussion_r665443646 ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java ########## @@ -165,4 +176,51 @@ private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) { public boolean isRunInDaemonMode() { return runInDaemonMode; } + + /** + * Wait till outstanding pending compaction/clustering reduces to the passed in value. + * + * @param numPending Maximum pending compactions/clustering allowed + * @throws InterruptedException + */ + public void waitTillPendingActionReducesTo(int numPending) throws InterruptedException { + try { + queueLock.lock(); + while (!isShutdown() && (pendingInstants.size() > numPending)) { + consumed.await(); + } + } finally { + queueLock.unlock(); + } + } + + /** + * Enqueues new pending clustering instant. + * @param instant {@link HoodieInstant} to enqueue. + */ + public void enqueuePendingAction(HoodieInstant instant) { + LOG.info("Enqueuing new pending clustering instant: " + instant.getTimestamp()); + pendingInstants.add(instant); + } + + /** + * Fetch next pending compaction/clustering instant if available. + * + * @return {@link HoodieInstant} corresponding to the next pending compaction/clustering. + * @throws InterruptedException + */ + HoodieInstant fetchNextActionInstant() throws InterruptedException { Review comment: nit. fetchNextAsyncServiceInstant ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java ########## @@ -296,6 +297,11 @@ public static void main(String[] args) throws IOException { + "outstanding compactions is less than this number") public Integer maxPendingCompactions = 5; + @Parameter(names = {"--max-pending-clustering"}, Review comment: @pratyakshsharma ^ ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala ########## @@ -207,12 +209,26 @@ class TestStructuredStreaming extends HoodieClientTestBase { metaClient.reloadActiveTimeline() assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, + structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, false, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } @Test - def testStructuredStreamingWithoutInlineClustering(): Unit = { + def testStructuredStreamingWithAsyncClustering(): Unit = { Review comment: not sure if we this will be too hard to achieve. Is there a way to simulate resource unavailability. i.e. when async clustering is scheduled, no resources to schedule right away. But after you open up resources in your test, async clustering should get triggered. basically to validate that a pending async clustering should get triggered when resources become available and not get cancelled. ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala ########## @@ -473,6 +473,11 @@ object DataSourceWriteOptions { .defaultValue("true") .withDocumentation("") + val ASYNC_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.clustering.async.enable") + .defaultValue("false") Review comment: can we set the min version as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > async clustering for deltastreamer > ---------------------------------- > > Key: HUDI-1483 > URL: https://issues.apache.org/jira/browse/HUDI-1483 > Project: Apache Hudi > Issue Type: Sub-task > Reporter: liwei > Assignee: liwei > Priority: Blocker > Labels: pull-request-available > Fix For: 0.9.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)