This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 17cb11b7c5 Flink: Add maxSleepTimeMs and retryPolicyName to
ZkLockFactory to support multiple retry policies. (#14243)
17cb11b7c5 is described below
commit 17cb11b7c503fe39a9905a6142f829e0c8a80306
Author: slfan1989 <[email protected]>
AuthorDate: Tue Oct 21 21:04:24 2025 +0800
Flink: Add maxSleepTimeMs and retryPolicyName to ZkLockFactory to support
multiple retry policies. (#14243)
---
.baseline/checkstyle/checkstyle-suppressions.xml | 1 +
docs/docs/flink-maintenance.md | 2 +
.../iceberg/flink/maintenance/api/LockConfig.java | 42 ++++++++++++
.../flink/maintenance/api/ZKRetryPolicies.java} | 40 ++++--------
.../flink/maintenance/api/ZkLockFactory.java | 50 +++++++++++++-
.../maintenance/operator/LockFactoryBuilder.java | 4 +-
.../flink/maintenance/api/TestZkLockFactory.java | 76 +++++++++++++++++++++-
7 files changed, 182 insertions(+), 33 deletions(-)
diff --git a/.baseline/checkstyle/checkstyle-suppressions.xml
b/.baseline/checkstyle/checkstyle-suppressions.xml
index 906251411a..1f180e40a1 100644
--- a/.baseline/checkstyle/checkstyle-suppressions.xml
+++ b/.baseline/checkstyle/checkstyle-suppressions.xml
@@ -51,6 +51,7 @@
<!-- Allow using Flink's shaded Curator dependency -->
<suppress files="org.apache.iceberg.flink.maintenance.api.ZkLockFactory"
id="BanShadedClasses"/>
+ <suppress
files="org.apache.iceberg.flink.maintenance.api.TestZkLockFactory"
id="BanShadedClasses"/>
<!-- Suppress checks for CometColumnReader -->
<suppress
files="org.apache.iceberg.spark.data.vectorized.CometColumnReader"
checks="IllegalImport"/>
diff --git a/docs/docs/flink-maintenance.md b/docs/docs/flink-maintenance.md
index 5d97062c1f..1af18fbb2a 100644
--- a/docs/docs/flink-maintenance.md
+++ b/docs/docs/flink-maintenance.md
@@ -378,6 +378,8 @@ These keys are used in SQL (SET or table WITH options) and
are applicable when w
| `flink-maintenance.lock.zookeeper.connection-timeout-ms` | Connection
timeout (ms) | `15000` |
| `flink-maintenance.lock.zookeeper.max-retries` | Max retries | `3` |
| `flink-maintenance.lock.zookeeper.base-sleep-ms` | Base sleep between
retries (ms) | `3000` |
+| `flink-maintenance.lock.zookeeper.max-sleep-ms` | Maximum sleep
time (ms) between retries. Caps the exponential backoff delay. | `10000` |
+| `flink-maintenance.lock.zookeeper.retry-policy` | Retry policy name
for ZooKeeper client. Supported values include: ONE_TIME, N_TIME,
BOUNDED_EXPONENTIAL_BACKOFF, UNTIL_ELAPSED, EXPONENTIAL_BACKOFF. |
`EXPONENTIAL_BACKOFF` |
### Best Practices
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
index b28731f91c..9000abe3a9 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/LockConfig.java
@@ -94,6 +94,28 @@ public class LockConfig {
.intType()
.defaultValue(3)
.withDescription("The maximum number of retries for the Zookeeper
client.");
+
+ /**
+ * The maximum sleep time (in milliseconds) between retries for the
Zookeeper client. Default:
+ * 10000 ms
+ */
+ public static final ConfigOption<Integer> ZK_MAX_SLEEP_MS_OPTION =
+ ConfigOptions.key(PREFIX + ZK + ".max-sleep-ms")
+ .intType()
+ .defaultValue(10000)
+ .withDescription(
+ "The maximum sleep time (in milliseconds) between retries for
the Zookeeper client. (Default: 10000)");
+
+ /**
+ * The retry policy name for the Zookeeper client. Supported values might
include:
+ * "exponential-backoff", "fixed", etc. Default: "exponential-backoff"
+ */
+ public static final ConfigOption<ZKRetryPolicies> ZK_RETRY_POLICY_OPTION =
+ ConfigOptions.key(PREFIX + ZK + ".retry-policy")
+ .enumType(ZKRetryPolicies.class)
+ .defaultValue(ZKRetryPolicies.EXPONENTIAL_BACKOFF)
+ .withDescription(
+ "The retry policy for the Zookeeper client. (Default:
EXPONENTIAL_BACKOFF)");
}
private final FlinkConfParser confParser;
@@ -202,6 +224,26 @@ public class LockConfig {
.parse();
}
+ /** Gets the Zookeeper maximum sleep time configuration (in milliseconds). */
+ public int zkMaxSleepMs() {
+ return confParser
+ .intConf()
+ .option(ZkLockConfig.ZK_MAX_SLEEP_MS_OPTION.key())
+ .flinkConfig(ZkLockConfig.ZK_MAX_SLEEP_MS_OPTION)
+ .defaultValue(ZkLockConfig.ZK_MAX_SLEEP_MS_OPTION.defaultValue())
+ .parse();
+ }
+
+ /** Gets the Zookeeper retry policy configuration. */
+ public ZKRetryPolicies zkRetryPolicy() {
+ return confParser
+ .enumConfParser(ZKRetryPolicies.class)
+ .option(ZkLockConfig.ZK_RETRY_POLICY_OPTION.key())
+ .flinkConfig(ZkLockConfig.ZK_RETRY_POLICY_OPTION)
+ .defaultValue(ZKRetryPolicies.EXPONENTIAL_BACKOFF)
+ .parse();
+ }
+
public Map<String, String> properties() {
Map<String, String> mergeConfig = Maps.newHashMap();
mergeConfig.putAll(setProperties);
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZKRetryPolicies.java
similarity index 53%
copy from
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
copy to
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZKRetryPolicies.java
index f1313c89ae..f7daf74928 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZKRetryPolicies.java
@@ -18,37 +18,19 @@
*/
package org.apache.iceberg.flink.maintenance.api;
-import java.io.IOException;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+public enum ZKRetryPolicies {
+ /** A retry policy that retries only once */
+ ONE_TIME,
-public class TestZkLockFactory extends TestLockFactoryBase {
+ /** A retry policy that retries a max number of times */
+ N_TIME,
- private TestingServer zkTestServer;
+ /** A retry policy that retries a set number of times with increasing sleep
time */
+ EXPONENTIAL_BACKOFF,
- @Override
- TriggerLockFactory lockFactory(String tableName) {
- return new ZkLockFactory(zkTestServer.getConnectString(), tableName, 5000,
3000, 1000, 3);
- }
+ /** A retry policy that retries with exponential backoff up to a max sleep
time */
+ BOUNDED_EXPONENTIAL_BACKOFF,
- @BeforeEach
- @Override
- void before() {
- try {
- zkTestServer = new TestingServer();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- super.before();
- }
-
- @AfterEach
- public void after() throws IOException {
- super.after();
- if (zkTestServer != null) {
- zkTestServer.close();
- }
- }
+ /** A retry policy that retries until a given amount of time elapses */
+ UNTIL_ELAPSED;
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
index 539ba6b297..27c0a43a85 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java
@@ -20,11 +20,17 @@ package org.apache.iceberg.flink.maintenance.api;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import
org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.shaded.curator5.org.apache.curator.RetryPolicy;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedCount;
import
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.VersionedValue;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
import
org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryNTimes;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryOneTime;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryUntilElapsed;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +47,8 @@ public class ZkLockFactory implements TriggerLockFactory {
private final int connectionTimeoutMs;
private final int baseSleepTimeMs;
private final int maxRetries;
+ private final ZKRetryPolicies retryPolicy;
+ private final int maxSleepTimeMs;
private transient CuratorFramework client;
private transient SharedCount taskSharedCount;
private transient SharedCount recoverySharedCount;
@@ -55,6 +63,8 @@ public class ZkLockFactory implements TriggerLockFactory {
* @param connectionTimeoutMs Connection timeout in milliseconds
* @param baseSleepTimeMs Base sleep time in milliseconds
* @param maxRetries Maximum number of retries
+ * @param retryPolicy The retry policy enum defining the Curator retry
behavior.
+ * @param maxSleepTimeMs The maximum sleep time (ms) between retries.
*/
public ZkLockFactory(
String connectString,
@@ -62,7 +72,9 @@ public class ZkLockFactory implements TriggerLockFactory {
int sessionTimeoutMs,
int connectionTimeoutMs,
int baseSleepTimeMs,
- int maxRetries) {
+ int maxRetries,
+ ZKRetryPolicies retryPolicy,
+ int maxSleepTimeMs) {
Preconditions.checkNotNull(connectString, "Zookeeper connection string
cannot be null");
Preconditions.checkNotNull(lockId, "Lock ID cannot be null");
Preconditions.checkArgument(
@@ -75,12 +87,22 @@ public class ZkLockFactory implements TriggerLockFactory {
baseSleepTimeMs >= 0, "Base sleep time must be positive, got: %s",
baseSleepTimeMs);
Preconditions.checkArgument(
maxRetries >= 0, "Max retries must be non-negative, got: %s",
maxRetries);
+ Preconditions.checkArgument(
+ maxSleepTimeMs >= 0, "Max sleep time must be positive, got: %s",
maxSleepTimeMs);
+ Preconditions.checkArgument(
+ maxSleepTimeMs >= baseSleepTimeMs,
+ "Max sleep time (%s ms) must be greater than or equal to base sleep
time (%s ms)",
+ maxSleepTimeMs,
+ baseSleepTimeMs);
+
this.connectString = connectString;
this.lockId = lockId;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxRetries = maxRetries;
+ this.retryPolicy = retryPolicy;
+ this.maxSleepTimeMs = maxSleepTimeMs;
}
@Override
@@ -95,7 +117,7 @@ public class ZkLockFactory implements TriggerLockFactory {
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
- .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs,
maxRetries))
+ .retryPolicy(createRetryPolicy())
.build();
client.start();
@@ -222,4 +244,28 @@ public class ZkLockFactory implements TriggerLockFactory {
}
}
}
+
+ @VisibleForTesting
+ RetryPolicy createRetryPolicy() {
+ ZKRetryPolicies effectivePolicy =
+ (retryPolicy == null) ? ZKRetryPolicies.EXPONENTIAL_BACKOFF :
retryPolicy;
+
+ switch (effectivePolicy) {
+ case ONE_TIME:
+ return new RetryOneTime(baseSleepTimeMs);
+
+ case N_TIME:
+ return new RetryNTimes(maxRetries, baseSleepTimeMs);
+
+ case BOUNDED_EXPONENTIAL_BACKOFF:
+ return new BoundedExponentialBackoffRetry(baseSleepTimeMs,
maxSleepTimeMs, maxRetries);
+
+ case UNTIL_ELAPSED:
+ return new RetryUntilElapsed(maxSleepTimeMs, baseSleepTimeMs);
+
+ case EXPONENTIAL_BACKOFF:
+ default:
+ return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
+ }
+ }
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
index ea91f13376..1c4f0f7a96 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockFactoryBuilder.java
@@ -82,6 +82,8 @@ public class LockFactoryBuilder {
lockConfig.zkSessionTimeoutMs(),
lockConfig.zkConnectionTimeoutMs(),
lockConfig.zkBaseSleepMs(),
- lockConfig.zkMaxRetries());
+ lockConfig.zkMaxRetries(),
+ lockConfig.zkRetryPolicy(),
+ lockConfig.zkMaxSleepMs());
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
index f1313c89ae..b51aaddc24 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java
@@ -18,10 +18,30 @@
*/
package org.apache.iceberg.flink.maintenance.api;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
import java.io.IOException;
+import java.util.Map;
+import java.util.stream.Stream;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.curator5.org.apache.curator.RetryPolicy;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryNTimes;
+import org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryOneTime;
+import
org.apache.flink.shaded.curator5.org.apache.curator.retry.RetryUntilElapsed;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.NullSource;
+import org.junit.jupiter.params.provider.ValueSource;
public class TestZkLockFactory extends TestLockFactoryBase {
@@ -29,7 +49,15 @@ public class TestZkLockFactory extends TestLockFactoryBase {
@Override
TriggerLockFactory lockFactory(String tableName) {
- return new ZkLockFactory(zkTestServer.getConnectString(), tableName, 5000,
3000, 1000, 3);
+ return new ZkLockFactory(
+ zkTestServer.getConnectString(),
+ tableName,
+ 5000,
+ 3000,
+ 1000,
+ 3,
+ ZKRetryPolicies.EXPONENTIAL_BACKOFF,
+ 2000);
}
@BeforeEach
@@ -51,4 +79,50 @@ public class TestZkLockFactory extends TestLockFactoryBase {
zkTestServer.close();
}
}
+
+ private static Stream<Arguments> retryPolicyProvider() {
+ return Stream.of(
+ Arguments.of(ZKRetryPolicies.ONE_TIME, RetryOneTime.class),
+ Arguments.of(ZKRetryPolicies.N_TIME, RetryNTimes.class),
+ Arguments.of(ZKRetryPolicies.EXPONENTIAL_BACKOFF,
ExponentialBackoffRetry.class),
+ Arguments.of(
+ ZKRetryPolicies.BOUNDED_EXPONENTIAL_BACKOFF,
BoundedExponentialBackoffRetry.class),
+ Arguments.of(ZKRetryPolicies.UNTIL_ELAPSED, RetryUntilElapsed.class));
+ }
+
+ @ParameterizedTest(name = "{0} should create {1}")
+ @MethodSource("retryPolicyProvider")
+ @DisplayName(
+ "Verify ZkLockFactory creates correct Curator RetryPolicy for each
ZKRetryPolicies enum")
+ void testRetryPolicyCreationAndType(
+ ZKRetryPolicies policy, Class<? extends RetryPolicy> expectedClass) {
+ ZkLockFactory factory =
+ new ZkLockFactory("localhost:2181", "test", 3000, 3000, 1000, 3,
policy, 2000);
+
+ RetryPolicy retryPolicy = factory.createRetryPolicy();
+
+ assertThat(retryPolicy)
+ .as("RetryPolicy should not be null for policy %s", policy)
+ .isNotNull()
+ .as("Expected %s for policy %s", expectedClass.getSimpleName(), policy)
+ .isInstanceOf(expectedClass);
+ }
+
+ @ParameterizedTest
+ @NullSource
+ @ValueSource(strings = {"", "non_existing_policy"})
+ void testInvalidOrMissingRetryPolicyFallsBackToDefault(String
retryPolicyConfig) {
+ Map<String, String> options = Maps.newHashMap();
+ if (retryPolicyConfig != null) {
+ options.put("iceberg.maintenance.lock.zookeeper.retry-policy",
retryPolicyConfig);
+ }
+
+ LockConfig config = new LockConfig(mock(Table.class), options, new
Configuration());
+
+ ZKRetryPolicies policy = config.zkRetryPolicy();
+
+ assertThat(policy)
+ .as("Invalid, empty, or missing retry-policy should fallback to
EXPONENTIAL_BACKOFF")
+ .isEqualTo(ZKRetryPolicies.EXPONENTIAL_BACKOFF);
+ }
}