This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a58513e4 [FLINK-29760] Introduce snapshots metadata table
a58513e4 is described below

commit a58513e40eb0ffb50fb6eee357c4449a9d8483cd
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Thu Oct 27 12:00:35 2022 +0800

    [FLINK-29760] Introduce snapshots metadata table
    
    This closes #333
---
 docs/content/docs/development/query-table.md       |  19 ++
 .../store/connector/AbstractTableStoreFactory.java |   6 +
 .../flink/table/store/connector/FlinkCatalog.java  |  22 ++-
 .../store/connector/MetadataCatalogTable.java      |  90 ++++++++++
 .../source/ContinuousFileSplitEnumerator.java      |   5 +-
 .../store/connector/source/FileStoreSource.java    |  45 +----
 .../connector/source/FileStoreSourceSplit.java     |  10 +-
 .../source/FileStoreSourceSplitGenerator.java      |   7 +-
 .../source/FileStoreSourceSplitSerializer.java     |  22 ++-
 .../table/store/connector/source/FlinkSource.java  |  85 +++++++++
 .../store/connector/source/FlinkTableSource.java   |  90 ++++++++++
 .../store/connector/source/MetadataSource.java     |  60 +++++++
 .../connector/source/MetadataTableSource.java      |  67 +++++++
 .../store/connector/source/TableStoreSource.java   |  56 +-----
 .../table/store/connector/CatalogTableITCase.java  |  33 ++--
 .../connector/TableStoreManagedFactoryTest.java    |   3 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |  19 +-
 .../table/store/file/catalog/AbstractCatalog.java  |  26 +++
 .../flink/table/store/file/catalog/Catalog.java    |   8 +-
 .../store/file/utils/IteratorRecordReader.java     |  66 +++++++
 .../table/store/file/utils/SnapshotManager.java    |  11 ++
 .../metadata/MetadataTableLoader.java}             |  29 ++-
 .../table/store/table/metadata/SnapshotsTable.java | 196 +++++++++++++++++++++
 .../flink/table/store/spark/SparkReadITCase.java   |  10 +-
 24 files changed, 826 insertions(+), 159 deletions(-)

