[ 
https://issues.apache.org/jira/browse/HUDI-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376619#comment-17376619
 ] 

ASF GitHub Bot commented on HUDI-1483:
--------------------------------------

nsivabalan commented on a change in pull request #3142:
URL: https://github.com/apache/hudi/pull/3142#discussion_r665443646



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
##########
@@ -165,4 +176,51 @@ private void monitorThreads(Function<Boolean, Boolean> 
onShutdownCallback) {
   public boolean isRunInDaemonMode() {
     return runInDaemonMode;
   }
+
+  /**
+   * Wait till outstanding pending compaction/clustering reduces to the passed 
in value.
+   *
+   * @param numPending Maximum pending compactions/clustering allowed
+   * @throws InterruptedException
+   */
+  public void waitTillPendingActionReducesTo(int numPending) throws 
InterruptedException {
+    try {
+      queueLock.lock();
+      while (!isShutdown() && (pendingInstants.size() > numPending)) {
+        consumed.await();
+      }
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  /**
+   * Enqueues new pending clustering instant.
+   * @param instant {@link HoodieInstant} to enqueue.
+   */
+  public void enqueuePendingAction(HoodieInstant instant) {
+    LOG.info("Enqueuing new pending clustering instant: " + 
instant.getTimestamp());
+    pendingInstants.add(instant);
+  }
+
+  /**
+   * Fetch next pending compaction/clustering instant if available.
+   *
+   * @return {@link HoodieInstant} corresponding to the next pending 
compaction/clustering.
+   * @throws InterruptedException
+   */
+  HoodieInstant fetchNextActionInstant() throws InterruptedException {

Review comment:
       nit. fetchNextAsyncServiceInstant

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -296,6 +297,11 @@ public static void main(String[] args) throws IOException {
         + "outstanding compactions is less than this number")
     public Integer maxPendingCompactions = 5;
 
+    @Parameter(names = {"--max-pending-clustering"},

Review comment:
       @pratyakshsharma ^ 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
##########
@@ -207,12 +209,26 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
       metaClient.reloadActiveTimeline()
       assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
     }
-    structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, 
false,
       HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
   }
 
   @Test
-  def testStructuredStreamingWithoutInlineClustering(): Unit = {
+  def testStructuredStreamingWithAsyncClustering(): Unit = {

Review comment:
       not sure if we this will be too hard to achieve. Is there a way to 
simulate resource unavailability. i.e. when async clustering is scheduled, no 
resources to schedule right away. But after you open up resources in your test, 
async clustering should get triggered. basically to validate that a pending 
async clustering should get triggered when resources become available and not 
get cancelled.

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -473,6 +473,11 @@ object DataSourceWriteOptions {
     .defaultValue("true")
     .withDocumentation("")
 
+  val ASYNC_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.clustering.async.enable")
+    .defaultValue("false")

Review comment:
       can we set the min version as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> async clustering for deltastreamer
> ----------------------------------
>
>                 Key: HUDI-1483
>                 URL: https://issues.apache.org/jira/browse/HUDI-1483
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: liwei
>            Assignee: liwei
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to