This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 61d30e5fcfc12bf3a1fc718aad02ffcf7d352171 Author: jerry <[email protected]> AuthorDate: Tue May 13 11:56:51 2025 +0800 [core] ro table support fallback branch (#5544) --- .../paimon/table/FallbackReadFileStoreTable.java | 31 +++++++++++++--------- .../paimon/table/system/ReadOptimizedTable.java | 10 ++++++- .../apache/paimon/flink/CatalogTableITCase.java | 30 ++++++++++++++++----- 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index dddc8ca439..0698f3b433 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -76,6 +76,10 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { Preconditions.checkArgument(!(fallback instanceof FallbackReadFileStoreTable)); } + public FileStoreTable fallback() { + return fallback; + } + @Override public FileStoreTable copy(Map<String, String> dynamicOptions) { return new FallbackReadFileStoreTable( @@ -170,7 +174,7 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { @Override public DataTableScan newScan() { validateSchema(); - return new Scan(); + return new FallbackReadScan(wrapped.newScan(), fallback.newScan()); } private void validateSchema() { @@ -229,46 +233,47 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { return true; } - private class Scan implements DataTableScan { + /** Scan implementation for {@link FallbackReadFileStoreTable}. */ + public static class FallbackReadScan implements DataTableScan { private final DataTableScan mainScan; private final DataTableScan fallbackScan; - private Scan() { - this.mainScan = wrapped.newScan(); - this.fallbackScan = fallback.newScan(); + public FallbackReadScan(DataTableScan mainScan, DataTableScan fallbackScan) { + this.mainScan = mainScan; + this.fallbackScan = fallbackScan; } @Override - public Scan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { + public FallbackReadScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { mainScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); fallbackScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); return this; } @Override - public Scan withFilter(Predicate predicate) { + public FallbackReadScan withFilter(Predicate predicate) { mainScan.withFilter(predicate); fallbackScan.withFilter(predicate); return this; } @Override - public Scan withLimit(int limit) { + public FallbackReadScan withLimit(int limit) { mainScan.withLimit(limit); fallbackScan.withLimit(limit); return this; } @Override - public Scan withPartitionFilter(Map<String, String> partitionSpec) { + public FallbackReadScan withPartitionFilter(Map<String, String> partitionSpec) { mainScan.withPartitionFilter(partitionSpec); fallbackScan.withPartitionFilter(partitionSpec); return this; } @Override - public Scan withPartitionFilter(List<BinaryRow> partitions) { + public FallbackReadScan withPartitionFilter(List<BinaryRow> partitions) { mainScan.withPartitionFilter(partitions); fallbackScan.withPartitionFilter(partitions); return this; @@ -282,21 +287,21 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { } @Override - public Scan withBucketFilter(Filter<Integer> bucketFilter) { + public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter) { mainScan.withBucketFilter(bucketFilter); fallbackScan.withBucketFilter(bucketFilter); return this; } @Override - public Scan withLevelFilter(Filter<Integer> levelFilter) { + public FallbackReadScan withLevelFilter(Filter<Integer> levelFilter) { mainScan.withLevelFilter(levelFilter); fallbackScan.withLevelFilter(levelFilter); return this; } @Override - public Scan withMetricsRegistry(MetricRegistry metricRegistry) { + public FallbackReadScan withMetricsRegistry(MetricRegistry metricRegistry) { mainScan.withMetricsRegistry(metricRegistry); fallbackScan.withMetricsRegistry(metricRegistry); return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 639b711c29..59c2c7b988 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -29,10 +29,12 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; +import org.apache.paimon.table.FallbackReadFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.DataTableBatchScan; +import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.DataTableStreamScan; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.StreamDataTableScan; @@ -131,7 +133,13 @@ public class ReadOptimizedTable implements DataTable, ReadonlyTable { } @Override - public DataTableBatchScan newScan() { + public DataTableScan newScan() { + if (wrapped instanceof FallbackReadFileStoreTable) { + FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) wrapped; + return (new FallbackReadFileStoreTable.FallbackReadScan( + table.wrapped().newScan(), table.fallback().newScan())) + .withLevelFilter(l -> l == coreOptions().numLevels() - 1); + } return new DataTableBatchScan( !wrapped.schema().primaryKeys().isEmpty(), coreOptions(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 3be5071e7a..73690fe9ab 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -1096,11 +1096,25 @@ public class CatalogTableITCase extends CatalogITCaseBase { @Test public void testReadOptimizedTable() { sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '1')"); - innerTestReadOptimizedTable(); + innerTestReadOptimizedTableAndCheckData("T"); sql("DROP TABLE T"); sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '-1')"); - innerTestReadOptimizedTable(); + innerTestReadOptimizedTableAndCheckData("T"); + } + + @Test + public void testReadOptimizedTableFallBack() { + sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '1')"); + sql("CALL sys.create_branch('default.T', 'stream')"); + sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')"); + innerTestReadOptimizedTableAndCheckData("T$branch_stream"); + + sql("DROP TABLE T"); + sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '-1')"); + sql("CALL sys.create_branch('default.T', 'stream')"); + sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')"); + innerTestReadOptimizedTableAndCheckData("T$branch_stream"); } @Test @@ -1169,21 +1183,25 @@ public class CatalogTableITCase extends CatalogITCaseBase { assertThat(row.getField(6)).isNotNull(); } - private void innerTestReadOptimizedTable() { + private void innerTestReadOptimizedTableAndCheckData(String insertTableName) { // full compaction will always be performed at the end of batch jobs, as long as // full-compaction.delta-commits is set, regardless of its value sql( - "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (1, 10), (2, 20)"); + String.format( + "INSERT INTO %s /*+ OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (1, 10), (2, 20)", + insertTableName)); List<Row> result = sql("SELECT k, v FROM T$ro ORDER BY k"); assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20)); // no compaction, so result of ro table does not change - sql("INSERT INTO T VALUES (1, 11), (3, 30)"); + sql(String.format("INSERT INTO %s VALUES (1, 11), (3, 30)", insertTableName)); result = sql("SELECT k, v FROM T$ro ORDER BY k"); assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20)); sql( - "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (2, 21), (3, 31)"); + String.format( + "INSERT INTO %s /*+ OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (2, 21), (3, 31)", + insertTableName)); result = sql("SELECT k, v FROM T$ro ORDER BY k"); assertThat(result).containsExactly(Row.of(1, 11), Row.of(2, 21), Row.of(3, 31)); }
