This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b300728913a [HUDI-7086] Fix the default for gcp pub sub max sync time 
to 1min (#10171)
b300728913a is described below

commit b300728913af24db626c6e8da8bde6c28b3a5004
Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com>
AuthorDate: Tue Nov 28 22:31:12 2023 -0800

    [HUDI-7086] Fix the default for gcp pub sub max sync time to 1min (#10171)
    
    Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
---
 .../org/apache/hudi/utilities/config/CloudSourceConfig.java |  8 ++++----
 .../org/apache/hudi/utilities/sources/GcsEventsSource.java  |  4 ++--
 .../sources/helpers/gcs/PubsubMessagesFetcher.java          | 13 +++++++------
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index 81533d940a8..54be9cabef9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -147,12 +147,12 @@ public class CloudSourceConfig extends HoodieConfig {
       .sinceVersion("0.14.1")
       .withDocumentation("specify this value in bytes, to coalesce partitions 
of source dataset not greater than specified limit");
 
-  public static final ConfigProperty<Integer> MAX_FETCH_TIME_PER_SYNC_MS = 
ConfigProperty
-      .key(STREAMER_CONFIG_PREFIX + 
"source.cloud.meta.max.fetch.time.per.sync.ms")
-      .defaultValue(1)
+  public static final ConfigProperty<Integer> MAX_FETCH_TIME_PER_SYNC_SECS = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + 
"source.cloud.meta.max.fetch.time.per.sync.secs")
+      .defaultValue(60)
       .markAdvanced()
       .sinceVersion("0.14.1")
-      .withDocumentation("Max time in millis to consume " + 
MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event 
queues like SQS, "
+      .withDocumentation("Max time in secs to consume " + 
MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event 
queues like SQS, "
           + "PubSub can return empty responses even when messages are 
available the queue, this config ensures we don't wait forever "
           + "to consume MAX_MESSAGES_CONF messages, but time out and move on 
further.");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
index 897771168ed..fdd3c8f49f3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
@@ -49,7 +49,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.utilities.config.CloudSourceConfig.ACK_MESSAGES;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.BATCH_SIZE_CONF;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_MS;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_SECS;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.MAX_NUM_MESSAGES_PER_SYNC;
 import static 
org.apache.hudi.utilities.config.GCSEventsSourceConfig.GOOGLE_PROJECT_ID;
 import static 
org.apache.hudi.utilities.config.GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID;
@@ -121,7 +121,7 @@ public class GcsEventsSource extends RowSource {
                 getStringWithAltKeys(props, PUBSUB_SUBSCRIPTION_ID),
                 getIntWithAltKeys(props, BATCH_SIZE_CONF),
                 getIntWithAltKeys(props, MAX_NUM_MESSAGES_PER_SYNC),
-                getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_MS))
+                getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_SECS))
     );
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java
index 3b574045d7a..506e312608d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java
@@ -56,7 +56,7 @@ public class PubsubMessagesFetcher {
 
   private final int batchSize;
   private final int maxMessagesPerSync;
-  private final long maxFetchTimePerSync;
+  private final long maxFetchTimePerSyncSecs;
   private final SubscriberStubSettings subscriberStubSettings;
   private final PubsubQueueClient pubsubQueueClient;
 
@@ -64,13 +64,13 @@ public class PubsubMessagesFetcher {
 
   public PubsubMessagesFetcher(String googleProjectId, String 
pubsubSubscriptionId, int batchSize,
                                int maxMessagesPerSync,
-                               long maxFetchTimePerSync,
+                               long maxFetchTimePerSyncSecs,
                                PubsubQueueClient pubsubQueueClient) {
     this.googleProjectId = googleProjectId;
     this.pubsubSubscriptionId = pubsubSubscriptionId;
     this.batchSize = batchSize;
     this.maxMessagesPerSync = maxMessagesPerSync;
-    this.maxFetchTimePerSync = maxFetchTimePerSync;
+    this.maxFetchTimePerSyncSecs = maxFetchTimePerSyncSecs;
 
     try {
       /** For details of timeout and retry configs,
@@ -94,13 +94,13 @@ public class PubsubMessagesFetcher {
       String pubsubSubscriptionId,
       int batchSize,
       int maxMessagesPerSync,
-      long maxFetchTimePerSync) {
+      long maxFetchTimePerSyncSecs) {
     this(
         googleProjectId,
         pubsubSubscriptionId,
         batchSize,
         maxMessagesPerSync,
-        maxFetchTimePerSync,
+        maxFetchTimePerSyncSecs,
         new PubsubQueueClient()
     );
   }
@@ -112,7 +112,8 @@ public class PubsubMessagesFetcher {
       long startTime = System.currentTimeMillis();
       long unAckedMessages = 
pubsubQueueClient.getNumUnAckedMessages(this.pubsubSubscriptionId);
       LOG.info("Found unacked messages " + unAckedMessages);
-      while (messageList.size() < unAckedMessages && messageList.size() < 
maxMessagesPerSync && (System.currentTimeMillis() - startTime < 
maxFetchTimePerSync)) {
+      while (messageList.size() < unAckedMessages && messageList.size() < 
maxMessagesPerSync
+          && ((System.currentTimeMillis() - startTime) < 
(maxFetchTimePerSyncSecs * 1000))) {
         PullResponse pullResponse = 
pubsubQueueClient.makePullRequest(subscriber, subscriptionName, batchSize);
         messageList.addAll(pullResponse.getReceivedMessagesList());
       }

Reply via email to