This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0be92626ea7 [FLINK-32473] Introduce base interfaces for time travel (#22928) 0be92626ea7 is described below commit 0be92626ea741c681619896ab86696e1d0bde665 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Tue Jul 4 09:30:30 2023 +0800 [FLINK-32473] Introduce base interfaces for time travel (#22928) --- .../org/apache/flink/table/catalog/Catalog.java | 18 ++++++++++++++++ .../apache/flink/table/catalog/CatalogTable.java | 24 ++++++++++++++++++++++ .../flink/table/catalog/DefaultCatalogTable.java | 21 +++++++++++++++++-- .../flink/table/catalog/ResolvedCatalogTable.java | 5 +++++ 4 files changed, 66 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index 575790e676d..8e96bdb650a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -244,6 +244,24 @@ public interface Catalog { */ CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException; + /** + * Returns a {@link CatalogTable} or {@link CatalogView} at a specific time identified by the + * given {@link ObjectPath}. The framework will resolve the metadata objects when necessary. + * + * @param tablePath Path of the table or view + * @param timestamp Timestamp of the table snapshot, which is milliseconds since 1970-01-01 + * 00:00:00 UTC + * @return The requested table or view + * @throws TableNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + default CatalogBaseTable getTable(ObjectPath tablePath, long timestamp) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "getTable(ObjectPath, long) is not implemented for %s.", this.getClass())); + } + /** * Check if a table or view exists in this catalog. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java index 3f17f30072b..a2e28731005 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; /** * Represents the unresolved metadata of a table in a {@link Catalog}. @@ -66,6 +67,24 @@ public interface CatalogTable extends CatalogBaseTable { return new DefaultCatalogTable(schema, comment, partitionKeys, options); } + /** + * Creates an instance of {@link CatalogTable} with a specific snapshot. + * + * @param schema unresolved schema + * @param comment optional comment + * @param partitionKeys list of partition keys or an empty list if not partitioned + * @param options options to configure the connector + * @param snapshot table snapshot of the table + */ + static CatalogTable of( + Schema schema, + @Nullable String comment, + List<String> partitionKeys, + Map<String, String> options, + @Nullable Long snapshot) { + return new DefaultCatalogTable(schema, comment, partitionKeys, options, snapshot); + } + /** * Creates an instance of {@link CatalogTable} from a map of string properties that were * previously created with {@link ResolvedCatalogTable#toProperties()}. @@ -120,4 +139,9 @@ public interface CatalogTable extends CatalogBaseTable { default Map<String, String> toProperties() { return Collections.emptyMap(); } + + /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */ + default Optional<Long> getSnapshot() { + return Optional.empty(); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java index 8cbf850adbc..ae60ee19e35 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java @@ -40,15 +40,27 @@ public class DefaultCatalogTable implements CatalogTable { private final List<String> partitionKeys; private final Map<String, String> options; + private final @Nullable Long snapshot; + protected DefaultCatalogTable( Schema schema, @Nullable String comment, List<String> partitionKeys, Map<String, String> options) { + this(schema, comment, partitionKeys, options, null); + } + + protected DefaultCatalogTable( + Schema schema, + @Nullable String comment, + List<String> partitionKeys, + Map<String, String> options, + @Nullable Long snapshot) { this.schema = checkNotNull(schema, "Schema must not be null."); this.comment = comment; this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must not be null."); this.options = checkNotNull(options, "Options must not be null."); + this.snapshot = snapshot; checkArgument( options.entrySet().stream() @@ -83,12 +95,12 @@ public class DefaultCatalogTable implements CatalogTable { @Override public CatalogBaseTable copy() { - return new DefaultCatalogTable(schema, comment, partitionKeys, options); + return new DefaultCatalogTable(schema, comment, partitionKeys, options, snapshot); } @Override public CatalogTable copy(Map<String, String> options) { - return new DefaultCatalogTable(schema, comment, partitionKeys, options); + return new DefaultCatalogTable(schema, comment, partitionKeys, options, snapshot); } @Override @@ -141,4 +153,9 @@ public class DefaultCatalogTable implements CatalogTable { + options + '}'; } + + @Override + public Optional<Long> getSnapshot() { + return Optional.ofNullable(snapshot); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java index 72be99186d9..a8c46f57d4e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java @@ -120,6 +120,11 @@ public final class ResolvedCatalogTable return origin.getPartitionKeys(); } + @Override + public Optional<Long> getSnapshot() { + return origin.getSnapshot(); + } + @Override public ResolvedCatalogTable copy(Map<String, String> options) { return new ResolvedCatalogTable(origin.copy(options), resolvedSchema);