wuchong commented on code in PR #2610:
URL: https://github.com/apache/fluss/pull/2610#discussion_r2785515741


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -401,6 +401,38 @@ void testPkTableReadWithKvSnapshotLease() throws Exception 
{
                 });
     }
 
+    @Test
+    void testReadWithKvSnapshotLeaseNoCheckpoint() throws Exception {
+        tEnv.executeSql(
+                "create table pk_table_with_kv_snapshot_lease2 (a int not null 
primary key not enforced, b varchar)");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"pk_table_with_kv_snapshot_lease2");
+
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+
+        // write records
+        writeRows(conn, tablePath, rows, false);
+
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+        List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", 
"+I[3, v3]");
+        org.apache.flink.util.CloseableIterator<Row> rowIter =
+                tEnv.executeSql(
+                                "select * from 
pk_table_with_kv_snapshot_lease2 /*+ OPTIONS('scan.kv.snapshot.lease.id' = 
'test-consumer-2') */")
+                        .collect();
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+        // check lease will be dropped after job finished.
+        ZooKeeperClient zkClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    
assertThat(zkClient.getKvSnapshotLeaseMetadata("test-consumer-2"))
+                            .isNotPresent();
+                    
assertThat(zkClient.getKvSnapshotLeasesList().contains("test-consumer-2"))

Review Comment:
   Can we use `getKvSnapshotLeasesList().isEmpty()` to assert the result now? 
And remove the `scan.kv.snapshot.lease.id` dynamic option.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -1053,24 +1083,43 @@ public void close() throws IOException {
         try {
             closed = true;
 
-            if (!streaming
-                    && hasPrimaryKey
-                    && startingOffsetsInitializer instanceof 
SnapshotOffsetsInitializer) {
-                // drop the kv snapshot lease for the batch mode.
-                flussAdmin
-                        .createKvSnapshotLease(
-                                leaseContext.getKvSnapshotLeaseId(),
-                                leaseContext.getKvSnapshotLeaseDurationMs())
-                        .dropLease()
-                        .get();
-            }
-
             if (workerExecutor != null) {
                 workerExecutor.close();
             }
+
             if (flussAdmin != null) {
+                if (!streaming

Review Comment:
   Extrac the drop lease logic into a private method 
`maybeDropKvSnapshotLease()`, and call it before the line of `closed = true;`, 
this is a action before close.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -1053,24 +1083,43 @@ public void close() throws IOException {
         try {
             closed = true;
 
-            if (!streaming
-                    && hasPrimaryKey
-                    && startingOffsetsInitializer instanceof 
SnapshotOffsetsInitializer) {
-                // drop the kv snapshot lease for the batch mode.
-                flussAdmin
-                        .createKvSnapshotLease(
-                                leaseContext.getKvSnapshotLeaseId(),
-                                leaseContext.getKvSnapshotLeaseDurationMs())
-                        .dropLease()
-                        .get();
-            }
-
             if (workerExecutor != null) {
                 workerExecutor.close();
             }
+
             if (flussAdmin != null) {
+                if (!streaming
+                        && hasPrimaryKey
+                        && startingOffsetsInitializer instanceof 
SnapshotOffsetsInitializer) {
+                    // Drop the kv snapshot lease for the batch mode.
+                    LOG.info(
+                            "Dropping kv snapshot lease {} for batch mode.",
+                            leaseContext.getKvSnapshotLeaseId());
+                    flussAdmin
+                            .createKvSnapshotLease(
+                                    leaseContext.getKvSnapshotLeaseId(),
+                                    
leaseContext.getKvSnapshotLeaseDurationMs())
+                            .dropLease()
+                            .get();
+                } else if (streaming && !checkpointTriggeredBefore) {

Review Comment:
   We can merge these 2 condition by `hasPrimaryKey && 
startingOffsetsInitializer instanceof SnapshotOffsetsInitializer && 
!checkpointTriggeredBefore`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to