This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1-lakehouse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 320e3b27baba947370276d0a0a03f744ff929604
Author: wuwenchi <[email protected]>
AuthorDate: Sat Feb 1 22:34:18 2025 +0800

    [opt](paimon)Upgrade the Paimon version to 1.0.0 and Iceberg to 1.6.1 
(#46990)
    
    Problem Summary:
    
    Upgrade the Paimon version to 1.0.0
    
    By default, paimon uses a caching catalog to cache some data to improve
    read performance.
    FYI:
    
https://paimon.apache.org/docs/1.0/maintenance/configurations/#catalogoptions
    
    If you do not want to use this catalog, you can add a configuration
    `paimon.cache-enabled ` to turn it off:
    ```
    CREATE CATALOG `c1` PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "xxx",
    "paimon.cache-enabled" = "false",
    "warehouse" = "xxx"
    );
    ```
    If you want to modify cache-related parameters, you can add the
    `paimon.` prefix to the parameters supported by paimon, such as:
    ```
    CREATE CATALOG `c1` PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "xxx",
    "warehouse" = "xxx",
    "paimon.cache.expiration-interval" = "20 min",
    "paimon.cache.manifest.small-file-memory"="10 mb"
    );
    ```
    
    Note:
    During the doris upgrade process, this error may occur:
    
    
![image](https://github.com/user-attachments/assets/47ec8216-9e3f-4d8e-95ef-17cce6b7c486)
    
    This is because doris will upgrade be first, and then upgrade fe. During
    this process, the version of paimon on be may be higher than that on fe.
    This is normal. Because bucketkey judgment is newly added in the higher
    version of paimon, which is not available in the lower version. After
    the fe upgrade is completed normally, there will be no more errors.
---
 .../datasource/iceberg/source/IcebergScanNode.java | 84 ++++++++++++----------
 .../datasource/paimon/PaimonExternalCatalog.java   | 11 ++-
 .../datasource/paimon/PaimonExternalTable.java     |  2 +-
 .../datasource/paimon/PaimonMetadataCache.java     |  2 +-
 .../apache/doris/datasource/paimon/PaimonUtil.java |  2 +-
 .../datasource/paimon/source/PaimonScanNode.java   |  7 +-
 fe/pom.xml                                         |  7 +-
 7 files changed, 70 insertions(+), 45 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 756c9024cdc..87392d0ff38 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -196,7 +196,12 @@ public class IcebergScanNode extends FileQueryScanNode {
         try {
             return preExecutionAuthenticator.execute(() -> 
doGetSplits(numBackends));
         } catch (Exception e) {
-            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), 
e);
+            Optional<NotSupportedException> opt = 
checkNotSupportedException(e);
+            if (opt.isPresent()) {
+                throw opt.get();
+            } else {
+                throw new 
RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
+            }
         }
     }
 
@@ -232,7 +237,12 @@ public class IcebergScanNode extends FileQueryScanNode {
                 );
                 splitAssignment.finishSchedule();
             } catch (Exception e) {
-                splitAssignment.setException(new UserException(e.getMessage(), 
e));
+                Optional<NotSupportedException> opt = 
checkNotSupportedException(e);
+                if (opt.isPresent()) {
+                    splitAssignment.setException(new 
UserException(opt.get().getMessage(), opt.get()));
+                } else {
+                    splitAssignment.setException(new 
UserException(e.getMessage(), e));
+                }
             }
         });
     }
@@ -266,40 +276,7 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
         long targetSplitSize = getRealFileSplitSize(0);
