This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0be92626ea7 [FLINK-32473] Introduce base interfaces for time travel 
(#22928)
0be92626ea7 is described below

commit 0be92626ea741c681619896ab86696e1d0bde665
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Tue Jul 4 09:30:30 2023 +0800

    [FLINK-32473] Introduce base interfaces for time travel (#22928)
---
 .../org/apache/flink/table/catalog/Catalog.java    | 18 ++++++++++++++++
 .../apache/flink/table/catalog/CatalogTable.java   | 24 ++++++++++++++++++++++
 .../flink/table/catalog/DefaultCatalogTable.java   | 21 +++++++++++++++++--
 .../flink/table/catalog/ResolvedCatalogTable.java  |  5 +++++
 4 files changed, 66 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 575790e676d..8e96bdb650a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -244,6 +244,24 @@ public interface Catalog {
      */
     CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException;
 
+    /**
+     * Returns a {@link CatalogTable} or {@link CatalogView} at a specific 
time identified by the
+     * given {@link ObjectPath}. The framework will resolve the metadata 
objects when necessary.
+     *
+     * @param tablePath Path of the table or view
+     * @param timestamp Timestamp of the table snapshot, which is milliseconds 
since 1970-01-01
+     *     00:00:00 UTC
+     * @return The requested table or view
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "getTable(ObjectPath, long) is not implemented for 
%s.", this.getClass()));
+    }
+
     /**
      * Check if a table or view exists in this catalog.
      *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index 3f17f30072b..a2e28731005 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Represents the unresolved metadata of a table in a {@link Catalog}.
@@ -66,6 +67,24 @@ public interface CatalogTable extends CatalogBaseTable {
         return new DefaultCatalogTable(schema, comment, partitionKeys, 
options);
     }
 
+    /**
+     * Creates an instance of {@link CatalogTable} with a specific snapshot.
+     *
+     * @param schema unresolved schema
+     * @param comment optional comment
+     * @param partitionKeys list of partition keys or an empty list if not 
partitioned
+     * @param options options to configure the connector
+     * @param snapshot table snapshot of the table
+     */
+    static CatalogTable of(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options,
+            @Nullable Long snapshot) {
+        return new DefaultCatalogTable(schema, comment, partitionKeys, 
options, snapshot);
+    }
+
     /**
      * Creates an instance of {@link CatalogTable} from a map of string 
properties that were
      * previously created with {@link ResolvedCatalogTable#toProperties()}.
@@ -120,4 +139,9 @@ public interface CatalogTable extends CatalogBaseTable {
     default Map<String, String> toProperties() {
         return Collections.emptyMap();
     }
+
+    /** Return the snapshot specified for the table. Return Optional.empty() 
if not specified. */
+    default Optional<Long> getSnapshot() {
+        return Optional.empty();
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
index 8cbf850adbc..ae60ee19e35 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
@@ -40,15 +40,27 @@ public class DefaultCatalogTable implements CatalogTable {
     private final List<String> partitionKeys;
     private final Map<String, String> options;
 
+    private final @Nullable Long snapshot;
+
     protected DefaultCatalogTable(
             Schema schema,
             @Nullable String comment,
             List<String> partitionKeys,
             Map<String, String> options) {
+        this(schema, comment, partitionKeys, options, null);
+    }
+
+    protected DefaultCatalogTable(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options,
+            @Nullable Long snapshot) {
         this.schema = checkNotNull(schema, "Schema must not be null.");
         this.comment = comment;
         this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must 
not be null.");
         this.options = checkNotNull(options, "Options must not be null.");
+        this.snapshot = snapshot;
 
         checkArgument(
                 options.entrySet().stream()
@@ -83,12 +95,12 @@ public class DefaultCatalogTable implements CatalogTable {
 
     @Override
     public CatalogBaseTable copy() {
-        return new DefaultCatalogTable(schema, comment, partitionKeys, 
options);
+        return new DefaultCatalogTable(schema, comment, partitionKeys, 
options, snapshot);
     }
 
     @Override
     public CatalogTable copy(Map<String, String> options) {
-        return new DefaultCatalogTable(schema, comment, partitionKeys, 
options);
+        return new DefaultCatalogTable(schema, comment, partitionKeys, 
options, snapshot);
     }
 
     @Override
@@ -141,4 +153,9 @@ public class DefaultCatalogTable implements CatalogTable {
                 + options
                 + '}';
     }
+
+    @Override
+    public Optional<Long> getSnapshot() {
+        return Optional.ofNullable(snapshot);
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
index 72be99186d9..a8c46f57d4e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
@@ -120,6 +120,11 @@ public final class ResolvedCatalogTable
         return origin.getPartitionKeys();
     }
 
+    @Override
+    public Optional<Long> getSnapshot() {
+        return origin.getSnapshot();
+    }
+
     @Override
     public ResolvedCatalogTable copy(Map<String, String> options) {
         return new ResolvedCatalogTable(origin.copy(options), resolvedSchema);

Reply via email to