This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new b300728913a [HUDI-7086] Fix the default for gcp pub sub max sync time to 1min (#10171) b300728913a is described below commit b300728913af24db626c6e8da8bde6c28b3a5004 Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com> AuthorDate: Tue Nov 28 22:31:12 2023 -0800 [HUDI-7086] Fix the default for gcp pub sub max sync time to 1min (#10171) Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local> --- .../org/apache/hudi/utilities/config/CloudSourceConfig.java | 8 ++++---- .../org/apache/hudi/utilities/sources/GcsEventsSource.java | 4 ++-- .../sources/helpers/gcs/PubsubMessagesFetcher.java | 13 +++++++------ 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index 81533d940a8..54be9cabef9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -147,12 +147,12 @@ public class CloudSourceConfig extends HoodieConfig { .sinceVersion("0.14.1") .withDocumentation("specify this value in bytes, to coalesce partitions of source dataset not greater than specified limit"); - public static final ConfigProperty<Integer> MAX_FETCH_TIME_PER_SYNC_MS = ConfigProperty - .key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.fetch.time.per.sync.ms") - .defaultValue(1) + public static final ConfigProperty<Integer> MAX_FETCH_TIME_PER_SYNC_SECS = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.fetch.time.per.sync.secs") + .defaultValue(60) .markAdvanced() .sinceVersion("0.14.1") - .withDocumentation("Max time in millis to consume " + MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event queues like SQS, " + .withDocumentation("Max time in secs to consume " + MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event queues like SQS, " + "PubSub can return empty responses even when messages are available the queue, this config ensures we don't wait forever " + "to consume MAX_MESSAGES_CONF messages, but time out and move on further."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java index 897771168ed..fdd3c8f49f3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java @@ -49,7 +49,7 @@ import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.config.CloudSourceConfig.ACK_MESSAGES; import static org.apache.hudi.utilities.config.CloudSourceConfig.BATCH_SIZE_CONF; -import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_MS; +import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_SECS; import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_NUM_MESSAGES_PER_SYNC; import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.GOOGLE_PROJECT_ID; import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID; @@ -121,7 +121,7 @@ public class GcsEventsSource extends RowSource { getStringWithAltKeys(props, PUBSUB_SUBSCRIPTION_ID), getIntWithAltKeys(props, BATCH_SIZE_CONF), getIntWithAltKeys(props, MAX_NUM_MESSAGES_PER_SYNC), - getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_MS)) + getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_SECS)) ); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java index 3b574045d7a..506e312608d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java @@ -56,7 +56,7 @@ public class PubsubMessagesFetcher { private final int batchSize; private final int maxMessagesPerSync; - private final long maxFetchTimePerSync; + private final long maxFetchTimePerSyncSecs; private final SubscriberStubSettings subscriberStubSettings; private final PubsubQueueClient pubsubQueueClient; @@ -64,13 +64,13 @@ public class PubsubMessagesFetcher { public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize, int maxMessagesPerSync, - long maxFetchTimePerSync, + long maxFetchTimePerSyncSecs, PubsubQueueClient pubsubQueueClient) { this.googleProjectId = googleProjectId; this.pubsubSubscriptionId = pubsubSubscriptionId; this.batchSize = batchSize; this.maxMessagesPerSync = maxMessagesPerSync; - this.maxFetchTimePerSync = maxFetchTimePerSync; + this.maxFetchTimePerSyncSecs = maxFetchTimePerSyncSecs; try { /** For details of timeout and retry configs, @@ -94,13 +94,13 @@ public class PubsubMessagesFetcher { String pubsubSubscriptionId, int batchSize, int maxMessagesPerSync, - long maxFetchTimePerSync) { + long maxFetchTimePerSyncSecs) { this( googleProjectId, pubsubSubscriptionId, batchSize, maxMessagesPerSync, - maxFetchTimePerSync, + maxFetchTimePerSyncSecs, new PubsubQueueClient() ); } @@ -112,7 +112,8 @@ public class PubsubMessagesFetcher { long startTime = System.currentTimeMillis(); long unAckedMessages = pubsubQueueClient.getNumUnAckedMessages(this.pubsubSubscriptionId); LOG.info("Found unacked messages " + unAckedMessages); - while (messageList.size() < unAckedMessages && messageList.size() < maxMessagesPerSync && (System.currentTimeMillis() - startTime < maxFetchTimePerSync)) { + while (messageList.size() < unAckedMessages && messageList.size() < maxMessagesPerSync + && ((System.currentTimeMillis() - startTime) < (maxFetchTimePerSyncSecs * 1000))) { PullResponse pullResponse = pubsubQueueClient.makePullRequest(subscriber, subscriptionName, batchSize); messageList.addAll(pullResponse.getReceivedMessagesList()); }