-        CloseableIterable<FileScanTask> splitFiles;
-        try {
-            splitFiles = TableScanUtil.splitFiles(scan.planFiles(), 
targetSplitSize);
-        } catch (NullPointerException e) {
-            /*
-        Caused by: java.lang.NullPointerException: Type cannot be null
-            at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull
-                (Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?]
-            at 
org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) 
~[iceberg-api-1.4.3.jar:?]
-            at 
org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) 
~[iceberg-api-1.4.3.jar:?]
-            at 
org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) 
~[iceberg-api-1.4.3.jar:?]
-            at 
org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) 
~[iceberg-core-1.4.3.jar:?]
-            at 
org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach
-                (RegularImmutableMap.java:297) 
~[iceberg-bundled-guava-1.4.3.jar:?]
-            at 
org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) 
~[iceberg-core-1.4.3.jar:?]
-            at 
org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) 
~[iceberg-core-1.4.3.jar:?]
-            at 
org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) 
~[iceberg-core-1.4.3.jar:?]
-            at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) 
~[iceberg-core-1.4.3.jar:?]
-            at 
org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) 
~[iceberg-core-1.4.3.jar:?]
-            at 
org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) 
~[iceberg-core-1.4.3.jar:?]
-            at 
org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) 
~[iceberg-core-1.4.3.jar:?]
-            at 
org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits
-                (IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT]
-        EXAMPLE:
-             CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG 
PARTITIONED BY (bucket(10,col2));
-             INSERT INTO iceberg_tb VALUES( ... );
-             ALTER  TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2);
-             ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING;
-        Link: https://github.com/apache/iceberg/pull/10755
-        */
-            LOG.warn("Iceberg TableScanUtil.splitFiles throw 
NullPointerException. Cause : ", e);
-            throw new NotSupportedException("Unable to read Iceberg table with 
dropped old partition column.");
-        }
-        return splitFiles;
+        return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
     }
 
     private Split createIcebergSplit(FileScanTask fileScanTask) {
@@ -529,4 +506,39 @@ public class IcebergScanNode extends FileQueryScanNode {
     public int numApproximateSplits() {
         return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ? 
partitionPathSet.size() : 1;
     }
+
+    private Optional<NotSupportedException> 
checkNotSupportedException(Exception e) {
+        if (e instanceof NullPointerException) {
+            /*
+        Caused by: java.lang.NullPointerException: Type cannot be null
+            at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull
+                (Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?]
+            at 
org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) 
~[iceberg-api-1.4.3.jar:?]
+            at 
org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) 
~[iceberg-api-1.4.3.jar:?]
+            at 
org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) 
~[iceberg-api-1.4.3.jar:?]
+            at 
org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) 
~[iceberg-core-1.4.3.jar:?]
+            at 
org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach
+                (RegularImmutableMap.java:297) 
~[iceberg-bundled-guava-1.4.3.jar:?]
+            at 
org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) 
~[iceberg-core-1.4.3.jar:?]
+            at 
org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) 
~[iceberg-core-1.4.3.jar:?]
+            at 
org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) 
~[iceberg-core-1.4.3.jar:?]
+            at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) 
~[iceberg-core-1.4.3.jar:?]
+            at 
org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) 
~[iceberg-core-1.4.3.jar:?]
+            at 
org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) 
~[iceberg-core-1.4.3.jar:?]
+            at 
org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) 
~[iceberg-core-1.4.3.jar:?]
+            at 
org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits
+                (IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT]
+        EXAMPLE:
+             CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG 
PARTITIONED BY (bucket(10,col2));
+             INSERT INTO iceberg_tb VALUES( ... );
+             ALTER  TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2);
+             ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING;
+        Link: https://github.com/apache/iceberg/pull/10755
+        */
+            LOG.warn("Iceberg TableScanUtil.splitFiles throw 
NullPointerException. Cause : ", e);
+            return Optional.of(
+                new NotSupportedException("Unable to read Iceberg table with 
dropped old partition column."));
+        }
+        return Optional.empty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index eb25336ab0b..7d857524f2b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
@@ -94,7 +95,15 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> 
catalog.tableExists(Identifier.create(dbName, tblName)));
+            return hadoopAuthenticator.doAs(() -> {
+                try {
+                    catalog.getTable(Identifier.create(dbName, tblName));
+                    return true;
+                } catch (TableNotExistException e) {
+                    return false;
+                }
+            });
+
         } catch (IOException e) {
             throw new RuntimeException("Failed to check table existence, 
catalog name: " + getName(), e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index efd8e6bb7a7..27cca2aecbd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -208,7 +208,7 @@ public class PaimonExternalTable extends ExternalTable 
implements MvccTable {
         PredicateBuilder builder = new PredicateBuilder(table.rowType());
         Predicate predicate = builder.equal(0, key.getSchemaId());
         // Adding predicates will also return excess data
-        List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}, 
{2}}, predicate);
+        List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1, 2}, 
predicate);
         for (InternalRow row : rows) {
             PaimonSchema schema = PaimonUtil.rowToSchema(row);
             if (schema.getSchemaId() == key.getSchemaId()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
index 5b711e07066..109394fabde 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -97,7 +97,7 @@ public class PaimonMetadataCache {
         Table table = ((PaimonExternalCatalog) 
key.getCatalog()).getPaimonTable(key.getDbName(),
                 key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + 
SnapshotsTable.SNAPSHOTS);
         // snapshotId and schemaId
-        List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, 
{1}}, null);
+        List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1}, 
null);
         long latestSnapshotId = 0L;
         long latestSchemaId = 0L;
         for (InternalRow row : rows) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index b3df41bc5ce..bbb1eaf5096 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -62,7 +62,7 @@ public class PaimonUtil {
     private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
 
     public static List<InternalRow> read(
-            Table table, @Nullable int[][] projection, @Nullable Predicate 
predicate,
+            Table table, @Nullable int[] projection, @Nullable Predicate 
predicate,
             Pair<ConfigOption<?>, String>... dynamicOptions)
             throws IOException {
         Map<String, String> options = new HashMap<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 28efbc58f51..0e9a8042a65 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -208,7 +208,12 @@ public class PaimonScanNode extends FileQueryScanNode {
                 .valueOf(sessionVariable.getIgnoreSplitType());
         List<Split> splits = new ArrayList<>();
         int[] projected = desc.getSlots().stream().mapToInt(
-                slot -> 
(source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
+                slot -> source.getPaimonTable().rowType()
+                        .getFieldNames()
+                        .stream()
+                        .map(String::toLowerCase)
+                        .collect(Collectors.toList())
+                        .indexOf(slot.getColumn().getName()))
                 .toArray();
         ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
         List<org.apache.paimon.table.source.Split> paimonSplits = 
readBuilder.withFilter(predicates)
diff --git a/fe/pom.xml b/fe/pom.xml
index f7ea65607d1..e5de5673536 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -222,8 +222,7 @@ under the License.
         <module>be-java-extensions</module>
     </modules>
     <properties>
-        <!--suppress UnresolvedMavenProperty -->
-        
<doris.hive.catalog.shade.version>2.1.1</doris.hive.catalog.shade.version>
+        
<doris.hive.catalog.shade.version>2.1.3</doris.hive.catalog.shade.version>
         <avro.version>1.11.4</avro.version>
         <parquet.version>1.13.1</parquet.version>
         <spark.version>3.4.3</spark.version>
@@ -318,7 +317,7 @@ under the License.
         <!-- ATTN: avro version must be consistent with Iceberg version -->
         <!-- Please modify iceberg.version and avro.version together,
          you can find avro version info in iceberg mvn repository -->
-        <iceberg.version>1.4.3</iceberg.version>
+        <iceberg.version>1.6.1</iceberg.version>
         <maxcompute.version>0.49.0-public</maxcompute.version>
         <arrow.version>17.0.0</arrow.version>
 
@@ -363,7 +362,7 @@ under the License.
         <quartz.version>2.3.2</quartz.version>
         <aircompressor.version>0.27</aircompressor.version>
         <!-- paimon -->
-        <paimon.version>0.8.1</paimon.version>
+        <paimon.version>1.0.0</paimon.version>
         <disruptor.version>3.4.4</disruptor.version>
         <!-- arrow flight sql -->
         
<arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to