This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new faff952bf76 branch-3.1: [fix](iceberg)Use the correct schema for query
(#50376) (#52219)
faff952bf76 is described below
commit faff952bf761b8dcfe793eab4c4f19f315b69e15
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 1 11:38:32 2025 +0800
branch-3.1: [fix](iceberg)Use the correct schema for query (#50376)
(#52219)
bp #50376
Co-authored-by: wuwenchi <[email protected]>
---
.../create_preinstalled_scripts/iceberg/run09.sql | 37 ++
.../java/org/apache/doris/catalog/TableIf.java | 11 -
.../apache/doris/datasource/ExternalCatalog.java | 2 +
.../doris/datasource/ExternalSchemaCache.java | 8 +-
.../apache/doris/datasource/FileQueryScanNode.java | 9 +-
.../org/apache/doris/datasource/FileScanNode.java | 2 +-
.../apache/doris/datasource/hive/HMSDlaTable.java | 9 +
.../doris/datasource/hive/HMSExternalCatalog.java | 6 +
.../doris/datasource/hive/HMSExternalTable.java | 16 +-
.../doris/datasource/hive/IcebergDlaTable.java | 147 ++++++++
.../datasource/iceberg/IcebergExternalCatalog.java | 1 -
.../datasource/iceberg/IcebergExternalTable.java | 352 ++-----------------
.../datasource/iceberg/IcebergMetadataCache.java | 17 +-
.../datasource/iceberg/IcebergPartitionInfo.java | 6 +
.../doris/datasource/iceberg/IcebergUtils.java | 391 +++++++++++++++++++--
.../datasource/iceberg/source/IcebergScanNode.java | 14 +-
.../java/org/apache/doris/planner/ScanNode.java | 10 +-
.../iceberg/IcebergExternalTableTest.java | 114 ++++++
.../test_iceberg_schema_change_with_timetravel.out | Bin 0 -> 691 bytes
...st_iceberg_schema_change_with_timetravel.groovy | 87 +++++
20 files changed, 844 insertions(+), 395 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
new file mode 100644
index 00000000000..c5795883c4d
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
@@ -0,0 +1,37 @@
+use demo.test_db;
+
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+
+alter table schema_change_with_time_travel add column c3 int;
+insert into schema_change_with_time_travel values (4,5,6);
+
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+
+alter table schema_change_with_time_travel add column c4 int;
+
+
+create table schema_change_with_time_travel_orc (c1 int) tblproperties
("write.format.default"="orc");
+insert into schema_change_with_time_travel_orc values (1);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (2,3);
+
+alter table schema_change_with_time_travel_orc add column c3 int;
+insert into schema_change_with_time_travel_orc values (4,5,6);
+
+alter table schema_change_with_time_travel_orc drop column c2;
+insert into schema_change_with_time_travel_orc values (7,8);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (9,10,11);
+
+alter table schema_change_with_time_travel_orc add column c4 int;
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 78fbc77698d..7d95d959ba6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -143,17 +143,6 @@ public interface TableIf {
Column getColumn(String name);
- default int getBaseColumnIdxByName(String colName) {
- int i = 0;
- for (Column col : getBaseSchema()) {
- if (col.getName().equalsIgnoreCase(colName)) {
- return i;
- }
- ++i;
- }
- return -1;
- }
-
String getMysqlType();
String getEngine();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 25b74bb6eff..5c2857cc0b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -133,6 +133,8 @@ public abstract class ExternalCatalog
CREATE_TIME,
USE_META_CACHE);
+ protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM =
Runtime.getRuntime().availableProcessors();
+
// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index 0fb66c4e3f9..da9a25d71be 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -100,11 +100,9 @@ public class ExternalSchemaCache {
}
public void invalidateTableCache(String dbName, String tblName) {
- SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
- schemaCache.invalidate(key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName,
tblName, catalog.getName());
- }
+ schemaCache.asMap().keySet().stream()
+ .filter(key -> key.dbName.equals(dbName) &&
key.tblName.equals(tblName))
+ .forEach(schemaCache::invalidate);
}
public void invalidateDbCache(String dbName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 56fecb24afc..fc1058105f1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -252,7 +252,14 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId());
String colName = slotDesc.getColumn().getName();
- int idx = tbl.getBaseColumnIdxByName(colName);
+ int idx = -1;
+ List<Column> columns = getColumns();
+ for (int i = 0; i < columns.size(); i++) {
+ if (columns.get(i).getName().equals(colName)) {
+ idx = i;
+ break;
+ }
+ }
if (idx == -1) {
throw new UserException("Column " + colName + " not found in
table " + tbl.getName());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 8d3aeaa6a26..d2c6230ba6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -193,7 +193,7 @@ public abstract class FileScanNode extends ExternalScanNode
{
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());
- for (Column column : tbl.getBaseSchema()) {
+ for (Column column : getColumns()) {
Expr expr;
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
index 7894279b295..034ce1ae443 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
@@ -73,4 +73,13 @@ public abstract class HMSDlaTable implements MTMVBaseTableIf
{
@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}
+
+ /**
+ * If the table is supported as related table.
+ * For example, an Iceberg table may become unsupported after partition
revolution.
+ * @return
+ */
+ protected boolean isValidRelatedTable() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index f232c8a1a76..90636495335 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -180,6 +180,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
String.valueOf(Config.hive_metastore_client_timeout_second));
}
HiveMetadataOps hiveOps =
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+ threadPoolWithPreAuth =
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+ ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+ Integer.MAX_VALUE,
+ String.format("hms_iceberg_catalog_%s_executor_pool", name),
+ true,
+ preExecutionAuthenticator);
FileSystemProvider fileSystemProvider = new
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
this.bindBrokerName(),
this.catalogProperty.getHadoopProperties());
this.fileSystemExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 2a29937653d..fde8ddf3b4d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
@@ -209,7 +210,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
} else {
if (supportedIcebergTable()) {
dlaType = DLAType.ICEBERG;
- dlaTable = new HiveDlaTable(this);
+ dlaTable = new IcebergDlaTable(this);
} else if (supportedHoodieTable()) {
dlaType = DLAType.HUDI;
dlaTable = new HudiDlaTable(this);
@@ -313,6 +314,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
if (getDlaType() == DLAType.HUDI) {
return ((HudiDlaTable)
dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this))
.getSchema();
+ } else if (getDlaType() == DLAType.ICEBERG) {
+ return IcebergUtils.getIcebergSchema(this, getCatalog(),
getDbName(), getName());
}
Optional<SchemaCacheValue> schemaCacheValue =
cache.getSchemaValue(dbName, name);
return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
@@ -1047,8 +1050,12 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
if (getDlaType() == DLAType.HUDI) {
return new
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
+ } else if (getDlaType() == DLAType.ICEBERG) {
+ return new IcebergMvccSnapshot(
+ IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot,
getCatalog(), getDbName(), getName()));
+ } else {
+ return new EmptyMvccSnapshot();
}
- return new EmptyMvccSnapshot();
}
public boolean firstColumnIsString() {
@@ -1084,4 +1091,9 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return Lists.newArrayList();
}
}
+
+ public boolean isValidRelatedTable() {
+ makeSureInitialized();
+ return dlaTable.isValidRelatedTable();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
new file mode 100644
index 00000000000..36b871282a9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class IcebergDlaTable extends HMSDlaTable {
+
+ private boolean isValidRelatedTableCached = false;
+ private boolean isValidRelatedTable = false;
+
+ public IcebergDlaTable(HMSExternalTable table) {
+ super(table);
+ }
+
+ @Override
+ public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+ return Maps.newHashMap(
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName())
+ .getPartitionInfo().getNameToPartitionItem());
+ }
+
+ @Override
+ public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+ return isValidRelatedTable() ? PartitionType.RANGE :
PartitionType.UNPARTITIONED;
+ }
+
+ @Override
+ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot>
snapshot) {
+ return
getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet());
+ }
+
+ @Override
+ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName());
+ IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+ hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName(),
+ snapshotValue.getSnapshot().getSchemaId());
+ return schemaValue.getPartitionColumns();
+ }
+
+ @Override
+ public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
+ Optional<MvccSnapshot>
snapshot) throws AnalysisException {
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName());
+ long latestSnapshotId =
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
+ if (latestSnapshotId <= 0) {
+ throw new AnalysisException("can not find partition: " +
partitionName);
+ }
+ return new MTMVSnapshotIdSnapshot(latestSnapshotId);
+ }
+
+ @Override
+ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
+ throws AnalysisException {
+ hmsTable.makeSureInitialized();
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(
+ snapshot, hmsTable.getCatalog(), hmsTable.getDbName(),
hmsTable.getName());
+ return new
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
+ }
+
+ @Override
+ boolean isPartitionColumnAllowNull() {
+ return true;
+ }
+
+ @Override
+ protected boolean isValidRelatedTable() {
+ if (isValidRelatedTableCached) {
+ return isValidRelatedTable;
+ }
+ isValidRelatedTable = false;
+ Set<String> allFields = Sets.newHashSet();
+ Table table = IcebergUtils.getIcebergTable(
+ hmsTable.getCatalog(),
+ hmsTable.getDbName(),
+ hmsTable.getName()
+ );
+ for (PartitionSpec spec : table.specs().values()) {
+ if (spec == null) {
+ isValidRelatedTableCached = true;
+ return false;
+ }
+ List<PartitionField> fields = spec.fields();
+ if (fields.size() != 1) {
+ isValidRelatedTableCached = true;
+ return false;
+ }
+ PartitionField partitionField = spec.fields().get(0);
+ String transformName = partitionField.transform().toString();
+ if (!IcebergUtils.YEAR.equals(transformName)
+ && !IcebergUtils.MONTH.equals(transformName)
+ && !IcebergUtils.DAY.equals(transformName)
+ && !IcebergUtils.HOUR.equals(transformName)) {
+ isValidRelatedTableCached = true;
+ return false;
+ }
+
allFields.add(table.schema().findColumnName(partitionField.sourceId()));
+ }
+ isValidRelatedTableCached = true;
+ isValidRelatedTable = allFields.size() == 1;
+ return isValidRelatedTable;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index b695a268b0f..41149370097 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -45,7 +45,6 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
protected String icebergCatalogType;
protected Catalog catalog;
- private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;
public IcebergExternalCatalog(long catalogId, String name, String comment)
{
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index e9bffbf56e3..6990e8a7449 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -17,25 +17,18 @@
package org.apache.doris.datasource.iceberg;
-import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
-import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.CacheException;
-import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccTable;
-import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.systable.SupportedSysTables;
import org.apache.doris.datasource.systable.SysTable;
import org.apache.doris.mtmv.MTMVBaseTableIf;
@@ -52,32 +45,12 @@ import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
import com.google.common.collect.Sets;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionsTable;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructProjection;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.Month;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Comparator;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -87,15 +60,7 @@ import java.util.stream.Collectors;
public class IcebergExternalTable extends ExternalTable implements
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
- public static final String YEAR = "year";
- public static final String MONTH = "month";
- public static final String DAY = "day";
- public static final String HOUR = "hour";
- public static final String IDENTITY = "identity";
- public static final int PARTITION_DATA_ID_START = 1000; //
org.apache.iceberg.PartitionSpec
-
private Table table;
- private List<Column> partitionColumns;
private boolean isValidRelatedTableCached = false;
private boolean isValidRelatedTable = false;
@@ -120,29 +85,9 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
this.table = table;
}
- @VisibleForTesting
- public void setPartitionColumns(List<Column> partitionColumns) {
- this.partitionColumns = partitionColumns;
- }
-
@Override
public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
- table = getIcebergTable();
- List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name,
- ((IcebergSchemaCacheKey) key).getSchemaId());
- List<Column> tmpColumns = Lists.newArrayList();
- PartitionSpec spec = table.spec();
- for (PartitionField field : spec.fields()) {
- Types.NestedField col = table.schema().findField(field.sourceId());
- for (Column c : schema) {
- if (c.getName().equalsIgnoreCase(col.name())) {
- tmpColumns.add(c);
- break;
- }
- }
- }
- partitionColumns = tmpColumns;
- return Optional.of(new IcebergSchemaCacheValue(schema,
partitionColumns));
+ return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name,
((IcebergSchemaCacheKey) key).getSchemaId());
}
@Override
@@ -180,23 +125,21 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
return IcebergUtils.getIcebergTable(getCatalog(), getDbName(),
getName());
}
- private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() {
- return
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
- .getSnapshotCache(catalog, dbName, name);
- }
-
@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}
@Override
public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
- return
Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem());
+ return Maps.newHashMap(
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(),
getDbName(), getName())
+ .getPartitionInfo().getNameToPartitionItem());
}
@Override
public Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
- return
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
+ return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName())
+ .getPartitionInfo().getNameToPartitionItem();
}
@Override
@@ -211,15 +154,18 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
- IcebergSnapshotCacheValue snapshotValue =
getOrFetchSnapshotCacheValue(snapshot);
- IcebergSchemaCacheValue schemaValue =
getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId());
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName());
+ IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+ catalog, getDbName(), getName(),
snapshotValue.getSnapshot().getSchemaId());
return schemaValue.getPartitionColumns();
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
Optional<MvccSnapshot>
snapshot) throws AnalysisException {
- IcebergSnapshotCacheValue snapshotValue =
getOrFetchSnapshotCacheValue(snapshot);
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName());
long latestSnapshotId =
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
if (latestSnapshotId <= 0) {
throw new AnalysisException("can not find partition: " +
partitionName);
@@ -231,7 +177,8 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
makeSureInitialized();
- IcebergSnapshotCacheValue snapshotValue =
getOrFetchSnapshotCacheValue(snapshot);
+ IcebergSnapshotCacheValue snapshotValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshot,
getCatalog(), getDbName(), getName());
return new
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}
@@ -266,10 +213,10 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
}
PartitionField partitionField = spec.fields().get(0);
String transformName = partitionField.transform().toString();
- if (!YEAR.equals(transformName)
- && !MONTH.equals(transformName)
- && !DAY.equals(transformName)
- && !HOUR.equals(transformName)) {
+ if (!IcebergUtils.YEAR.equals(transformName)
+ && !IcebergUtils.MONTH.equals(transformName)
+ && !IcebergUtils.DAY.equals(transformName)
+ && !IcebergUtils.HOUR.equals(transformName)) {
isValidRelatedTableCached = true;
return false;
}
@@ -282,27 +229,13 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
@Override
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
- return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
- }
-
- public long getLatestSnapshotId() {
- table = getIcebergTable();
- Snapshot snapshot = table.currentSnapshot();
- return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID :
table.currentSnapshot().snapshotId();
- }
-
- public long getSchemaId(long snapshotId) {
- table = getIcebergTable();
- return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID
- ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
- : table.snapshot(snapshotId).schemaId();
+ return new
IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue(
+ tableSnapshot, getCatalog(), getDbName(), getName()));
}
@Override
public List<Column> getFullSchema() {
- Optional<MvccSnapshot> snapshotFromContext =
MvccUtil.getSnapshotFromContext(this);
- IcebergSnapshotCacheValue cacheValue =
getOrFetchSnapshotCacheValue(snapshotFromContext);
- return
getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema();
+ return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(),
getName());
}
@Override
@@ -310,239 +243,6 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
return true;
}
- public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) {
- ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
- Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
- new IcebergSchemaCacheKey(dbName, name, schemaId));
- if (!schemaCacheValue.isPresent()) {
- throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
- null, catalog.getName(), dbName, name, schemaId);
- }
- return (IcebergSchemaCacheValue) schemaCacheValue.get();
- }
-
- public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws
AnalysisException {
- // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table,
haven't contained any snapshot yet.
- if (!isValidRelatedTable() || snapshotId ==
IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
- return new IcebergPartitionInfo();
- }
- List<IcebergPartition> icebergPartitions =
loadIcebergPartition(snapshotId);
- Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
- Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
- table = getIcebergTable();
- partitionColumns =
getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns();
- for (IcebergPartition partition : icebergPartitions) {
- nameToPartition.put(partition.getPartitionName(), partition);
- String transform =
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
- Range<PartitionKey> partitionRange = getPartitionRange(
- partition.getPartitionValues().get(0), transform,
partitionColumns);
- PartitionItem item = new RangePartitionItem(partitionRange);
- nameToPartitionItem.put(partition.getPartitionName(), item);
- }
- Map<String, Set<String>> partitionNameMap =
mergeOverlapPartitions(nameToPartitionItem);
- return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition,
partitionNameMap);
- }
-
- public List<IcebergPartition> loadIcebergPartition(long snapshotId) {
- PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
- .createMetadataTableInstance(table,
MetadataTableType.PARTITIONS);
- List<IcebergPartition> partitions = Lists.newArrayList();
- try (CloseableIterable<FileScanTask> tasks =
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
- for (FileScanTask task : tasks) {
- CloseableIterable<StructLike> rows = task.asDataTask().rows();
- for (StructLike row : rows) {
- partitions.add(generateIcebergPartition(row));
- }
- }
- } catch (IOException e) {
- LOG.warn("Failed to get Iceberg table {} partition info.", name,
e);
- }
- return partitions;
- }
-
- public IcebergPartition generateIcebergPartition(StructLike row) {
- // row format :
- // 0. partitionData,
- // 1. spec_id,
- // 2. record_count,
- // 3. file_count,
- // 4. total_data_file_size_in_bytes,
- // 5. position_delete_record_count,
- // 6. position_delete_file_count,
- // 7. equality_delete_record_count,
- // 8. equality_delete_file_count,
- // 9. last_updated_at,
- // 10. last_updated_snapshot_id
- table = getIcebergTable();
- Preconditions.checkState(!table.spec().fields().isEmpty(),
table.name() + " is not a partition table.");
- int specId = row.get(1, Integer.class);
- PartitionSpec partitionSpec = table.specs().get(specId);
- StructProjection partitionData = row.get(0, StructProjection.class);
- StringBuilder sb = new StringBuilder();
- List<String> partitionValues = Lists.newArrayList();
- List<String> transforms = Lists.newArrayList();
- for (int i = 0; i < partitionSpec.fields().size(); ++i) {
- PartitionField partitionField = partitionSpec.fields().get(i);
- Class<?> fieldClass = partitionSpec.javaClasses()[i];
- int fieldId = partitionField.fieldId();
- // Iceberg partition field id starts at PARTITION_DATA_ID_START,
- // So we can get the field index in partitionData using fieldId -
PARTITION_DATA_ID_START
- int index = fieldId - PARTITION_DATA_ID_START;
- Object o = partitionData.get(index, fieldClass);
- String fieldValue = o == null ? null : o.toString();
- String fieldName = partitionField.name();
- sb.append(fieldName);
- sb.append("=");
- sb.append(fieldValue);
- sb.append("/");
- partitionValues.add(fieldValue);
- transforms.add(partitionField.transform().toString());
- }
- if (sb.length() > 0) {
- sb.delete(sb.length() - 1, sb.length());
- }
- String partitionName = sb.toString();
- long recordCount = row.get(2, Long.class);
- long fileCount = row.get(3, Integer.class);
- long fileSizeInBytes = row.get(4, Long.class);
- long lastUpdateTime = row.get(9, Long.class);
- long lastUpdateSnapShotId = row.get(10, Long.class);
- return new IcebergPartition(partitionName, specId, recordCount,
fileSizeInBytes, fileCount,
- lastUpdateTime, lastUpdateSnapShotId, partitionValues,
transforms);
- }
-
- @VisibleForTesting
- public Range<PartitionKey> getPartitionRange(String value, String
transform, List<Column> partitionColumns)
- throws AnalysisException {
- // For NULL value, create a minimum partition for it.
- if (value == null) {
- PartitionKey nullLowKey = PartitionKey.createPartitionKey(
- Lists.newArrayList(new PartitionValue("0000-01-01")),
partitionColumns);
- PartitionKey nullUpKey = nullLowKey.successor();
- return Range.closedOpen(nullLowKey, nullUpKey);
- }
- LocalDateTime epoch =
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
- LocalDateTime target;
- LocalDateTime lower;
- LocalDateTime upper;
- long longValue = Long.parseLong(value);
- switch (transform) {
- case HOUR:
- target = epoch.plusHours(longValue);
- lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(),
- target.getHour(), 0, 0);
- upper = lower.plusHours(1);
- break;
- case DAY:
- target = epoch.plusDays(longValue);
- lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(), 0, 0, 0);
- upper = lower.plusDays(1);
- break;
- case MONTH:
- target = epoch.plusMonths(longValue);
- lower = LocalDateTime.of(target.getYear(), target.getMonth(),
1, 0, 0, 0);
- upper = lower.plusMonths(1);
- break;
- case YEAR:
- target = epoch.plusYears(longValue);
- lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1,
0, 0, 0);
- upper = lower.plusYears(1);
- break;
- default:
- throw new RuntimeException("Unsupported transform " +
transform);
- }
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
- Column c = partitionColumns.get(0);
- Preconditions.checkState(c.getDataType().isDateType(), "Only support
date type partition column");
- if (c.getType().isDate() || c.getType().isDateV2()) {
- formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- }
- PartitionValue lowerValue = new
PartitionValue(lower.format(formatter));
- PartitionValue upperValue = new
PartitionValue(upper.format(formatter));
- PartitionKey lowKey =
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue),
partitionColumns);
- PartitionKey upperKey =
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue),
partitionColumns);
- return Range.closedOpen(lowKey, upperKey);
- }
-
- /**
- * Merge overlapped iceberg partitions into one Doris partition.
- */
- public Map<String, Set<String>> mergeOverlapPartitions(Map<String,
PartitionItem> originPartitions) {
- List<Map.Entry<String, PartitionItem>> entries =
sortPartitionMap(originPartitions);
- Map<String, Set<String>> map = Maps.newHashMap();
- for (int i = 0; i < entries.size() - 1; i++) {
- Range<PartitionKey> firstValue =
entries.get(i).getValue().getItems();
- String firstKey = entries.get(i).getKey();
- Range<PartitionKey> secondValue = entries.get(i +
1).getValue().getItems();
- String secondKey = entries.get(i + 1).getKey();
- // If the first entry enclose the second one, remove the second
entry and keep a record in the return map.
- // So we can track the iceberg partitions those contained by one
Doris partition.
- while (i < entries.size() && firstValue.encloses(secondValue)) {
- originPartitions.remove(secondKey);
- map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
- String finalSecondKey = secondKey;
- map.computeIfPresent(firstKey, (key, value) -> {
- value.add(finalSecondKey);
- return value;
- });
- i++;
- if (i >= entries.size() - 1) {
- break;
- }
- secondValue = entries.get(i + 1).getValue().getItems();
- secondKey = entries.get(i + 1).getKey();
- }
- }
- return map;
- }
-
- /**
- * Sort the given map entries by PartitionItem Range(LOW, HIGH)
- * When comparing two ranges, the one with smaller LOW value is smaller
than the other one.
- * If two ranges have same values of LOW, the one with larger HIGH value
is smaller.
- *
- * For now, we only support year, month, day and hour,
- * so it is impossible to have two partially intersect partitions.
- * One range is either enclosed by another or has no intersection at all
with another.
- *
- *
- * For example, we have these 4 ranges:
- * [10, 20), [30, 40), [0, 30), [10, 15)
- *
- * After sort, they become:
- * [0, 30), [10, 20), [10, 15), [30, 40)
- */
- public List<Map.Entry<String, PartitionItem>> sortPartitionMap(Map<String,
PartitionItem> originPartitions) {
- List<Map.Entry<String, PartitionItem>> entries = new
ArrayList<>(originPartitions.entrySet());
- entries.sort(new RangeComparator());
- return entries;
- }
-
- public static class RangeComparator implements
Comparator<Map.Entry<String, PartitionItem>> {
- @Override
- public int compare(Map.Entry<String, PartitionItem> p1,
Map.Entry<String, PartitionItem> p2) {
- PartitionItem value1 = p1.getValue();
- PartitionItem value2 = p2.getValue();
- if (value1 instanceof RangePartitionItem && value2 instanceof
RangePartitionItem) {
- Range<PartitionKey> items1 = value1.getItems();
- Range<PartitionKey> items2 = value2.getItems();
- if (!items1.hasLowerBound()) {
- return -1;
- }
- if (!items2.hasLowerBound()) {
- return 1;
- }
- PartitionKey upper1 = items1.upperEndpoint();
- PartitionKey lower1 = items1.lowerEndpoint();
- PartitionKey upper2 = items2.upperEndpoint();
- PartitionKey lower2 = items2.lowerEndpoint();
- int compareLow = lower1.compareTo(lower2);
- return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
- }
- return 0;
- }
- }
-
@VisibleForTesting
public boolean isValidRelatedTableCached() {
return isValidRelatedTableCached;
@@ -557,14 +257,6 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
this.isValidRelatedTableCached = isCached;
}
- public IcebergSnapshotCacheValue
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
- if (snapshot.isPresent()) {
- return ((IcebergMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
- } else {
- return getIcebergSnapshotCacheValue();
- }
- }
-
@Override
public List<SysTable> getSupportedSysTables() {
makeSureInitialized();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index b776a8c3a47..f99b652b42d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -24,6 +24,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Iterables;
@@ -123,12 +124,18 @@ public class IcebergMetadataCache {
@NotNull
private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey
key) throws AnalysisException {
- IcebergExternalTable table = (IcebergExternalTable)
key.catalog.getDbOrAnalysisException(key.dbName)
+ MTMVRelatedTableIf table = (MTMVRelatedTableIf)
key.catalog.getDbOrAnalysisException(key.dbName)
.getTableOrAnalysisException(key.tableName);
- long snapshotId = table.getLatestSnapshotId();
- long schemaId = table.getSchemaId(snapshotId);
- IcebergPartitionInfo icebergPartitionInfo =
table.loadPartitionInfo(snapshotId);
- return new IcebergSnapshotCacheValue(icebergPartitionInfo, new
IcebergSnapshot(snapshotId, schemaId));
+ IcebergSnapshot lastedIcebergSnapshot =
IcebergUtils.getLastedIcebergSnapshot(
+ (ExternalCatalog) key.catalog, key.dbName, key.tableName);
+ IcebergPartitionInfo icebergPartitionInfo;
+ if (!table.isValidRelatedTable()) {
+ icebergPartitionInfo = IcebergPartitionInfo.empty();
+ } else {
+ icebergPartitionInfo = IcebergUtils.loadPartitionInfo(
+ (ExternalCatalog) key.catalog, key.dbName, key.tableName,
lastedIcebergSnapshot.getSnapshotId());
+ }
+ return new IcebergSnapshotCacheValue(icebergPartitionInfo,
lastedIcebergSnapshot);
}
public void invalidateCatalogCache(long catalogId) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
index 9edb2137f4f..bb46eb21223 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java
@@ -29,12 +29,18 @@ public class IcebergPartitionInfo {
private final Map<String, IcebergPartition> nameToIcebergPartition;
private final Map<String, Set<String>> nameToIcebergPartitionNames;
+ private static final IcebergPartitionInfo EMPTY = new
IcebergPartitionInfo();
+
public IcebergPartitionInfo() {
this.nameToPartitionItem = Maps.newHashMap();
this.nameToIcebergPartition = Maps.newHashMap();
this.nameToIcebergPartitionNames = Maps.newHashMap();
}
+ static IcebergPartitionInfo empty() {
+ return EMPTY;
+ }
+
public IcebergPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
Map<String, IcebergPartition>
nameToIcebergPartition,
Map<String, Set<String>>
nameToIcebergPartitionNames) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index f61c226ed75..ab91d9c3d90 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -31,39 +31,58 @@ import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.And;
@@ -79,14 +98,25 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.StructProjection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -117,7 +147,15 @@ public class IcebergUtils {
// nickname in spark
public static final String SPARK_SQL_COMPRESSION_CODEC =
"spark.sql.iceberg.compression-codec";
- public static final long UNKNOWN_SNAPSHOT_ID = -1;
+ public static final long UNKNOWN_SNAPSHOT_ID = -1; // means an empty table
+ public static final long NEWEST_SCHEMA_ID = -1;
+
+ public static final String YEAR = "year";
+ public static final String MONTH = "month";
+ public static final String DAY = "day";
+ public static final String HOUR = "hour";
+ public static final String IDENTITY = "identity";
+ public static final int PARTITION_DATA_ID_START = 1000; //
org.apache.iceberg.PartitionSpec
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
@@ -582,10 +620,6 @@ public class IcebergUtils {
: metadataCache.getIcebergTable(catalog, dbName, tblName);
}
- public static List<Column> getSchema(ExternalCatalog catalog, String
dbName, String name) {
- return getSchema(catalog, dbName, name, UNKNOWN_SNAPSHOT_ID);
- }
-
/**
* Get iceberg schema from catalog and convert them to doris schema
*/
@@ -594,7 +628,7 @@ public class IcebergUtils {
return catalog.getPreExecutionAuthenticator().execute(() -> {
org.apache.iceberg.Table icebergTable =
getIcebergTable(catalog, dbName, name);
Schema schema;
- if (schemaId == UNKNOWN_SNAPSHOT_ID ||
icebergTable.currentSnapshot() == null) {
+ if (schemaId == NEWEST_SCHEMA_ID ||
icebergTable.currentSnapshot() == null) {
schema = icebergTable.schema();
} else {
schema = icebergTable.schemas().get((int) schemaId);
@@ -725,25 +759,6 @@ public class IcebergUtils {
return hiveCatalog;
}
- // load table schema from iceberg API to external schema cache.
- public static Optional<SchemaCacheValue> loadSchemaCacheValue(
- ExternalCatalog catalog, String dbName, String tbName, long
schemaId) {
- Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
- List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName,
schemaId);
- List<Column> tmpColumns = Lists.newArrayList();
- PartitionSpec spec = table.spec();
- for (PartitionField field : spec.fields()) {
- Types.NestedField col = table.schema().findField(field.sourceId());
- for (Column c : schema) {
- if (c.getName().equalsIgnoreCase(col.name())) {
- tmpColumns.add(c);
- break;
- }
- }
- }
- return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
- }
-
// Retrieve the manifest files that match the query based on partitions in
filter
public static CloseableIterable<ManifestFile> getMatchingManifest(
List<ManifestFile> dataManifests,
@@ -772,4 +787,330 @@ public class IcebergUtils {
return matchingManifests;
}
+
+ // get snapshot id from query like 'for version/time as of'
+ public static long getQuerySpecSnapshot(Table table, TableSnapshot
queryTableSnapshot) {
+ TableSnapshot.VersionType type = queryTableSnapshot.getType();
+ if (type == TableSnapshot.VersionType.VERSION) {
+ return queryTableSnapshot.getVersion();
+ } else {
+ long timestamp =
TimeUtils.timeStringToLong(queryTableSnapshot.getTime(),
TimeUtils.getTimeZone());
+ if (timestamp < 0) {
+ throw new DateTimeException("can't parse time: " +
queryTableSnapshot.getTime());
+ }
+ return SnapshotUtil.snapshotIdAsOfTime(table, timestamp);
+ }
+ }
+
+ // read schema from external schema cache
+ public static IcebergSchemaCacheValue getSchemaCacheValue(
+ ExternalCatalog catalog, String dbName, String name, long
schemaId) {
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+ Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+ new IcebergSchemaCacheKey(dbName, name, schemaId));
+ if (!schemaCacheValue.isPresent()) {
+ throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+ null, catalog.getName(), dbName, name, schemaId);
+ }
+ return (IcebergSchemaCacheValue) schemaCacheValue.get();
+ }
+
+ public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog
catalog, String dbName, String tbName) {
+ Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+ Snapshot snapshot = table.currentSnapshot();
+ long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
: snapshot.snapshotId();
+ return new IcebergSnapshot(snapshotId, table.schema().schemaId());
+ }
+
+ public static IcebergPartitionInfo loadPartitionInfo(
+ ExternalCatalog catalog, String dbName, String tbName, long
snapshotId) throws AnalysisException {
+ // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table,
haven't contained any snapshot yet.
+ if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
+ return IcebergPartitionInfo.empty();
+ }
+ Table table = getIcebergTable(catalog, dbName, tbName);
+ List<IcebergPartition> icebergPartitions = loadIcebergPartition(table,
snapshotId);
+ Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
+ Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+
+ List<Column> partitionColumns = IcebergUtils.getSchemaCacheValue(
+ catalog, dbName, tbName,
table.snapshot(snapshotId).schemaId()).getPartitionColumns();
+ for (IcebergPartition partition : icebergPartitions) {
+ nameToPartition.put(partition.getPartitionName(), partition);
+ String transform =
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
+ Range<PartitionKey> partitionRange = getPartitionRange(
+ partition.getPartitionValues().get(0), transform,
partitionColumns);
+ PartitionItem item = new RangePartitionItem(partitionRange);
+ nameToPartitionItem.put(partition.getPartitionName(), item);
+ }
+ Map<String, Set<String>> partitionNameMap =
mergeOverlapPartitions(nameToPartitionItem);
+ return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition,
partitionNameMap);
+ }
+
+ private static List<IcebergPartition> loadIcebergPartition(Table table,
long snapshotId) {
+ PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
+ .createMetadataTableInstance(table,
MetadataTableType.PARTITIONS);
+ List<IcebergPartition> partitions = Lists.newArrayList();
+ try (CloseableIterable<FileScanTask> tasks =
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
+ for (FileScanTask task : tasks) {
+ CloseableIterable<StructLike> rows = task.asDataTask().rows();
+ for (StructLike row : rows) {
+ partitions.add(generateIcebergPartition(table, row));
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to get Iceberg table {} partition info.",
table.name(), e);
+ }
+ return partitions;
+ }
+
+ private static IcebergPartition generateIcebergPartition(Table table,
StructLike row) {
+ // row format :
+ // 0. partitionData,
+ // 1. spec_id,
+ // 2. record_count,
+ // 3. file_count,
+ // 4. total_data_file_size_in_bytes,
+ // 5. position_delete_record_count,
+ // 6. position_delete_file_count,
+ // 7. equality_delete_record_count,
+ // 8. equality_delete_file_count,
+ // 9. last_updated_at,
+ // 10. last_updated_snapshot_id
+ Preconditions.checkState(!table.spec().fields().isEmpty(),
table.name() + " is not a partition table.");
+ int specId = row.get(1, Integer.class);
+ PartitionSpec partitionSpec = table.specs().get(specId);
+ StructProjection partitionData = row.get(0, StructProjection.class);
+ StringBuilder sb = new StringBuilder();
+ List<String> partitionValues = Lists.newArrayList();
+ List<String> transforms = Lists.newArrayList();
+ for (int i = 0; i < partitionSpec.fields().size(); ++i) {
+ PartitionField partitionField = partitionSpec.fields().get(i);
+ Class<?> fieldClass = partitionSpec.javaClasses()[i];
+ int fieldId = partitionField.fieldId();
+ // Iceberg partition field id starts at PARTITION_DATA_ID_START,
+ // So we can get the field index in partitionData using fieldId -
PARTITION_DATA_ID_START
+ int index = fieldId - PARTITION_DATA_ID_START;
+ Object o = partitionData.get(index, fieldClass);
+ String fieldValue = o == null ? null : o.toString();
+ String fieldName = partitionField.name();
+ sb.append(fieldName);
+ sb.append("=");
+ sb.append(fieldValue);
+ sb.append("/");
+ partitionValues.add(fieldValue);
+ transforms.add(partitionField.transform().toString());
+ }
+ if (sb.length() > 0) {
+ sb.delete(sb.length() - 1, sb.length());
+ }
+ String partitionName = sb.toString();
+ long recordCount = row.get(2, Long.class);
+ long fileCount = row.get(3, Integer.class);
+ long fileSizeInBytes = row.get(4, Long.class);
+ long lastUpdateTime = row.get(9, Long.class);
+ long lastUpdateSnapShotId = row.get(10, Long.class);
+ return new IcebergPartition(partitionName, specId, recordCount,
fileSizeInBytes, fileCount,
+ lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms);
+ }
+
+ @VisibleForTesting
+ public static Range<PartitionKey> getPartitionRange(String value, String
transform, List<Column> partitionColumns)
+ throws AnalysisException {
+ // For NULL value, create a minimum partition for it.
+ if (value == null) {
+ PartitionKey nullLowKey = PartitionKey.createPartitionKey(
+ Lists.newArrayList(new PartitionValue("0000-01-01")),
partitionColumns);
+ PartitionKey nullUpKey = nullLowKey.successor();
+ return Range.closedOpen(nullLowKey, nullUpKey);
+ }
+ LocalDateTime epoch =
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
+ LocalDateTime target;
+ LocalDateTime lower;
+ LocalDateTime upper;
+ long longValue = Long.parseLong(value);
+ switch (transform) {
+ case HOUR:
+ target = epoch.plusHours(longValue);
+ lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(),
+ target.getHour(), 0, 0);
+ upper = lower.plusHours(1);
+ break;
+ case DAY:
+ target = epoch.plusDays(longValue);
+ lower = LocalDateTime.of(target.getYear(), target.getMonth(),
target.getDayOfMonth(), 0, 0, 0);
+ upper = lower.plusDays(1);
+ break;
+ case MONTH:
+ target = epoch.plusMonths(longValue);
+ lower = LocalDateTime.of(target.getYear(), target.getMonth(),
1, 0, 0, 0);
+ upper = lower.plusMonths(1);
+ break;
+ case YEAR:
+ target = epoch.plusYears(longValue);
+ lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1,
0, 0, 0);
+ upper = lower.plusYears(1);
+ break;
+ default:
+ throw new RuntimeException("Unsupported transform " +
transform);
+ }
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
+ Column c = partitionColumns.get(0);
+ Preconditions.checkState(c.getDataType().isDateType(), "Only support
date type partition column");
+ if (c.getType().isDate() || c.getType().isDateV2()) {
+ formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ }
+ PartitionValue lowerValue = new
PartitionValue(lower.format(formatter));
+ PartitionValue upperValue = new
PartitionValue(upper.format(formatter));
+ PartitionKey lowKey =
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue),
partitionColumns);
+ PartitionKey upperKey =
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue),
partitionColumns);
+ return Range.closedOpen(lowKey, upperKey);
+ }
+
+ /**
+ * Merge overlapped iceberg partitions into one Doris partition.
+ */
+ @VisibleForTesting
+ public static Map<String, Set<String>> mergeOverlapPartitions(Map<String,
PartitionItem> originPartitions) {
+ List<Map.Entry<String, PartitionItem>> entries =
sortPartitionMap(originPartitions);
+ Map<String, Set<String>> map = Maps.newHashMap();
+ for (int i = 0; i < entries.size() - 1; i++) {
+ Range<PartitionKey> firstValue =
entries.get(i).getValue().getItems();
+ String firstKey = entries.get(i).getKey();
+ Range<PartitionKey> secondValue = entries.get(i +
1).getValue().getItems();
+ String secondKey = entries.get(i + 1).getKey();
+ // If the first entry enclose the second one, remove the second
entry and keep a record in the return map.
+ // So we can track the iceberg partitions those contained by one
Doris partition.
+ while (i < entries.size() && firstValue.encloses(secondValue)) {
+ originPartitions.remove(secondKey);
+ map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
+ String finalSecondKey = secondKey;
+ map.computeIfPresent(firstKey, (key, value) -> {
+ value.add(finalSecondKey);
+ return value;
+ });
+ i++;
+ if (i >= entries.size() - 1) {
+ break;
+ }
+ secondValue = entries.get(i + 1).getValue().getItems();
+ secondKey = entries.get(i + 1).getKey();
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Sort the given map entries by PartitionItem Range(LOW, HIGH)
+ * When comparing two ranges, the one with smaller LOW value is smaller
than the other one.
+ * If two ranges have same values of LOW, the one with larger HIGH value
is smaller.
+ *
+ * For now, we only support year, month, day and hour,
+ * so it is impossible to have two partially intersect partitions.
+ * One range is either enclosed by another or has no intersection at all
with another.
+ *
+ *
+ * For example, we have these 4 ranges:
+ * [10, 20), [30, 40), [0, 30), [10, 15)
+ *
+ * After sort, they become:
+ * [0, 30), [10, 20), [10, 15), [30, 40)
+ */
+ @VisibleForTesting
+ public static List<Map.Entry<String, PartitionItem>>
sortPartitionMap(Map<String, PartitionItem> originPartitions) {
+ List<Map.Entry<String, PartitionItem>> entries = new
ArrayList<>(originPartitions.entrySet());
+ entries.sort(new RangeComparator());
+ return entries;
+ }
+
+ public static class RangeComparator implements
Comparator<Map.Entry<String, PartitionItem>> {
+ @Override
+ public int compare(Map.Entry<String, PartitionItem> p1,
Map.Entry<String, PartitionItem> p2) {
+ PartitionItem value1 = p1.getValue();
+ PartitionItem value2 = p2.getValue();
+ if (value1 instanceof RangePartitionItem && value2 instanceof
RangePartitionItem) {
+ Range<PartitionKey> items1 = value1.getItems();
+ Range<PartitionKey> items2 = value2.getItems();
+ if (!items1.hasLowerBound()) {
+ return -1;
+ }
+ if (!items2.hasLowerBound()) {
+ return 1;
+ }
+ PartitionKey upper1 = items1.upperEndpoint();
+ PartitionKey lower1 = items1.lowerEndpoint();
+ PartitionKey upper2 = items2.upperEndpoint();
+ PartitionKey lower2 = items2.lowerEndpoint();
+ int compareLow = lower1.compareTo(lower2);
+ return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
+ }
+ return 0;
+ }
+ }
+
+ public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue(
+ Optional<TableSnapshot> tableSnapshot,
+ ExternalCatalog catalog,
+ String dbName,
+ String tbName) {
+ IcebergSnapshotCacheValue snapshotCache =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+ .getSnapshotCache(catalog, dbName, tbName);
+ if (tableSnapshot.isPresent()) {
+ // If a snapshot is specified,
+ // use the specified snapshot and the corresponding schema(not the
latest schema).
+ Table icebergTable = getIcebergTable(catalog, dbName, tbName);
+ TableSnapshot snapshot = tableSnapshot.get();
+ long querySpecSnapshot = getQuerySpecSnapshot(icebergTable,
snapshot);
+ return new IcebergSnapshotCacheValue(
+ IcebergPartitionInfo.empty(),
+ new IcebergSnapshot(querySpecSnapshot,
icebergTable.snapshot(querySpecSnapshot).schemaId()));
+ } else {
+ // Otherwise, use the latest snapshot and the latest schema.
+ return snapshotCache;
+ }
+ }
+
+ // load table schema from iceberg API to external schema cache.
+ public static Optional<SchemaCacheValue> loadSchemaCacheValue(
+ ExternalCatalog catalog, String dbName, String tbName, long
schemaId) {
+ Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+ List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName,
schemaId);
+ List<Column> tmpColumns = Lists.newArrayList();
+ PartitionSpec spec = table.spec();
+ for (PartitionField field : spec.fields()) {
+ Types.NestedField col = table.schema().findField(field.sourceId());
+ for (Column c : schema) {
+ if (c.getName().equalsIgnoreCase(col.name())) {
+ tmpColumns.add(c);
+ break;
+ }
+ }
+ }
+ return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
+ }
+
+ public static List<Column> getIcebergSchema(
+ TableIf tableIf,
+ ExternalCatalog catalog,
+ String dbName,
+ String tbName) {
+ Optional<MvccSnapshot> snapshotFromContext =
MvccUtil.getSnapshotFromContext(tableIf);
+ IcebergSnapshotCacheValue cacheValue =
+ IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext,
catalog, dbName, tbName);
+ return IcebergUtils.getSchemaCacheValue(
+ catalog, dbName, tbName,
cacheValue.getSnapshot().getSchemaId())
+ .getSchema();
+ }
+
+ public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(
+ Optional<MvccSnapshot> snapshot,
+ ExternalCatalog catalog,
+ String dbName,
+ String tbName) {
+ if (snapshot.isPresent()) {
+ return ((IcebergMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
+ } else {
+ return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(),
catalog, dbName, tbName);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 34efa59a459..d2f31214b0b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
@@ -68,13 +67,11 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
-import java.time.DateTimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -446,16 +443,7 @@ public class IcebergScanNode extends FileQueryScanNode {
public Long getSpecifiedSnapshot() {
TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
- TableSnapshot.VersionType type = tableSnapshot.getType();
- if (type == TableSnapshot.VersionType.VERSION) {
- return tableSnapshot.getVersion();
- } else {
- long timestamp =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
- if (timestamp < 0) {
- throw new DateTimeException("can't parse time: " +
tableSnapshot.getTime());
- }
- return SnapshotUtil.snapshotIdAsOfTime(icebergTable,
timestamp);
- }
+ return IcebergUtils.getQuerySpecSnapshot(icebergTable,
tableSnapshot);
}
return null;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index e68ab2476d9..edc54e230be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -110,6 +110,7 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
protected TableSnapshot tableSnapshot;
+ protected List<Column> columns;
// Save the id of backends which this scan node will be executed on.
// This is also important for local shuffle logic.
@@ -141,6 +142,13 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
return result;
}
+ protected List<Column> getColumns() {
+ if (columns == null && desc.getTable() != null) {
+ columns = desc.getTable().getBaseSchema();
+ }
+ return columns;
+ }
+
public TupleDescriptor getTupleDesc() {
return desc;
}
@@ -233,7 +241,7 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
// for load scan node, table is null
// partitionsInfo maybe null for other scan node, eg:
ExternalScanNode...
if (desc.getTable() != null) {
- computeColumnsFilter(desc.getTable().getBaseSchema(),
partitionsInfo);
+ computeColumnsFilter(getColumns(), partitionsInfo);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
index 251f4d8f6b6..7408898ab74 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
@@ -17,8 +17,16 @@
package org.apache.doris.datasource.iceberg;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.RangePartitionItem;
+import org.apache.doris.common.AnalysisException;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -32,8 +40,10 @@ import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class IcebergExternalTableTest {
@@ -136,4 +146,108 @@ public class IcebergExternalTableTest {
Assertions.assertTrue(spyTable.isValidRelatedTableCached());
Assertions.assertTrue(spyTable.validRelatedTableCache());
}
+
+ @Test
+ public void testGetPartitionRange() throws AnalysisException {
+ Column c = new Column("ts", PrimitiveType.DATETIMEV2);
+ List<Column> partitionColumns = Lists.newArrayList(c);
+
+ // Test null partition value
+ Range<PartitionKey> nullRange = IcebergUtils.getPartitionRange(null,
"hour", partitionColumns);
+ Assertions.assertEquals("0000-01-01 00:00:00",
+
nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0));
+ Assertions.assertEquals("0000-01-01 00:00:01",
+
nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0));
+
+ // Test hour transform.
+ Range<PartitionKey> hour = IcebergUtils.getPartitionRange("100",
"hour", partitionColumns);
+ PartitionKey lowKey = hour.lowerEndpoint();
+ PartitionKey upKey = hour.upperEndpoint();
+ Assertions.assertEquals("1970-01-05 04:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
+ Assertions.assertEquals("1970-01-05 05:00:00",
upKey.getPartitionValuesAsStringList().get(0));
+
+ // Test day transform.
+ Range<PartitionKey> day = IcebergUtils.getPartitionRange("100", "day",
partitionColumns);
+ lowKey = day.lowerEndpoint();
+ upKey = day.upperEndpoint();
+ Assertions.assertEquals("1970-04-11 00:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
+ Assertions.assertEquals("1970-04-12 00:00:00",
upKey.getPartitionValuesAsStringList().get(0));
+
+ // Test month transform.
+ Range<PartitionKey> month = IcebergUtils.getPartitionRange("100",
"month", partitionColumns);
+ lowKey = month.lowerEndpoint();
+ upKey = month.upperEndpoint();
+ Assertions.assertEquals("1978-05-01 00:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
+ Assertions.assertEquals("1978-06-01 00:00:00",
upKey.getPartitionValuesAsStringList().get(0));
+
+ // Test year transform.
+ Range<PartitionKey> year = IcebergUtils.getPartitionRange("100",
"year", partitionColumns);
+ lowKey = year.lowerEndpoint();
+ upKey = year.upperEndpoint();
+ Assertions.assertEquals("2070-01-01 00:00:00",
lowKey.getPartitionValuesAsStringList().get(0));
+ Assertions.assertEquals("2071-01-01 00:00:00",
upKey.getPartitionValuesAsStringList().get(0));
+
+ // Test unsupported transform
+ Exception exception = Assertions.assertThrows(RuntimeException.class,
() -> {
+ IcebergUtils.getPartitionRange("100", "bucket", partitionColumns);
+ });
+ Assertions.assertEquals("Unsupported transform bucket",
exception.getMessage());
+ }
+
+ @Test
+ public void testSortRange() throws AnalysisException {
+ Column c = new Column("c", PrimitiveType.DATETIMEV2);
+ ArrayList<Column> columns = Lists.newArrayList(c);
+ PartitionItem nullRange = new
RangePartitionItem(IcebergUtils.getPartitionRange(null, "hour", columns));
+ PartitionItem year1970 = new
RangePartitionItem(IcebergUtils.getPartitionRange("0", "year", columns));
+ PartitionItem year1971 = new
RangePartitionItem(IcebergUtils.getPartitionRange("1", "year", columns));
+ PartitionItem month197002 = new
RangePartitionItem(IcebergUtils.getPartitionRange("1", "month", columns));
+ PartitionItem month197103 = new
RangePartitionItem(IcebergUtils.getPartitionRange("14", "month", columns));
+ PartitionItem month197204 = new
RangePartitionItem(IcebergUtils.getPartitionRange("27", "month", columns));
+ PartitionItem day19700202 = new
RangePartitionItem(IcebergUtils.getPartitionRange("32", "day", columns));
+ PartitionItem day19730101 = new
RangePartitionItem(IcebergUtils.getPartitionRange("1096", "day", columns));
+ Map<String, PartitionItem> map = Maps.newHashMap();
+ map.put("nullRange", nullRange);
+ map.put("year1970", year1970);
+ map.put("year1971", year1971);
+ map.put("month197002", month197002);
+ map.put("month197103", month197103);
+ map.put("month197204", month197204);
+ map.put("day19700202", day19700202);
+ map.put("day19730101", day19730101);
+ List<Map.Entry<String, PartitionItem>> entries =
IcebergUtils.sortPartitionMap(map);
+ Assertions.assertEquals(8, entries.size());
+ Assertions.assertEquals("nullRange", entries.get(0).getKey());
+ Assertions.assertEquals("year1970", entries.get(1).getKey());
+ Assertions.assertEquals("month197002", entries.get(2).getKey());
+ Assertions.assertEquals("day19700202", entries.get(3).getKey());
+ Assertions.assertEquals("year1971", entries.get(4).getKey());
+ Assertions.assertEquals("month197103", entries.get(5).getKey());
+ Assertions.assertEquals("month197204", entries.get(6).getKey());
+ Assertions.assertEquals("day19730101", entries.get(7).getKey());
+
+ Map<String, Set<String>> stringSetMap =
IcebergUtils.mergeOverlapPartitions(map);
+ Assertions.assertEquals(2, stringSetMap.size());
+ Assertions.assertTrue(stringSetMap.containsKey("year1970"));
+ Assertions.assertTrue(stringSetMap.containsKey("year1971"));
+
+ Set<String> names1970 = stringSetMap.get("year1970");
+ Assertions.assertEquals(3, names1970.size());
+ Assertions.assertTrue(names1970.contains("year1970"));
+ Assertions.assertTrue(names1970.contains("month197002"));
+ Assertions.assertTrue(names1970.contains("day19700202"));
+
+ Set<String> names1971 = stringSetMap.get("year1971");
+ Assertions.assertEquals(2, names1971.size());
+ Assertions.assertTrue(names1971.contains("year1971"));
+ Assertions.assertTrue(names1971.contains("month197103"));
+
+ Assertions.assertEquals(5, map.size());
+ Assertions.assertTrue(map.containsKey("nullRange"));
+ Assertions.assertTrue(map.containsKey("year1970"));
+ Assertions.assertTrue(map.containsKey("year1971"));
+ Assertions.assertTrue(map.containsKey("month197204"));
+ Assertions.assertTrue(map.containsKey("day19730101"));
+ }
}
+
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
new file mode 100644
index 00000000000..27e9359b713
Binary files /dev/null and
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
new file mode 100644
index 00000000000..a376dfc210d
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+
+suite("iceberg_schema_change_with_timetravel",
"p0,external,doris,external_docker,external_docker_doris") {
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "iceberg_schema_change_with_timetravel"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """ use test_db;"""
+
+ def executeTimeTravelingQueries = { String tableName ->
+ def snapshots = sql """ select snapshot_id from iceberg_meta("table" =
"${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by
committed_at; """
+ def snapshotIds = [
+ s0: snapshots.get(0)[0],
+ s1: snapshots.get(1)[0],
+ s2: snapshots.get(2)[0],
+ s3: snapshots.get(3)[0],
+ s4: snapshots.get(4)[0]
+ ]
+
+ qt_q0 """ desc ${tableName} """
+ qt_q1 """ select * from ${tableName} order by c1 """
+ qt_q2 """ select * from ${tableName} for version as of
${snapshotIds.s0} order by c1 """
+ qt_q3 """ select * from ${tableName} for version as of
${snapshotIds.s1} order by c1 """
+ qt_q4 """ select * from ${tableName} for version as of
${snapshotIds.s2} order by c1 """
+ qt_q5 """ select * from ${tableName} for version as of
${snapshotIds.s3} order by c1 """
+ qt_q6 """ select * from ${tableName} for version as of
${snapshotIds.s4} order by c1 """
+ }
+
+ executeTimeTravelingQueries("schema_change_with_time_travel")
+ executeTimeTravelingQueries("schema_change_with_time_travel_orc")
+
+}
+
+/*
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+alter table schema_change_with_time_travel add column c3 int;
+insert into schema_change_with_time_travel values (4,5,6);
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+alter table schema_change_with_time_travel add column c4 int;
+*/
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]