This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 02306b475bb [fix](iceberg)Use the correct schema for query (#50376)
02306b475bb is described below
commit 02306b475bb82df7cc496e9bb7d71fd1ccfad635
Author: wuwenchi <[email protected]>
AuthorDate: Tue Apr 29 13:42:14 2025 +0800
[fix](iceberg)Use the correct schema for query (#50376)
### What problem does this PR solve?
Followup #49956
Problem Summary:
When a snapshot is specified in the query, the corresponding schema
should be used for parsing, otherwise the latest snapshot should be used
for parsing.
1. When using the HMS type, you also need to initialize the executor
pool.
2. Set the size of the thread pool to be equal to the number of cores of
the current machine.
3. When no snapshot is specified, the latest schema is used.
4. When specifying a snapshot, you need to use the schema corresponding
to the snapshot.
5. When generating a scannode, save the schema information and no longer
obtain it from the cache to prevent the cache from being refreshed.
6. When refreshing the schema, you need to refresh all schemas of
related tables.
---
.../create_preinstalled_scripts/iceberg/run09.sql | 37 ++
.../java/org/apache/doris/catalog/TableIf.java | 11 -
.../apache/doris/datasource/ExternalCatalog.java | 2 +
.../doris/datasource/ExternalSchemaCache.java | 8 +-
.../apache/doris/datasource/FileQueryScanNode.java | 9 +-
.../org/apache/doris/datasource/FileScanNode.java | 2 +-
.../apache/doris/datasource/hive/HMSDlaTable.java | 9 +
.../doris/datasource/hive/HMSExternalCatalog.java | 6 +
.../doris/datasource/hive/HMSExternalTable.java | 25 +-
.../doris/datasource/hive/IcebergDlaTable.java | 147 ++++++++
.../datasource/iceberg/IcebergExternalCatalog.java | 1 -
.../datasource/iceberg/IcebergExternalTable.java | 350 ++-----------------
.../datasource/iceberg/IcebergMetadataCache.java | 17 +-
.../doris/datasource/iceberg/IcebergUtils.java | 375 ++++++++++++++++++++-
.../datasource/iceberg/source/IcebergScanNode.java | 14 +-
.../java/org/apache/doris/planner/ScanNode.java | 10 +-
.../iceberg/IcebergExternalTableTest.java | 38 +--
.../test_iceberg_schema_change_with_timetravel.out | Bin 0 -> 691 bytes
...st_iceberg_schema_change_with_timetravel.groovy | 87 +++++
19 files changed, 746 insertions(+), 402 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
new file mode 100644
index 00000000000..c5795883c4d
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
@@ -0,0 +1,37 @@
+use demo.test_db;
+
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+
+alter table schema_change_with_time_travel add column c3 int;
+insert into schema_change_with_time_travel values (4,5,6);
+
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+
+alter table schema_change_with_time_travel add column c4 int;
+
+
+create table schema_change_with_time_travel_orc (c1 int) tblproperties
("write.format.default"="orc");
+insert into schema_change_with_time_travel_orc values (1);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (2,3);
+
+alter table schema_change_with_time_travel_orc add column c3 int;
+insert into schema_change_with_time_travel_orc values (4,5,6);
+
+alter table schema_change_with_time_travel_orc drop column c2;
+insert into schema_change_with_time_travel_orc values (7,8);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (9,10,11);
+
+alter table schema_change_with_time_travel_orc add column c4 int;
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 96fd8da54af..74da9a88309 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -140,17 +140,6 @@ public interface TableIf {
Column getColumn(String name);
- default int getBaseColumnIdxByName(String colName) {
- int i = 0;
- for (Column col : getBaseSchema()) {
- if (col.getName().equalsIgnoreCase(colName)) {
- return i;
- }
- ++i;
- }
- return -1;
- }
-
String getMysqlType();
String getEngine();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 52ac5a28bda..d9c620ee0bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -125,6 +125,8 @@ public abstract class ExternalCatalog
CREATE_TIME,
USE_META_CACHE);
+ protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM =
Runtime.getRuntime().availableProcessors();
+
// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index de3eeff75d9..d0673da56fd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -96,11 +96,9 @@ public class ExternalSchemaCache {
}
public void invalidateTableCache(String dbName, String tblName) {
- SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
- schemaCache.invalidate(key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName,
tblName, catalog.getName());
- }
+ schemaCache.asMap().keySet().stream()
+ .filter(key -> key.dbName.equals(dbName) &&
key.tblName.equals(tblName))
+ .forEach(schemaCache::invalidate);
}
public void invalidateDbCache(String dbName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index c3351600fc6..6e852940cec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -251,7 +251,14 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId());
String colName = slotDesc.getColumn().getName();
- int idx = tbl.getBaseColumnIdxByName(colName);
+ int idx = -1;
+ List<Column> columns = getColumns();
+ for (int i = 0; i < columns.size(); i++) {
+ if (columns.get(i).getName().equals(colName)) {
+ idx = i;
+ break;
+ }
+ }
if (idx == -1) {
throw new UserException("Column " + colName + " not found in
table " + tbl.getName());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 8d3aeaa6a26..d2c6230ba6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -193,7 +193,7 @@ public abstract class FileScanNode extends ExternalScanNode
{
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());
- for (Column column : tbl.getBaseSchema()) {
+ for (Column column : getColumns()) {
Expr expr;
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
index 1710646ce3d..d5316aa7fdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
@@ -76,4 +76,13 @@ public abstract class HMSDlaTable implements MTMVBaseTableIf
{
Env.getCurrentEnv().getRefreshManager()
.refreshTable(hmsTable.getCatalog().getName(),
hmsTable.getDbName(), hmsTable.getName(), true);
}
+
+ /**
+ * If the table is supported as related table.
+ * For example, an Iceberg table may become unsupported after partition
revolution.
+ * @return
+ */
+ protected boolean isValidRelatedTable() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 0ec5153a415..9b506d917f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -187,6 +187,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
String.valueOf(Config.hive_metastore_client_timeout_second));
}
HiveMetadataOps hiveOps =
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+ threadPoolWithPreAuth =
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+ ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+ Integer.MAX_VALUE,
+ String.format("hms_iceberg_catalog_%s_executor_pool", name),
+ true,
+ preExecutionAuthenticator);
FileSystemProvider fileSystemProvider = new
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
this.bindBrokerName(),
this.catalogProperty.getHadoopProperties());
this.fileSystemExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 1d520fc4178..cfd21534db0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -40,6 +40,8 @@ import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
+import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
@@ -211,7 +213,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
} else {
if (supportedIcebergTable()) {
dlaType = DLAType.ICEBERG;
- dlaTable = new HiveDlaTable(this);
+ dlaTable = new IcebergDlaTable(this);
} else if (supportedHoodieTable()) {
dlaType = DLAType.HUDI;
dlaTable = new HudiDlaTable(this);
@@ -315,6 +317,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
if (getDlaType() == DLAType.HUDI) {
return ((HudiDlaTable)
dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this))
.getSchema();
+ } else if (getDlaType() == DLAType.ICEBERG) {
+ return IcebergUtils.getIcebergSchema(this, getCatalog(),
getDbName(), getName());
}
Optional<SchemaCacheValue> schemaCacheValue =
cache.getSchemaValue(dbName, name);
return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
@@ -619,7 +623,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
makeSureInitialized();
if (dlaType.equals(DLAType.ICEBERG)) {
- return getIcebergSchema();
+ return getIcebergSchema(key);
} else if (dlaType.equals(DLAType.HUDI)) {
return getHudiSchema(key);
} else {
@@ -627,10 +631,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
}
- private Optional<SchemaCacheValue> getIcebergSchema() {
- List<Column> columns = IcebergUtils.getSchema(catalog, dbName, name);
- List<Column> partitionColumns = initPartitionColumns(columns);
- return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
+ private Optional<SchemaCacheValue> getIcebergSchema(SchemaCacheKey key) {
+ return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name,
((IcebergSchemaCacheKey) key).getSchemaId());
}
private Optional<SchemaCacheValue> getHudiSchema(SchemaCacheKey key) {
@@ -1085,8 +1087,12 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
if (getDlaType() == DLAType.HUDI) {
return new
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
+ } else if (getDlaType() == DLAType.ICEBERG) {
+ return new IcebergMvccSnapshot(
+ IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot,
getCatalog(), getDbName(), getName()));
+ } else {
+ return new EmptyMvccSnapshot();
}
- return new EmptyMvccSnapshot();
}
public boolean firstColumnIsString() {
@@ -1107,4 +1113,9 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
getRemoteTable().getSd().getLocation(),
getCatalog().getConfiguration());
}
+
+ public boolean isValidRelatedTable() {
+ makeSureInitialized();
+ return dlaTable.isValidRelatedTable();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
new file mode 100644
index 00000000000..36b871282a9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class IcebergDlaTable extends HMSDlaTable {
+
+ private boolean isValidRelatedTableCached = false;
+ private boolean isValidRelatedTable = false;
+
+ public IcebergDlaTable(HMSExternalTable table) {
+ super(table);
+ }
+
+ @Override
+ public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+ return Maps.newHashMap(
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName())
+ .getPartitionInfo().getNameToPartitionItem());
+ }
+
+ @Override
+ public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+ return isValidRelatedTable() ? PartitionType.RANGE :
PartitionType.UNPARTITIONED;
+ }
+
+ @Override
+ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot>
snapshot) {
+ return
getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet());
+ }
+
+ @Override
+ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName());
+ IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+ hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName(),
+ snapshotValue.getSnapshot().getSchemaId());
+ return schemaValue.getPartitionColumns();
+ }
+
+ @Override
+ public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
+ Optional<MvccSnapshot>
snapshot) throws AnalysisException {
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName());
+ long latestSnapshotId =
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
+ if (latestSnapshotId <= 0) {
+ throw new AnalysisException("can not find partition: " +
partitionName);
+ }
+ return new MTMVSnapshotIdSnapshot(latestSnapshotId);
+ }
+
+ @Override
+ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
+ throws AnalysisException {
+ hmsTable.makeSureInitialized();
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName());
+ return new
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
+ }
+
+ @Override
+ boolean isPartitionColumnAllowNull() {
+ return true;
+ }
+
+ @Override
+ protected boolean isValidRelatedTable() {
+ if (isValidRelatedTableCached) {
+ return isValidRelatedTable;
+ }
+ isValidRelatedTable = false;
+ Set<String> allFields = Sets.newHashSet();
+ Table table = IcebergUtils.getIcebergTable(
+ hmsTable.getCatalog(),
+ hmsTable.getDbName(),
+ hmsTable.getName()
+ );
+ for (PartitionSpec spec : table.specs().values()) {
+ if (spec == null) {
+ isValidRelatedTableCached = true;
+ return false;
+ }
+ List<PartitionField> fields = spec.fields();
+ if (fields.size() != 1) {
+ isValidRelatedTableCached = true;
+ return false;
+ }
+ PartitionField partitionField = spec.fields().get(0);
+ String transformName = partitionField.transform().toString();
+ if (!IcebergUtils.YEAR.equals(transformName)
+ && !IcebergUtils.MONTH.equals(transformName)
+ && !IcebergUtils.DAY.equals(transformName)
+ && !IcebergUtils.HOUR.equals(transformName)) {
+ isValidRelatedTableCached = true;
+ return false;
+ }
+
allFields.add(table.schema().findColumnName(partitionField.sourceId()));
+ }
+ isValidRelatedTableCached = true;
+ isValidRelatedTable = allFields.size() == 1;
+ return isValidRelatedTable;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 9bb7ca8ae08..225e14af420 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -47,7 +47,6 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
protected String icebergCatalogType;
protected Catalog catalog;
- private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;
public IcebergExternalCatalog(long catalogId, String name, String comment)
{
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index 16964ead1d2..2e8cdc63196 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -17,25 +17,18 @@
package org.apache.doris.datasource.iceberg;
-import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
-import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.CacheException;
-import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccTable;
-import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
@@ -50,32 +43,12 @@ import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
import com.google.common.collect.Sets;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionsTable;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructProjection;
-import java.io.IOException;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.Month;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -85,15 +58,7 @@ import java.util.stream.Collectors;
public class IcebergExternalTable extends ExternalTable implements
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
- public static final String YEAR = "year";
- public static final String MONTH = "month";
- public static final String DAY = "day";
- public static final String HOUR = "hour";
- public static final String IDENTITY = "identity";
- public static final int PARTITION_DATA_ID_START = 1000; //
org.apache.iceberg.PartitionSpec
-
private Table table;
- private List<Column> partitionColumns;
private boolean isValidRelatedTableCached = false;
private boolean isValidRelatedTable = false;
@@ -118,29 +83,9 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
this.table = table;
}
- @VisibleForTesting
- public void setPartitionColumns(List<Column> partitionColumns) {
- this.partitionColumns = partitionColumns;
- }
-
@Override
public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
- table = getIcebergTable();
- List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name,
- ((IcebergSchemaCacheKey) key).getSchemaId());
- List<Column> tmpColumns = Lists.newArrayList();
- PartitionSpec spec = table.spec();
- for (PartitionField field : spec.fields()) {
- Types.NestedField col = table.schema().findField(field.sourceId());
- for (Column c : schema) {
- if (c.getName().equalsIgnoreCase(col.name())) {
- tmpColumns.add(c);
- break;
- }
- }
- }
- partitionColumns = tmpColumns;
- return Optional.of(new IcebergSchemaCacheValue(schema,
partitionColumns));
+ return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name,
((IcebergSchemaCacheKey) key).getSchemaId());
}
@Override
@@ -178,23 +123,21 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
return IcebergUtils.getIcebergTable(getCatalog(), getDbName(),
getName());
}
- private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() {
- return
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
- .getSnapshotCache(catalog, dbName, name);
- }
-
@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}
@Override
public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
- return
Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem());
+ return Maps.newHashMap(
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(),
getDbName(), getName())
+ .getPartitionInfo().getNameToPartitionItem());
}
@Override
public Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
- return
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
+ return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName())
+ .getPartitionInfo().getNameToPartitionItem();
}
@Override
@@ -209,15 +152,18 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
- IcebergSnapshotCacheValue snapshotValue =
getOrFetchSnapshotCacheValue(snapshot);
- IcebergSchemaCacheValue schemaValue =
getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId());
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName());
+ IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+ catalog, getDbName(), getName(),
snapshotValue.getSnapshot().getSchemaId());
return schemaValue.getPartitionColumns();
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
Optional<MvccSnapshot>
snapshot) throws AnalysisException {
- IcebergSnapshotCacheValue snapshotValue =
getOrFetchSnapshotCacheValue(snapshot);
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName());
long latestSnapshotId =
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
if (latestSnapshotId <= 0) {
throw new AnalysisException("can not find partition: " +
partitionName);
@@ -229,7 +175,8 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
makeSureInitialized();
- IcebergSnapshotCacheValue snapshotValue =
getOrFetchSnapshotCacheValue(snapshot);
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName());
return new
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}
@@ -264,10 +211,10 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
}
PartitionField partitionField = spec.fields().get(0);
String transformName = partitionField.transform().toString();
- if (!YEAR.equals(transformName)
- && !MONTH.equals(transformName)
- && !DAY.equals(transformName)
- && !HOUR.equals(transformName)) {
+ if (!IcebergUtils.YEAR.equals(transformName)
+ && !IcebergUtils.MONTH.equals(transformName)
+ && !IcebergUtils.DAY.equals(transformName)
+ && !IcebergUtils.HOUR.equals(transformName)) {
isValidRelatedTableCached = true;
return false;
}
@@ -280,27 +227,13 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
@Override
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
- return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
- }
-
- public long getLatestSnapshotId() {
- table = getIcebergTable();
- Snapshot snapshot = table.currentSnapshot();
- return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID :
table.currentSnapshot().snapshotId();
- }
-
- public long getSchemaId(long snapshotId) {
- table = getIcebergTable();
- return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID
- ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
- : table.snapshot(snapshotId).schemaId();
+ return new
IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue(
+ tableSnapshot, getCatalog(), getDbName(), getName()));
}
@Override
public List<Column> getFullSchema() {
- Optional<MvccSnapshot> snapshotFromContext =
MvccUtil.getSnapshotFromContext(this);
- IcebergSnapshotCacheValue cacheValue =
getOrFetchSnapshotCacheValue(snapshotFromContext);
- return
getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema();
+ return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(),
getName());
}
@Override
@@ -308,239 +241,6 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
return true;
}
- public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) {
- ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
- Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
- new IcebergSchemaCacheKey(dbName, name, schemaId));
- if (!schemaCacheValue.isPresent()) {
- throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
- null, catalog.getName(), dbName, name, schemaId);
- }
- return (IcebergSchemaCacheValue) schemaCacheValue.get();
- }
-
- public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws
AnalysisException {
- // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table,
haven't contained any snapshot yet.
- if (!isValidRelatedTable() || snapshotId ==
IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
- return IcebergPartitionInfo.empty();
- }
- List<IcebergPartition> icebergPartitions =
loadIcebergPartition(snapshotId);
- Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
- Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
- table = getIcebergTable();
- partitionColumns =
getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns();
- for (IcebergPartition partition : icebergPartitions) {
- nameToPartition.put(partition.getPartitionName(), partition);
- String transform =
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
- Range<PartitionKey> partitionRange = getPartitionRange(
- partition.getPartitionValues().get(0), transform,
partitionColumns);
- PartitionItem item = new RangePartitionItem(partitionRange);
- nameToPartitionItem.put(partition.getPartitionName(), item);
- }
- Map<String, Set<String>> partitionNameMap =
mergeOverlapPartitions(nameToPartitionItem);
- return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition,
partitionNameMap);
- }
-
- public List<IcebergPartition> loadIcebergPartition(long snapshotId) {
- PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
- .createMetadataTableInstance(table,
MetadataTableType.PARTITIONS);
- List<IcebergPartition> partitions = Lists.newArrayList();
- try (CloseableIterable<FileScanTask> tasks =
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
- for (FileScanTask task : tasks) {
- CloseableIterable<StructLike> rows = task.asDataTask().rows();
- for (StructLike row : rows) {
- partitions.add(generateIcebergPartition(row));
- }
- }
- } catch (IOException e) {
- LOG.warn("Failed to get Iceberg table {} partition info.", name,
e);
- }
- return partitions;
- }
-
- public IcebergPartition generateIcebergPartition(StructLike row) {
- // row format :
- // 0. partitionData,
- // 1. spec_id,
- // 2. record_count,
- // 3. file_count,
- // 4. total_data_file_size_in_bytes,
- // 5. position_delete_record_count,
- // 6. position_delete_file_count,
- // 7. equality_delete_record_count,
- // 8. equality_delete_file_count,
- // 9. last_updated_at,
- // 10. last_updated_snapshot_id
- table = getIcebergTable();
- Preconditions.checkState(!table.spec().fields().isEmpty(),
table.name() + " is not a partition table.");
- int specId = row.get(1, Integer.class);
- PartitionSpec partitionSpec = table.specs().get(specId);
- StructProjection partitionData = row.get(0, StructProjection.class);
- StringBuilder sb = new StringBuilder();
- List<String> partitionValues = Lists.newArrayList();
- List<String> transforms = Lists.newArrayList();
- for (int i = 0; i < partitionSpec.fields().size(); ++i) {
- PartitionField partitionField = partitionSpec.fields().get(i);
- Class<?> fieldClass = partitionSpec.javaClasses()[i];
- int fieldId = partitionField.fieldId();
- // Iceberg partition field id starts at PARTITION_DATA_ID_START,
- // So we can get the field index in partitionData using fieldId -
PARTITION_DATA_ID_START
- int index = fieldId - PARTITION_DATA_ID_START;
- Object o = partitionData.get(index, fieldClass);
- String fieldValue = o == null ? null : o.toString();
- String fieldName = partitionField.name();
- sb.append(fieldName);
- sb.append("=");
- sb.append(fieldValue);
- sb.append("/");
- partitionValues.add(fieldValue);
- transforms.add(partitionField.transform().toString());
- }
- if (sb.length() > 0) {
- sb.delete(sb.length() - 1, sb.length());
- }
- String partitionName = sb.toString();
- long recordCount = row.get(2, Long.class);
- long fileCount = row.get(3, Integer.class);
- long fileSizeInBytes = row.get(4, Long.class);
- long lastUpdateTime = row.get(9, Long.class);
- long lastUpdateSnapShotId = row.get(10, Long.class);
- return new IcebergPartition(partitionName, specId, recordCount,
fileSizeInBytes, fileCount,
- lastUpdateTime, lastUpdateSnapShotId, partitionValues,
transforms);
- }
-
- @VisibleForTesting
- public Range<PartitionKey> getPartitionRange(String value, String
transform, List<Column> partitionColumns)
- throws AnalysisException {
- // For NULL value, create a minimum partition for it.
- if (value == null) {
- PartitionKey nullLowKey = PartitionKey.createPartitionKey(
- Lists.newArrayList(new PartitionValue("0000-01-01")),
partitionColumns);
- PartitionKey nullUpKey = nullLowKey.successor();
- return Range.closedOpen(nullLowKey, nullUpKey);
- }
- LocalDateTime epoch =
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
- LocalDateTime target;
- LocalDateTime lower;
- LocalDateTime upper;
- long longValue = Long.parseLong(value);
- switch (transform) {
- case HOUR:
- target = epoch.plusHours(longValue);
- lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(),
- target.getHour(), 0, 0);
- upper = lower.plusHours(1);
- break;
- case DAY:
- target = epoch.plusDays(longValue);
- lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(), 0, 0, 0);
- upper = lower.plusDays(1);
- break;
- case MONTH:
- target = epoch.plusMonths(longValue);
- lower = LocalDateTime.of(target.getYear(), target.getMonth(),
1, 0, 0, 0);
- upper = lower.plusMonths(1);
- break;
- case YEAR:
- target = epoch.plusYears(longValue);
- lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1,
0, 0, 0);
- upper = lower.plusYears(1);
- break;
- default:
- throw new RuntimeException("Unsupported transform " +
transform);
- }
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
- Column c = partitionColumns.get(0);
- Preconditions.checkState(c.getDataType().isDateType(), "Only support
date type partition column");
- if (c.getType().isDate() || c.getType().isDateV2()) {
- formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- }
- PartitionValue lowerValue = new
PartitionValue(lower.format(formatter));
- PartitionValue upperValue = new
PartitionValue(upper.format(formatter));
- PartitionKey lowKey =
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue),
partitionColumns);
- PartitionKey upperKey =
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue),
partitionColumns);
- return Range.closedOpen(lowKey, upperKey);
- }
-
- /**
- * Merge overlapped iceberg partitions into one Doris partition.
- */
- public Map<String, Set<String>> mergeOverlapPartitions(Map<String,
PartitionItem> originPartitions) {
- List<Map.Entry<String, PartitionItem>> entries =
sortPartitionMap(originPartitions);
- Map<String, Set<String>> map = Maps.newHashMap();
- for (int i = 0; i < entries.size() - 1; i++) {
- Range<PartitionKey> firstValue =
entries.get(i).getValue().getItems();
- String firstKey = entries.get(i).getKey();
- Range<PartitionKey> secondValue = entries.get(i +
1).getValue().getItems();
- String secondKey = entries.get(i + 1).getKey();
- // If the first entry enclose the second one, remove the second
entry and keep a record in the return map.
- // So we can track the iceberg partitions those contained by one
Doris partition.
- while (i < entries.size() && firstValue.encloses(secondValue)) {
- originPartitions.remove(secondKey);
- map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
- String finalSecondKey = secondKey;
- map.computeIfPresent(firstKey, (key, value) -> {
- value.add(finalSecondKey);
- return value;
- });
- i++;
- if (i >= entries.size() - 1) {
- break;
- }
- secondValue = entries.get(i + 1).getValue().getItems();
- secondKey = entries.get(i + 1).getKey();
- }
- }
- return map;
- }
-
- /**
- * Sort the given map entries by PartitionItem Range(LOW, HIGH)
- * When comparing two ranges, the one with smaller LOW value is smaller
than the other one.
- * If two ranges have same values of LOW, the one with larger HIGH value
is smaller.
- *
- * For now, we only support year, month, day and hour,
- * so it is impossible to have two partially intersect partitions.
- * One range is either enclosed by another or has no intersection at all
with another.
- *
- *
- * For example, we have these 4 ranges:
- * [10, 20), [30, 40), [0, 30), [10, 15)
- *
- * After sort, they become:
- * [0, 30), [10, 20), [10, 15), [30, 40)
- */
- public List<Map.Entry<String, PartitionItem>> sortPartitionMap(Map<String,
PartitionItem> originPartitions) {
- List<Map.Entry<String, PartitionItem>> entries = new
ArrayList<>(originPartitions.entrySet());
- entries.sort(new RangeComparator());
- return entries;
- }
-
- public static class RangeComparator implements
Comparator<Map.Entry<String, PartitionItem>> {
- @Override
- public int compare(Map.Entry<String, PartitionItem> p1,
Map.Entry<String, PartitionItem> p2) {
- PartitionItem value1 = p1.getValue();
- PartitionItem value2 = p2.getValue();
- if (value1 instanceof RangePartitionItem && value2 instanceof
RangePartitionItem) {
- Range<PartitionKey> items1 = value1.getItems();
- Range<PartitionKey> items2 = value2.getItems();
- if (!items1.hasLowerBound()) {
- return -1;
- }
- if (!items2.hasLowerBound()) {
- return 1;
- }
- PartitionKey upper1 = items1.upperEndpoint();
- PartitionKey lower1 = items1.lowerEndpoint();
- PartitionKey upper2 = items2.upperEndpoint();
- PartitionKey lower2 = items2.lowerEndpoint();
- int compareLow = lower1.compareTo(lower2);
- return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
- }
- return 0;
- }
- }
-
@VisibleForTesting
public boolean isValidRelatedTableCached() {
return isValidRelatedTableCached;
@@ -554,12 +254,4 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
public void setIsValidRelatedTableCached(boolean isCached) {
this.isValidRelatedTableCached = isCached;
}
-
- public IcebergSnapshotCacheValue
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
- if (snapshot.isPresent()) {
- return ((IcebergMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
- } else {
- return getIcebergSnapshotCacheValue();
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index f3daf2d2795..57c5eb20c64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.thrift.TIcebergMetadataParams;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -136,12 +137,18 @@ public class IcebergMetadataCache {
@NotNull
private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey
key) throws AnalysisException {
- IcebergExternalTable table = (IcebergExternalTable)
key.catalog.getDbOrAnalysisException(key.dbName)
+ MTMVRelatedTableIf table = (MTMVRelatedTableIf)
key.catalog.getDbOrAnalysisException(key.dbName)
.getTableOrAnalysisException(key.tableName);
- long snapshotId = table.getLatestSnapshotId();
- long schemaId = table.getSchemaId(snapshotId);
- IcebergPartitionInfo icebergPartitionInfo =
table.loadPartitionInfo(snapshotId);
- return new IcebergSnapshotCacheValue(icebergPartitionInfo, new
IcebergSnapshot(snapshotId, schemaId));
+ IcebergSnapshot lastedIcebergSnapshot =
IcebergUtils.getLastedIcebergSnapshot(
+ (ExternalCatalog) key.catalog, key.dbName, key.tableName);
+ IcebergPartitionInfo icebergPartitionInfo;
+ if (!table.isValidRelatedTable()) {
+ icebergPartitionInfo = IcebergPartitionInfo.empty();
+ } else {
+ icebergPartitionInfo = IcebergUtils.loadPartitionInfo(
+ (ExternalCatalog) key.catalog, key.dbName, key.tableName,
lastedIcebergSnapshot.getSnapshotId());
+ }
+ return new IcebergSnapshotCacheValue(icebergPartitionInfo,
lastedIcebergSnapshot);
}
public void invalidateCatalogCache(long catalogId) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index a3cbfec688e..f1d6c4eb033 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -31,37 +31,58 @@ import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.And;
@@ -77,13 +98,25 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.StructProjection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -114,7 +147,15 @@ public class IcebergUtils {
// nickname in spark
public static final String SPARK_SQL_COMPRESSION_CODEC =
"spark.sql.iceberg.compression-codec";
- public static final long UNKNOWN_SNAPSHOT_ID = -1;
+ public static final long UNKNOWN_SNAPSHOT_ID = -1; // means an empty table
+ public static final long NEWEST_SCHEMA_ID = -1;
+
+ public static final String YEAR = "year";
+ public static final String MONTH = "month";
+ public static final String DAY = "day";
+ public static final String HOUR = "hour";
+ public static final String IDENTITY = "identity";
+ public static final int PARTITION_DATA_ID_START = 1000; //
org.apache.iceberg.PartitionSpec
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
@@ -579,10 +620,6 @@ public class IcebergUtils {
: metadataCache.getIcebergTable(catalog, dbName, tblName);
}
- public static List<Column> getSchema(ExternalCatalog catalog, String
dbName, String name) {
- return getSchema(catalog, dbName, name, UNKNOWN_SNAPSHOT_ID);
- }
-
/**
* Get iceberg schema from catalog and convert them to doris schema
*/
@@ -591,7 +628,7 @@ public class IcebergUtils {
return catalog.getPreExecutionAuthenticator().execute(() -> {
org.apache.iceberg.Table icebergTable =
getIcebergTable(catalog, dbName, name);
Schema schema;
- if (schemaId == UNKNOWN_SNAPSHOT_ID ||
icebergTable.currentSnapshot() == null) {
+ if (schemaId == NEWEST_SCHEMA_ID ||
icebergTable.currentSnapshot() == null) {
schema = icebergTable.schema();
} else {
schema = icebergTable.schemas().get((int) schemaId);
@@ -744,4 +781,330 @@ public class IcebergUtils {
return matchingManifests;
}
+
+ // get snapshot id from query like 'for version/time as of'
+ public static long getQuerySpecSnapshot(Table table, TableSnapshot
queryTableSnapshot) {
+ TableSnapshot.VersionType type = queryTableSnapshot.getType();
+ if (type == TableSnapshot.VersionType.VERSION) {
+ return queryTableSnapshot.getVersion();
+ } else {
+ long timestamp =
TimeUtils.timeStringToLong(queryTableSnapshot.getTime(),
TimeUtils.getTimeZone());
+ if (timestamp < 0) {
+ throw new DateTimeException("can't parse time: " +
queryTableSnapshot.getTime());
+ }
+ return SnapshotUtil.snapshotIdAsOfTime(table, timestamp);
+ }
+ }
+
+ // read schema from external schema cache
+ public static IcebergSchemaCacheValue getSchemaCacheValue(
+ ExternalCatalog catalog, String dbName, String name, long
schemaId) {
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+ Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+ new IcebergSchemaCacheKey(dbName, name, schemaId));
+ if (!schemaCacheValue.isPresent()) {
+ throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+ null, catalog.getName(), dbName, name, schemaId);
+ }
+ return (IcebergSchemaCacheValue) schemaCacheValue.get();
+ }
+
+ public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog
catalog, String dbName, String tbName) {
+ Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+ Snapshot snapshot = table.currentSnapshot();
+ long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
: snapshot.snapshotId();
+ return new IcebergSnapshot(snapshotId, table.schema().schemaId());
+ }
+
+ public static IcebergPartitionInfo loadPartitionInfo(
+ ExternalCatalog catalog, String dbName, String tbName, long
snapshotId) throws AnalysisException {
+ // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table,
haven't contained any snapshot yet.
+ if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
+ return IcebergPartitionInfo.empty();
+ }
+ Table table = getIcebergTable(catalog, dbName, tbName);
+ List<IcebergPartition> icebergPartitions = loadIcebergPartition(table,
snapshotId);
+ Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
+ Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+
+ List<Column> partitionColumns = IcebergUtils.getSchemaCacheValue(
+ catalog, dbName, tbName,
table.snapshot(snapshotId).schemaId()).getPartitionColumns();
+ for (IcebergPartition partition : icebergPartitions) {
+ nameToPartition.put(partition.getPartitionName(), partition);
+ String transform =
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
+ Range<PartitionKey> partitionRange = getPartitionRange(
+ partition.getPartitionValues().get(0), transform,
partitionColumns);
+ PartitionItem item = new RangePartitionItem(partitionRange);
+ nameToPartitionItem.put(partition.getPartitionName(), item);
+ }
+ Map<String, Set<String>> partitionNameMap =
mergeOverlapPartitions(nameToPartitionItem);
+ return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition,
partitionNameMap);
+ }
+
+ private static List<IcebergPartition> loadIcebergPartition(Table table,
long snapshotId) {
+ PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
+ .createMetadataTableInstance(table,
MetadataTableType.PARTITIONS);
+ List<IcebergPartition> partitions = Lists.newArrayList();
+ try (CloseableIterable<FileScanTask> tasks =
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
+ for (FileScanTask task : tasks) {
+ CloseableIterable<StructLike> rows = task.asDataTask().rows();
+ for (StructLike row : rows) {
+ partitions.add(generateIcebergPartition(table, row));
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to get Iceberg table {} partition info.",
table.name(), e);
+ }
+ return partitions;
+ }
+
+ private static IcebergPartition generateIcebergPartition(Table table,
StructLike row) {
+ // row format :
+ // 0. partitionData,
+ // 1. spec_id,
+ // 2. record_count,
+ // 3. file_count,
+ // 4. total_data_file_size_in_bytes,
+ // 5. position_delete_record_count,
+ // 6. position_delete_file_count,
+ // 7. equality_delete_record_count,
+ // 8. equality_delete_file_count,
+ // 9. last_updated_at,
+ // 10. last_updated_snapshot_id
+ Preconditions.checkState(!table.spec().fields().isEmpty(),
table.name() + " is not a partition table.");
+ int specId = row.get(1, Integer.class);
+ PartitionSpec partitionSpec = table.specs().get(specId);
+ StructProjection partitionData = row.get(0, StructProjection.class);
+ StringBuilder sb = new StringBuilder();
+ List<String> partitionValues = Lists.newArrayList();
+ List<String> transforms = Lists.newArrayList();
+ for (int i = 0; i < partitionSpec.fields().size(); ++i) {
+ PartitionField partitionField = partitionSpec.fields().get(i);
+ Class<?> fieldClass = partitionSpec.javaClasses()[i];
+ int fieldId = partitionField.fieldId();
+ // Iceberg partition field id starts at PARTITION_DATA_ID_START,
+ // So we can get the field index in partitionData using fieldId -
PARTITION_DATA_ID_START
+ int index = fieldId - PARTITION_DATA_ID_START;
+ Object o = partitionData.get(index, fieldClass);
+ String fieldValue = o == null ? null : o.toString();
+ String fieldName = partitionField.name();
+ sb.append(fieldName);
+ sb.append("=");
+ sb.append(fieldValue);
+ sb.append("/");
+ partitionValues.add(fieldValue);
+ transforms.add(partitionField.transform().toString());
+ }
+ if (sb.length() > 0) {
+ sb.delete(sb.length() - 1, sb.length());
+ }
+ String partitionName = sb.toString();
+ long recordCount = row.get(2, Long.class);
+ long fileCount = row.get(3, Integer.class);
+ long fileSizeInBytes = row.get(4, Long.class);
+ long lastUpdateTime = row.get(9, Long.class);
+ long lastUpdateSnapShotId = row.get(10, Long.class);
+ return new IcebergPartition(partitionName, specId, recordCount,
fileSizeInBytes, fileCount,
+ lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms);
+ }
+
+ @VisibleForTesting
+ public static Range<PartitionKey> getPartitionRange(String value, String
transform, List<Column> partitionColumns)
+ throws AnalysisException {
+ // For NULL value, create a minimum partition for it.
+ if (value == null) {
+ PartitionKey nullLowKey = PartitionKey.createPartitionKey(
+ Lists.newArrayList(new PartitionValue("0000-01-01")),
partitionColumns);
+ PartitionKey nullUpKey = nullLowKey.successor();
+ return Range.closedOpen(nullLowKey, nullUpKey);
+ }
+ LocalDateTime epoch =
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
+ LocalDateTime target;
+ LocalDateTime lower;
+ LocalDateTime upper;
+ long longValue = Long.parseLong(value);
+ switch (transform) {
+ case HOUR:
+ target = epoch.plusHours(longValue);
+ lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(),
+ target.getHour(), 0, 0);
+ upper = lower.plusHours(1);
+ break;
+ case DAY:
+ target = epoch.plusDays(longValue);
+ lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(), 0, 0, 0);
+ upper = lower.plusDays(1);
+ break;
+ case MONTH:
+ target = epoch.plusMonths(longValue);
+ lower = LocalDateTime.of(target.getYear(), target.getMonth(),
1, 0, 0, 0);
+ upper = lower.plusMonths(1);
+ break;
+ case YEAR:
+ target = epoch.plusYears(longValue);
+ lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1,
0, 0, 0);
+ upper = lower.plusYears(1);
+ break;
+ default:
+ throw new RuntimeException("Unsupported transform " +
transform);
+ }
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
+ Column c = partitionColumns.get(0);
+ Preconditions.checkState(c.getDataType().isDateType(), "Only support
date type partition column");
+ if (c.getType().isDate() || c.getType().isDateV2()) {
+ formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ }
+ PartitionValue lowerValue = new
PartitionValue(lower.format(formatter));
+ PartitionValue upperValue = new
PartitionValue(upper.format(formatter));
+ PartitionKey lowKey =
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue),
partitionColumns);
+ PartitionKey upperKey =
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue),
partitionColumns);
+ return Range.closedOpen(lowKey, upperKey);
+ }
+
+ /**
+ * Merge overlapped iceberg partitions into one Doris partition.
+ */
+ @VisibleForTesting
+ public static Map<String, Set<String>> mergeOverlapPartitions(Map<String,
PartitionItem> originPartitions) {
+ List<Map.Entry<String, PartitionItem>> entries =
sortPartitionMap(originPartitions);
+ Map<String, Set<String>> map = Maps.newHashMap();
+ for (int i = 0; i < entries.size() - 1; i++) {
+ Range<PartitionKey> firstValue =
entries.get(i).getValue().getItems();
+ String firstKey = entries.get(i).getKey();
+ Range<PartitionKey> secondValue = entries.get(i +
1).getValue().getItems();
+ String secondKey = entries.get(i + 1).getKey();
+ // If the first entry enclose the second one, remove the second
entry and keep a record in the return map.
+ // So we can track the iceberg partitions those contained by one
Doris partition.
+ while (i < entries.size() && firstValue.encloses(secondValue)) {
+ originPartitions.remove(secondKey);
+ map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
+ String finalSecondKey = secondKey;
+ map.computeIfPresent(firstKey, (key, value) -> {
+ value.add(finalSecondKey);
+ return value;
+ });
+ i++;
+ if (i >= entries.size() - 1) {
+ break;
+ }
+ secondValue = entries.get(i + 1).getValue().getItems();
+ secondKey = entries.get(i + 1).getKey();
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Sort the given map entries by PartitionItem Range(LOW, HIGH)
+ * When comparing two ranges, the one with smaller LOW value is smaller
than the other one.
+ * If two ranges have same values of LOW, the one with larger HIGH value
is smaller.
+ *
+ * For now, we only support year, month, day and hour,
+ * so it is impossible to have two partially intersect partitions.
+ * One range is either enclosed by another or has no intersection at all
with another.
+ *
+ *
+ * For example, we have these 4 ranges:
+ * [10, 20), [30, 40), [0, 30), [10, 15)
+ *
+ * After sort, they become:
+ * [0, 30), [10, 20), [10, 15), [30, 40)
+ */
+ @VisibleForTesting
+ public static List<Map.Entry<String, PartitionItem>>
sortPartitionMap(Map<String, PartitionItem> originPartitions) {
+ List<Map.Entry<String, PartitionItem>> entries = new
ArrayList<>(originPartitions.entrySet());
+ entries.sort(new RangeComparator());
+ return entries;
+ }
+
+ public static class RangeComparator implements
Comparator<Map.Entry<String, PartitionItem>> {
+ @Override
+ public int compare(Map.Entry<String, PartitionItem> p1,
Map.Entry<String, PartitionItem> p2) {
+ PartitionItem value1 = p1.getValue();
+ PartitionItem value2 = p2.getValue();
+ if (value1 instanceof RangePartitionItem && value2 instanceof
RangePartitionItem) {
+ Range<PartitionKey> items1 = value1.getItems();
+ Range<PartitionKey> items2 = value2.getItems();
+ if (!items1.hasLowerBound()) {
+ return -1;
+ }
+ if (!items2.hasLowerBound()) {
+ return 1;
+ }
+ PartitionKey upper1 = items1.upperEndpoint();
+ PartitionKey lower1 = items1.lowerEndpoint();
+ PartitionKey upper2 = items2.upperEndpoint();
+ PartitionKey lower2 = items2.lowerEndpoint();
+ int compareLow = lower1.compareTo(lower2);
+ return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
+ }
+ return 0;
+ }
+ }
+
+ public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue(
+ Optional<TableSnapshot> tableSnapshot,
+ ExternalCatalog catalog,
+ String dbName,
+ String tbName) {
+ IcebergSnapshotCacheValue snapshotCache =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+ .getSnapshotCache(catalog, dbName, tbName);
+ if (tableSnapshot.isPresent()) {
+ // If a snapshot is specified,
+ // use the specified snapshot and the corresponding schema(not the
latest schema).
+ Table icebergTable = getIcebergTable(catalog, dbName, tbName);
+ TableSnapshot snapshot = tableSnapshot.get();
+ long querySpecSnapshot = getQuerySpecSnapshot(icebergTable,
snapshot);
+ return new IcebergSnapshotCacheValue(
+ IcebergPartitionInfo.empty(),
+ new IcebergSnapshot(querySpecSnapshot,
icebergTable.snapshot(querySpecSnapshot).schemaId()));
+ } else {
+ // Otherwise, use the latest snapshot and the latest schema.
+ return snapshotCache;
+ }
+ }
+
+ // load table schema from iceberg API to external schema cache.
+ public static Optional<SchemaCacheValue> loadSchemaCacheValue(
+ ExternalCatalog catalog, String dbName, String tbName, long
schemaId) {
+ Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+ List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName,
schemaId);
+ List<Column> tmpColumns = Lists.newArrayList();
+ PartitionSpec spec = table.spec();
+ for (PartitionField field : spec.fields()) {
+ Types.NestedField col = table.schema().findField(field.sourceId());
+ for (Column c : schema) {
+ if (c.getName().equalsIgnoreCase(col.name())) {
+ tmpColumns.add(c);
+ break;
+ }
+ }
+ }
+ return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
+ }
+
+ public static List<Column> getIcebergSchema(
+ TableIf tableIf,
+ ExternalCatalog catalog,
+ String dbName,
+ String tbName) {
+ Optional<MvccSnapshot> snapshotFromContext =
MvccUtil.getSnapshotFromContext(tableIf);
+ IcebergSnapshotCacheValue cacheValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext,
catalog, dbName, tbName);
+ return IcebergUtils.getSchemaCacheValue(
+ catalog, dbName, tbName,
cacheValue.getSnapshot().getSchemaId())
+ .getSchema();
+ }
+
+ public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(
+ Optional<MvccSnapshot> snapshot,
+ ExternalCatalog catalog,
+ String dbName,
+ String tbName) {
+ if (snapshot.isPresent()) {
+ return ((IcebergMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
+ } else {
+ return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(),
catalog, dbName, tbName);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 30a2380a272..a634aa6c91f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
@@ -66,13 +65,11 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
-import java.time.DateTimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -392,16 +389,7 @@ public class IcebergScanNode extends FileQueryScanNode {
public Long getSpecifiedSnapshot() {
TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
- TableSnapshot.VersionType type = tableSnapshot.getType();
- if (type == TableSnapshot.VersionType.VERSION) {
- return tableSnapshot.getVersion();
- } else {
- long timestamp =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
- if (timestamp < 0) {
- throw new DateTimeException("can't parse time: " +
tableSnapshot.getTime());
- }
- return SnapshotUtil.snapshotIdAsOfTime(icebergTable,
timestamp);
- }
+ return IcebergUtils.getQuerySpecSnapshot(icebergTable,
tableSnapshot);
}
return null;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 37488a001f6..1dde5f633ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -110,6 +110,7 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
protected TableSnapshot tableSnapshot;
+ protected List<Column> columns;
// Save the id of backends which this scan node will be executed on.
// This is also important for local shuffle logic.
@@ -141,6 +142,13 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
return result;
}
+ protected List<Column> getColumns() {
+ if (columns == null && desc.getTable() != null) {
+ columns = desc.getTable().getBaseSchema();
+ }
+ return columns;
+ }
+
public TupleDescriptor getTupleDesc() {
return desc;
}
@@ -233,7 +241,7 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
// for load scan node, table is null
// partitionsInfo maybe null for other scan node, eg:
ExternalScanNode...
if (desc.getTable() != null) {
- computeColumnsFilter(desc.getTable().getBaseSchema(),
partitionsInfo);
+ computeColumnsFilter(getColumns(), partitionsInfo);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
index 49144d874a4..f7305e6eb7c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
@@ -171,42 +171,39 @@ public class IcebergExternalTableTest {
@Test
public void testGetPartitionRange() throws AnalysisException {
- IcebergExternalDatabase database = new IcebergExternalDatabase(null,
1L, "2", "2");
- IcebergExternalTable table = new IcebergExternalTable(1, "1", "1",
null, database);
Column c = new Column("ts", PrimitiveType.DATETIMEV2);
List<Column> partitionColumns = Lists.newArrayList(c);
- table.setPartitionColumns(partitionColumns);
// Test null partition value
- Range<PartitionKey> nullRange = table.getPartitionRange(null, "hour",
partitionColumns);
+ Range<PartitionKey> nullRange = IcebergUtils.getPartitionRange(null,
"hour", partitionColumns);
Assertions.assertEquals("0000-01-01 00:00:00",
nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0));
Assertions.assertEquals("0000-01-01 00:00:01",
nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0));
// Test hour transform.
- Range<PartitionKey> hour = table.getPartitionRange("100", "hour",
partitionColumns);
+ Range<PartitionKey> hour = IcebergUtils.getPartitionRange("100",
"hour", partitionColumns);
PartitionKey lowKey = hour.lowerEndpoint();
PartitionKey upKey = hour.upperEndpoint();
Assertions.assertEquals("1970-01-05 04:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
Assertions.assertEquals("1970-01-05 05:00:00",
upKey.getPartitionValuesAsStringList().get(0));
// Test day transform.
- Range<PartitionKey> day = table.getPartitionRange("100", "day",
partitionColumns);
+ Range<PartitionKey> day = IcebergUtils.getPartitionRange("100", "day",
partitionColumns);
lowKey = day.lowerEndpoint();
upKey = day.upperEndpoint();
Assertions.assertEquals("1970-04-11 00:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
Assertions.assertEquals("1970-04-12 00:00:00",
upKey.getPartitionValuesAsStringList().get(0));
// Test month transform.
- Range<PartitionKey> month = table.getPartitionRange("100", "month",
partitionColumns);
+ Range<PartitionKey> month = IcebergUtils.getPartitionRange("100",
"month", partitionColumns);
lowKey = month.lowerEndpoint();
upKey = month.upperEndpoint();
Assertions.assertEquals("1978-05-01 00:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
Assertions.assertEquals("1978-06-01 00:00:00",
upKey.getPartitionValuesAsStringList().get(0));
// Test year transform.
- Range<PartitionKey> year = table.getPartitionRange("100", "year",
partitionColumns);
+ Range<PartitionKey> year = IcebergUtils.getPartitionRange("100",
"year", partitionColumns);
lowKey = year.lowerEndpoint();
upKey = year.upperEndpoint();
Assertions.assertEquals("2070-01-01 00:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
@@ -214,26 +211,23 @@ public class IcebergExternalTableTest {
// Test unsupported transform
Exception exception = Assertions.assertThrows(RuntimeException.class,
() -> {
- table.getPartitionRange("100", "bucket", partitionColumns);
+ IcebergUtils.getPartitionRange("100", "bucket", partitionColumns);
});
Assertions.assertEquals("Unsupported transform bucket",
exception.getMessage());
}
@Test
public void testSortRange() throws AnalysisException {
- IcebergExternalDatabase database = new IcebergExternalDatabase(null,
1L, "2", "2");
- IcebergExternalTable table = new IcebergExternalTable(1, "1", "1",
null, database);
Column c = new Column("c", PrimitiveType.DATETIMEV2);
ArrayList<Column> columns = Lists.newArrayList(c);
- table.setPartitionColumns(Lists.newArrayList(c));
- PartitionItem nullRange = new
RangePartitionItem(table.getPartitionRange(null, "hour", columns));
- PartitionItem year1970 = new
RangePartitionItem(table.getPartitionRange("0", "year", columns));
- PartitionItem year1971 = new
RangePartitionItem(table.getPartitionRange("1", "year", columns));
- PartitionItem month197002 = new
RangePartitionItem(table.getPartitionRange("1", "month", columns));
- PartitionItem month197103 = new
RangePartitionItem(table.getPartitionRange("14", "month", columns));
- PartitionItem month197204 = new
RangePartitionItem(table.getPartitionRange("27", "month", columns));
- PartitionItem day19700202 = new
RangePartitionItem(table.getPartitionRange("32", "day", columns));
- PartitionItem day19730101 = new
RangePartitionItem(table.getPartitionRange("1096", "day", columns));
+ PartitionItem nullRange = new
RangePartitionItem(IcebergUtils.getPartitionRange(null, "hour", columns));
+ PartitionItem year1970 = new
RangePartitionItem(IcebergUtils.getPartitionRange("0", "year", columns));
+ PartitionItem year1971 = new
RangePartitionItem(IcebergUtils.getPartitionRange("1", "year", columns));
+ PartitionItem month197002 = new
RangePartitionItem(IcebergUtils.getPartitionRange("1", "month", columns));
+ PartitionItem month197103 = new
RangePartitionItem(IcebergUtils.getPartitionRange("14", "month", columns));
+ PartitionItem month197204 = new
RangePartitionItem(IcebergUtils.getPartitionRange("27", "month", columns));
+ PartitionItem day19700202 = new
RangePartitionItem(IcebergUtils.getPartitionRange("32", "day", columns));
+ PartitionItem day19730101 = new
RangePartitionItem(IcebergUtils.getPartitionRange("1096", "day", columns));
Map<String, PartitionItem> map = Maps.newHashMap();
map.put("nullRange", nullRange);
map.put("year1970", year1970);
@@ -243,7 +237,7 @@ public class IcebergExternalTableTest {
map.put("month197204", month197204);
map.put("day19700202", day19700202);
map.put("day19730101", day19730101);
- List<Map.Entry<String, PartitionItem>> entries =
table.sortPartitionMap(map);
+ List<Map.Entry<String, PartitionItem>> entries =
IcebergUtils.sortPartitionMap(map);
Assertions.assertEquals(8, entries.size());
Assertions.assertEquals("nullRange", entries.get(0).getKey());
Assertions.assertEquals("year1970", entries.get(1).getKey());
@@ -254,7 +248,7 @@ public class IcebergExternalTableTest {
Assertions.assertEquals("month197204", entries.get(6).getKey());
Assertions.assertEquals("day19730101", entries.get(7).getKey());
- Map<String, Set<String>> stringSetMap =
table.mergeOverlapPartitions(map);
+ Map<String, Set<String>> stringSetMap =
IcebergUtils.mergeOverlapPartitions(map);
Assertions.assertEquals(2, stringSetMap.size());
Assertions.assertTrue(stringSetMap.containsKey("year1970"));
Assertions.assertTrue(stringSetMap.containsKey("year1971"));
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
new file mode 100644
index 00000000000..27e9359b713
Binary files /dev/null and
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
new file mode 100644
index 00000000000..a376dfc210d
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+
+suite("iceberg_schema_change_with_timetravel",
"p0,external,doris,external_docker,external_docker_doris") {
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "iceberg_schema_change_with_timetravel"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """ use test_db;"""
+
+ def executeTimeTravelingQueries = { String tableName ->
+ def snapshots = sql """ select snapshot_id from iceberg_meta("table" =
"${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by
committed_at; """
+ def snapshotIds = [
+ s0: snapshots.get(0)[0],
+ s1: snapshots.get(1)[0],
+ s2: snapshots.get(2)[0],
+ s3: snapshots.get(3)[0],
+ s4: snapshots.get(4)[0]
+ ]
+
+ qt_q0 """ desc ${tableName} """
+ qt_q1 """ select * from ${tableName} order by c1 """
+ qt_q2 """ select * from ${tableName} for version as of
${snapshotIds.s0} order by c1 """
+ qt_q3 """ select * from ${tableName} for version as of
${snapshotIds.s1} order by c1 """
+ qt_q4 """ select * from ${tableName} for version as of
${snapshotIds.s2} order by c1 """
+ qt_q5 """ select * from ${tableName} for version as of
${snapshotIds.s3} order by c1 """
+ qt_q6 """ select * from ${tableName} for version as of
${snapshotIds.s4} order by c1 """
+ }
+
+ executeTimeTravelingQueries("schema_change_with_time_travel")
+ executeTimeTravelingQueries("schema_change_with_time_travel_orc")
+
+}
+
+/*
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+alter table schema_change_with_time_travel add column c3 int;
+insert into schema_change_with_time_travel values (4,5,6);
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+alter table schema_change_with_time_travel add column c4 int;
+*/
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]