This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 17cb11b7c5 Flink: Add maxSleepTimeMs and retryPolicyName to 
ZkLockFactory to support multiple retry policies. (#14243)
17cb11b7c5 is described below

commit 17cb11b7c503fe39a9905a6142f829e0c8a80306
Author: slfan1989 <[email protected]>
AuthorDate: Tue Oct 21 21:04:24 2025 +0800

    Flink: Add maxSleepTimeMs and retryPolicyName to ZkLockFactory to support 
multiple retry policies. (#14243)
---
 .baseline/checkstyle/checkstyle-suppressions.xml   |  1 +
 docs/docs/flink-maintenance.md                     |  2 +
 .../iceberg/flink/maintenance/api/LockConfig.java  | 42 ++++++++++++
 .../flink/maintenance/api/ZKRetryPolicies.java}    | 40 ++++--------
 .../flink/maintenance/api/ZkLockFactory.java       | 50 +++++++++++++-
 .../maintenance/operator/LockFactoryBuilder.java   |  4 +-
 .../flink/maintenance/api/TestZkLockFactory.java   | 76 +++++++++++++++++++++-
 7 files changed, 182 insertions(+), 33 deletions(-)

diff --git a/.baseline/checkstyle/checkstyle-suppressions.xml 
b/.baseline/checkstyle/checkstyle-suppressions.xml
index 906251411a..1f180e40a1 100644
--- a/.baseline/checkstyle/checkstyle-suppressions.xml
+++ b/.baseline/checkstyle/checkstyle-suppressions.xml
@@ -51,6 +51,7 @@
 
     <!-- Allow using Flink's shaded Curator dependency -->
     <suppress files="org.apache.iceberg.flink.maintenance.api.ZkLockFactory" 
id="BanShadedClasses"/>
+    <suppress 
files="org.apache.iceberg.flink.maintenance.api.TestZkLockFactory" 
id="BanShadedClasses"/>
 
     <!-- Suppress checks for CometColumnReader -->
     <suppress 
files="org.apache.iceberg.spark.data.vectorized.CometColumnReader" 
checks="IllegalImport"/>
diff --git a/docs/docs/flink-maintenance.md b/docs/docs/flink-maintenance.md
index 5d97062c1f..1af18fbb2a 100644
--- a/docs/docs/flink-maintenance.md
+++ b/docs/docs/flink-maintenance.md
@@ -378,6 +378,8 @@ These keys are used in SQL (SET or table WITH options) and 
are applicable when w
 | `flink-maintenance.lock.zookeeper.connection-timeout-ms` | Connection 
timeout (ms) | `15000` |
 | `flink-maintenance.lock.zookeeper.max-retries` | Max retries | `3` |
 | `flink-maintenance.lock.zookeeper.base-sleep-ms` | Base sleep between 
retries (ms) | `3000` |
+| `flink-maintenance.lock.zookeeper.max-sleep-ms`          | Maximum sleep 
time (ms) between retries. Caps the exponential backoff delay.  | `10000` |
+| `flink-maintenance.lock.zookeeper.retry-policy`          | Retry policy name 
for ZooKeeper client. Supported values include: ONE_TIME, N_TIME, 
BOUNDED_EXPONENTIAL_BACKOFF, UNTIL_ELAPSED, EXPONENTIAL_BACKOFF.  | 
`EXPONENTIAL_BACKOFF` |
 
 ### Best Practices
 
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
index b28731f91c..9000abe3a9 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
@@ -94,6 +94,28 @@ public class LockConfig {
             .intType()
             .defaultValue(3)
             .withDescription("The maximum number of retries for the Zookeeper 
client.");
+
+    /**
+     * The maximum sleep time (in milliseconds) between retries for the 
Zookeeper client. Default:
+     * 10000 ms
+     */
+    public static final ConfigOption<Integer> ZK_MAX_SLEEP_MS_OPTION =
+        ConfigOptions.key(PREFIX + ZK + ".max-sleep-ms")
+            .intType()
+            .defaultValue(10000)
+            .withDescription(
+                "The maximum sleep time (in milliseconds) between retries for 
the Zookeeper client. (Default: 10000)");
+
+    /**
+     * The retry policy name for the Zookeeper client. Supported values might 
include:
+     * "exponential-backoff", "fixed", etc. Default: "exponential-backoff"
+     */
+    public static final ConfigOption<ZKRetryPolicies> ZK_RETRY_POLICY_OPTION =
+        ConfigOptions.key(PREFIX + ZK + ".retry-policy")
+            .enumType(ZKRetryPolicies.class)
+            .defaultValue(ZKRetryPolicies.EXPONENTIAL_BACKOFF)
+            .withDescription(
+                "The retry policy for the Zookeeper client. (Default: 
EXPONENTIAL_BACKOFF)");
   }
 
   private final FlinkConfParser confParser;
@@ -202,6 +224,26 @@ public class LockConfig {
         .parse();
   }
 
+  /** Gets the Zookeeper maximum sleep time configuration (in milliseconds). */
+  public int zkMaxSleepMs() {
+    return confParser
+        .intConf()
+        .option(ZkLockConfig.ZK_MAX_SLEEP_MS_OPTION.key())
+        .flinkConfig(ZkLockConfig.ZK_MAX_SLEEP_MS_OPTION)
+        .defaultValue(ZkLockConfig.ZK_MAX_SLEEP_MS_OPTION.defaultValue())
+        .parse();
+  }
+
+  /** Gets the Zookeeper retry policy configuration. */
+  public ZKRetryPolicies zkRetryPolicy() {
+    return confParser
+        .enumConfParser(ZKRetryPolicies.class)
+        .option(ZkLockConfig.ZK_RETRY_POLICY_OPTION.key())
+        .flinkConfig(ZkLockConfig.ZK_RETRY_POLICY_OPTION)
+        .defaultValue(ZKRetryPolicies.EXPONENTIAL_BACKOFF)
+        .parse();
+  }
+
   public Map<String, String> properties() {
     Map<String, String> mergeConfig = Maps.newHashMap();
     mergeConfig.putAll(setProperties);
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZKRetryPolicies.java
similarity index 53%
copy from 
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
copy to 
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZKRetryPolicies.java
index f1313c89ae..f7daf74928 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZKRetryPolicies.java
@@ -18,37 +18,19 @@
  */
 package org.apache.iceberg.flink.maintenance.api;
 
-import java.io.IOException;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+public enum ZKRetryPolicies {
+  /** A retry policy that retries only once */
+  ONE_TIME,
 
-public class TestZkLockFactory extends TestLockFactoryBase {
+  /** A retry policy that retries a max number of times */
+  N_TIME,
 
-  private TestingServer zkTestServer;
+  /** A retry policy that retries a set number of times with increasing sleep 
time */
+  EXPONENTIAL_BACKOFF,
 
-  @Override
-  TriggerLockFactory lockFactory(String tableName) {
-    return new ZkLockFactory(zkTestServer.getConnectString(), tableName, 5000, 
3000, 1000, 3);
-  }
+  /** A retry policy that retries with exponential backoff up to a max sleep 
time */
+  BOUNDED_EXPONENTIAL_BACKOFF,
 
-  @BeforeEach
-  @Override
-  void before() {
-    try {
-      zkTestServer = new TestingServer();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    super.before();
-  }
-
-  @AfterEach
-  public void after() throws IOException {
-    super.after();
-    if (zkTestServer != null) {
-      zkTestServer.close();
-    }
-  }
+  /** A retry policy that retries until a given amount of time elapses */
+  UNTIL_ELAPSED;
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
index 539ba6b297..27c0a43a85 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
@@ -20,11 +20,17 @@ package org.apache.iceberg.flink.maintenance.api;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import 
org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.shaded.curator5.org.apache.curator.RetryPolicy;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedCount;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.VersionedValue;
+import 
org.apache.flink.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryNTimes;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryOneTime;
+import 
org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryUntilElapsed;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +47,8 @@ public class ZkLockFactory implements TriggerLockFactory {
   private final int connectionTimeoutMs;
   private final int baseSleepTimeMs;
   private final int maxRetries;
+  private final ZKRetryPolicies retryPolicy;
+  private final int maxSleepTimeMs;
   private transient CuratorFramework client;
   private transient SharedCount taskSharedCount;
   private transient SharedCount recoverySharedCount;
@@ -55,6 +63,8 @@ public class ZkLockFactory implements TriggerLockFactory {
    * @param connectionTimeoutMs Connection timeout in milliseconds
    * @param baseSleepTimeMs Base sleep time in milliseconds
    * @param maxRetries Maximum number of retries
+   * @param retryPolicy The retry policy enum defining the Curator retry 
behavior.
+   * @param maxSleepTimeMs The maximum sleep time (ms) between retries.
    */
   public ZkLockFactory(
       String connectString,
@@ -62,7 +72,9 @@ public class ZkLockFactory implements TriggerLockFactory {
       int sessionTimeoutMs,
       int connectionTimeoutMs,
       int baseSleepTimeMs,
-      int maxRetries) {
+      int maxRetries,
+      ZKRetryPolicies retryPolicy,
+      int maxSleepTimeMs) {
     Preconditions.checkNotNull(connectString, "Zookeeper connection string 
cannot be null");
     Preconditions.checkNotNull(lockId, "Lock ID cannot be null");
     Preconditions.checkArgument(
@@ -75,12 +87,22 @@ public class ZkLockFactory implements TriggerLockFactory {
         baseSleepTimeMs >= 0, "Base sleep time must be positive, got: %s", 
baseSleepTimeMs);
     Preconditions.checkArgument(
         maxRetries >= 0, "Max retries must be non-negative, got: %s", 
maxRetries);
+    Preconditions.checkArgument(
+        maxSleepTimeMs >= 0, "Max sleep time must be positive, got: %s", 
maxSleepTimeMs);
+    Preconditions.checkArgument(
+        maxSleepTimeMs >= baseSleepTimeMs,
+        "Max sleep time (%s ms) must be greater than or equal to base sleep 
time (%s ms)",
+        maxSleepTimeMs,
+        baseSleepTimeMs);
+
     this.connectString = connectString;
     this.lockId = lockId;
     this.sessionTimeoutMs = sessionTimeoutMs;
     this.connectionTimeoutMs = connectionTimeoutMs;
     this.baseSleepTimeMs = baseSleepTimeMs;
     this.maxRetries = maxRetries;
+    this.retryPolicy = retryPolicy;
+    this.maxSleepTimeMs = maxSleepTimeMs;
   }
 
   @Override
@@ -95,7 +117,7 @@ public class ZkLockFactory implements TriggerLockFactory {
             .connectString(connectString)
             .sessionTimeoutMs(sessionTimeoutMs)
             .connectionTimeoutMs(connectionTimeoutMs)
-            .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, 
maxRetries))
+            .retryPolicy(createRetryPolicy())
             .build();
     client.start();
 
@@ -222,4 +244,28 @@ public class ZkLockFactory implements TriggerLockFactory {
       }
     }
   }
+
+  @VisibleForTesting
+  RetryPolicy createRetryPolicy() {
+    ZKRetryPolicies effectivePolicy =
+        (retryPolicy == null) ? ZKRetryPolicies.EXPONENTIAL_BACKOFF : 
retryPolicy;
+
+    switch (effectivePolicy) {
+      case ONE_TIME:
+        return new RetryOneTime(baseSleepTimeMs);
+
+      case N_TIME:
+        return new RetryNTimes(maxRetries, baseSleepTimeMs);
+
+      case BOUNDED_EXPONENTIAL_BACKOFF:
+        return new BoundedExponentialBackoffRetry(baseSleepTimeMs, 
maxSleepTimeMs, maxRetries);
+
+      case UNTIL_ELAPSED:
+        return new RetryUntilElapsed(maxSleepTimeMs, baseSleepTimeMs);
+
+      case EXPONENTIAL_BACKOFF:
+      default:
+        return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
+    }
+  }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
index ea91f13376..1c4f0f7a96 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
@@ -82,6 +82,8 @@ public class LockFactoryBuilder {
         lockConfig.zkSessionTimeoutMs(),
         lockConfig.zkConnectionTimeoutMs(),
         lockConfig.zkBaseSleepMs(),
-        lockConfig.zkMaxRetries());
+        lockConfig.zkMaxRetries(),
+        lockConfig.zkRetryPolicy(),
+        lockConfig.zkMaxSleepMs());
   }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
index f1313c89ae..b51aaddc24 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
@@ -18,10 +18,30 @@
  */
 package org.apache.iceberg.flink.maintenance.api;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
 import java.io.IOException;
+import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.curator5.org.apache.curator.RetryPolicy;
+import 
org.apache.flink.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import 
org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryNTimes;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryOneTime;
+import 
org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryUntilElapsed;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.NullSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 public class TestZkLockFactory extends TestLockFactoryBase {
 
@@ -29,7 +49,15 @@ public class TestZkLockFactory extends TestLockFactoryBase {
 
   @Override
   TriggerLockFactory lockFactory(String tableName) {
-    return new ZkLockFactory(zkTestServer.getConnectString(), tableName, 5000, 
3000, 1000, 3);
+    return new ZkLockFactory(
+        zkTestServer.getConnectString(),
+        tableName,
+        5000,
+        3000,
+        1000,
+        3,
+        ZKRetryPolicies.EXPONENTIAL_BACKOFF,
+        2000);
   }
 
   @BeforeEach
@@ -51,4 +79,50 @@ public class TestZkLockFactory extends TestLockFactoryBase {
       zkTestServer.close();
     }
   }
+
+  private static Stream<Arguments> retryPolicyProvider() {
+    return Stream.of(
+        Arguments.of(ZKRetryPolicies.ONE_TIME, RetryOneTime.class),
+        Arguments.of(ZKRetryPolicies.N_TIME, RetryNTimes.class),
+        Arguments.of(ZKRetryPolicies.EXPONENTIAL_BACKOFF, 
ExponentialBackoffRetry.class),
+        Arguments.of(
+            ZKRetryPolicies.BOUNDED_EXPONENTIAL_BACKOFF, 
BoundedExponentialBackoffRetry.class),
+        Arguments.of(ZKRetryPolicies.UNTIL_ELAPSED, RetryUntilElapsed.class));
+  }
+
+  @ParameterizedTest(name = "{0} should create {1}")
+  @MethodSource("retryPolicyProvider")
+  @DisplayName(
+      "Verify ZkLockFactory creates correct Curator RetryPolicy for each 
ZKRetryPolicies enum")
+  void testRetryPolicyCreationAndType(
+      ZKRetryPolicies policy, Class<? extends RetryPolicy> expectedClass) {
+    ZkLockFactory factory =
+        new ZkLockFactory("localhost:2181", "test", 3000, 3000, 1000, 3, 
policy, 2000);
+
+    RetryPolicy retryPolicy = factory.createRetryPolicy();
+
+    assertThat(retryPolicy)
+        .as("RetryPolicy should not be null for policy %s", policy)
+        .isNotNull()
+        .as("Expected %s for policy %s", expectedClass.getSimpleName(), policy)
+        .isInstanceOf(expectedClass);
+  }
+
+  @ParameterizedTest
+  @NullSource
+  @ValueSource(strings = {"", "non_existing_policy"})
+  void testInvalidOrMissingRetryPolicyFallsBackToDefault(String 
retryPolicyConfig) {
+    Map<String, String> options = Maps.newHashMap();
+    if (retryPolicyConfig != null) {
+      options.put("iceberg.maintenance.lock.zookeeper.retry-policy", 
retryPolicyConfig);
+    }
+
+    LockConfig config = new LockConfig(mock(Table.class), options, new 
Configuration());
+
+    ZKRetryPolicies policy = config.zkRetryPolicy();
+
+    assertThat(policy)
+        .as("Invalid, empty, or missing retry-policy should fallback to 
EXPONENTIAL_BACKOFF")
+        .isEqualTo(ZKRetryPolicies.EXPONENTIAL_BACKOFF);
+  }
 }

Reply via email to