This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff01ca799 [feature-wip](multi-catalog) support Iceberg time travel in 
external table (#15418)
3ff01ca799 is described below

commit 3ff01ca799de96f459f79520d92518c3604140b4
Author: slothever <[email protected]>
AuthorDate: Fri Dec 30 00:25:21 2022 +0800

    [feature-wip](multi-catalog) support Iceberg time travel in external table 
(#15418)
    
    For example
    SELECT* FROM tbl FOR VERSION AS OF 10963874102873;
    SELECT* FROM tbl FOR TIME AS OF '1986-10-26 01:21:00';
---
 fe/fe-core/src/main/cup/sql_parser.cup             | 33 +++++++++-
 .../org/apache/doris/analysis/BaseTableRef.java    |  1 +
 .../java/org/apache/doris/analysis/TableRef.java   | 37 ++++++++++++
 .../org/apache/doris/analysis/TableSnapshot.java   | 70 ++++++++++++++++++++++
 .../java/org/apache/doris/common/ErrorCode.java    |  5 +-
 .../org/apache/doris/common/util/TimeUtils.java    |  2 +-
 .../planner/external/IcebergScanProvider.java      | 40 +++++++++++++
 fe/fe-core/src/main/jflex/sql_scanner.flex         |  2 +
 .../iceberg/test_external_catalog_icebergv2.out    | 22 ++++++-
 .../iceberg/test_external_catalog_icebergv2.groovy |  8 +++
 10 files changed, 215 insertions(+), 5 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index b6bba6a5d7..8f6aaa63b8 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -457,6 +457,7 @@ terminal String
     KW_NULL,
     KW_NULLS,
     KW_OBSERVER,
+    KW_OF,
     KW_OFFSET,
     KW_ON,
     KW_ONLY,
@@ -593,6 +594,7 @@ terminal String
     KW_VARCHAR,
     KW_VARIABLES,
     KW_VERBOSE,
+    KW_VERSION,
     KW_VIEW,
     KW_WARNINGS,
     KW_WEEK,
@@ -685,6 +687,7 @@ nonterminal ArrayList<String> ident_list;
 nonterminal PartitionNames opt_partition_names, partition_names;
 nonterminal ArrayList<Long> opt_tablet_list, tablet_list;
 nonterminal TableSample opt_table_sample, table_sample;
+nonterminal TableSnapshot opt_table_snapshot, table_snapshot;
 nonterminal ClusterName cluster_name;
 nonterminal ClusterName des_cluster_name;
 nonterminal TableName table_name, opt_table_name;
@@ -5100,9 +5103,31 @@ base_table_ref_list ::=
   ;
 
 base_table_ref ::=
-    table_name:name opt_partition_names:partitionNames 
opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample 
opt_common_hints:commonHints
+    table_name:name opt_table_snapshot:tableSnapshot 
opt_partition_names:partitionNames opt_tablet_list:tabletIds 
opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints
     {:
-        RESULT = new TableRef(name, alias, partitionNames, tabletIds, 
tableSample, commonHints);
+        RESULT = new TableRef(name, alias, partitionNames, tabletIds, 
tableSample, commonHints, tableSnapshot);
+    :}
+    ;
+
+opt_table_snapshot ::=
+    /* empty */
+    {:
+        RESULT = null;
+    :}
+    | table_snapshot:tableSnapshot
+    {:
+        RESULT = tableSnapshot;
+    :}
+    ;
+
+table_snapshot ::=
+    KW_FOR KW_VERSION KW_AS KW_OF INTEGER_LITERAL:version
+    {:
+        RESULT = new TableSnapshot(version);
+    :}
+    | KW_FOR KW_TIME KW_AS KW_OF STRING_LITERAL:time
+    {:
+        RESULT = new TableSnapshot(time);
     :}
     ;
 
@@ -6693,6 +6718,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_NULLS:id
     {: RESULT = id; :}
+    | KW_OF:id
+    {: RESULT = id; :}
     | KW_OFFSET:id
     {: RESULT = id; :}
     | KW_ONLY:id
@@ -6817,6 +6844,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_VERBOSE:id
     {: RESULT = id; :}
+    | KW_VERSION:id
+    {: RESULT = id; :}
     | KW_VIEW:id
     {: RESULT = id; :}
     | KW_WARNINGS:id
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java
index ccfc508cd1..05c7b9c588 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java
@@ -70,6 +70,7 @@ public class BaseTableRef extends TableRef {
         name.analyze(analyzer);
         desc = analyzer.registerTableRef(this);
         isAnalyzed = true;  // true that we have assigned desc
+        analyzeTableSnapshot(analyzer);
         analyzeLateralViewRef(analyzer);
         analyzeJoin(analyzer);
         analyzeSortHints();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 3b5577fd6b..bf43f552ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -22,6 +22,7 @@ package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
@@ -29,6 +30,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.ExprRewriter.ClauseType;
 
@@ -48,6 +50,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.regex.Matcher;
 
 /**
  * Superclass of all table references, including references to views, base 
tables
@@ -129,6 +132,8 @@ public class TableRef implements ParseNode, Writable {
     private boolean isPartitionJoin;
     private String sortColumn = null;
 
+    private TableSnapshot tableSnapshot;
+
     // END: Members that need to be reset()
     // ///////////////////////////////////////
 
@@ -153,6 +158,11 @@ public class TableRef implements ParseNode, Writable {
      */
     public TableRef(TableName name, String alias, PartitionNames 
partitionNames, ArrayList<Long> sampleTabletIds,
                     TableSample tableSample, ArrayList<String> commonHints) {
+        this(name, alias, partitionNames, sampleTabletIds, tableSample, 
commonHints, null);
+    }
+
+    public TableRef(TableName name, String alias, PartitionNames 
partitionNames, ArrayList<Long> sampleTabletIds,
+                    TableSample tableSample, ArrayList<String> commonHints, 
TableSnapshot tableSnapshot) {
         this.name = name;
         if (alias != null) {
             if (Env.isStoredTableNamesLowerCase()) {
@@ -167,6 +177,7 @@ public class TableRef implements ParseNode, Writable {
         this.sampleTabletIds = sampleTabletIds;
         this.tableSample = tableSample;
         this.commonHints = commonHints;
+        this.tableSnapshot = tableSnapshot;
         isAnalyzed = false;
     }
 
@@ -186,6 +197,7 @@ public class TableRef implements ParseNode, Writable {
                 (other.sortHints != null) ? 
Lists.newArrayList(other.sortHints) : null;
         onClause = (other.onClause != null) ? other.onClause.clone().reset() : 
null;
         partitionNames = (other.partitionNames != null) ? new 
PartitionNames(other.partitionNames) : null;
+        tableSnapshot = (other.tableSnapshot != null) ? new 
TableSnapshot(other.tableSnapshot) : null;
         tableSample = (other.tableSample != null) ? new 
TableSample(other.tableSample) : null;
         commonHints = other.commonHints;
 
@@ -302,6 +314,10 @@ public class TableRef implements ParseNode, Writable {
         return tableSample;
     }
 
+    public TableSnapshot getTableSnapshot() {
+        return tableSnapshot;
+    }
+
     /**
      * This method should only be called after the TableRef has been analyzed.
      */
@@ -499,6 +515,27 @@ public class TableRef implements ParseNode, Writable {
         }
     }
 
+    protected void analyzeTableSnapshot(Analyzer analyzer) throws 
AnalysisException {
+        if (tableSnapshot == null) {
+            return;
+        }
+        TableIf.TableType tableType = this.getTable().getType();
+        if (tableType != TableIf.TableType.HMS_EXTERNAL_TABLE) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
+        }
+        HMSExternalTable extTable = (HMSExternalTable) this.getTable();
+        if (extTable.getDlaType() != HMSExternalTable.DLAType.ICEBERG) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
+        }
+        if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) {
+            String asOfTime = tableSnapshot.getTime();
+            Matcher matcher = TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime);
+            if (!matcher.matches()) {
+                throw new AnalysisException("Invalid datetime string: " + 
asOfTime);
+            }
+        }
+    }
+
     /**
      * Analyze the join clause.
      * The join clause can only be analyzed after the left table has been 
analyzed
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
new file mode 100644
index 0000000000..e5d43b6d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+/**
+ * Snapshot read for time travel
+ * the version in 2022.12.28 just supports external iceberg table
+ */
+public class TableSnapshot {
+
+    public enum VersionType {
+        TIME, VERSION
+    }
+
+    private final VersionType type;
+    private String time;
+    private long version;
+
+    public TableSnapshot(long version) {
+        this.version = version;
+        this.type = VersionType.VERSION;
+    }
+
+    public TableSnapshot(String time) {
+        this.time = time;
+        this.type = VersionType.TIME;
+    }
+
+    public TableSnapshot(TableSnapshot other) {
+        this.type = other.type;
+        this.time = other.time;
+        this.version = other.version;
+    }
+
+    public VersionType getType() {
+        return type;
+    }
+
+    public String getTime() {
+        return time;
+    }
+
+    public long getVersion() {
+        return version;
+    }
+
+    @Override
+    public String toString() {
+        if (this.type == VersionType.VERSION) {
+            return " FOR VERSION AS OF " + version;
+        } else {
+            return " FOR TIME AS OF '" + time + "'";
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index a6a78d054b..20948c039f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1699,7 +1699,10 @@ public enum ErrorCode {
     ERR_NONSUPPORT_HMS_TABLE(5088, new byte[]{'4', '2', '0', '0', '0'},
             "Nonsupport hive metastore table named '%s' in database '%s' with 
catalog '%s'."),
     ERR_TABLE_NAME_LENGTH_LIMIT(5089, new byte[]{'4', '2', '0', '0', '0'}, 
"Table name length exceeds limit, "
-     + "the length of table name '%s' is %d which is greater than the 
configuration 'table_name_length_limit' (%d).");
+     + "the length of table name '%s' is %d which is greater than the 
configuration 'table_name_length_limit' (%d)."),
+
+    ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0', 
'0'}, "Only iceberg external"
+     + " table supports time travel in current version");
 
     // This is error code
     private final int code;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
index b5dd620f25..9d73d0b368 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
@@ -68,7 +68,7 @@ public class TimeUtils {
     private static final SimpleDateFormat DATETIME_FORMAT;
     private static final SimpleDateFormat TIME_FORMAT;
 
-    private static final Pattern DATETIME_FORMAT_REG =
+    public static final Pattern DATETIME_FORMAT_REG =
             
Pattern.compile("^((\\d{2}(([02468][048])|([13579][26]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?"
                     + 
"((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?"
                     + 
"((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])))))|("
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index e51b39144f..850cc14d42 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.HMSResource;
@@ -35,6 +36,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileFormatType;
@@ -54,14 +56,17 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.types.Conversions;
 
 import java.nio.ByteBuffer;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -168,6 +173,20 @@ public class IcebergScanProvider extends HiveScanProvider {
 
         org.apache.iceberg.Table table = getIcebergTable();
         TableScan scan = table.newScan();
+        TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot();
+        if (tableSnapshot != null) {
+            TableSnapshot.VersionType type = tableSnapshot.getType();
+            try {
+                if (type == TableSnapshot.VersionType.VERSION) {
+                    scan = scan.useSnapshot(tableSnapshot.getVersion());
+                } else {
+                    long snapshotId = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+                    scan = 
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
+                }
+            } catch (IllegalArgumentException e) {
+                throw new UserException(e);
+            }
+        }
         for (Expression predicate : expressions) {
             scan = scan.filter(predicate);
         }
@@ -199,6 +218,27 @@ public class IcebergScanProvider extends HiveScanProvider {
         return splits;
     }
 
+    public static long getSnapshotIdAsOfTime(List<HistoryEntry> 
historyEntries, long asOfTimestamp) {
+        // find history at or before asOfTimestamp
+        HistoryEntry latestHistory = null;
+        for (HistoryEntry entry : historyEntries) {
+            if (entry.timestampMillis() <= asOfTimestamp) {
+                if (latestHistory == null) {
+                    latestHistory = entry;
+                    continue;
+                }
+                if (entry.timestampMillis() > latestHistory.timestampMillis()) 
{
+                    latestHistory = entry;
+                }
+            }
+        }
+        if (latestHistory == null) {
+            throw new NotFoundException("No version history at or before "
+                + Instant.ofEpochMilli(asOfTimestamp));
+        }
+        return latestHistory.snapshotId();
+    }
+
     private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask 
spitTask) {
         List<IcebergDeleteFileFilter> filters = new ArrayList<>();
         for (DeleteFile delete : spitTask.deletes()) {
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 730e6affb3..e481039c35 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -317,6 +317,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("null", new Integer(SqlParserSymbols.KW_NULL));
         keywordMap.put("nulls", new Integer(SqlParserSymbols.KW_NULLS));
         keywordMap.put("observer", new Integer(SqlParserSymbols.KW_OBSERVER));
+        keywordMap.put("of", new Integer(SqlParserSymbols.KW_OF));
         keywordMap.put("offset", new Integer(SqlParserSymbols.KW_OFFSET));
         keywordMap.put("on", new Integer(SqlParserSymbols.KW_ON));
         keywordMap.put("only", new Integer(SqlParserSymbols.KW_ONLY));
@@ -455,6 +456,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("varchar", new Integer(SqlParserSymbols.KW_VARCHAR));
         keywordMap.put("variables", new 
Integer(SqlParserSymbols.KW_VARIABLES));
         keywordMap.put("verbose", new Integer(SqlParserSymbols.KW_VERBOSE));
+        keywordMap.put("version", new Integer(SqlParserSymbols.KW_VERSION));
         keywordMap.put("view", new Integer(SqlParserSymbols.KW_VIEW));
         keywordMap.put("warnings", new Integer(SqlParserSymbols.KW_WARNINGS));
         keywordMap.put("week", new Integer(SqlParserSymbols.KW_WEEK));
diff --git 
a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
 
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
index 4d3a4b9176..7956396740 100644
--- 
a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
+++ 
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
@@ -30,4 +30,24 @@
 2736865
 
 -- !q08 --
-1499999990
\ No newline at end of file
+1499999990
+
+-- !q09 --
+1
+2
+3
+
+-- !q10 --
+2
+3
+4
+
+-- !q11 --
+1
+2
+3
+
+-- !q12 --
+2
+3
+4
diff --git 
a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
 
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
index 816e75b85e..a3a0511613 100644
--- 
a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
+++ 
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
@@ -42,7 +42,15 @@ suite("test_external_catalog_icebergv2", "p2") {
             qt_q07 """ select o_orderkey from orders where o_custkey < 3357 
limit 3"""
             qt_q08 """ select count(1) as c from customer;"""
         }
+        // test time travel stmt
+        def q02 = {
+            qt_q09 """ select c_custkey from customer for time as of 
'2022-12-27 10:21:36' limit 3 """
+            qt_q10 """ select c_custkey from customer for time as of 
'2022-12-28 10:21:36' limit 3 """
+            qt_q11 """ select c_custkey from customer for version as of 
906874575350293177  limit 3 """
+            qt_q12 """ select c_custkey from customer for version as of 
6352416983354893547  limit 3 """
+        }
         sql """ use `tpch_1000_icebergv2`; """
         q01()
+        q02()
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to