wuchong commented on code in PR #2179:
URL: https://github.com/apache/fluss/pull/2179#discussion_r2779161277
##########
fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java:
##########
@@ -1060,6 +1062,119 @@ void testRebalanceDuringConcurrentTableCreation()
throws Exception {
}
}
+ // ------------------------------------------------------------------------
+ // KV Snapshot Lease Authorization Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ void testAcquireKvSnapshotLease() throws Exception {
+ TableInfo tableInfo =
rootAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get();
+ long tableId = tableInfo.getTableId();
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+ KvSnapshotLease kvSnapshotLease =
+ guestAdmin.createKvSnapshotLease(
+ "test-acquire-lease", Duration.ofDays(1).toMillis());
+ Map<TableBucket, Long> snapshotIds = new HashMap<>();
+ snapshotIds.put(new TableBucket(tableId, 0), 0L);
+
+ // test acquireKvSnapshotLease without READ permission on table
resource
+ assertThatThrownBy(() ->
kvSnapshotLease.acquireSnapshots(snapshotIds).get())
+ .rootCause()
+ .isInstanceOf(AuthorizationException.class)
+ .hasMessageContaining(
+ String.format(
+ "Principal %s have no authorization to operate
READ on resource Resource{type=TABLE, name='%s'}",
+ guestPrincipal, DATA1_TABLE_PATH_PK));
+
+ // add READ permission to guest user on table resource
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(DATA1_TABLE_PATH_PK),
+ new AccessControlEntry(
+ guestPrincipal, "*", READ,
PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
+
+ // test acquireKvSnapshotLease with READ permission should succeed
+ // (no AuthorizationException should be thrown)
+ kvSnapshotLease.acquireSnapshots(snapshotIds).get();
+
+ // cleanup: drop the lease using root admin
+ rootAdmin
Review Comment:
use `guestAdmin`
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -531,22 +540,78 @@ private List<SourceSplitBase>
initPrimaryKeyTablePartitionSplits(
List<SourceSplitBase> splits = new ArrayList<>();
for (Partition partition : newPartitions) {
String partitionName = partition.getPartitionName();
- // get the table snapshot info
- final KvSnapshots kvSnapshots;
- try {
- kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath,
partitionName).get();
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format(
- "Failed to get table snapshot for table %s and
partition %s",
- tablePath, partitionName),
- ExceptionUtils.stripCompletionException(e));
- }
- splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
+ splits.addAll(
+ getSnapshotAndLogSplits(
+ getLatestKvSnapshotsAndRegister(partitionName),
partitionName));
}
return splits;
}
+ private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String
partitionName) {
+ long tableId;
+ Long partitionId;
+ Map<Integer, Long> snapshotIds = new HashMap<>();
+ Map<Integer, Long> logOffsets = new HashMap<>();
+
+ // Get the latest kv snapshots and acquire kvSnapshot lease.
+ try {
+ KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
+
+ tableId = kvSnapshots.getTableId();
+ partitionId = kvSnapshots.getPartitionId();
+
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TableBucket tb : kvSnapshots.getTableBuckets()) {
+ int bucket = tb.getBucket();
+ OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucket);
+ OptionalLong logOffsetOpt = kvSnapshots.getLogOffset(bucket);
+ if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
+ bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
+ }
+
+ snapshotIds.put(
+ bucket, snapshotIdOpt.isPresent() ?
snapshotIdOpt.getAsLong() : null);
+ logOffsets.put(bucket, logOffsetOpt.isPresent() ?
logOffsetOpt.getAsLong() : null);
+ }
+
+ if (!bucketsToLease.isEmpty()) {
+ String kvSnapshotLeaseId = leaseContext.getKvSnapshotLeaseId();
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for table {}",
+ kvSnapshotLeaseId,
+ PhysicalTablePath.of(tablePath, partitionName));
+ long kvSnapshotLeaseDurationMs =
leaseContext.getKvSnapshotLeaseDurationMs();
+ checkNotNull(kvSnapshotLeaseDurationMs, "kv snapshot lease
duration is null.");
+ Set<TableBucket> unavailableTableBucketSet =
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
kvSnapshotLeaseDurationMs)
+ .acquireSnapshots(bucketsToLease)
+ .get()
+ .getUnavailableTableBucketSet();
+ if (!unavailableTableBucketSet.isEmpty()) {
+ LOG.info(
Review Comment:
use `LOG.warn` or `LOG.error`
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java:
##########
@@ -135,55 +154,101 @@ CompletedSnapshot addSnapshotAndSubsumeOldestOne(
// Remove completed snapshot from queue and
snapshotStateHandleStore, not
// discard.
- Optional<CompletedSnapshot> subsume =
- subsume(
- completedSnapshots,
- maxNumberOfSnapshotsToRetain,
- completedSnapshot -> {
- remove(
-
completedSnapshot.getTableBucket(),
-
completedSnapshot.getSnapshotID());
-
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
- });
-
+ subsume(
+ completedSnapshots,
+ maxNumberOfSnapshotsToRetain,
+ completedSnapshot -> {
+ remove(
+ completedSnapshot.getTableBucket(),
+ completedSnapshot.getSnapshotID());
+
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
+ },
+ subsumptionChecker);
+
+ // Move leased snapshots that should have been subsumed
but couldn't
+ // (protected by a lease) from completedSnapshots to
stillInUseSnapshots.
+ // This ensures the effective lowestSnapshotID is computed
from retained
+ // (non-leased) snapshots only, allowing SST files from
non-leased
+ // subsumed snapshots to be cleaned up properly.
+ CompletedSnapshot latest = completedSnapshots.peekLast();
+ Iterator<CompletedSnapshot> leaseIt =
completedSnapshots.iterator();
+ while (leaseIt.hasNext()) {
+ CompletedSnapshot next = leaseIt.next();
+ if (next != latest
+ && !subsumptionChecker.canSubsume(
+ new TableBucketSnapshot(
+ next.getTableBucket(),
next.getSnapshotID()))) {
+ leaseIt.remove();
+ stillInUseSnapshots.put(next.getSnapshotID(),
next);
+ LOG.debug(
+ "Moved leased snapshot {} to
stillInUseSnapshots",
+ next.getSnapshotID());
+ }
+ }
+
+ // Check if any previously still-in-use snapshots can now
be released
+ // (lease expired).
+ Iterator<Map.Entry<Long, CompletedSnapshot>>
stillInUseIter =
+ stillInUseSnapshots.entrySet().iterator();
+ while (stillInUseIter.hasNext()) {
+ Map.Entry<Long, CompletedSnapshot> entry =
stillInUseIter.next();
+ CompletedSnapshot s = entry.getValue();
+ if (subsumptionChecker.canSubsume(
+ new TableBucketSnapshot(s.getTableBucket(),
s.getSnapshotID()))) {
+ stillInUseIter.remove();
+ try {
+ remove(s.getTableBucket(), s.getSnapshotID());
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to remove released snapshot {}
from store",
+ s.getSnapshotID(),
+ e);
+ }
+ snapshotsCleaner.addSubsumedSnapshot(s);
+ LOG.debug(
+ "Released snapshot {} from
stillInUseSnapshots (lease expired)",
+ s.getSnapshotID());
+ }
+ }
+
+ // SST file cleanup: compute effective lowest from
retained (non-leased)
+ // snapshots only, and protect files referenced by
still-in-use snapshots.
+ Set<Long> stillInUseIds = new
HashSet<>(stillInUseSnapshots.keySet());
findLowest(completedSnapshots)
.ifPresent(
- id -> {
- // unregister the unused kv file,
which will then cause the
- // kv file
- // deletion
-
sharedKvFileRegistry.unregisterUnusedKvFile(id);
-
snapshotsCleaner.cleanSubsumedSnapshots(
- id,
- Collections.emptySet(),
- postCleanup,
- ioExecutor);
- });
- return subsume.orElse(null);
+ id ->
+
sharedKvFileRegistry.unregisterUnusedKvFile(
+ id, stillInUseIds));
+
+ // Snapshot metadata/private files cleanup: use the latest
snapshot
+ // ID + 1 so subsumed snapshots can be cleaned even when a
lower
+ // snapshot has a lease. This is safe because
+ // KvSnapshotHandle.discard() only deletes private files
and
+ // metadata, not shared SST files registered in
SharedKvFileRegistry.
+ snapshotsCleaner.cleanSubsumedSnapshots(
+ snapshot.getSnapshotID() + 1, stillInUseIds,
postCleanup, ioExecutor);
+ return null;
Review Comment:
The return value seems not used, change the signature of the method to
return `void`?
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/KvSnapshotLeaseImpl.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.admin;
+
+import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
+import org.apache.fluss.client.utils.ClientRpcMessageUtils;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.rpc.gateway.AdminGateway;
+import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
+import org.apache.fluss.rpc.messages.DropKvSnapshotLeaseRequest;
+import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAcquireKvSnapshotLeaseRequest;
+
+/** The default implementation of KvSnapshotLease. */
+public class KvSnapshotLeaseImpl implements KvSnapshotLease {
+ private final String leaseId;
+ private final long leaseDurationMs;
+ private final AdminGateway gateway;
+
+ public KvSnapshotLeaseImpl(String leaseId, long leaseDurationMs,
AdminGateway gateway) {
+ this.leaseId = leaseId;
+ this.leaseDurationMs = leaseDurationMs;
+ this.gateway = gateway;
+ }
+
+ @Override
+ public String leaseId() {
+ return leaseId;
+ }
+
+ @Override
+ public long leaseDurationMs() {
+ return leaseDurationMs;
+ }
+
+ @Override
+ public CompletableFuture<AcquireKvSnapshotLeaseResult> acquireSnapshots(
+ Map<TableBucket, Long> snapshotIds) {
+ if (snapshotIds.isEmpty()) {
+ throw new IllegalArgumentException(
+ "The snapshotIds to acquire kv snapshot lease is empty");
+ }
+
+ return gateway.acquireKvSnapshotLease(
+ makeAcquireKvSnapshotLeaseRequest(leaseId,
snapshotIds, leaseDurationMs))
+
.thenApply(ClientRpcMessageUtils::toAcquireKvSnapshotLeaseResult);
+ }
+
+ @Override
+ public CompletableFuture<Void> renew() {
+ AcquireKvSnapshotLeaseRequest request =
+ new AcquireKvSnapshotLeaseRequest()
+ .setLeaseId(leaseId)
+ .setLeaseDurationMs(leaseDurationMs);
+ return gateway.acquireKvSnapshotLease(request).thenApply(r -> null);
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseSnapshots(Set<TableBucket>
bucketsToRelease) {
+ ReleaseKvSnapshotLeaseRequest request =
+ new ReleaseKvSnapshotLeaseRequest().setLeaseId(leaseId);
Review Comment:
The `bucketsToRelease` is not passed.
Use `makeReleaseKvSnapshotLeaseRequest(leaseId, bucketsToRelease)` instead.
##########
fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java:
##########
@@ -1060,6 +1062,119 @@ void testRebalanceDuringConcurrentTableCreation()
throws Exception {
}
}
+ // ------------------------------------------------------------------------
+ // KV Snapshot Lease Authorization Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ void testAcquireKvSnapshotLease() throws Exception {
+ TableInfo tableInfo =
rootAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get();
+ long tableId = tableInfo.getTableId();
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+ KvSnapshotLease kvSnapshotLease =
+ guestAdmin.createKvSnapshotLease(
+ "test-acquire-lease", Duration.ofDays(1).toMillis());
+ Map<TableBucket, Long> snapshotIds = new HashMap<>();
+ snapshotIds.put(new TableBucket(tableId, 0), 0L);
+
+ // test acquireKvSnapshotLease without READ permission on table
resource
+ assertThatThrownBy(() ->
kvSnapshotLease.acquireSnapshots(snapshotIds).get())
+ .rootCause()
+ .isInstanceOf(AuthorizationException.class)
+ .hasMessageContaining(
+ String.format(
+ "Principal %s have no authorization to operate
READ on resource Resource{type=TABLE, name='%s'}",
+ guestPrincipal, DATA1_TABLE_PATH_PK));
+
+ // add READ permission to guest user on table resource
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(DATA1_TABLE_PATH_PK),
+ new AccessControlEntry(
+ guestPrincipal, "*", READ,
PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
+
+ // test acquireKvSnapshotLease with READ permission should succeed
+ // (no AuthorizationException should be thrown)
+ kvSnapshotLease.acquireSnapshots(snapshotIds).get();
+
+ // cleanup: drop the lease using root admin
+ rootAdmin
+ .createKvSnapshotLease("test-acquire-lease",
Duration.ofDays(1).toMillis())
+ .dropLease()
+ .get();
+ }
+
+ @Test
+ void testReleaseKvSnapshotLease() throws Exception {
+ TableInfo tableInfo =
rootAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get();
+ long tableId = tableInfo.getTableId();
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+ AdminGateway guestGateway = ((FlussAdmin)
guestAdmin).getAdminGateway();
+ ReleaseKvSnapshotLeaseRequest request =
+ ClientRpcMessageUtils.makeReleaseKvSnapshotLeaseRequest(
+ "test-release-lease", Collections.singleton(new
TableBucket(tableId, 0)));
+
+ // test releaseKvSnapshotLease without READ permission on table
resource
+ assertThatThrownBy(() ->
guestGateway.releaseKvSnapshotLease(request).get())
+ .rootCause()
+ .isInstanceOf(AuthorizationException.class)
+ .hasMessageContaining(
+ String.format(
+ "Principal %s have no authorization to operate
READ on resource Resource{type=TABLE, name='%s'}",
+ guestPrincipal, DATA1_TABLE_PATH_PK));
+
+ // add READ permission to guest user on table resource
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(DATA1_TABLE_PATH_PK),
+ new AccessControlEntry(
+ guestPrincipal, "*", READ,
PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
+
+ // test releaseKvSnapshotLease with READ permission should succeed
+ // (the lease doesn't exist, but no AuthorizationException should be
thrown)
+ guestGateway.releaseKvSnapshotLease(request).get();
+ }
+
+ @Test
+ void testDropKvSnapshotLease() throws Exception {
+ KvSnapshotLease kvSnapshotLease =
+ guestAdmin.createKvSnapshotLease("test-drop-lease",
Duration.ofDays(1).toMillis());
+
+ // test dropKvSnapshotLease without WRITE permission on cluster
resource
+ assertThatThrownBy(() -> kvSnapshotLease.dropLease().get())
+ .rootCause()
+ .isInstanceOf(AuthorizationException.class)
+ .hasMessageContaining(
+ String.format(
+ "Principal %s have no authorization to operate
WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
+ guestPrincipal));
+
+ // add WRITE permission to guest user on cluster resource
Review Comment:
should use `READ` permission. Note that the kvSnapshotLease is used for read
kv snapshots, so most users only have read permission to this table, and we
should make this common case work.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -260,4 +287,17 @@ private List<SourceSplitBase>
deserializeRemainingHybridLakeFlussSplits(
return null;
}
}
+
+ private void serializeLeaseContext(final DataOutputSerializer out,
SourceEnumeratorState state)
Review Comment:
rename method to `serializeLeaseId()`.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -923,15 +1000,72 @@ public void addReader(int subtaskId) {
public SourceEnumeratorState snapshotState(long checkpointId) {
final SourceEnumeratorState enumeratorState =
new SourceEnumeratorState(
- assignedTableBuckets, assignedPartitions,
pendingHybridLakeFlussSplits);
+ assignedTableBuckets,
+ assignedPartitions,
+ pendingHybridLakeFlussSplits,
+ leaseContext.getKvSnapshotLeaseId());
LOG.debug("Source Checkpoint is {}", enumeratorState);
return enumeratorState;
}
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // lower than this checkpoint id.
+ Set<TableBucket> consumedKvSnapshots =
getAndRemoveConsumedBucketsUpTo(checkpointId);
+
+ LOG.info(
+ "kv snapshot has already consumed and try to release kv
snapshot lease for: {}, checkpoint id: {}",
+ consumedKvSnapshots,
+ checkpointId);
+
+ // send request to fluss to unregister the kv snapshot lease.
+ try {
+ flussAdmin
+ .createKvSnapshotLease(
+ leaseContext.getKvSnapshotLeaseId(),
+ leaseContext.getKvSnapshotLeaseDurationMs())
+ .releaseSnapshots(consumedKvSnapshots)
+ .get();
+ } catch (Exception e) {
+ LOG.error("Failed to release kv snapshot lease. These snapshot
need to re-enqueue", e);
+ // use the current checkpoint id to re-enqueue the buckets
+ consumedKvSnapshots.forEach(
+ tableBucket -> addConsumedBucket(checkpointId,
tableBucket));
+ }
+ }
+
+ /** Add bucket who has been consumed kv snapshot to the
consumedKvSnapshotMap. */
+ public void addConsumedBucket(long checkpointId, TableBucket tableBucket) {
+ consumedKvSnapshotMap.computeIfAbsent(checkpointId, k -> new
HashSet<>()).add(tableBucket);
+ }
+
+ /** Get and remove the buckets who have been consumed kv snapshot up to
the checkpoint id. */
+ public Set<TableBucket> getAndRemoveConsumedBucketsUpTo(long checkpointId)
{
+ NavigableMap<Long, Set<TableBucket>> toRemove =
+ consumedKvSnapshotMap.headMap(checkpointId, false);
+ Set<TableBucket> result = new HashSet<>();
+ for (Set<TableBucket> snapshots : toRemove.values()) {
+ result.addAll(snapshots);
+ }
+ toRemove.clear();
+ return result;
+ }
+
@Override
public void close() throws IOException {
try {
closed = true;
+
+ if (!isStreaming) {
Review Comment:
Change the condition to check it's pk table and read from snapshot to avoid
unnecessary IO.
```java
if (!isStreaming
&& hasPrimaryKey
&& startingOffsetsInitializer instanceof
SnapshotOffsetsInitializer)
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -856,6 +872,95 @@ public CompletableFuture<ControlledShutdownResponse>
controlledShutdown(
return response;
}
+ @Override
+ public CompletableFuture<AcquireKvSnapshotLeaseResponse>
acquireKvSnapshotLease(
+ AcquireKvSnapshotLeaseRequest request) {
+ // Authorization: require WRITE permission on all tables in the request
+ if (authorizer != null) {
+ for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
+ request.getSnapshotsToLeasesList()) {
+ long tableId = kvSnapshotLeaseForTable.getTableId();
+ authorizeTable(OperationType.READ, tableId);
+ }
+ }
+
+ String leaseId = request.getLeaseId();
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return makeAcquireKvSnapshotLeaseResponse(
+ kvSnapshotLeaseManager.acquireLease(
+ leaseId,
+ request.getLeaseDurationMs(),
+
getAcquireKvSnapshotLeaseData(request)));
+ } catch (ApiException e) {
+ // Re-throw ApiExceptions as-is to preserve exception
type for client
+ throw e;
+ } catch (Exception e) {
+ throw new UnknownServerException(
+ "Failed to acquire kv snapshot lease for" +
leaseId, e);
+ }
+ },
+ ioExecutor);
+ }
+
+ @Override
+ public CompletableFuture<ReleaseKvSnapshotLeaseResponse>
releaseKvSnapshotLease(
+ ReleaseKvSnapshotLeaseRequest request) {
+ // Authorization: require WRITE permission on all tables in the request
+ if (authorizer != null) {
+ for (PbTableBucket tableBucket :
request.getBucketsToReleasesList()) {
+ long tableId = tableBucket.getTableId();
+ authorizeTable(OperationType.READ, tableId);
+ }
+ }
+
+ String leaseId = request.getLeaseId();
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ ReleaseKvSnapshotLeaseResponse response =
+ new ReleaseKvSnapshotLeaseResponse();
+ kvSnapshotLeaseManager.release(
+ leaseId,
getReleaseKvSnapshotLeaseData(request));
+ return response;
+ } catch (ApiException e) {
+ // Re-throw ApiExceptions as-is to preserve exception
type for client
+ throw e;
+ } catch (Exception e) {
+ throw new UnknownServerException(
+ "Failed to release kv snapshot lease for" +
leaseId, e);
+ }
+ },
+ ioExecutor);
+ }
+
+ @Override
+ public CompletableFuture<DropKvSnapshotLeaseResponse> dropKvSnapshotLease(
+ DropKvSnapshotLeaseRequest request) {
+ // Authorization: require WRITE permission on the cluster
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.WRITE,
Resource.cluster());
Review Comment:
We should use **`READ` permission on the table** instead. Requiring
**admin-level cluster permissions** just to read a primary key table is
unreasonable and violates the principle of least privilege. Table-level `READ`
access should be sufficient for such operations.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -378,6 +378,29 @@ message GetKvSnapshotMetadataResponse {
repeated PbRemotePathAndLocalFile snapshot_files = 2;
}
+message AcquireKvSnapshotLeaseRequest {
+ required string lease_id = 1;
+ required int64 lease_duration_ms = 2;
+ repeated PbKvSnapshotLeaseForTable snapshots_to_lease = 3;
+}
+
+message AcquireKvSnapshotLeaseResponse {
+ repeated PbKvSnapshotLeaseForTable unavailable_snapshots = 3;
Review Comment:
```suggestion
repeated PbKvSnapshotLeaseForTable unavailable_snapshots = 1;
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -113,7 +114,8 @@ void testPkTableNoSnapshotSplits() throws Throwable {
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
null,
- null);
+ null,
+ new LeaseContext("kv_snapshot_lease",
Duration.ofDays(1).toMillis()));
Review Comment:
Update all the `LeaseContext` parameter in this class to use
`LeaseContext.DEFAULT`
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.lease;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableBucketSnapshot;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+
+ private final KvSnapshotLeaseMetadataManager metadataManager;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final long leaseExpirationCheckInterval;
+
+ private final ReadWriteLock managerLock = new ReentrantReadWriteLock();
+
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("managerLock")
+ private final ConcurrentHashMap<String, KvSnapshotLeaseHandler>
kvSnapshotLeaseMap =
+ MapUtils.newConcurrentHashMap();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ @GuardedBy("managerLock")
+ private final Map<TableBucketSnapshot, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ long leaseExpirationCheckInterval,
+ ZooKeeperClient zkClient,
+ String remoteDataDir,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ leaseExpirationCheckInterval,
+ zkClient,
+ remoteDataDir,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+ clock,
+ coordinatorMetricGroup);
+ }
+
+ @VisibleForTesting
+ public KvSnapshotLeaseManager(
+ long leaseExpirationCheckInterval,
+ ZooKeeperClient zkClient,
+ String remoteDataDir,
+ ScheduledExecutorService scheduledExecutor,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this.metadataManager = new KvSnapshotLeaseMetadataManager(zkClient,
remoteDataDir);
+ this.leaseExpirationCheckInterval = leaseExpirationCheckInterval;
+ this.scheduledExecutor = scheduledExecutor;
+ this.clock = clock;
+
+ registerMetrics(coordinatorMetricGroup);
+ }
+
+ public void start() {
+ LOG.info("kv snapshot lease manager has been started.");
+
+ List<String> leasesList = new ArrayList<>();
+ try {
+ leasesList = metadataManager.getLeasesList();
+ } catch (Exception e) {
+ LOG.error("Failed to get leases list from zookeeper.", e);
+ }
+
+ for (String leaseId : leasesList) {
+ Optional<KvSnapshotLeaseHandler> kvSnapshotLeaseOpt =
Optional.empty();
+ try {
+ kvSnapshotLeaseOpt = metadataManager.getLease(leaseId);
+ } catch (Exception e) {
+ LOG.error("Failed to get kv snapshot lease from zookeeper.",
e);
+ }
+
+ if (kvSnapshotLeaseOpt.isPresent()) {
+ KvSnapshotLeaseHandler kvSnapshotLeasehandle =
kvSnapshotLeaseOpt.get();
+ this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLeasehandle);
+
+ initializeRefCount(kvSnapshotLeasehandle);
+
+
leasedBucketCount.addAndGet(kvSnapshotLeasehandle.getLeasedSnapshotCount());
+ }
+ }
+
+ scheduledExecutor.scheduleWithFixedDelay(
+ this::expireLeases, 0L, leaseExpirationCheckInterval,
TimeUnit.MILLISECONDS);
+ }
+
+ public boolean snapshotLeaseNotExist(TableBucketSnapshot
tableBucketSnapshot) {
+ return inReadLock(
+ managerLock,
+ () -> {
+ AtomicInteger count = refCount.get(tableBucketSnapshot);
+ return count == null || count.get() <= 0;
+ });
+ }
+
+ /**
+ * Acquire kv snapshot lease.
+ *
+ * @param leaseId the lease id
+ * @param leaseDuration the lease duration
+ * @param tableIdToLeaseBucket the table id to lease bucket
+ * @return the map of unavailable snapshots that failed to be leased
+ */
+ public Map<TableBucket, Long> acquireLease(
Review Comment:
It seems this comment
https://github.com/apache/fluss/pull/2179#discussion_r2777746406 is not
addressed. I post it here again. I think we can create an issue to track this
and fix in the future.
------
The current use of `leaseLocks` feels like a hack. If thread safety is
required for `KvSnapshotLease`, then the class itself should be made inherently
thread-safe, which would also significantly simplify the surrounding code.
To achieve atomicity and thread safety, we should refactor the existing
`KvSnapshotLease#acquireBucket` method into a single, cohesive operation:
```java
KvSnapshotLease#acquireSnapshots(
long leaseDurationMs,
Map<Long, List<KvSnapshotLeaseForBucket>> tableIdToLeaseBucket,
ConcurrentHashMap<KvSnapshotLeaseForBucket, AtomicInteger> refCount
)
```
This method should:
- Perform all necessary validations and state updates **atomically**.
- Update reference counts internally as part of the acquisition logic.
Once this method completes successfully, the ZooKeeper metadata update
should be performed **outside the lock**, but with **version-based
consistency** to ensure correctness in concurrent scenarios. Specifically:
- Use conditional ZK writes based on the current version of the lease node.
- Follow the pattern established in `ProducerOffsetsStore#xxxVersion()`
methods, which use versioned ZK operations to avoid race conditions during
metadata updates.
This approach eliminates the need for external locking, improves code
clarity, and ensures both memory and ZK state remain consistent under
concurrency.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java:
##########
@@ -82,6 +85,55 @@ void after() {
tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
}
+ @Test
+ void testPkTableBatchReadWithKvSnapshotLease() throws Exception {
Review Comment:
Actually, this doesn't read kv snapshot, because the limit is pushed down. I
checked the code again, and realized that we dont' allow read kv snapshot when
batch mode. So I think we can remove this test (the `close()` logic in source
enumerator as well).
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -196,6 +208,18 @@ protected void startServices() throws Exception {
Executors.newFixedThreadPool(
conf.get(ConfigOptions.SERVER_IO_POOL_SIZE),
new ExecutorThreadFactory("coordinator-io"));
+
+ // Initialize and start the kv snapshot lease manager
+ this.kvSnapshotLeaseManager =
Review Comment:
close `kvSnapshotLeaseManager` (the thread pool in it) when shutdown the
server.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -531,22 +540,78 @@ private List<SourceSplitBase>
initPrimaryKeyTablePartitionSplits(
List<SourceSplitBase> splits = new ArrayList<>();
for (Partition partition : newPartitions) {
String partitionName = partition.getPartitionName();
- // get the table snapshot info
- final KvSnapshots kvSnapshots;
- try {
- kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath,
partitionName).get();
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format(
- "Failed to get table snapshot for table %s and
partition %s",
- tablePath, partitionName),
- ExceptionUtils.stripCompletionException(e));
- }
- splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
+ splits.addAll(
+ getSnapshotAndLogSplits(
+ getLatestKvSnapshotsAndRegister(partitionName),
partitionName));
}
return splits;
}
+ private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String
partitionName) {
+ long tableId;
+ Long partitionId;
+ Map<Integer, Long> snapshotIds = new HashMap<>();
+ Map<Integer, Long> logOffsets = new HashMap<>();
+
+ // Get the latest kv snapshots and acquire kvSnapshot lease.
+ try {
+ KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
+
+ tableId = kvSnapshots.getTableId();
+ partitionId = kvSnapshots.getPartitionId();
+
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TableBucket tb : kvSnapshots.getTableBuckets()) {
+ int bucket = tb.getBucket();
+ OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucket);
+ OptionalLong logOffsetOpt = kvSnapshots.getLogOffset(bucket);
+ if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
+ bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
+ }
+
+ snapshotIds.put(
+ bucket, snapshotIdOpt.isPresent() ?
snapshotIdOpt.getAsLong() : null);
+ logOffsets.put(bucket, logOffsetOpt.isPresent() ?
logOffsetOpt.getAsLong() : null);
+ }
+
+ if (!bucketsToLease.isEmpty()) {
+ String kvSnapshotLeaseId = leaseContext.getKvSnapshotLeaseId();
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for table {}",
+ kvSnapshotLeaseId,
+ PhysicalTablePath.of(tablePath, partitionName));
+ long kvSnapshotLeaseDurationMs =
leaseContext.getKvSnapshotLeaseDurationMs();
+ checkNotNull(kvSnapshotLeaseDurationMs, "kv snapshot lease
duration is null.");
Review Comment:
not need to check not null for a primitive type.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -243,6 +259,8 @@ public FlinkSourceEnumerator(
streaming ? new NoStoppingOffsetsInitializer() :
OffsetsInitializer.latest();
this.lakeSource = lakeSource;
this.workerExecutor = workerExecutor;
+ this.leaseContext = leaseContext;
+ this.isStreaming = streaming;
Review Comment:
We already have `streaming` as the member variable, no need to introduce a
new one.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandler.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.lease;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/** handler of kv snapshot lease. */
+@NotThreadSafe
+public class KvSnapshotLeaseHandler {
+ private long expirationTime;
+
+ /** A map from table id to kv snapshot lease for one table. */
+ private final Map<Long, KvSnapshotTableLease> tableIdToTableLease;
+
+ public KvSnapshotLeaseHandler(long expirationTime) {
+ this(expirationTime, MapUtils.newConcurrentHashMap());
+ }
+
+ public KvSnapshotLeaseHandler(
+ long expirationTime, Map<Long, KvSnapshotTableLease>
tableIdToTableLease) {
+ this.expirationTime = expirationTime;
+ this.tableIdToTableLease = tableIdToTableLease;
+ }
+
+ public void setExpirationTime(long expirationTime) {
+ this.expirationTime = expirationTime;
+ }
+
+ public long getExpirationTime() {
+ return expirationTime;
+ }
+
+ public Map<Long, KvSnapshotTableLease> getTableIdToTableLease() {
+ return tableIdToTableLease;
+ }
+
+ /**
+ * Acquire a bucket to the lease id. If the bucket array already exists
but is too small to
+ * accommodate the given bucket id, the array will be dynamically expanded
to {@code bucketId +
+ * 1}.
+ *
+ * @param tableBucket table bucket
+ * @param snapshotId snapshot id
+ * @return the original registered snapshotId. if -1 means the bucket is
new registered
+ */
+ public long acquireBucket(TableBucket tableBucket, long snapshotId) {
+ Long[] bucketSnapshot;
+ Long partitionId = tableBucket.getPartitionId();
+ long tableId = tableBucket.getTableId();
+ int bucketId = tableBucket.getBucket();
+ if (partitionId == null) {
+ // For none-partitioned table.
+ KvSnapshotTableLease tableLease =
+ tableIdToTableLease.computeIfAbsent(
+ tableId,
+ k -> {
+ Long[] array = new Long[bucketId + 1];
+ Arrays.fill(array, -1L);
+ return new KvSnapshotTableLease(tableId,
array);
+ });
+ bucketSnapshot = tableLease.getBucketSnapshots();
+ // Dynamically expand the array if the bucket id exceeds the
current array size.
+ // This can happen when new buckets are added to an existing table.
+ if (bucketSnapshot != null && bucketId >= bucketSnapshot.length) {
+ bucketSnapshot = expandArray(bucketSnapshot, bucketId + 1);
Review Comment:
Currently, we iterate over requested buckets sequentially. If the bucket IDs
range from 0 to 10,000, this causes the internal `int[]` array to be resized
(expanded) up to 10,000 times—an extremely inefficient process. Worse, this
happens while holding locks, which amplifies performance degradation and
increases contention.
My earlier suggestion was to refactor `KvSnapshotLeaseHandler` to use a
**batched API** that accepts the full `Map<Long, List<TableBucketSnapshot>>
tableIdToLeaseBucket` upfront, enabling more efficient memory allocation and
processing.
If you plan to implement that refactoring later, **at a minimum**, this PR
should include an initialization step that pre-sizes the snapshot `int[]` array
based on the **maximum bucket ID** in the request. This avoids repeated array
reallocations and significantly reduces lock-holding time during lease
acquisition.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -352,6 +354,46 @@ void testPkTableReadMixSnapshotAndLog() throws Exception {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
+ @Test
+ void testPkTableReadWithKvSnapshotLease() throws Exception {
+ tEnv.executeSql(
+ "create table pk_table_with_kv_snapshot_lease (a int not null
primary key not enforced, b varchar)");
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"pk_table_with_kv_snapshot_lease");
+
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
+
+ // write records
+ writeRows(conn, tablePath, rows, false);
+
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+ // enable checkpoint to make sure the kv snapshot lease will be
cleared.
+ execEnv.enableCheckpointing(100);
Review Comment:
I think we should add another test case covering scenarios where
**checkpointing is disabled** (e.g., debug mode or jobs without checkpointing
enabled). In such cases, leases should still be released promptly when the job
finishes or is canceled.
One possible approach is to drop the lease in
`FlinkSourceEnumerator#close()` under the following conditions:
- A checkpoint has already been successfully completed (i.e.,
`snapshotState(cp)` was called), **and**
- The enumerator was **not restored** from a prior state (i.e., it wasn’t
created via `FlinkSource#restoreEnumerator()`).
This would mean: if **no checkpoint ever completed** and the job terminates
(either normally or via cancellation), there’s no need to retain the lease—so
we can safely release it immediately in `close()`.
That said, this is an optimization that can be addressed later. I’m fine
with creating a follow-up issue to track this improvement.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -856,6 +872,95 @@ public CompletableFuture<ControlledShutdownResponse>
controlledShutdown(
return response;
}
+ @Override
+ public CompletableFuture<AcquireKvSnapshotLeaseResponse>
acquireKvSnapshotLease(
+ AcquireKvSnapshotLeaseRequest request) {
+ // Authorization: require WRITE permission on all tables in the request
Review Comment:
```suggestion
// Authorization: require READ permission on all tables in the
request
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -260,4 +287,17 @@ private List<SourceSplitBase>
deserializeRemainingHybridLakeFlussSplits(
return null;
}
}
+
+ private void serializeLeaseContext(final DataOutputSerializer out,
SourceEnumeratorState state)
+ throws IOException {
+ String leaseId = state.getLeaseId();
+ out.writeUTF(leaseId);
+ }
+
+ private LeaseContext deserializeLeaseContext(final DataInputDeserializer
in)
Review Comment:
rename method to `String deserializeLeaseId()` as there is only lease id is
deserialized.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -856,6 +872,95 @@ public CompletableFuture<ControlledShutdownResponse>
controlledShutdown(
return response;
}
+ @Override
+ public CompletableFuture<AcquireKvSnapshotLeaseResponse>
acquireKvSnapshotLease(
+ AcquireKvSnapshotLeaseRequest request) {
+ // Authorization: require WRITE permission on all tables in the request
+ if (authorizer != null) {
+ for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
+ request.getSnapshotsToLeasesList()) {
+ long tableId = kvSnapshotLeaseForTable.getTableId();
+ authorizeTable(OperationType.READ, tableId);
+ }
+ }
+
+ String leaseId = request.getLeaseId();
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return makeAcquireKvSnapshotLeaseResponse(
+ kvSnapshotLeaseManager.acquireLease(
+ leaseId,
+ request.getLeaseDurationMs(),
+
getAcquireKvSnapshotLeaseData(request)));
+ } catch (ApiException e) {
+ // Re-throw ApiExceptions as-is to preserve exception
type for client
+ throw e;
+ } catch (Exception e) {
+ throw new UnknownServerException(
+ "Failed to acquire kv snapshot lease for" +
leaseId, e);
+ }
+ },
+ ioExecutor);
+ }
+
+ @Override
+ public CompletableFuture<ReleaseKvSnapshotLeaseResponse>
releaseKvSnapshotLease(
+ ReleaseKvSnapshotLeaseRequest request) {
+ // Authorization: require WRITE permission on all tables in the request
Review Comment:
```suggestion
// Authorization: require READ permission on all tables in the
request
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+
+ private final KvSnapshotLeaseMetadataManager metadataManager;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataManager,
+ CoordinatorContext coordinatorContext,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ conf,
+ metadataManager,
+ coordinatorContext,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
Review Comment:
Could you create an issue to improve this in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]