This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new a58513e4 [FLINK-29760] Introduce snapshots metadata table a58513e4 is described below commit a58513e40eb0ffb50fb6eee357c4449a9d8483cd Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Thu Oct 27 12:00:35 2022 +0800 [FLINK-29760] Introduce snapshots metadata table This closes #333 --- docs/content/docs/development/query-table.md | 19 ++ .../store/connector/AbstractTableStoreFactory.java | 6 + .../flink/table/store/connector/FlinkCatalog.java | 22 ++- .../store/connector/MetadataCatalogTable.java | 90 ++++++++++ .../source/ContinuousFileSplitEnumerator.java | 5 +- .../store/connector/source/FileStoreSource.java | 45 +---- .../connector/source/FileStoreSourceSplit.java | 10 +- .../source/FileStoreSourceSplitGenerator.java | 7 +- .../source/FileStoreSourceSplitSerializer.java | 22 ++- .../table/store/connector/source/FlinkSource.java | 85 +++++++++ .../store/connector/source/FlinkTableSource.java | 90 ++++++++++ .../store/connector/source/MetadataSource.java | 60 +++++++ .../connector/source/MetadataTableSource.java | 67 +++++++ .../store/connector/source/TableStoreSource.java | 56 +----- .../table/store/connector/CatalogTableITCase.java | 33 ++-- .../connector/TableStoreManagedFactoryTest.java | 3 +- .../source/FileStoreSourceSplitGeneratorTest.java | 19 +- .../table/store/file/catalog/AbstractCatalog.java | 26 +++ .../flink/table/store/file/catalog/Catalog.java | 8 +- .../store/file/utils/IteratorRecordReader.java | 66 +++++++ .../table/store/file/utils/SnapshotManager.java | 11 ++ .../metadata/MetadataTableLoader.java} | 29 ++- .../table/store/table/metadata/SnapshotsTable.java | 196 +++++++++++++++++++++ .../flink/table/store/spark/SparkReadITCase.java | 10 +- 24 files changed, 826 insertions(+), 159 deletions(-) diff --git a/docs/content/docs/development/query-table.md b/docs/content/docs/development/query-table.md index 520e1d4a..da890546 100644 --- a/docs/content/docs/development/query-table.md +++ b/docs/content/docs/development/query-table.md @@ -90,3 +90,22 @@ SELECT * FROM orders WHERE order_id=29495; SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495; ``` + +## Snapshots Table + +You can query the snapshot history information of the table through Flink SQL. + +```sql +SELECT * FROM MyTable$snapshots; + ++--------------+------------+-----------------+-------------------+--------------+-------------------------+ +| snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | ++--------------+------------+-----------------+-------------------+--------------+-------------------------+ +| 2 | 0 | 7ca4cd28-98e... | 2 | APPEND | 2022-10-26 11:44:15.600 | +| 1 | 0 | 870062aa-3e9... | 1 | APPEND | 2022-10-26 11:44:15.148 | ++--------------+------------+-----------------+-------------------+--------------+-------------------------+ +2 rows in set +``` + +By querying one table's snapshots table, you can know the commit and expiration +information about that table and time travel through the data. diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java index 7c24cc42..cb09a06d 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableFactory; @@ -34,6 +35,7 @@ import org.apache.flink.table.store.CoreOptions.LogChangelogMode; import org.apache.flink.table.store.CoreOptions.LogConsistency; import org.apache.flink.table.store.CoreOptions.LogStartupMode; import org.apache.flink.table.store.connector.sink.TableStoreSink; +import org.apache.flink.table.store.connector.source.MetadataTableSource; import org.apache.flink.table.store.connector.source.TableStoreSource; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.schema.UpdateSchema; @@ -63,6 +65,10 @@ public abstract class AbstractTableStoreFactory @Override public DynamicTableSource createDynamicTableSource(Context context) { + CatalogTable origin = context.getCatalogTable().getOrigin(); + if (origin instanceof MetadataCatalogTable) { + return new MetadataTableSource(((MetadataCatalogTable) origin).table()); + } return new TableStoreSource( context.getObjectIdentifier(), buildFileStoreTable(context), diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java index e4dcefb7..63e29b06 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java @@ -42,8 +42,9 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.store.file.catalog.Catalog; import org.apache.flink.table.store.file.schema.SchemaChange; -import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.schema.UpdateSchema; +import org.apache.flink.table.store.table.FileStoreTable; +import org.apache.flink.table.store.table.Table; import java.util.ArrayList; import java.util.Collections; @@ -137,17 +138,24 @@ public class FlinkCatalog extends AbstractCatalog { @Override public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { - TableSchema schema; + Table table; try { - schema = catalog.getTableSchema(tablePath); + table = catalog.getTable(tablePath); } catch (Catalog.TableNotExistException e) { throw new TableNotExistException(getName(), e.tablePath()); } - CatalogTable table = schema.toUpdateSchema().toCatalogTable(); - // add path to source and sink - table.getOptions().put(PATH.key(), catalog.getTableLocation(tablePath).toString()); - return table; + if (table instanceof FileStoreTable) { + CatalogTable catalogTable = + ((FileStoreTable) table).schema().toUpdateSchema().toCatalogTable(); + // add path to source and sink + catalogTable + .getOptions() + .put(PATH.key(), catalog.getTableLocation(tablePath).toString()); + return catalogTable; + } else { + return new MetadataCatalogTable(table); + } } @Override diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java new file mode 100644 index 00000000..a17e4203 --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** A {@link CatalogTable} to represent metadata table. */ +public class MetadataCatalogTable implements CatalogTable { + + private final Table table; + + public MetadataCatalogTable(Table table) { + this.table = table; + } + + public Table table() { + return table; + } + + @Override + public Schema getUnresolvedSchema() { + return Schema.newBuilder() + .fromRowDataType(TypeConversions.fromLogicalToDataType(table.rowType())) + .build(); + } + + @Override + public boolean isPartitioned() { + return false; + } + + @Override + public List<String> getPartitionKeys() { + return Collections.emptyList(); + } + + @Override + public CatalogTable copy(Map<String, String> map) { + return copy(); + } + + @Override + public Map<String, String> getOptions() { + return Collections.emptyMap(); + } + + @Override + public String getComment() { + return ""; + } + + @Override + public CatalogTable copy() { + return new MetadataCatalogTable(table); + } + + @Override + public Optional<String> getDescription() { + return Optional.empty(); + } + + @Override + public Optional<String> getDetailedDescription() { + return Optional.empty(); + } +} diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java index 14e84dd2..e2614bc7 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.core.fs.Path; import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.table.source.DataSplit; import org.apache.flink.table.store.table.source.DataTableScan; import org.apache.flink.table.store.table.source.SnapshotEnumerator; import org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult; @@ -92,7 +93,9 @@ public class ContinuousFileSplitEnumerator } private void addSplit(FileStoreSourceSplit split) { - bucketSplits.computeIfAbsent(split.split().bucket(), i -> new LinkedList<>()).add(split); + bucketSplits + .computeIfAbsent(((DataSplit) split.split()).bucket(), i -> new LinkedList<>()) + .add(split); } @Override diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java index 2aff07c2..72165041 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java @@ -20,17 +20,13 @@ package org.apache.flink.table.store.connector.source; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; -import org.apache.flink.api.connector.source.SourceReader; -import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.utils.SnapshotManager; import org.apache.flink.table.store.table.FileStoreTable; import org.apache.flink.table.store.table.source.DataTableScan; -import org.apache.flink.table.store.table.source.TableRead; import javax.annotation.Nullable; @@ -41,8 +37,7 @@ import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpo import static org.apache.flink.util.Preconditions.checkArgument; /** {@link Source} of file store. */ -public class FileStoreSource - implements Source<RowData, FileStoreSourceSplit, PendingSplitsCheckpoint> { +public class FileStoreSource extends FlinkSource { private static final long serialVersionUID = 1L; @@ -54,12 +49,6 @@ public class FileStoreSource private final boolean latestContinuous; - @Nullable private final int[][] projectedFields; - - @Nullable private final Predicate predicate; - - @Nullable private final Long limit; - public FileStoreSource( FileStoreTable table, boolean isContinuous, @@ -68,13 +57,11 @@ public class FileStoreSource @Nullable int[][] projectedFields, @Nullable Predicate predicate, @Nullable Long limit) { + super(table, projectedFields, predicate, limit); this.table = table; this.isContinuous = isContinuous; this.discoveryInterval = discoveryInterval; this.latestContinuous = latestContinuous; - this.projectedFields = projectedFields; - this.predicate = predicate; - this.limit = limit; } @Override @@ -82,24 +69,6 @@ public class FileStoreSource return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED; } - @Override - public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) { - TableRead read = table.newRead(); - if (projectedFields != null) { - read.withProjection(projectedFields); - } - if (predicate != null) { - read.withFilter(predicate); - } - return new FileStoreSourceReader(context, read, limit); - } - - @Override - public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator( - SplitEnumeratorContext<FileStoreSourceSplit> context) { - return restoreEnumerator(context, null); - } - @Override public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator( SplitEnumeratorContext<FileStoreSourceSplit> context, @@ -150,14 +119,4 @@ public class FileStoreSource return new StaticFileStoreSplitEnumerator(context, snapshot, splits); } } - - @Override - public FileStoreSourceSplitSerializer getSplitSerializer() { - return new FileStoreSourceSplitSerializer(); - } - - @Override - public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() { - return new PendingSplitsCheckpointSerializer(getSplitSerializer()); - } } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java index 2356bab8..4ff027ea 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java @@ -19,7 +19,7 @@ package org.apache.flink.table.store.connector.source; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.table.store.table.source.DataSplit; +import org.apache.flink.table.store.table.source.Split; import java.util.Objects; @@ -29,21 +29,21 @@ public class FileStoreSourceSplit implements SourceSplit { /** The unique ID of the split. Unique within the scope of this source. */ private final String id; - private final DataSplit split; + private final Split split; private final long recordsToSkip; - public FileStoreSourceSplit(String id, DataSplit split) { + public FileStoreSourceSplit(String id, Split split) { this(id, split, 0); } - public FileStoreSourceSplit(String id, DataSplit split, long recordsToSkip) { + public FileStoreSourceSplit(String id, Split split, long recordsToSkip) { this.id = id; this.split = split; this.recordsToSkip = recordsToSkip; } - public DataSplit split() { + public Split split() { return split; } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java index 5ffc6725..92ae8ceb 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java @@ -18,8 +18,7 @@ package org.apache.flink.table.store.connector.source; -import org.apache.flink.table.store.table.source.DataSplit; -import org.apache.flink.table.store.table.source.DataTableScan; +import org.apache.flink.table.store.table.source.TableScan; import java.util.List; import java.util.stream.Collectors; @@ -36,9 +35,9 @@ public class FileStoreSourceSplitGenerator { */ private final char[] currentId = "0000000000".toCharArray(); - public List<FileStoreSourceSplit> createSplits(DataTableScan.Plan plan) { + public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) { return plan.splits().stream() - .map(s -> new FileStoreSourceSplit(getNextId(), (DataSplit) s)) + .map(s -> new FileStoreSourceSplit(getNextId(), s)) .collect(Collectors.toList()); } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java index 4c0d6d2f..58b718b3 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java @@ -19,10 +19,12 @@ package org.apache.flink.table.store.connector.source; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.table.store.table.source.DataSplit; +import org.apache.flink.table.store.table.source.Split; +import org.apache.flink.util.InstantiationUtil; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -40,15 +42,23 @@ public class FileStoreSourceSplitSerializer ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); view.writeUTF(split.splitId()); - split.split().serialize(view); + InstantiationUtil.serializeObject(view, split.split()); view.writeLong(split.recordsToSkip()); return out.toByteArray(); } @Override public FileStoreSourceSplit deserialize(int version, byte[] serialized) throws IOException { - DataInputDeserializer view = new DataInputDeserializer(serialized); - return new FileStoreSourceSplit( - view.readUTF(), DataSplit.deserialize(view), view.readLong()); + ByteArrayInputStream in = new ByteArrayInputStream(serialized); + DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in); + String splitId = view.readUTF(); + Split split; + try { + split = InstantiationUtil.deserializeObject(in, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + long recordsToSkip = view.readLong(); + return new FileStoreSourceSplit(splitId, split, recordsToSkip); } } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSource.java new file mode 100644 index 00000000..9d80ec6d --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSource.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.store.table.source.TableRead; + +import javax.annotation.Nullable; + +/** A Flink {@link Source} for table store. */ +public abstract class FlinkSource + implements Source<RowData, FileStoreSourceSplit, PendingSplitsCheckpoint> { + + private static final long serialVersionUID = 1L; + + protected final Table table; + + @Nullable protected final int[][] projectedFields; + + @Nullable protected final Predicate predicate; + + @Nullable protected final Long limit; + + public FlinkSource( + Table table, + @Nullable int[][] projectedFields, + @Nullable Predicate predicate, + @Nullable Long limit) { + this.table = table; + this.projectedFields = projectedFields; + this.predicate = predicate; + this.limit = limit; + } + + @Override + public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) { + TableRead read = table.newRead(); + if (projectedFields != null) { + read.withProjection(projectedFields); + } + if (predicate != null) { + read.withFilter(predicate); + } + return new FileStoreSourceReader(context, read, limit); + } + + @Override + public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator( + SplitEnumeratorContext<FileStoreSourceSplit> context) throws Exception { + return restoreEnumerator(context, null); + } + + @Override + public FileStoreSourceSplitSerializer getSplitSerializer() { + return new FileStoreSourceSplitSerializer(); + } + + @Override + public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() { + return new PendingSplitsCheckpointSerializer(getSplitSerializer()); + } +} diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkTableSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkTableSource.java new file mode 100644 index 00000000..77d03a14 --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkTableSource.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.file.predicate.PredicateBuilder; +import org.apache.flink.table.store.file.predicate.PredicateConverter; +import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** A Flink {@link ScanTableSource} for table store. */ +public abstract class FlinkTableSource + implements ScanTableSource, + SupportsFilterPushDown, + SupportsProjectionPushDown, + SupportsLimitPushDown { + + private final Table table; + + @Nullable protected Predicate predicate; + @Nullable protected int[][] projectFields; + @Nullable protected Long limit; + + public FlinkTableSource(Table table) { + this(table, null, null, null); + } + + public FlinkTableSource( + Table table, + @Nullable Predicate predicate, + @Nullable int[][] projectFields, + @Nullable Long limit) { + this.table = table; + this.predicate = predicate; + this.projectFields = projectFields; + this.limit = limit; + } + + @Override + public Result applyFilters(List<ResolvedExpression> filters) { + List<Predicate> converted = new ArrayList<>(); + RowType rowType = table.rowType(); + for (ResolvedExpression filter : filters) { + PredicateConverter.convert(rowType, filter).ifPresent(converted::add); + } + predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted); + return Result.of(filters, filters); + } + + @Override + public boolean supportsNestedProjection() { + return false; + } + + @Override + public void applyProjection(int[][] projectedFields) { + this.projectFields = projectedFields; + } + + @Override + public void applyLimit(long limit) { + this.limit = limit; + } +} diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java new file mode 100644 index 00000000..9155c27c --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.store.table.source.TableScan; + +import javax.annotation.Nullable; + +/** A {@link FlinkSource} for metadata table. */ +public class MetadataSource extends FlinkSource { + + private static final long serialVersionUID = 1L; + + public MetadataSource( + Table table, + @Nullable int[][] projectedFields, + @Nullable Predicate predicate, + @Nullable Long limit) { + super(table, projectedFields, predicate, limit); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator( + SplitEnumeratorContext<FileStoreSourceSplit> context, + PendingSplitsCheckpoint checkpoint) { + TableScan scan = table.newScan(); + if (predicate != null) { + scan.withFilter(predicate); + } + + return new StaticFileStoreSplitEnumerator( + context, null, new FileStoreSourceSplitGenerator().createSplits(scan.plan())); + } +} diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java new file mode 100644 index 00000000..63941196 --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.table.Table; + +import javax.annotation.Nullable; + +/** A {@link FlinkTableSource} for metadata table. */ +public class MetadataTableSource extends FlinkTableSource { + + private final Table table; + + public MetadataTableSource(Table table) { + super(table); + this.table = table; + } + + public MetadataTableSource( + Table table, + @Nullable Predicate predicate, + @Nullable int[][] projectFields, + @Nullable Long limit) { + super(table, predicate, projectFields, limit); + this.table = table; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return SourceProvider.of(new MetadataSource(table, projectFields, predicate, limit)); + } + + @Override + public DynamicTableSource copy() { + return new MetadataTableSource(table, predicate, projectFields, limit); + } + + @Override + public String asSummaryString() { + return "TableStore-MetadataSource"; + } +} diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java index 68a976b0..503f1009 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java @@ -24,14 +24,9 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.TableFunctionProvider; -import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.store.CoreOptions.ChangelogProducer; import org.apache.flink.table.store.CoreOptions.LogChangelogMode; @@ -40,8 +35,6 @@ import org.apache.flink.table.store.connector.FlinkConnectorOptions; import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider; import org.apache.flink.table.store.connector.lookup.FileStoreLookupFunction; import org.apache.flink.table.store.file.predicate.Predicate; -import org.apache.flink.table.store.file.predicate.PredicateBuilder; -import org.apache.flink.table.store.file.predicate.PredicateConverter; import org.apache.flink.table.store.log.LogSourceProvider; import org.apache.flink.table.store.log.LogStoreTableFactory; import org.apache.flink.table.store.table.AppendOnlyFileStoreTable; @@ -49,12 +42,9 @@ import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable; import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable; import org.apache.flink.table.store.table.FileStoreTable; import org.apache.flink.table.store.utils.Projection; -import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.List; import java.util.stream.IntStream; import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER; @@ -68,13 +58,8 @@ import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka * log source created by {@link LogSourceProvider}. */ -public class TableStoreSource - implements ScanTableSource, - LookupTableSource, - SupportsFilterPushDown, - SupportsProjectionPushDown, - SupportsLimitPushDown, - SupportsWatermarkPushDown { +public class TableStoreSource extends FlinkTableSource + implements LookupTableSource, SupportsWatermarkPushDown { private final ObjectIdentifier tableIdentifier; private final FileStoreTable table; @@ -82,10 +67,6 @@ public class TableStoreSource private final DynamicTableFactory.Context context; @Nullable private final LogStoreTableFactory logStoreTableFactory; - @Nullable private Predicate predicate; - @Nullable private int[][] projectFields; - @Nullable private Long limit; - @Nullable private WatermarkStrategy<RowData> watermarkStrategy; public TableStoreSource( @@ -116,6 +97,7 @@ public class TableStoreSource @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy<RowData> watermarkStrategy) { + super(table, predicate, projectFields, limit); this.tableIdentifier = tableIdentifier; this.table = table; this.streaming = streaming; @@ -158,7 +140,9 @@ public class TableStoreSource : ChangelogMode.upsert(); } else { throw new UnsupportedOperationException( - "Unknown FileStoreTable subclass " + table.getClass().getName()); + "Unsupported Table subclass " + + table.getClass().getName() + + " for streaming mode."); } } @@ -202,28 +186,7 @@ public class TableStoreSource @Override public String asSummaryString() { - return "TableStoreSource"; - } - - @Override - public Result applyFilters(List<ResolvedExpression> filters) { - List<Predicate> converted = new ArrayList<>(); - RowType rowType = table.schema().logicalRowType(); - for (ResolvedExpression filter : filters) { - PredicateConverter.convert(rowType, filter).ifPresent(converted::add); - } - predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted); - return Result.of(filters, filters); - } - - @Override - public boolean supportsNestedProjection() { - return false; - } - - @Override - public void applyProjection(int[][] projectedFields) { - this.projectFields = projectedFields; + return "TableStore-DataSource"; } @Override @@ -231,11 +194,6 @@ public class TableStoreSource this.watermarkStrategy = watermarkStrategy; } - @Override - public void applyLimit(long limit) { - this.limit = limit; - } - @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { if (limit != null) { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java similarity index 50% copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java copy to flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java index c3e2dd59..baaf8f00 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,24 +16,27 @@ * limitations under the License. */ -package org.apache.flink.table.store.file.catalog; +package org.apache.flink.table.store.connector; -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.types.Row; -/** Common implementation of {@link Catalog}. */ -public abstract class AbstractCatalog implements Catalog { +import org.junit.Test; - protected static final String DB_SUFFIX = ".db"; +import java.util.List; - @Override - public Path getTableLocation(ObjectPath tablePath) { - return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName()); - } +import static org.assertj.core.api.Assertions.assertThat; - protected Path databasePath(String database) { - return new Path(warehouse(), database + DB_SUFFIX); - } +/** ITCase for catalog tables. */ +public class CatalogTableITCase extends CatalogITCaseBase { - protected abstract String warehouse(); + @Test + public void testMetadataTable() throws Exception { + sql("CREATE TABLE T (a INT, b INT)"); + sql("INSERT INTO T VALUES (1, 2)"); + sql("INSERT INTO T VALUES (3, 4)"); + + List<Row> result = sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots"); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + } } diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java index 4cedbbc8..fa39d08c 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java @@ -204,7 +204,8 @@ public class TableStoreManagedFactoryTest { context = createEnrichedContext(TABLE_IDENTIFIER, catalogTable); if (expectedResult.success) { tableStoreManagedFactory.onCreateTable(context, false); - FileStoreTable table = AbstractTableStoreFactory.buildFileStoreTable(context); + FileStoreTable table = + (FileStoreTable) AbstractTableStoreFactory.buildFileStoreTable(context); assertThat(table.schema().partitionKeys().size() > 0) .isEqualTo(catalogTable.isPartitioned()); assertThat(table.schema().primaryKeys().size()) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java index 4e645045..a55a9183 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java @@ -85,8 +85,12 @@ public class FileStoreSourceSplitGeneratorTest { assertThat(splits.size()).isEqualTo(12); splits.sort( Comparator.comparingInt( - o -> ((FileStoreSourceSplit) o).split().partition().getInt(0)) - .thenComparing(o -> ((FileStoreSourceSplit) o).split().bucket())); + o -> + ((DataSplit) ((FileStoreSourceSplit) o).split()) + .partition() + .getInt(0)) + .thenComparing( + o -> ((DataSplit) ((FileStoreSourceSplit) o).split()).bucket())); assertSplit(splits.get(0), "0000000007", 1, 0, Arrays.asList("f0", "f1")); assertSplit(splits.get(1), "0000000008", 1, 1, Collections.singletonList("f2")); assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3", "f4", "f5")); @@ -104,12 +108,13 @@ public class FileStoreSourceSplitGeneratorTest { private void assertSplit( FileStoreSourceSplit split, String splitId, int part, int bucket, List<String> files) { assertThat(split.splitId()).isEqualTo(splitId); - assertThat(split.split().partition().getInt(0)).isEqualTo(part); - assertThat(split.split().bucket()).isEqualTo(bucket); + assertThat(((DataSplit) split.split()).partition().getInt(0)).isEqualTo(part); + assertThat(((DataSplit) split.split()).bucket()).isEqualTo(bucket); assertThat( - split.split().files().stream() - .map(DataFileMeta::fileName) - .collect(Collectors.toList())) + ((DataSplit) split.split()) + .files().stream() + .map(DataFileMeta::fileName) + .collect(Collectors.toList())) .isEqualTo(files); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java index c3e2dd59..fc705332 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java @@ -20,6 +20,12 @@ package org.apache.flink.table.store.file.catalog; import org.apache.flink.core.fs.Path; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.store.file.schema.TableSchema; +import org.apache.flink.table.store.table.FileStoreTableFactory; +import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.store.table.metadata.MetadataTableLoader; + +import org.apache.commons.lang3.StringUtils; /** Common implementation of {@link Catalog}. */ public abstract class AbstractCatalog implements Catalog { @@ -31,6 +37,26 @@ public abstract class AbstractCatalog implements Catalog { return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName()); } + @Override + public Table getTable(ObjectPath tablePath) throws TableNotExistException { + String inputTableName = tablePath.getObjectName(); + if (inputTableName.contains(METADATA_TABLE_SPLITTER)) { + String[] splits = StringUtils.split(inputTableName, METADATA_TABLE_SPLITTER); + if (splits.length != 2) { + throw new IllegalArgumentException( + "Metadata table can only contain one '$' separator, but this is: " + + inputTableName); + } + String table = splits[0]; + String metadata = splits[1]; + Path location = getTableLocation(new ObjectPath(tablePath.getDatabaseName(), table)); + return MetadataTableLoader.load(metadata, location); + } else { + TableSchema tableSchema = getTableSchema(tablePath); + return FileStoreTableFactory.create(getTableLocation(tablePath), tableSchema); + } + } + protected Path databasePath(String database) { return new Path(warehouse(), database + DB_SUFFIX); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java index a84bb22f..a29977cf 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java @@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.store.file.schema.SchemaChange; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.schema.UpdateSchema; -import org.apache.flink.table.store.table.FileStoreTableFactory; import org.apache.flink.table.store.table.Table; import java.util.List; @@ -37,6 +36,8 @@ public interface Catalog extends AutoCloseable { String DEFAULT_DATABASE = "default"; + String METADATA_TABLE_SPLITTER = "$"; + /** * Get lock factory from catalog. Lock is used to support multiple concurrent writes on the * object store. @@ -116,10 +117,7 @@ public interface Catalog extends AutoCloseable { * @return The requested table * @throws TableNotExistException if the target does not exist */ - default Table getTable(ObjectPath tablePath) throws TableNotExistException { - TableSchema tableSchema = getTableSchema(tablePath); - return FileStoreTableFactory.create(getTableLocation(tablePath), tableSchema); - } + Table getTable(ObjectPath tablePath) throws TableNotExistException; /** * Check if a table exists in this catalog. diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/IteratorRecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/IteratorRecordReader.java new file mode 100644 index 00000000..5dad460d --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/IteratorRecordReader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Iterator; + +/** Wrap a {@link Iterator} as an {@link RecordReader}. */ +public class IteratorRecordReader<T> implements RecordReader<T> { + + private final Iterator<T> iterator; + + private boolean read = false; + + public IteratorRecordReader(Iterator<T> iterator) { + this.iterator = iterator; + } + + @Nullable + @Override + public RecordIterator<T> readBatch() throws IOException { + if (read) { + return null; + } + + read = true; + return new RecordIterator<T>() { + @Override + public T next() { + return iterator.hasNext() ? iterator.next() : null; + } + + @Override + public void releaseBatch() {} + }; + } + + @Override + public void close() throws IOException { + if (iterator instanceof AutoCloseable) { + try { + ((AutoCloseable) iterator).close(); + } catch (Exception e) { + throw new IOException(e); + } + } + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java index 86c0e22c..1e16e729 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Iterator; import java.util.UUID; import java.util.function.BinaryOperator; @@ -79,6 +80,16 @@ public class SnapshotManager { } } + public long snapshotCount() throws IOException { + return listVersionedFiles(snapshotDirectory(), SNAPSHOT_PREFIX).count(); + } + + public Iterator<Snapshot> snapshots() throws IOException { + return listVersionedFiles(snapshotDirectory(), SNAPSHOT_PREFIX) + .map(this::snapshot) + .iterator(); + } + public Long findLatest() throws IOException { Path snapshotDir = snapshotDirectory(); FileSystem fs = snapshotDir.getFileSystem(); diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java similarity index 54% copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java index c3e2dd59..782bb1f4 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,24 +16,23 @@ * limitations under the License. */ -package org.apache.flink.table.store.file.catalog; +package org.apache.flink.table.store.table.metadata; import org.apache.flink.core.fs.Path; -import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.store.table.Table; -/** Common implementation of {@link Catalog}. */ -public abstract class AbstractCatalog implements Catalog { +import static org.apache.flink.table.store.table.metadata.SnapshotsTable.SNAPSHOTS; - protected static final String DB_SUFFIX = ".db"; +/** Loader to load metadata {@link Table}s. */ +public class MetadataTableLoader { - @Override - public Path getTableLocation(ObjectPath tablePath) { - return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName()); + public static Table load(String metadata, Path location) { + switch (metadata.toLowerCase()) { + case SNAPSHOTS: + return new SnapshotsTable(location); + default: + throw new UnsupportedOperationException( + "Unsupported metadata table type: " + metadata); + } } - - protected Path databasePath(String database) { - return new Path(warehouse(), database + DB_SUFFIX); - } - - protected abstract String warehouse(); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java new file mode 100644 index 00000000..b0d9b143 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.metadata; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.store.file.Snapshot; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.file.utils.IteratorRecordReader; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.SerializationUtils; +import org.apache.flink.table.store.file.utils.SnapshotManager; +import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.store.table.source.Split; +import org.apache.flink.table.store.table.source.TableRead; +import org.apache.flink.table.store.table.source.TableScan; +import org.apache.flink.table.store.utils.ProjectedRowData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Objects; + +import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER; + +/** A {@link Table} for showing committing snapshots of table. */ +public class SnapshotsTable implements Table { + + private static final long serialVersionUID = 1L; + + public static final String SNAPSHOTS = "snapshots"; + + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new RowType.RowField("snapshot_id", new BigIntType(false)), + new RowType.RowField("schema_id", new BigIntType(false)), + new RowType.RowField( + "commit_user", SerializationUtils.newStringType(false)), + new RowType.RowField( + "commit_identifier", SerializationUtils.newStringType(false)), + new RowType.RowField( + "commit_kind", SerializationUtils.newStringType(false)), + new RowType.RowField("commit_time", new TimestampType(false, 3)))); + + private final Path location; + + public SnapshotsTable(Path location) { + this.location = location; + } + + @Override + public String name() { + return location.getName() + METADATA_TABLE_SPLITTER + SNAPSHOTS; + } + + @Override + public RowType rowType() { + return TABLE_TYPE; + } + + @Override + public TableScan newScan() { + return new SnapshotsScan(); + } + + @Override + public TableRead newRead() { + return new SnapshotsRead(); + } + + private class SnapshotsScan implements TableScan { + + @Override + public TableScan withFilter(Predicate predicate) { + // TODO + return this; + } + + @Override + public Plan plan() { + return () -> Collections.singletonList(new SnapshotsSplit(location)); + } + } + + private static class SnapshotsSplit implements Split { + + private static final long serialVersionUID = 1L; + + private final Path location; + + private SnapshotsSplit(Path location) { + this.location = location; + } + + @Override + public long rowCount() { + try { + return new SnapshotManager(location).snapshotCount(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SnapshotsSplit that = (SnapshotsSplit) o; + return Objects.equals(location, that.location); + } + + @Override + public int hashCode() { + return Objects.hash(location); + } + } + + private static class SnapshotsRead implements TableRead { + + private int[][] projection; + + @Override + public TableRead withFilter(Predicate predicate) { + // TODO + return this; + } + + @Override + public TableRead withProjection(int[][] projection) { + this.projection = projection; + return this; + } + + @Override + public RecordReader<RowData> createReader(Split split) throws IOException { + if (!(split instanceof SnapshotsSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + Path location = ((SnapshotsSplit) split).location; + Iterator<Snapshot> snapshots = new SnapshotManager(location).snapshots(); + Iterator<RowData> rows = Iterators.transform(snapshots, this::toRow); + if (projection != null) { + rows = + Iterators.transform( + rows, row -> ProjectedRowData.from(projection).replaceRow(row)); + } + return new IteratorRecordReader<>(rows); + } + + private RowData toRow(Snapshot snapshot) { + return GenericRowData.of( + snapshot.id(), + snapshot.schemaId(), + StringData.fromString(snapshot.commitUser()), + StringData.fromString(snapshot.commitIdentifier()), + StringData.fromString(snapshot.commitKind().toString()), + TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant( + Instant.ofEpochMilli(snapshot.timeMillis()), + ZoneId.systemDefault()))); + } + } +} diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java index 522723b4..c32ab2eb 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java @@ -221,10 +221,18 @@ public class SparkReadITCase { @Test public void testCatalogNormal() { innerTestSimpleType(spark.table("tablestore.default.t1")); - innerTestNestedType(spark.table("tablestore.default.t2")); } + @Test + public void testMetadataTable() { + List<Row> rows = + spark.table("tablestore.default.`t1$snapshots`") + .select("snapshot_id", "schema_id", "commit_user", "commit_kind") + .collectAsList(); + assertThat(rows.toString()).isEqualTo("[[1,0,user,APPEND]]"); + } + @Test public void testCatalogFilterPushDown() { innerTestSimpleTypeFilterPushDown(spark.table("tablestore.default.t1"));