tsreaper commented on code in PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#discussion_r1015169082


##########
flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java:
##########
@@ -296,6 +299,52 @@ public void testWriteWithoutCompaction() throws Exception {
         }
     }
 
+    @Test
+    public void testReadCompactedSnapshot() throws Exception {
+        writeCompactData();
+        FileStoreTable table =
+                createFileStoreTable(conf -> 
conf.set(CoreOptions.READ_COMPACTED, true));
+
+        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        Snapshot compactedSnapshot = 
table.snapshotManager().snapshot(plan.snapshotId);
+        Iterator<Snapshot> snapshotIterator = 
table.snapshotManager().snapshots();
+        while (snapshotIterator.hasNext()) {
+            Snapshot snapshot = snapshotIterator.next();
+            if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                
assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id());
+            }
+        }
+
+        
assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        List<Split> splits = plan.splits();
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING).size())
+                .isGreaterThan(0);
+        assertThat(getResult(read, splits, binaryRow(2), 0, 
BATCH_ROW_TO_STRING).size())
+                .isGreaterThan(0);

Review Comment:
   The latest snapshot is a compacted snapshot, how do you make sure your 
implementation works (instead of just reading the latest snapshot)? Change this 
test.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java:
##########
@@ -75,7 +76,8 @@ public AbstractFileStoreScan(
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            CoreOptions.ChangelogProducer changelogProducer) {
+            CoreOptions.ChangelogProducer changelogProducer,
+            boolean readCompacted) {

Review Comment:
   I prefer adding a `withReadCompacted(boolean)` method to the scan interface, 
just like `withIncrement` and other methods. We don't need to pass the 
parameter all along the way.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java:
##########
@@ -87,6 +89,7 @@ public AbstractFileStoreScan(
         this.numOfBuckets = numOfBuckets;
         this.checkNumOfBuckets = checkNumOfBuckets;
         this.changelogProducer = changelogProducer;
+        this.readCompacted = readCompacted;

Review Comment:
   What happens if user set changelog producer to `INPUT`, and at the same time 
use `readCompacted`? It seems that we'll scan nothing, which is unfriendly to 
the user.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,25 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long latestCompactedSnapshotId() {
+        try {
+            Iterator<Snapshot> iterator = snapshots();
+            Long maxCompactedSnapshotId = null;
+            while (iterator.hasNext()) {
+                Snapshot snapshot = iterator.next();
+                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                    if (maxCompactedSnapshotId == null || snapshot.id() > 
maxCompactedSnapshotId) {
+                        maxCompactedSnapshotId = snapshot.id();
+                    }
+                }
+            }
+
+            return maxCompactedSnapshotId;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find latest compacted 
snapshot id", e);
+        }

Review Comment:
   This implementation is very inefficient with thousands of snapshots. Try 
iterating backwards from the latest snapshot. When you spot a COMPACT snapshot 
you can return directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to