tsreaper commented on code in PR #351: URL: https://github.com/apache/flink-table-store/pull/351#discussion_r1015169082
########## flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java: ########## @@ -296,6 +299,52 @@ public void testWriteWithoutCompaction() throws Exception { } } + @Test + public void testReadCompactedSnapshot() throws Exception { + writeCompactData(); + FileStoreTable table = + createFileStoreTable(conf -> conf.set(CoreOptions.READ_COMPACTED, true)); + + DataTableScan.DataFilePlan plan = table.newScan().plan(); + Snapshot compactedSnapshot = table.snapshotManager().snapshot(plan.snapshotId); + Iterator<Snapshot> snapshotIterator = table.snapshotManager().snapshots(); + while (snapshotIterator.hasNext()) { + Snapshot snapshot = snapshotIterator.next(); + if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) { + assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id()); + } + } + + assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); + List<Split> splits = plan.splits(); + TableRead read = table.newRead(); + assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING).size()) + .isGreaterThan(0); + assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING).size()) + .isGreaterThan(0); Review Comment: The latest snapshot is a compacted snapshot, how do you make sure your implementation works (instead of just reading the latest snapshot)? Change this test. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java: ########## @@ -75,7 +76,8 @@ public AbstractFileStoreScan( ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets, - CoreOptions.ChangelogProducer changelogProducer) { + CoreOptions.ChangelogProducer changelogProducer, + boolean readCompacted) { Review Comment: I prefer adding a `withReadCompacted(boolean)` method to the scan interface, just like `withIncrement` and other methods. We don't need to pass the parameter all along the way. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java: ########## @@ -87,6 +89,7 @@ public AbstractFileStoreScan( this.numOfBuckets = numOfBuckets; this.checkNumOfBuckets = checkNumOfBuckets; this.changelogProducer = changelogProducer; + this.readCompacted = readCompacted; Review Comment: What happens if user set changelog producer to `INPUT`, and at the same time use `readCompacted`? It seems that we'll scan nothing, which is unfriendly to the user. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java: ########## @@ -90,6 +90,25 @@ public boolean snapshotExists(long snapshotId) { } } + public @Nullable Long latestCompactedSnapshotId() { + try { + Iterator<Snapshot> iterator = snapshots(); + Long maxCompactedSnapshotId = null; + while (iterator.hasNext()) { + Snapshot snapshot = iterator.next(); + if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) { + if (maxCompactedSnapshotId == null || snapshot.id() > maxCompactedSnapshotId) { + maxCompactedSnapshotId = snapshot.id(); + } + } + } + + return maxCompactedSnapshotId; + } catch (IOException e) { + throw new RuntimeException("Failed to find latest compacted snapshot id", e); + } Review Comment: This implementation is very inefficient with thousands of snapshots. Try iterating backwards from the latest snapshot. When you spot a COMPACT snapshot you can return directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org