diff --git a/docs/content/docs/development/query-table.md 
b/docs/content/docs/development/query-table.md
index 520e1d4a..da890546 100644
--- a/docs/content/docs/development/query-table.md
+++ b/docs/content/docs/development/query-table.md
@@ -90,3 +90,22 @@ SELECT * FROM orders WHERE order_id=29495;
 
 SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
 ```
+
+## Snapshots Table
+
+You can query the snapshot history information of the table through Flink SQL.
+
+```sql
+SELECT * FROM MyTable$snapshots;
+
++--------------+------------+-----------------+-------------------+--------------+-------------------------+
+|  snapshot_id |  schema_id |     commit_user | commit_identifier |  
commit_kind |             commit_time |
++--------------+------------+-----------------+-------------------+--------------+-------------------------+
+|            2 |          0 | 7ca4cd28-98e... |                 2 |       
APPEND | 2022-10-26 11:44:15.600 |
+|            1 |          0 | 870062aa-3e9... |                 1 |       
APPEND | 2022-10-26 11:44:15.148 |
++--------------+------------+-----------------+-------------------+--------------+-------------------------+
+2 rows in set
+```
+
+By querying one table's snapshots table, you can know the commit and expiration
+information about that table and time travel through the data.
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 7c24cc42..cb09a06d 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
@@ -34,6 +35,7 @@ import 
org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.MetadataTableSource;
 import org.apache.flink.table.store.connector.source.TableStoreSource;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -63,6 +65,10 @@ public abstract class AbstractTableStoreFactory
 
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
+        CatalogTable origin = context.getCatalogTable().getOrigin();
+        if (origin instanceof MetadataCatalogTable) {
+            return new MetadataTableSource(((MetadataCatalogTable) 
origin).table());
+        }
         return new TableStoreSource(
                 context.getObjectIdentifier(),
                 buildFileStoreTable(context),
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index e4dcefb7..63e29b06 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -42,8 +42,9 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.schema.SchemaChange;
-import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -137,17 +138,24 @@ public class FlinkCatalog extends AbstractCatalog {
     @Override
     public CatalogTable getTable(ObjectPath tablePath)
             throws TableNotExistException, CatalogException {
-        TableSchema schema;
+        Table table;
         try {
-            schema = catalog.getTableSchema(tablePath);
+            table = catalog.getTable(tablePath);
         } catch (Catalog.TableNotExistException e) {
             throw new TableNotExistException(getName(), e.tablePath());
         }
 
-        CatalogTable table = schema.toUpdateSchema().toCatalogTable();
-        // add path to source and sink
-        table.getOptions().put(PATH.key(), 
catalog.getTableLocation(tablePath).toString());
-        return table;
+        if (table instanceof FileStoreTable) {
+            CatalogTable catalogTable =
+                    ((FileStoreTable) 
table).schema().toUpdateSchema().toCatalogTable();
+            // add path to source and sink
+            catalogTable
+                    .getOptions()
+                    .put(PATH.key(), 
catalog.getTableLocation(tablePath).toString());
+            return catalogTable;
+        } else {
+            return new MetadataCatalogTable(table);
+        }
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java
new file mode 100644
index 00000000..a17e4203
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** A {@link CatalogTable} to represent metadata table. */
+public class MetadataCatalogTable implements CatalogTable {
+
+    private final Table table;
+
+    public MetadataCatalogTable(Table table) {
+        this.table = table;
+    }
+
+    public Table table() {
+        return table;
+    }
+
+    @Override
+    public Schema getUnresolvedSchema() {
+        return Schema.newBuilder()
+                
.fromRowDataType(TypeConversions.fromLogicalToDataType(table.rowType()))
+                .build();
+    }
+
+    @Override
+    public boolean isPartitioned() {
+        return false;
+    }
+
+    @Override
+    public List<String> getPartitionKeys() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogTable copy(Map<String, String> map) {
+        return copy();
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public String getComment() {
+        return "";
+    }
+
+    @Override
+    public CatalogTable copy() {
+        return new MetadataCatalogTable(table);
+    }
+
+    @Override
+    public Optional<String> getDescription() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> getDetailedDescription() {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 14e84dd2..e2614bc7 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 import 
org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult;
@@ -92,7 +93,9 @@ public class ContinuousFileSplitEnumerator
     }
 
     private void addSplit(FileStoreSourceSplit split) {
-        bucketSplits.computeIfAbsent(split.split().bucket(), i -> new 
LinkedList<>()).add(split);
+        bucketSplits
+                .computeIfAbsent(((DataSplit) split.split()).bucket(), i -> 
new LinkedList<>())
+                .add(split);
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 2aff07c2..72165041 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -20,17 +20,13 @@ package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.TableRead;
 
 import javax.annotation.Nullable;
 
@@ -41,8 +37,7 @@ import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpo
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** {@link Source} of file store. */
-public class FileStoreSource
-        implements Source<RowData, FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
+public class FileStoreSource extends FlinkSource {
 
     private static final long serialVersionUID = 1L;
 
@@ -54,12 +49,6 @@ public class FileStoreSource
 
     private final boolean latestContinuous;
 
-    @Nullable private final int[][] projectedFields;
-
-    @Nullable private final Predicate predicate;
-
-    @Nullable private final Long limit;
-
     public FileStoreSource(
             FileStoreTable table,
             boolean isContinuous,
@@ -68,13 +57,11 @@ public class FileStoreSource
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
+        super(table, projectedFields, predicate, limit);
         this.table = table;
         this.isContinuous = isContinuous;
         this.discoveryInterval = discoveryInterval;
         this.latestContinuous = latestContinuous;
-        this.projectedFields = projectedFields;
-        this.predicate = predicate;
-        this.limit = limit;
     }
 
     @Override
@@ -82,24 +69,6 @@ public class FileStoreSource
         return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED;
     }
 
-    @Override
-    public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
-        TableRead read = table.newRead();
-        if (projectedFields != null) {
-            read.withProjection(projectedFields);
-        }
-        if (predicate != null) {
-            read.withFilter(predicate);
-        }
-        return new FileStoreSourceReader(context, read, limit);
-    }
-
-    @Override
-    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
createEnumerator(
-            SplitEnumeratorContext<FileStoreSourceSplit> context) {
-        return restoreEnumerator(context, null);
-    }
-
     @Override
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
@@ -150,14 +119,4 @@ public class FileStoreSource
             return new StaticFileStoreSplitEnumerator(context, snapshot, 
splits);
         }
     }
-
-    @Override
-    public FileStoreSourceSplitSerializer getSplitSerializer() {
-        return new FileStoreSourceSplitSerializer();
-    }
-
-    @Override
-    public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
-        return new PendingSplitsCheckpointSerializer(getSplitSerializer());
-    }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
index 2356bab8..4ff027ea 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.Split;
 
 import java.util.Objects;
 
@@ -29,21 +29,21 @@ public class FileStoreSourceSplit implements SourceSplit {
     /** The unique ID of the split. Unique within the scope of this source. */
     private final String id;
 
-    private final DataSplit split;
+    private final Split split;
 
     private final long recordsToSkip;
 
-    public FileStoreSourceSplit(String id, DataSplit split) {
+    public FileStoreSourceSplit(String id, Split split) {
         this(id, split, 0);
     }
 
-    public FileStoreSourceSplit(String id, DataSplit split, long 
recordsToSkip) {
+    public FileStoreSourceSplit(String id, Split split, long recordsToSkip) {
         this.id = id;
         this.split = split;
         this.recordsToSkip = recordsToSkip;
     }
 
-    public DataSplit split() {
+    public Split split() {
         return split;
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
index 5ffc6725..92ae8ceb 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.table.store.connector.source;
 
-import org.apache.flink.table.store.table.source.DataSplit;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableScan;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -36,9 +35,9 @@ public class FileStoreSourceSplitGenerator {
      */
     private final char[] currentId = "0000000000".toCharArray();
 
-    public List<FileStoreSourceSplit> createSplits(DataTableScan.Plan plan) {
+    public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
         return plan.splits().stream()
-                .map(s -> new FileStoreSourceSplit(getNextId(), (DataSplit) s))
+                .map(s -> new FileStoreSourceSplit(getNextId(), s))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
index 4c0d6d2f..58b718b3 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
@@ -19,10 +19,12 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.util.InstantiationUtil;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
@@ -40,15 +42,23 @@ public class FileStoreSourceSplitSerializer
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
         view.writeUTF(split.splitId());
-        split.split().serialize(view);
+        InstantiationUtil.serializeObject(view, split.split());
         view.writeLong(split.recordsToSkip());
         return out.toByteArray();
     }
 
     @Override
     public FileStoreSourceSplit deserialize(int version, byte[] serialized) 
throws IOException {
-        DataInputDeserializer view = new DataInputDeserializer(serialized);
-        return new FileStoreSourceSplit(
-                view.readUTF(), DataSplit.deserialize(view), view.readLong());
+        ByteArrayInputStream in = new ByteArrayInputStream(serialized);
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+        String splitId = view.readUTF();
+        Split split;
+        try {
+            split = InstantiationUtil.deserializeObject(in, 
getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+        long recordsToSkip = view.readLong();
+        return new FileStoreSourceSplit(splitId, split, recordsToSkip);
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSource.java
new file mode 100644
index 00000000..9d80ec6d
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.source.TableRead;
+
+import javax.annotation.Nullable;
+
+/** A Flink {@link Source} for table store. */
+public abstract class FlinkSource
+        implements Source<RowData, FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final Table table;
+
+    @Nullable protected final int[][] projectedFields;
+
+    @Nullable protected final Predicate predicate;
+
+    @Nullable protected final Long limit;
+
+    public FlinkSource(
+            Table table,
+            @Nullable int[][] projectedFields,
+            @Nullable Predicate predicate,
+            @Nullable Long limit) {
+        this.table = table;
+        this.projectedFields = projectedFields;
+        this.predicate = predicate;
+        this.limit = limit;
+    }
+
+    @Override
+    public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
+        TableRead read = table.newRead();
+        if (projectedFields != null) {
+            read.withProjection(projectedFields);
+        }
+        if (predicate != null) {
+            read.withFilter(predicate);
+        }
+        return new FileStoreSourceReader(context, read, limit);
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context) throws 
Exception {
+        return restoreEnumerator(context, null);
+    }
+
+    @Override
+    public FileStoreSourceSplitSerializer getSplitSerializer() {
+        return new FileStoreSourceSplitSerializer();
+    }
+
+    @Override
+    public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
+        return new PendingSplitsCheckpointSerializer(getSplitSerializer());
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkTableSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkTableSource.java
new file mode 100644
index 00000000..77d03a14
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkTableSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A Flink {@link ScanTableSource} for table store. */
+public abstract class FlinkTableSource
+        implements ScanTableSource,
+                SupportsFilterPushDown,
+                SupportsProjectionPushDown,
+                SupportsLimitPushDown {
+
+    private final Table table;
+
+    @Nullable protected Predicate predicate;
+    @Nullable protected int[][] projectFields;
+    @Nullable protected Long limit;
+
+    public FlinkTableSource(Table table) {
+        this(table, null, null, null);
+    }
+
+    public FlinkTableSource(
+            Table table,
+            @Nullable Predicate predicate,
+            @Nullable int[][] projectFields,
+            @Nullable Long limit) {
+        this.table = table;
+        this.predicate = predicate;
+        this.projectFields = projectFields;
+        this.limit = limit;
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        List<Predicate> converted = new ArrayList<>();
+        RowType rowType = table.rowType();
+        for (ResolvedExpression filter : filters) {
+            PredicateConverter.convert(rowType, 
filter).ifPresent(converted::add);
+        }
+        predicate = converted.isEmpty() ? null : 
PredicateBuilder.and(converted);
+        return Result.of(filters, filters);
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields) {
+        this.projectFields = projectedFields;
+    }
+
+    @Override
+    public void applyLimit(long limit) {
+        this.limit = limit;
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java
new file mode 100644
index 00000000..9155c27c
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.source.TableScan;
+
+import javax.annotation.Nullable;
+
+/** A {@link FlinkSource} for metadata table. */
+public class MetadataSource extends FlinkSource {
+
+    private static final long serialVersionUID = 1L;
+
+    public MetadataSource(
+            Table table,
+            @Nullable int[][] projectedFields,
+            @Nullable Predicate predicate,
+            @Nullable Long limit) {
+        super(table, projectedFields, predicate, limit);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            PendingSplitsCheckpoint checkpoint) {
+        TableScan scan = table.newScan();
+        if (predicate != null) {
+            scan.withFilter(predicate);
+        }
+
+        return new StaticFileStoreSplitEnumerator(
+                context, null, new 
FileStoreSourceSplitGenerator().createSplits(scan.plan()));
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java
new file mode 100644
index 00000000..63941196
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.Table;
+
+import javax.annotation.Nullable;
+
+/** A {@link FlinkTableSource} for metadata table. */
+public class MetadataTableSource extends FlinkTableSource {
+
+    private final Table table;
+
+    public MetadataTableSource(Table table) {
+        super(table);
+        this.table = table;
+    }
+
+    public MetadataTableSource(
+            Table table,
+            @Nullable Predicate predicate,
+            @Nullable int[][] projectFields,
+            @Nullable Long limit) {
+        super(table, predicate, projectFields, limit);
+        this.table = table;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        return SourceProvider.of(new MetadataSource(table, projectFields, 
predicate, limit));
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new MetadataTableSource(table, predicate, projectFields, limit);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TableStore-MetadataSource";
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 68a976b0..503f1009 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -24,14 +24,9 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
-import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
-import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
@@ -40,8 +35,6 @@ import 
org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
 import org.apache.flink.table.store.connector.lookup.FileStoreLookupFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.file.predicate.PredicateConverter;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -49,12 +42,9 @@ import 
org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.utils.Projection;
-import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
@@ -68,13 +58,8 @@ import static 
org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE
  * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link 
FileStoreSource} and kafka
  * log source created by {@link LogSourceProvider}.
  */
-public class TableStoreSource
-        implements ScanTableSource,
-                LookupTableSource,
-                SupportsFilterPushDown,
-                SupportsProjectionPushDown,
-                SupportsLimitPushDown,
-                SupportsWatermarkPushDown {
+public class TableStoreSource extends FlinkTableSource
+        implements LookupTableSource, SupportsWatermarkPushDown {
 
     private final ObjectIdentifier tableIdentifier;
     private final FileStoreTable table;
@@ -82,10 +67,6 @@ public class TableStoreSource
     private final DynamicTableFactory.Context context;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
-    @Nullable private Predicate predicate;
-    @Nullable private int[][] projectFields;
-    @Nullable private Long limit;
-
     @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
 
     public TableStoreSource(
@@ -116,6 +97,7 @@ public class TableStoreSource
             @Nullable int[][] projectFields,
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
+        super(table, predicate, projectFields, limit);
         this.tableIdentifier = tableIdentifier;
         this.table = table;
         this.streaming = streaming;
@@ -158,7 +140,9 @@ public class TableStoreSource
                     : ChangelogMode.upsert();
         } else {
             throw new UnsupportedOperationException(
-                    "Unknown FileStoreTable subclass " + 
table.getClass().getName());
+                    "Unsupported Table subclass "
+                            + table.getClass().getName()
+                            + " for streaming mode.");
         }
     }
 
@@ -202,28 +186,7 @@ public class TableStoreSource
 
     @Override
     public String asSummaryString() {
-        return "TableStoreSource";
-    }
-
-    @Override
-    public Result applyFilters(List<ResolvedExpression> filters) {
-        List<Predicate> converted = new ArrayList<>();
-        RowType rowType = table.schema().logicalRowType();
-        for (ResolvedExpression filter : filters) {
-            PredicateConverter.convert(rowType, 
filter).ifPresent(converted::add);
-        }
-        predicate = converted.isEmpty() ? null : 
PredicateBuilder.and(converted);
-        return Result.of(filters, filters);
-    }
-
-    @Override
-    public boolean supportsNestedProjection() {
-        return false;
-    }
-
-    @Override
-    public void applyProjection(int[][] projectedFields) {
-        this.projectFields = projectedFields;
+        return "TableStore-DataSource";
     }
 
     @Override
@@ -231,11 +194,6 @@ public class TableStoreSource
         this.watermarkStrategy = watermarkStrategy;
     }
 
-    @Override
-    public void applyLimit(long limit) {
-        this.limit = limit;
-    }
-
     @Override
     public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
         if (limit != null) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
similarity index 50%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
copy to 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
index c3e2dd59..baaf8f00 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,24 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.catalog;
+package org.apache.flink.table.store.connector;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.types.Row;
 
-/** Common implementation of {@link Catalog}. */
-public abstract class AbstractCatalog implements Catalog {
+import org.junit.Test;
 
-    protected static final String DB_SUFFIX = ".db";
+import java.util.List;
 
-    @Override
-    public Path getTableLocation(ObjectPath tablePath) {
-        return new Path(databasePath(tablePath.getDatabaseName()), 
tablePath.getObjectName());
-    }
+import static org.assertj.core.api.Assertions.assertThat;
 
-    protected Path databasePath(String database) {
-        return new Path(warehouse(), database + DB_SUFFIX);
-    }
+/** ITCase for catalog tables. */
+public class CatalogTableITCase extends CatalogITCaseBase {
 
-    protected abstract String warehouse();
+    @Test
+    public void testMetadataTable() throws Exception {
+        sql("CREATE TABLE T (a INT, b INT)");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (3, 4)");
+
+        List<Row> result = sql("SELECT snapshot_id, schema_id, commit_kind 
FROM T$snapshots");
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1L, 0L, "APPEND"), 
Row.of(2L, 0L, "APPEND"));
+    }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 4cedbbc8..fa39d08c 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -204,7 +204,8 @@ public class TableStoreManagedFactoryTest {
         context = createEnrichedContext(TABLE_IDENTIFIER, catalogTable);
         if (expectedResult.success) {
             tableStoreManagedFactory.onCreateTable(context, false);
-            FileStoreTable table = 
AbstractTableStoreFactory.buildFileStoreTable(context);
+            FileStoreTable table =
+                    (FileStoreTable) 
AbstractTableStoreFactory.buildFileStoreTable(context);
             assertThat(table.schema().partitionKeys().size() > 0)
                     .isEqualTo(catalogTable.isPartitioned());
             assertThat(table.schema().primaryKeys().size())
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 4e645045..a55a9183 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -85,8 +85,12 @@ public class FileStoreSourceSplitGeneratorTest {
         assertThat(splits.size()).isEqualTo(12);
         splits.sort(
                 Comparator.comparingInt(
-                                o -> ((FileStoreSourceSplit) 
o).split().partition().getInt(0))
-                        .thenComparing(o -> ((FileStoreSourceSplit) 
o).split().bucket()));
+                                o ->
+                                        ((DataSplit) ((FileStoreSourceSplit) 
o).split())
+                                                .partition()
+                                                .getInt(0))
+                        .thenComparing(
+                                o -> ((DataSplit) ((FileStoreSourceSplit) 
o).split()).bucket()));
         assertSplit(splits.get(0), "0000000007", 1, 0, Arrays.asList("f0", 
"f1"));
         assertSplit(splits.get(1), "0000000008", 1, 1, 
Collections.singletonList("f2"));
         assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3", 
"f4", "f5"));
@@ -104,12 +108,13 @@ public class FileStoreSourceSplitGeneratorTest {
     private void assertSplit(
             FileStoreSourceSplit split, String splitId, int part, int bucket, 
List<String> files) {
         assertThat(split.splitId()).isEqualTo(splitId);
-        assertThat(split.split().partition().getInt(0)).isEqualTo(part);
-        assertThat(split.split().bucket()).isEqualTo(bucket);
+        assertThat(((DataSplit) 
split.split()).partition().getInt(0)).isEqualTo(part);
+        assertThat(((DataSplit) split.split()).bucket()).isEqualTo(bucket);
         assertThat(
-                        split.split().files().stream()
-                                .map(DataFileMeta::fileName)
-                                .collect(Collectors.toList()))
+                        ((DataSplit) split.split())
+                                .files().stream()
+                                        .map(DataFileMeta::fileName)
+                                        .collect(Collectors.toList()))
                 .isEqualTo(files);
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
index c3e2dd59..fc705332 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
@@ -20,6 +20,12 @@ package org.apache.flink.table.store.file.catalog;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.metadata.MetadataTableLoader;
+
+import org.apache.commons.lang3.StringUtils;
 
 /** Common implementation of {@link Catalog}. */
 public abstract class AbstractCatalog implements Catalog {
@@ -31,6 +37,26 @@ public abstract class AbstractCatalog implements Catalog {
         return new Path(databasePath(tablePath.getDatabaseName()), 
tablePath.getObjectName());
     }
 
+    @Override
+    public Table getTable(ObjectPath tablePath) throws TableNotExistException {
+        String inputTableName = tablePath.getObjectName();
+        if (inputTableName.contains(METADATA_TABLE_SPLITTER)) {
+            String[] splits = StringUtils.split(inputTableName, 
METADATA_TABLE_SPLITTER);
+            if (splits.length != 2) {
+                throw new IllegalArgumentException(
+                        "Metadata table can only contain one '$' separator, 
but this is: "
+                                + inputTableName);
+            }
+            String table = splits[0];
+            String metadata = splits[1];
+            Path location = getTableLocation(new 
ObjectPath(tablePath.getDatabaseName(), table));
+            return MetadataTableLoader.load(metadata, location);
+        } else {
+            TableSchema tableSchema = getTableSchema(tablePath);
+            return FileStoreTableFactory.create(getTableLocation(tablePath), 
tableSchema);
+        }
+    }
+
     protected Path databasePath(String database) {
         return new Path(warehouse(), database + DB_SUFFIX);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index a84bb22f..a29977cf 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.Table;
 
 import java.util.List;
@@ -37,6 +36,8 @@ public interface Catalog extends AutoCloseable {
 
     String DEFAULT_DATABASE = "default";
 
+    String METADATA_TABLE_SPLITTER = "$";
+
     /**
      * Get lock factory from catalog. Lock is used to support multiple 
concurrent writes on the
      * object store.
@@ -116,10 +117,7 @@ public interface Catalog extends AutoCloseable {
      * @return The requested table
      * @throws TableNotExistException if the target does not exist
      */
-    default Table getTable(ObjectPath tablePath) throws TableNotExistException 
{
-        TableSchema tableSchema = getTableSchema(tablePath);
-        return FileStoreTableFactory.create(getTableLocation(tablePath), 
tableSchema);
-    }
+    Table getTable(ObjectPath tablePath) throws TableNotExistException;
 
     /**
      * Check if a table exists in this catalog.
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/IteratorRecordReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/IteratorRecordReader.java
new file mode 100644
index 00000000..5dad460d
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/IteratorRecordReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/** Wrap a {@link Iterator} as an {@link RecordReader}. */
+public class IteratorRecordReader<T> implements RecordReader<T> {
+
+    private final Iterator<T> iterator;
+
+    private boolean read = false;
+
+    public IteratorRecordReader(Iterator<T> iterator) {
+        this.iterator = iterator;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+        if (read) {
+            return null;
+        }
+
+        read = true;
+        return new RecordIterator<T>() {
+            @Override
+            public T next() {
+                return iterator.hasNext() ? iterator.next() : null;
+            }
+
+            @Override
+            public void releaseBatch() {}
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (iterator instanceof AutoCloseable) {
+            try {
+                ((AutoCloseable) iterator).close();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 86c0e22c..1e16e729 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.UUID;
 import java.util.function.BinaryOperator;
 
@@ -79,6 +80,16 @@ public class SnapshotManager {
         }
     }
 
+    public long snapshotCount() throws IOException {
+        return listVersionedFiles(snapshotDirectory(), 
SNAPSHOT_PREFIX).count();
+    }
+
+    public Iterator<Snapshot> snapshots() throws IOException {
+        return listVersionedFiles(snapshotDirectory(), SNAPSHOT_PREFIX)
+                .map(this::snapshot)
+                .iterator();
+    }
+
     public Long findLatest() throws IOException {
         Path snapshotDir = snapshotDirectory();
         FileSystem fs = snapshotDir.getFileSystem();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java
similarity index 54%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java
index c3e2dd59..782bb1f4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,24 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.catalog;
+package org.apache.flink.table.store.table.metadata;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.table.Table;
 
-/** Common implementation of {@link Catalog}. */
-public abstract class AbstractCatalog implements Catalog {
+import static 
org.apache.flink.table.store.table.metadata.SnapshotsTable.SNAPSHOTS;
 
-    protected static final String DB_SUFFIX = ".db";
+/** Loader to load metadata {@link Table}s. */
+public class MetadataTableLoader {
 
-    @Override
-    public Path getTableLocation(ObjectPath tablePath) {
-        return new Path(databasePath(tablePath.getDatabaseName()), 
tablePath.getObjectName());
+    public static Table load(String metadata, Path location) {
+        switch (metadata.toLowerCase()) {
+            case SNAPSHOTS:
+                return new SnapshotsTable(location);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported metadata table type: " + metadata);
+        }
     }
-
-    protected Path databasePath(String database) {
-        return new Path(warehouse(), database + DB_SUFFIX);
-    }
-
-    protected abstract String warehouse();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
new file mode 100644
index 00000000..b0d9b143
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.metadata;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.IteratorRecordReader;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.SerializationUtils;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.utils.ProjectedRowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+
+/** A {@link Table} for showing committing snapshots of table. */
+public class SnapshotsTable implements Table {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String SNAPSHOTS = "snapshots";
+
+    public static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new RowType.RowField("snapshot_id", new 
BigIntType(false)),
+                            new RowType.RowField("schema_id", new 
BigIntType(false)),
+                            new RowType.RowField(
+                                    "commit_user", 
SerializationUtils.newStringType(false)),
+                            new RowType.RowField(
+                                    "commit_identifier", 
SerializationUtils.newStringType(false)),
+                            new RowType.RowField(
+                                    "commit_kind", 
SerializationUtils.newStringType(false)),
+                            new RowType.RowField("commit_time", new 
TimestampType(false, 3))));
+
+    private final Path location;
+
+    public SnapshotsTable(Path location) {
+        this.location = location;
+    }
+
+    @Override
+    public String name() {
+        return location.getName() + METADATA_TABLE_SPLITTER + SNAPSHOTS;
+    }
+
+    @Override
+    public RowType rowType() {
+        return TABLE_TYPE;
+    }
+
+    @Override
+    public TableScan newScan() {
+        return new SnapshotsScan();
+    }
+
+    @Override
+    public TableRead newRead() {
+        return new SnapshotsRead();
+    }
+
+    private class SnapshotsScan implements TableScan {
+
+        @Override
+        public TableScan withFilter(Predicate predicate) {
+            // TODO
+            return this;
+        }
+
+        @Override
+        public Plan plan() {
+            return () -> Collections.singletonList(new 
SnapshotsSplit(location));
+        }
+    }
+
+    private static class SnapshotsSplit implements Split {
+
+        private static final long serialVersionUID = 1L;
+
+        private final Path location;
+
+        private SnapshotsSplit(Path location) {
+            this.location = location;
+        }
+
+        @Override
+        public long rowCount() {
+            try {
+                return new SnapshotManager(location).snapshotCount();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SnapshotsSplit that = (SnapshotsSplit) o;
+            return Objects.equals(location, that.location);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(location);
+        }
+    }
+
+    private static class SnapshotsRead implements TableRead {
+
+        private int[][] projection;
+
+        @Override
+        public TableRead withFilter(Predicate predicate) {
+            // TODO
+            return this;
+        }
+
+        @Override
+        public TableRead withProjection(int[][] projection) {
+            this.projection = projection;
+            return this;
+        }
+
+        @Override
+        public RecordReader<RowData> createReader(Split split) throws 
IOException {
+            if (!(split instanceof SnapshotsSplit)) {
+                throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
+            }
+            Path location = ((SnapshotsSplit) split).location;
+            Iterator<Snapshot> snapshots = new 
SnapshotManager(location).snapshots();
+            Iterator<RowData> rows = Iterators.transform(snapshots, 
this::toRow);
+            if (projection != null) {
+                rows =
+                        Iterators.transform(
+                                rows, row -> 
ProjectedRowData.from(projection).replaceRow(row));
+            }
+            return new IteratorRecordReader<>(rows);
+        }
+
+        private RowData toRow(Snapshot snapshot) {
+            return GenericRowData.of(
+                    snapshot.id(),
+                    snapshot.schemaId(),
+                    StringData.fromString(snapshot.commitUser()),
+                    StringData.fromString(snapshot.commitIdentifier()),
+                    StringData.fromString(snapshot.commitKind().toString()),
+                    TimestampData.fromLocalDateTime(
+                            LocalDateTime.ofInstant(
+                                    
Instant.ofEpochMilli(snapshot.timeMillis()),
+                                    ZoneId.systemDefault())));
+        }
+    }
+}
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 522723b4..c32ab2eb 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -221,10 +221,18 @@ public class SparkReadITCase {
     @Test
     public void testCatalogNormal() {
         innerTestSimpleType(spark.table("tablestore.default.t1"));
-
         innerTestNestedType(spark.table("tablestore.default.t2"));
     }
 
+    @Test
+    public void testMetadataTable() {
+        List<Row> rows =
+                spark.table("tablestore.default.`t1$snapshots`")
+                        .select("snapshot_id", "schema_id", "commit_user", 
"commit_kind")
+                        .collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[1,0,user,APPEND]]");
+    }
+
     @Test
     public void testCatalogFilterPushDown() {
         
innerTestSimpleTypeFilterPushDown(spark.table("tablestore.default.t1"));

Reply via email to