This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 48ea35b464f [opt](paimon)Use the API instead of reading from the meta
table (#47544)
48ea35b464f is described below
commit 48ea35b464feff1f8f38d5c5326db95fc5de311b
Author: wuwenchi <[email protected]>
AuthorDate: Thu Feb 20 11:07:45 2025 +0800
[opt](paimon)Use the API instead of reading from the meta table (#47544)
### What problem does this PR solve?
1. Use `latestSnapshotId` to get the latest snapshot id.
2. If there are no additional options, no copy operation is performed on
the table.
3. Use `DataTable.schemaManager().schema()` to get schema.
4. Use `Catalog.listPartitions` to get partitions.
---
.../datasource/paimon/PaimonExternalCatalog.java | 19 ++++
.../datasource/paimon/PaimonExternalTable.java | 42 ++------
.../datasource/paimon/PaimonMetadataCache.java | 46 ++++-----
.../datasource/paimon/PaimonPartitionInfo.java | 11 ++-
.../doris/datasource/paimon/PaimonSchema.java | 46 ---------
.../doris/datasource/paimon/PaimonSnapshot.java | 11 ++-
.../apache/doris/datasource/paimon/PaimonUtil.java | 106 +++++----------------
.../datasource/paimon/source/PaimonScanNode.java | 5 +
.../java/org/apache/doris/mtmv/PaimonUtilTest.java | 71 --------------
9 files changed, 91 insertions(+), 266 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index e87994ecdd3..fd332f8216f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -40,6 +40,7 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
import java.io.IOException;
import java.util.ArrayList;
@@ -137,6 +138,24 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
}
}
+ public List<Partition> getPaimonPartitions(String dbName, String tblName) {
+ makeSureInitialized();
+ try {
+ return hadoopAuthenticator.doAs(() -> {
+ List<Partition> partitions = new ArrayList<>();
+ try {
+ partitions =
catalog.listPartitions(Identifier.create(dbName, tblName));
+ } catch (Catalog.TableNotExistException e) {
+ LOG.warn("TableNotExistException", e);
+ }
+ return partitions;
+ });
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get Paimon table
partitions:" + getName() + "."
+ + dbName + "." + tblName + ", because " + e.getMessage(), e);
+ }
+ }
+
protected String getPaimonCatalogType(String catalogType) {
if (PAIMON_HMS.equalsIgnoreCase(catalogType)) {
return PaimonProperties.PAIMON_HMS_CATALOG;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 992f469906e..7cf965f4eef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -50,17 +50,13 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.partition.Partition;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.system.SchemasTable;
import org.apache.paimon.types.DataField;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -93,9 +89,7 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
}
public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
- return paimonTable.copy(
- Collections.singletonMap(CoreOptions.SCAN_VERSION.key(),
-
String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId())));
+ return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable();
}
public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) {
@@ -194,12 +188,12 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
- PaimonPartition paimonPartition =
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition()
+ Partition paimonPartition =
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition()
.get(partitionName);
if (paimonPartition == null) {
throw new AnalysisException("can not find partition: " +
partitionName);
}
- return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime());
+ return new
MTMVTimestampSnapshot(paimonPartition.lastFileCreationTime());
}
@Override
@@ -244,10 +238,11 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
makeSureInitialized();
PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key;
try {
- PaimonSchema schema =
loadPaimonSchemaBySchemaId(paimonSchemaCacheKey);
- List<DataField> columns = schema.getFields();
+ Table table = ((PaimonExternalCatalog)
getCatalog()).getPaimonTable(key.getDbName(), name);
+ TableSchema tableSchema = ((DataTable)
table).schemaManager().schema(paimonSchemaCacheKey.getSchemaId());
+ List<DataField> columns = tableSchema.fields();
List<Column> dorisColumns =
Lists.newArrayListWithCapacity(columns.size());
- Set<String> partitionColumnNames =
Sets.newHashSet(schema.getPartitionKeys());
+ Set<String> partitionColumnNames =
Sets.newHashSet(tableSchema.partitionKeys());
List<Column> partitionColumns = Lists.newArrayList();
for (DataField field : columns) {
Column column = new Column(field.name().toLowerCase(),
@@ -267,23 +262,6 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
}
- private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key)
throws IOException {
- Table table = ((PaimonExternalCatalog)
getCatalog()).getPaimonTable(key.getDbName(),
- name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS);
- PredicateBuilder builder = new PredicateBuilder(table.rowType());
- Predicate predicate = builder.equal(0, key.getSchemaId());
- // Adding predicates will also return excess data
- List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1, 2},
predicate);
- for (InternalRow row : rows) {
- PaimonSchema schema = PaimonUtil.rowToSchema(row);
- if (schema.getSchemaId() == key.getSchemaId()) {
- return schema;
- }
- }
- throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
- null, getCatalog().getName(), key.getDbName(),
key.getTblName(), key.getSchemaId());
- }
-
private PaimonSchemaCacheValue
getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
PaimonSnapshotCacheValue snapshotCacheValue =
getOrFetchSnapshotCacheValue(snapshot);
return
getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
index 109394fabde..e6023743f3b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -26,17 +26,15 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.system.PartitionsTable;
-import org.apache.paimon.table.system.SnapshotsTable;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
@@ -75,39 +73,27 @@ public class PaimonMetadataCache {
private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key,
List<Column> partitionColumns)
throws IOException, AnalysisException {
if (CollectionUtils.isEmpty(partitionColumns)) {
- return new PaimonPartitionInfo();
+ return PaimonPartitionInfo.EMPTY;
}
- List<PaimonPartition> paimonPartitions = loadPartitions(key);
+ List<Partition> paimonPartitions = ((PaimonExternalCatalog)
key.getCatalog())
+ .getPaimonPartitions(key.getDbName(), key.getTableName());
return PaimonUtil.generatePartitionInfo(partitionColumns,
paimonPartitions);
}
- private List<PaimonPartition> loadPartitions(PaimonSnapshotCacheKey key)
- throws IOException {
- Table table = ((PaimonExternalCatalog)
key.getCatalog()).getPaimonTable(key.getDbName(),
- key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER +
PartitionsTable.PARTITIONS);
- List<InternalRow> rows = PaimonUtil.read(table, null, null);
- List<PaimonPartition> res =
Lists.newArrayListWithCapacity(rows.size());
- for (InternalRow row : rows) {
- res.add(PaimonUtil.rowToPartition(row));
- }
- return res;
- }
-
private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key)
throws IOException {
- Table table = ((PaimonExternalCatalog)
key.getCatalog()).getPaimonTable(key.getDbName(),
- key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER +
SnapshotsTable.SNAPSHOTS);
+ Table table = ((PaimonExternalCatalog)
key.getCatalog()).getPaimonTable(key.getDbName(), key.getTableName());
+ Table snapshotTable = table;
// snapshotId and schemaId
- List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1},
null);
- long latestSnapshotId = 0L;
+ Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
long latestSchemaId = 0L;
- for (InternalRow row : rows) {
- long snapshotId = row.getLong(0);
- if (snapshotId > latestSnapshotId) {
- latestSnapshotId = snapshotId;
- latestSchemaId = row.getLong(1);
- }
+ OptionalLong optionalSnapshotId = table.latestSnapshotId();
+ if (optionalSnapshotId.isPresent()) {
+ latestSnapshotId = optionalSnapshotId.getAsLong();
+ latestSchemaId = table.snapshot(latestSnapshotId).schemaId();
+ snapshotTable =
+
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(),
latestSnapshotId.toString()));
}
- return new PaimonSnapshot(latestSnapshotId, latestSchemaId);
+ return new PaimonSnapshot(latestSnapshotId, latestSchemaId,
snapshotTable);
}
public void invalidateCatalogCache(long catalogId) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
index 88515a2510d..a6339ef5155 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
@@ -20,20 +20,23 @@ package org.apache.doris.datasource.paimon;
import org.apache.doris.catalog.PartitionItem;
import com.google.common.collect.Maps;
+import org.apache.paimon.partition.Partition;
import java.util.Map;
public class PaimonPartitionInfo {
+ public static final PaimonPartitionInfo EMPTY = new PaimonPartitionInfo();
+
private final Map<String, PartitionItem> nameToPartitionItem;
- private final Map<String, PaimonPartition> nameToPartition;
+ private final Map<String, Partition> nameToPartition;
- public PaimonPartitionInfo() {
+ private PaimonPartitionInfo() {
this.nameToPartitionItem = Maps.newHashMap();
this.nameToPartition = Maps.newHashMap();
}
public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
- Map<String, PaimonPartition> nameToPartition) {
+ Map<String, Partition> nameToPartition) {
this.nameToPartitionItem = nameToPartitionItem;
this.nameToPartition = nameToPartition;
}
@@ -42,7 +45,7 @@ public class PaimonPartitionInfo {
return nameToPartitionItem;
}
- public Map<String, PaimonPartition> getNameToPartition() {
+ public Map<String, Partition> getNameToPartition() {
return nameToPartition;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
deleted file mode 100644
index ef26e1ed208..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.paimon;
-
-import org.apache.paimon.types.DataField;
-
-import java.util.List;
-
-public class PaimonSchema {
- private final long schemaId;
- private final List<DataField> fields;
- private final List<String> partitionKeys;
-
- public PaimonSchema(long schemaId, List<DataField> fields, List<String>
partitionKeys) {
- this.schemaId = schemaId;
- this.fields = fields;
- this.partitionKeys = partitionKeys;
- }
-
- public long getSchemaId() {
- return schemaId;
- }
-
- public List<DataField> getFields() {
- return fields;
- }
-
- public List<String> getPartitionKeys() {
- return partitionKeys;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
index 4a536dd72cc..96f32370d99 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
@@ -17,13 +17,18 @@
package org.apache.doris.datasource.paimon;
+import org.apache.paimon.table.Table;
+
public class PaimonSnapshot {
+ public static long INVALID_SNAPSHOT_ID = -1;
private final long snapshotId;
private final long schemaId;
+ private final Table table;
- public PaimonSnapshot(long snapshotId, long schemaId) {
+ public PaimonSnapshot(long snapshotId, long schemaId, Table table) {
this.snapshotId = snapshotId;
this.schemaId = schemaId;
+ this.table = table;
}
public long getSnapshotId() {
@@ -33,4 +38,8 @@ public class PaimonSnapshot {
public long getSchemaId() {
return schemaId;
}
+
+ public Table getTable() {
+ return table;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index bbb1eaf5096..4119f978d24 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -36,9 +36,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.ArrayType;
@@ -46,7 +46,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Projection;
@@ -69,7 +68,9 @@ public class PaimonUtil {
for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
options.put(pair.getKey().key(), pair.getValue());
}
- table = table.copy(options);
+ if (!options.isEmpty()) {
+ table = table.copy(options);
+ }
ReadBuilder readBuilder = table.newReadBuilder();
if (projection != null) {
readBuilder.withProjection(projection);
@@ -89,71 +90,40 @@ public class PaimonUtil {
return rows;
}
+ public static PaimonPartitionInfo generatePartitionInfo(List<Column>
partitionColumns,
+ List<Partition>
paimonPartitions) {
- /*
-
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
-
+---------------+----------------+--------------------+--------------------+------------------------+
- | partition | record_count | file_size_in_bytes|
file_count| last_update_time|
-
+---------------+----------------+--------------------+--------------------+------------------------+
- | [1] | 1 | 645 | 1
| 2024-06-24 10:25:57.400|
-
+---------------+----------------+--------------------+--------------------+------------------------+
- org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE
- public static final RowType TABLE_TYPE =
- new RowType(
- Arrays.asList(
- new DataField(0, "partition",
SerializationUtils.newStringType(true)),
- new DataField(1, "record_count", new
BigIntType(false)),
- new DataField(2, "file_size_in_bytes", new
BigIntType(false)),
- new DataField(3, "file_count", new
BigIntType(false)),
- new DataField(4, "last_update_time",
DataTypes.TIMESTAMP_MILLIS())));
- */
- public static PaimonPartition rowToPartition(InternalRow row) {
- String partition = row.getString(0).toString();
- long recordCount = row.getLong(1);
- long fileSizeInBytes = row.getLong(2);
- long fileCount = row.getLong(3);
- long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond();
- return new PaimonPartition(partition, recordCount, fileSizeInBytes,
fileCount, lastUpdateTime);
- }
+ if (CollectionUtils.isEmpty(partitionColumns) ||
paimonPartitions.isEmpty()) {
+ return PaimonPartitionInfo.EMPTY;
+ }
- public static PaimonPartitionInfo generatePartitionInfo(List<Column>
partitionColumns,
- List<PaimonPartition> paimonPartitions) {
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
- Map<String, PaimonPartition> nameToPartition = Maps.newHashMap();
+ Map<String, Partition> nameToPartition = Maps.newHashMap();
PaimonPartitionInfo partitionInfo = new
PaimonPartitionInfo(nameToPartitionItem, nameToPartition);
- if (CollectionUtils.isEmpty(partitionColumns)) {
- return partitionInfo;
- }
- for (PaimonPartition paimonPartition : paimonPartitions) {
- String partitionName = getPartitionName(partitionColumns,
paimonPartition.getPartitionValues());
- nameToPartition.put(partitionName, paimonPartition);
+
+ for (Partition partition : paimonPartitions) {
+ Map<String, String> spec = partition.spec();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : spec.entrySet()) {
+
sb.append(entry.getKey()).append("=").append(entry.getValue()).append("/");
+ }
+ if (sb.length() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ String partitionName = sb.toString();
+ nameToPartition.put(partitionName, partition);
try {
// partition values return by paimon api, may have problem,
// to avoid affecting the query, we catch exceptions here
nameToPartitionItem.put(partitionName,
toListPartitionItem(partitionName, partitionColumns));
} catch (Exception e) {
- LOG.warn("toListPartitionItem failed, partitionColumns: {},
partitionValues: {}", partitionColumns,
- paimonPartition.getPartitionValues(), e);
+ LOG.warn("toListPartitionItem failed, partitionColumns: {},
partitionValues: {}",
+ partitionColumns, partition.spec(), e);
}
}
return partitionInfo;
}
- private static String getPartitionName(List<Column> partitionColumns,
String partitionValueStr) {
- Preconditions.checkNotNull(partitionValueStr);
- String[] partitionValues = partitionValueStr.replace("[",
"").replace("]", "")
- .split(",");
- Preconditions.checkState(partitionColumns.size() ==
partitionValues.length);
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < partitionColumns.size(); ++i) {
- if (i != 0) {
- sb.append("/");
- }
-
sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]);
- }
- return sb.toString();
- }
-
public static ListPartitionItem toListPartitionItem(String partitionName,
List<Column> partitionColumns)
throws AnalysisException {
List<Type> types = partitionColumns.stream()
@@ -251,32 +221,4 @@ public class PaimonUtil {
public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType
type) {
return paimonPrimitiveTypeToDorisType(type);
}
-
- /**
- *
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table
- * demo:
- * 0
- * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"},
- * {"id":1,"name":"item_id","type":"BIGINT"},
- * {"id":2,"name":"behavior","type":"STRING"},
- * {"id":3,"name":"dt","type":"STRING NOT NULL"},
- * {"id":4,"name":"hh","type":"STRING NOT NULL"}]
- * ["dt"]
- * ["dt","hh","user_id"]
- * {"owner":"hadoop","provider":"paimon"}
- * 2024-12-03 15:38:14.734
- *
- * @param row
- * @return
- */
- public static PaimonSchema rowToSchema(InternalRow row) {
- long schemaId = row.getLong(0);
- String fieldsStr = row.getString(1).toString();
- String partitionKeysStr = row.getString(2).toString();
- List<DataField> fields = JsonSerdeUtil.fromJson(fieldsStr, new
TypeReference<List<DataField>>() {
- });
- List<String> partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr,
new TypeReference<List<String>>() {
- });
- return new PaimonSchema(schemaId, fields, partitionKeys);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 0e9a8042a65..59e7eed5d42 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -44,6 +44,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
@@ -207,6 +208,10 @@ public class PaimonScanNode extends FileQueryScanNode {
SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType
.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
+ if
(!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key()))
{
+ // an empty table in PaimonSnapshotCacheValue
+ return splits;
+ }
int[] projected = desc.getSlots().stream().mapToInt(
slot -> source.getPaimonTable().rowType()
.getFieldNames()
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
deleted file mode 100644
index 789af7bf835..00000000000
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.mtmv;
-
-import org.apache.doris.analysis.LiteralExpr;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PartitionKey;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.datasource.paimon.PaimonPartition;
-import org.apache.doris.datasource.paimon.PaimonPartitionInfo;
-import org.apache.doris.datasource.paimon.PaimonUtil;
-
-import com.google.common.collect.Lists;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.Timestamp;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-public class PaimonUtilTest {
-
- @Test
- public void testGeneratePartitionInfo() throws AnalysisException {
- Column k1 = new Column("k1", PrimitiveType.INT);
- Column k2 = new Column("k2", PrimitiveType.VARCHAR);
- List<Column> partitionColumns = Lists.newArrayList(k1, k2);
- PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5);
- List<PaimonPartition> paimonPartitions = Lists.newArrayList(p1);
- PaimonPartitionInfo partitionInfo =
PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
- String expectPartitionName = "k1=1/k2=aa";
-
Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName));
- PartitionItem partitionItem =
partitionInfo.getNameToPartitionItem().get(expectPartitionName);
- List<PartitionKey> keys = partitionItem.getItems();
- Assert.assertEquals(1, keys.size());
- PartitionKey partitionKey = keys.get(0);
- List<LiteralExpr> exprs = partitionKey.getKeys();
- Assert.assertEquals(2, exprs.size());
- Assert.assertEquals(1, exprs.get(0).getLongValue());
- Assert.assertEquals("aa", exprs.get(1).getStringValue());
- }
-
- @Test
- public void testRowToPartition() {
- GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L,
3L, 4L, Timestamp.fromEpochMillis(5L));
- PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row);
- Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues());
- Assert.assertEquals(2L, paimonPartition.getRecordCount());
- Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes());
- Assert.assertEquals(4L, paimonPartition.getFileCount());
- Assert.assertEquals(5L, paimonPartition.getLastUpdateTime());
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]