This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b270551b Core: Add timeout and retry logic to Azure token fetch 
(#3113)
5b270551b is described below

commit 5b270551b7b1453ab962b2bd7c753ae2bc6da953
Author: fivetran-rahulprakash 
<[email protected]>
AuthorDate: Mon Dec 8 12:38:13 2025 +0530

    Core: Add timeout and retry logic to Azure token fetch (#3113)
---
 .../polaris/core/config/FeatureConfiguration.java  | 39 +++++++++
 .../azure/AzureCredentialsStorageIntegration.java  | 97 +++++++++++++++++++++-
 2 files changed, 133 insertions(+), 3 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 1772f4725..b843fea58 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -438,4 +438,43 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
               "If set to true (default), allow credential vending for external 
catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be 
true first.")
           .defaultValue(true)
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Integer> AZURE_TIMEOUT_MILLIS =
+      PolarisConfiguration.<Integer>builder()
+          .key("AZURE_TIMEOUT_MILLIS")
+          .description(
+              "Timeout in milliseconds for Azure API requests. "
+                  + "Prevents indefinite blocking when Azure endpoints are 
slow or unresponsive. "
+                  + "Used internally by Azure storage integration for 
credential vending and other operations.")
+          .defaultValue(15000)
+          .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Integer> AZURE_RETRY_COUNT =
+      PolarisConfiguration.<Integer>builder()
+          .key("AZURE_RETRY_COUNT")
+          .description(
+              "Number of retry attempts for Azure API requests. "
+                  + "Uses exponential backoff with jitter to handle transient 
failures.")
+          .defaultValue(3)
+          .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Integer> AZURE_RETRY_DELAY_MILLIS =
+      PolarisConfiguration.<Integer>builder()
+          .key("AZURE_RETRY_DELAY_MILLIS")
+          .description(
+              "Initial delay in milliseconds before first retry for Azure API 
requests. "
+                  + "Delay doubles with each retry (exponential backoff).")
+          .defaultValue(2000)
+          .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Double> AZURE_RETRY_JITTER_FACTOR =
+      PolarisConfiguration.<Double>builder()
+          .key("AZURE_RETRY_JITTER_FACTOR")
+          .description(
+              "Jitter factor (0.0 to 1.0) applied to retry delays for Azure 
API requests. "
+                  + "The jitter is applied as a random percentage of the 
computed exponential backoff delay. "
+                  + "For example, 0.5 means up to 50%% random jitter will be 
added to each retry delay. "
+                  + "Helps prevent thundering herd when multiple requests fail 
simultaneously.")
+          .defaultValue(0.5)
+          .buildFeatureConfiguration();
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
index 0b189b311..7763178b9 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
@@ -18,6 +18,10 @@
  */
 package org.apache.polaris.core.storage.azure;
 
+import static 
org.apache.polaris.core.config.FeatureConfiguration.AZURE_RETRY_COUNT;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.AZURE_RETRY_DELAY_MILLIS;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.AZURE_RETRY_JITTER_FACTOR;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.AZURE_TIMEOUT_MILLIS;
 import static 
org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS;
 
 import com.azure.core.credential.AccessToken;
@@ -39,6 +43,7 @@ import 
com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
 import com.azure.storage.file.datalake.sas.PathSasPermission;
 import com.google.common.annotations.VisibleForTesting;
 import jakarta.annotation.Nonnull;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.OffsetDateTime;
 import java.time.Period;
@@ -55,6 +60,7 @@ import org.apache.polaris.core.storage.StorageAccessProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 /** Azure credential vendor that supports generating SAS token */
 public class AzureCredentialsStorageIntegration
@@ -120,7 +126,7 @@ public class AzureCredentialsStorageIntegration
         OffsetDateTime.ofInstant(
             start.plusSeconds(3600), ZoneOffset.UTC); // 1 hr to sync with AWS 
and GCP Access token
 
-    AccessToken accessToken = getAccessToken(config().getTenantId());
+    AccessToken accessToken = getAccessToken(realmConfig, 
config().getTenantId());
     // Get user delegation key.
     // Set the new generated user delegation key expiry to 7 days and minute 1 
min
     // Azure strictly requires the end time to be <= 7 days from the current 
time, -1 min to avoid
@@ -312,16 +318,101 @@ public class AzureCredentialsStorageIntegration
         });
   }
 
-  private AccessToken getAccessToken(String tenantId) {
+  /**
+   * Fetches an Azure AD access token with timeout and retry logic to handle 
transient failures.
+   *
+   * <p>This access token is used internally to obtain a user delegation key 
from Azure Storage,
+   * which is then used to generate SAS tokens for client credential vending.
+   *
+   * <p>This method implements a defensive strategy against slow or failing 
cloud provider requests:
+   *
+   * <ul>
+   *   <li>Per-attempt timeout (configurable via AZURE_TIMEOUT_MILLIS, default 
15000ms)
+   *   <li>Exponential backoff retry (configurable count and initial delay via 
AZURE_RETRY_COUNT and
+   *       AZURE_RETRY_DELAY_MILLIS, defaults: 3 attempts starting at 2000ms)
+   *   <li>Jitter to prevent thundering herd (configurable via 
AZURE_RETRY_JITTER_FACTOR, default
+   *       0.5 = 50%%)
+   * </ul>
+   *
+   * @param realmConfig the realm configuration to get timeout and retry 
settings
+   * @param tenantId the Azure tenant ID
+   * @return the access token
+   * @throws RuntimeException if token fetch fails after all retries or times 
out
+   */
+  private AccessToken getAccessToken(RealmConfig realmConfig, String tenantId) 
{
+    int timeoutMillis = realmConfig.getConfig(AZURE_TIMEOUT_MILLIS);
+    int retryCount = realmConfig.getConfig(AZURE_RETRY_COUNT);
+    int initialDelayMillis = realmConfig.getConfig(AZURE_RETRY_DELAY_MILLIS);
+    double jitter = realmConfig.getConfig(AZURE_RETRY_JITTER_FACTOR);
+    int maxAttempts = retryCount + 1;
+
     String scope = "https://storage.azure.com/.default";;
     AccessToken accessToken =
         defaultAzureCredential
             .getToken(new 
TokenRequestContext().addScopes(scope).setTenantId(tenantId))
+            .timeout(Duration.ofMillis(timeoutMillis))
+            .doOnError(
+                error ->
+                    LOGGER.warn("Error fetching Azure access token for tenant 
{}", tenantId, error))
+            .retryWhen(
+                Retry.backoff(retryCount, 
Duration.ofMillis(initialDelayMillis))
+                    .jitter(jitter) // Apply jitter factor to computed delay
+                    .filter(this::isRetriableAzureException)
+                    .doBeforeRetry(
+                        retrySignal ->
+                            LOGGER.info(
+                                "Retrying Azure token fetch for tenant {} 
(attempt {}/{})",
+                                tenantId,
+                                retrySignal.totalRetries() + 1,
+                                maxAttempts))
+                    .onRetryExhaustedThrow(
+                        (retryBackoffSpec, retrySignal) ->
+                            new RuntimeException(
+                                String.format(
+                                    "Azure token fetch exhausted after %d 
attempts for tenant %s",
+                                    retrySignal.totalRetries(), tenantId),
+                                retrySignal.failure())))
             .blockOptional()
             .orElse(null);
+
     if (accessToken == null) {
-      throw new RuntimeException("No access token fetched!");
+      throw new RuntimeException(
+          String.format("Failed to fetch Azure access token for tenant %s", 
tenantId));
     }
     return accessToken;
   }
+
+  /**
+   * Determines if an exception is retriable for Azure token requests.
+   *
+   * <p>Retries are attempted for:
+   *
+   * <ul>
+   *   <li>TimeoutException - per-attempt timeout exceeded
+   *   <li>AADSTS50058 - Token endpoint timeout
+   *   <li>AADSTS50078 - Service temporarily unavailable
+   *   <li>AADSTS700084 - Token refresh required
+   *   <li>503 - Service unavailable
+   *   <li>429 - Too many requests (rate limited)
+   * </ul>
+   *
+   * @param throwable the exception to check
+   * @return true if the exception should trigger a retry
+   */
+  private boolean isRetriableAzureException(Throwable throwable) {
+    // Retry on timeout exceptions
+    if (throwable instanceof java.util.concurrent.TimeoutException) {
+      return true;
+    }
+    // Retry on common transient Azure credential exceptions
+    String message = throwable.getMessage();
+    if (message != null) {
+      return message.contains("AADSTS50058") // Token endpoint timeout
+          || message.contains("AADSTS50078") // Service temporarily unavailable
+          || message.contains("AADSTS700084") // Token refresh required
+          || message.contains("503") // Service unavailable
+          || message.contains("429"); // Too many requests
+    }
+    return false;
+  }
 }

Reply via email to