This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1e68a266c [KV] Avoid TabletServer to discard already commited kv
snapshot (#1738)
1e68a266c is described below
commit 1e68a266c37200b1d5b11921555cc1d42c4d7b6c
Author: Yang Wang <[email protected]>
AuthorDate: Sun Sep 28 21:05:43 2025 +0800
[KV] Avoid TabletServer to discard already commited kv snapshot (#1738)
---
.../server/kv/snapshot/KvTabletSnapshotTarget.java | 105 +++++++++-
.../kv/snapshot/RocksIncrementalSnapshot.java | 3 +-
.../org/apache/fluss/server/replica/Replica.java | 1 +
.../kv/snapshot/KvTabletSnapshotTargetTest.java | 227 ++++++++++++++++++++-
4 files changed, 322 insertions(+), 14 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
index d982d486a..c84a26b70 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
@@ -23,6 +23,8 @@ import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.SequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketSnapshot;
import org.apache.fluss.utils.CloseableRegistry;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.FlussPaths;
@@ -30,6 +32,7 @@ import org.apache.fluss.utils.FlussPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
@@ -55,6 +58,8 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter;
+ private final ZooKeeperClient zooKeeperClient;
+
private final RocksIncrementalSnapshot rocksIncrementalSnapshot;
private final FsPath remoteKvTabletDir;
private final FsPath remoteSnapshotSharedDir;
@@ -82,6 +87,7 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
KvTabletSnapshotTarget(
TableBucket tableBucket,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
+ ZooKeeperClient zooKeeperClient,
RocksIncrementalSnapshot rocksIncrementalSnapshot,
FsPath remoteKvTabletDir,
Executor ioExecutor,
@@ -97,6 +103,7 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
this(
tableBucket,
completedKvSnapshotCommitter,
+ zooKeeperClient,
rocksIncrementalSnapshot,
remoteKvTabletDir,
(int)
ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE.defaultValue().getBytes(),
@@ -114,6 +121,7 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
public KvTabletSnapshotTarget(
TableBucket tableBucket,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
+ @Nonnull ZooKeeperClient zooKeeperClient,
RocksIncrementalSnapshot rocksIncrementalSnapshot,
FsPath remoteKvTabletDir,
int snapshotWriteBufferSize,
@@ -129,6 +137,7 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
throws IOException {
this.tableBucket = tableBucket;
this.completedKvSnapshotCommitter = completedKvSnapshotCommitter;
+ this.zooKeeperClient = zooKeeperClient;
this.rocksIncrementalSnapshot = rocksIncrementalSnapshot;
this.remoteKvTabletDir = remoteKvTabletDir;
this.remoteSnapshotSharedDir =
FlussPaths.remoteKvSharedDir(remoteKvTabletDir);
@@ -211,18 +220,13 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
// commit the completed snapshot
completedKvSnapshotCommitter.commitKvSnapshot(
completedSnapshot, coordinatorEpoch, bucketLeaderEpoch);
- // notify the snapshot complete
- rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId);
- logOffsetOfLatestSnapshot = snapshotResult.getLogOffset();
- snapshotSize = snapshotResult.getSnapshotSize();
- // update LogTablet to notify the lowest offset that should be
retained
- updateMinRetainOffset.accept(snapshotResult.getLogOffset());
+ // update local state after successful commit
+ updateStateOnCommitSuccess(snapshotId, snapshotResult);
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
- snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {},
ioExecutor);
- handleSnapshotFailure(snapshotId, snapshotLocation, t);
- // throw the exception to make PeriodicSnapshotManager can catch
the exception
- throw t;
+ // handle the exception with idempotent check
+ handleSnapshotCommitException(
+ snapshotId, snapshotResult, completedSnapshot,
snapshotLocation, t);
}
}
@@ -249,6 +253,87 @@ public class KvTabletSnapshotTarget implements
PeriodicSnapshotManager.SnapshotT
return rocksIncrementalSnapshot;
}
+ /**
+ * Update local state after successful snapshot completion. This includes
notifying RocksDB
+ * about completion, updating latest snapshot offset/size, and notifying
LogTablet about the
+ * minimum offset to retain.
+ */
+ private void updateStateOnCommitSuccess(long snapshotId, SnapshotResult
snapshotResult) {
+ // notify the snapshot complete
+ rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId);
+ logOffsetOfLatestSnapshot = snapshotResult.getLogOffset();
+ snapshotSize = snapshotResult.getSnapshotSize();
+ // update LogTablet to notify the lowest offset that should be retained
+ updateMinRetainOffset.accept(snapshotResult.getLogOffset());
+ }
+
+ /**
+ * Handle snapshot commit exception with idempotent check. This method
implements the fix for
+ * issue #1304 by double-checking ZooKeeper to verify if the snapshot
actually exists before
+ * cleanup.
+ */
+ private void handleSnapshotCommitException(
+ long snapshotId,
+ SnapshotResult snapshotResult,
+ CompletedSnapshot completedSnapshot,
+ SnapshotLocation snapshotLocation,
+ Throwable t)
+ throws Throwable {
+
+ // Fix for issue: https://github.com/apache/fluss/issues/1304
+ // Tablet server try to commit kv snapshot to coordinator server,
+ // coordinator server commit the kv snapshot to zk, then failover.
+ // Tablet server will got exception from coordinator server, but
mistake it as a fail
+ // commit although coordinator server has committed to zk, then
discard the commited kv
+ // snapshot.
+ //
+ // Idempotent check: Double check ZK to verify if the snapshot
actually exists before
+ // cleanup
+ try {
+ Optional<BucketSnapshot> zkSnapshot =
+ zooKeeperClient.getTableBucketSnapshot(tableBucket,
snapshotId);
+ if (zkSnapshot.isPresent()) {
+ // Snapshot exists in ZK, indicating the commit was actually
successful,
+ // just response was lost
+ LOG.warn(
+ "Snapshot {} for TableBucket {} already exists in ZK. "
+ + "The commit was successful but response was
lost due to coordinator failover. "
+ + "Skipping cleanup and treating as
successful.",
+ snapshotId,
+ tableBucket);
+
+ // Update local state as if the commit was successful
+ updateStateOnCommitSuccess(snapshotId, snapshotResult);
+ return; // Snapshot commit succeeded, return directly
+ } else {
+ // Snapshot does not exist in ZK, indicating the commit truly
failed
+ LOG.warn(
+ "Snapshot {} for TableBucket {} does not exist in ZK. "
+ + "The commit truly failed, proceeding with
cleanup.",
+ snapshotId,
+ tableBucket);
+ snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {},
ioExecutor);
+ handleSnapshotFailure(snapshotId, snapshotLocation, t);
+ }
+ } catch (Exception zkException) {
+ LOG.warn(
+ "Failed to query ZK for snapshot {} of TableBucket {}. "
+ + "Cannot determine actual snapshot status,
keeping snapshot in current state "
+ + "to avoid potential data loss.",
+ snapshotId,
+ tableBucket,
+ zkException);
+ // When ZK query fails, we cannot determine the actual status.
+ // The snapshot might have succeeded or failed on the ZK side.
+ // Therefore, we must not clean up the snapshot files and not
update local state.
+ // This avoids the risk of discarding a successfully committed
snapshot that
+ // connectors may already be reading, which would cause data loss
or job failure.
+ }
+
+ // throw the exception to make PeriodicSnapshotManager can catch the
exception
+ throw t;
+ }
+
private SnapshotRunner createSnapshotRunner(CloseableRegistry
cancelStreamRegistry) {
return new SnapshotRunner(rocksIncrementalSnapshot,
cancelStreamRegistry);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
index 0723fd488..fbfa42e1f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
@@ -341,7 +341,8 @@ public class RocksIncrementalSnapshot implements
AutoCloseable {
@Nonnull private final Map<String, KvFileHandle> confirmedSstFiles;
- private PreviousSnapshot(@Nullable
Collection<KvFileHandleAndLocalPath> confirmedSstFiles) {
+ protected PreviousSnapshot(
+ @Nullable Collection<KvFileHandleAndLocalPath>
confirmedSstFiles) {
this.confirmedSstFiles =
confirmedSstFiles != null
? confirmedSstFiles.stream()
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 3b9b11c47..3b7ac5b7f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -849,6 +849,7 @@ public final class Replica {
new KvTabletSnapshotTarget(
tableBucket,
completedKvSnapshotCommitter,
+ snapshotContext.getZooKeeperClient(),
rocksIncrementalSnapshot,
remoteKvTabletDir,
snapshotContext.getSnapshotFsWriteBufferSize(),
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 9b5a3b161..9f9ea078d 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -29,9 +29,12 @@ import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.server.testutils.KvTestUtils;
import org.apache.fluss.server.utils.ResourceGuard;
+import org.apache.fluss.server.zk.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.BucketSnapshot;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import
org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
import org.apache.fluss.utils.CloseableRegistry;
@@ -55,15 +58,19 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
+import static org.apache.fluss.server.zk.ZooKeeperUtils.startZookeeperClient;
+import static
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.Builder;
+import static
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.builder;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
@@ -297,6 +304,150 @@ class KvTabletSnapshotTargetTest {
assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L);
}
+ @Test
+ void testIdempotentCheckWhenSnapshotExistsInZK(@TempDir Path kvTabletDir)
throws Exception {
+ // Test case: coordinator commits to ZK successfully but response is
lost due to failover
+ // The tablet server should detect the snapshot exists in ZK and skip
cleanup
+ CompletedSnapshotHandleStore completedSnapshotHandleStore =
+ new ZooKeeperCompletedSnapshotHandleStore(zooKeeperClient);
+ FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile());
+
+ // Create a committer that commits to ZK first, then throws exception
+ // This simulates coordinator failover after ZK commit but before
response
+ CompletedKvSnapshotCommitter failingAfterZkCommitCommitter =
+ (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> {
+ // Always commit to ZK first
+ CompletedSnapshotHandle handle =
+ new CompletedSnapshotHandle(
+ snapshot.getSnapshotID(),
+ snapshot.getSnapshotLocation(),
+ snapshot.getLogOffset());
+ completedSnapshotHandleStore.add(
+ snapshot.getTableBucket(),
snapshot.getSnapshotID(), handle);
+
+ // Then throw exception - simulating coordinator failover
after ZK commit
+ throw new FlussException("Coordinator failover after ZK
commit");
+ };
+
+ KvTabletSnapshotTarget kvTabletSnapshotTarget =
+ createSnapshotTargetWithCustomCommitter(
+ remoteKvTabletDir, failingAfterZkCommitCommitter);
+
+ periodicSnapshotManager =
createSnapshotManager(kvTabletSnapshotTarget);
+ periodicSnapshotManager.start();
+
+ RocksDB rocksDB = rocksDBExtension.getRocksDb();
+ rocksDB.put("key1".getBytes(), "val1".getBytes());
+
+ // Trigger snapshot - will commit to ZK but throw exception
+ periodicSnapshotManager.triggerSnapshot();
+ long snapshotId1 = 1;
+
+ TestRocksIncrementalSnapshot rocksIncrementalSnapshot =
+ (TestRocksIncrementalSnapshot)
kvTabletSnapshotTarget.getRocksIncrementalSnapshot();
+
+ // The snapshot should be treated as successful due to idempotent check
+ // Even though commit threw exception, idempotent check should find it
in ZK
+ retry(
+ Duration.ofMinutes(1),
+ () ->
+ assertThat(rocksIncrementalSnapshot.completedSnapshots)
+ .contains(snapshotId1));
+
+ // Verify snapshot was not cleaned up and state was updated correctly
+ FsPath snapshotPath1 =
FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1);
+
assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue();
+ assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L);
+ }
+
+ @Test
+ void testIdempotentCheckWhenSnapshotNotExistsInZK(@TempDir Path
kvTabletDir) throws Exception {
+ // Test case: genuine commit failure - snapshot should not exist in ZK
and cleanup should
+ // occur
+ FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile());
+
+ // Create a committer that always fails - simulating genuine
coordinator failure
+ CompletedKvSnapshotCommitter alwaysFailingCommitter =
+ (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> {
+ throw new FlussException(
+ "Genuine coordinator failure - snapshot not
committed to ZK");
+ };
+
+ KvTabletSnapshotTarget kvTabletSnapshotTarget =
+ createSnapshotTargetWithCustomCommitter(remoteKvTabletDir,
alwaysFailingCommitter);
+
+ periodicSnapshotManager =
createSnapshotManager(kvTabletSnapshotTarget);
+ periodicSnapshotManager.start();
+
+ RocksDB rocksDB = rocksDBExtension.getRocksDb();
+ rocksDB.put("key1".getBytes(), "val1".getBytes());
+ periodicSnapshotManager.triggerSnapshot();
+
+ long snapshotId1 = 1;
+ TestRocksIncrementalSnapshot rocksIncrementalSnapshot =
+ (TestRocksIncrementalSnapshot)
kvTabletSnapshotTarget.getRocksIncrementalSnapshot();
+
+ // The snapshot should be aborted since it genuinely failed
+ retry(
+ Duration.ofMinutes(1),
+ () ->
assertThat(rocksIncrementalSnapshot.abortedSnapshots).contains(snapshotId1));
+
+ // Verify cleanup occurred
+ FsPath snapshotPath1 =
FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1);
+
assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isFalse();
+
assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE);
+ }
+
+ @Test
+ void testIdempotentCheckWhenZKQueryFails(@TempDir Path kvTabletDir) throws
Exception {
+ // Test case: ZK query fails - should keep snapshot in current state
to avoid data loss
+ FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile());
+
+ // Create a failing ZK client that throws exception to simulate ZK
query failure
+ ZooKeeperClient failingZkClient = createFailingZooKeeperClient();
+
+ CompletedKvSnapshotCommitter failingCommitter =
+ (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> {
+ throw new FlussException("Commit failed");
+ };
+
+ KvTabletSnapshotTarget kvTabletSnapshotTarget =
+ createSnapshotTargetWithCustomZkAndCommitter(
+ remoteKvTabletDir, failingZkClient, failingCommitter);
+
+ periodicSnapshotManager =
createSnapshotManager(kvTabletSnapshotTarget);
+ periodicSnapshotManager.start();
+
+ RocksDB rocksDB = rocksDBExtension.getRocksDb();
+ rocksDB.put("key1".getBytes(), "val1".getBytes());
+ periodicSnapshotManager.triggerSnapshot();
+
+ long snapshotId1 = 1;
+ TestRocksIncrementalSnapshot rocksIncrementalSnapshot =
+ (TestRocksIncrementalSnapshot)
kvTabletSnapshotTarget.getRocksIncrementalSnapshot();
+
+ // Wait for snapshot processing to complete
+ // The snapshot should be created but commit will fail, then ZK query
will fail
+ // In this case, the new logic should preserve the snapshot files (no
cleanup)
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ // Verify that snapshot creation happened but neither
completion nor abortion
+ // occurred
+ // Since both commit and ZK query failed, snapshot should
remain in limbo state
+ FsPath snapshotPath1 =
+ FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir,
snapshotId1);
+
assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue();
+ assertThat(rocksIncrementalSnapshot.abortedSnapshots)
+ .doesNotContain(snapshotId1);
+ assertThat(rocksIncrementalSnapshot.completedSnapshots)
+ .doesNotContain(snapshotId1);
+ });
+
+ // Verify local state was not updated (remain unchanged)
+
assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE);
+ }
+
private PeriodicSnapshotManager createSnapshotManager(
PeriodicSnapshotManager.SnapshotTarget target) {
return new PeriodicSnapshotManager(
@@ -343,6 +494,45 @@ class KvTabletSnapshotTargetTest {
return new KvTabletSnapshotTarget(
tableBucket,
new
TestingStoreCompletedKvSnapshotCommitter(completedSnapshotStore),
+ zooKeeperClient,
+ rocksIncrementalSnapshot,
+ remoteKvTabletDir,
+ executor,
+ cancelStreamRegistry,
+ testingSnapshotIdCounter,
+ logOffsetGenerator::get,
+ updateMinRetainOffsetConsumer::set,
+ bucketLeaderEpochSupplier,
+ coordinatorEpochSupplier,
+ 0,
+ 0L);
+ }
+
+ private KvTabletSnapshotTarget createSnapshotTargetWithCustomCommitter(
+ FsPath remoteKvTabletDir, CompletedKvSnapshotCommitter
customCommitter)
+ throws IOException {
+ return createSnapshotTargetWithCustomZkAndCommitter(
+ remoteKvTabletDir, zooKeeperClient, customCommitter);
+ }
+
+ private KvTabletSnapshotTarget
createSnapshotTargetWithCustomZkAndCommitter(
+ FsPath remoteKvTabletDir,
+ ZooKeeperClient zkClient,
+ CompletedKvSnapshotCommitter customCommitter)
+ throws IOException {
+ TableBucket tableBucket = new TableBucket(1, 1);
+ Executor executor = Executors.directExecutor();
+ RocksIncrementalSnapshot rocksIncrementalSnapshot =
+ createIncrementalSnapshot(SnapshotFailType.NONE);
+ CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
+ TestingSnapshotIDCounter testingSnapshotIdCounter = new
TestingSnapshotIDCounter();
+ Supplier<Integer> bucketLeaderEpochSupplier = () -> 0;
+ Supplier<Integer> coordinatorEpochSupplier = () -> 0;
+
+ return new KvTabletSnapshotTarget(
+ tableBucket,
+ customCommitter,
+ zkClient,
rocksIncrementalSnapshot,
remoteKvTabletDir,
executor,
@@ -380,10 +570,41 @@ class KvTabletSnapshotTargetTest {
snapshotFailType);
}
+ private ZooKeeperClient createFailingZooKeeperClient() {
+ // Create a ZooKeeperClient that throws exception on
getTableBucketSnapshot
+ return new FailingZooKeeperClient();
+ }
+
+ private static class FailingZooKeeperClient extends ZooKeeperClient {
+
+ public FailingZooKeeperClient() {
+ // Create a new ZooKeeperClient using
ZooKeeperUtils.startZookeeperClient
+ super(createCuratorFrameworkWrapper(), new Configuration());
+ }
+
+ private static CuratorFrameworkWithUnhandledErrorListener
createCuratorFrameworkWrapper() {
+ Builder builder =
+ builder()
+ .connectString(
+ ZOO_KEEPER_EXTENSION_WRAPPER
+ .getCustomExtension()
+ .getConnectString())
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3));
+
+ return startZookeeperClient(builder, NOPErrorHandler.INSTANCE);
+ }
+
+ @Override
+ public Optional<BucketSnapshot> getTableBucketSnapshot(
+ TableBucket tableBucket, long snapshotId) throws Exception {
+ throw new Exception("ZK query failed");
+ }
+ }
+
private static final class TestRocksIncrementalSnapshot extends
RocksIncrementalSnapshot {
- private final Set<Long> abortedSnapshots = new HashSet<>();
- private final Set<Long> completedSnapshots = new HashSet<>();
+ private final Set<Long> abortedSnapshots =
ConcurrentHashMap.newKeySet();
+ private final Set<Long> completedSnapshots =
ConcurrentHashMap.newKeySet();
private final SnapshotFailType snapshotFailType;
public TestRocksIncrementalSnapshot(