This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1-lakehouse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8d24b929fa515fd6fd14937806f81528c04a81e0
Author: wuwenchi <[email protected]>
AuthorDate: Fri Jan 24 11:10:21 2025 +0800

    [opt](iceberg)support iceberg in batch mode (#46398)
    
    1. use thread pool in planFiles.
    2. support batch mode.
    3. When executing `doAs`, if the exception type is
    `UndeclaredThrowableException`, we should get the specific cause inside.
    Because the message of UndeclaredThrowableException is null, the message
    will not be obtained if the exception is captured externally
    4. Modify the default configuration:
         fetch_splits_max_wait_time_ms: from 4000 to 1000
         remote_split_source_batch_size: from 10240 to 1000
---
 be/src/common/config.cpp                           |   2 +-
 .../authentication/HadoopAuthenticator.java        |   7 +
 .../authentication/PreExecutionAuthenticator.java  |  48 ++-----
 .../org/apache/doris/common/ThreadPoolManager.java |  69 ++++++++++
 .../apache/doris/datasource/ExternalCatalog.java   |   6 +
 .../apache/doris/datasource/FileQueryScanNode.java |   8 +-
 .../apache/doris/datasource/SplitAssignment.java   |  30 ++++-
 .../apache/doris/datasource/SplitGenerator.java    |   2 +-
 .../datasource/iceberg/IcebergExternalCatalog.java |  15 +++
 .../iceberg/IcebergHMSExternalCatalog.java         |   7 -
 .../datasource/iceberg/IcebergMetadataOps.java     |  28 +++-
 .../datasource/iceberg/source/IcebergScanNode.java | 147 +++++++++++++++------
 .../datasource/iceberg/source/IcebergSplit.java    |   3 +-
 .../java/org/apache/doris/planner/ScanNode.java    |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   2 +-
 .../iceberg/test_iceberg_filter.groovy             |   2 +
 16 files changed, 280 insertions(+), 98 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f047071139e..25726a2f47c 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -287,7 +287,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, 
[](const int config) -> b
     return true;
 });
 DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
-DEFINE_Int32(remote_split_source_batch_size, "10240");
+DEFINE_Int32(remote_split_source_batch_size, "1000");
 DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
 // number of olap scanner thread pool queue size
 DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
index c3cab5f410b..88d32a593e1 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.security.authentication;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 
 public interface HadoopAuthenticator {
@@ -31,6 +32,12 @@ public interface HadoopAuthenticator {
             return getUGI().doAs(action);
         } catch (InterruptedException e) {
             throw new IOException(e);
+        } catch (UndeclaredThrowableException e) {
+            if (e.getCause() instanceof RuntimeException) {
+                throw (RuntimeException) e.getCause();
+            } else {
+                throw new RuntimeException(e.getCause());
+            }
         }
     }
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
index 6260833b7db..a64dd4cf717 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.common.security.authentication;
 
-import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
 
 /**
@@ -54,14 +53,26 @@ public class PreExecutionAuthenticator {
     public <T> T execute(Callable<T> task) throws Exception {
         if (hadoopAuthenticator != null) {
             // Adapts Callable to PrivilegedExceptionAction for use with 
Hadoop authentication
-            PrivilegedExceptionAction<T> action = new 
CallableToPrivilegedExceptionActionAdapter<>(task);
-            return hadoopAuthenticator.doAs(action);
+            return hadoopAuthenticator.doAs(task::call);
         } else {
             // Executes the task directly if no authentication is needed
             return task.call();
         }
     }
 
+    public void execute(Runnable task) throws Exception {
+        if (hadoopAuthenticator != null) {
+            // Adapts Runnable to PrivilegedExceptionAction for use with 
Hadoop authentication
+            hadoopAuthenticator.doAs(() -> {
+                task.run();
+                return null;
+            });
+        } else {
+            // Executes the task directly if no authentication is needed
+            task.run();
+        }
+    }
+
     /**
      * Retrieves the current HadoopAuthenticator.
      * <p>This allows checking if a HadoopAuthenticator is configured or
@@ -82,35 +93,4 @@ public class PreExecutionAuthenticator {
     public void setHadoopAuthenticator(HadoopAuthenticator 
hadoopAuthenticator) {
         this.hadoopAuthenticator = hadoopAuthenticator;
     }
-
-    /**
-     * Adapter class to convert a Callable into a PrivilegedExceptionAction.
-     * <p>This is necessary to run the task within a privileged context,
-     * particularly for Hadoop operations with Kerberos.
-     *
-     * @param <T> The type of result returned by the action
-     */
-    public class CallableToPrivilegedExceptionActionAdapter<T> implements 
PrivilegedExceptionAction<T> {
-        private final Callable<T> callable;
-
-        /**
-         * Constructs an adapter that wraps a Callable into a 
PrivilegedExceptionAction.
-         *
-         * @param callable The Callable to be adapted
-         */
-        public CallableToPrivilegedExceptionActionAdapter(Callable<T> 
callable) {
-            this.callable = callable;
-        }
-
-        /**
-         * Executes the wrapped Callable as a PrivilegedExceptionAction.
-         *
-         * @return The result of the callable's call method
-         * @throws Exception If an exception occurs during callable execution
-         */
-        @Override
-        public T run() throws Exception {
-            return callable.call();
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 3be5af8ac54..3bebd4145a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common;
 
 
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.metric.MetricLabel;
@@ -33,6 +34,7 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -69,6 +71,7 @@ import java.util.function.Supplier;
 
 public class ThreadPoolManager {
 
+    private static final Logger LOG = 
LogManager.getLogger(ThreadPoolManager.class);
     private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap = 
Maps.newConcurrentMap();
 
     private static String[] poolMetricTypes = {"pool_size", 
"active_thread_num", "task_in_queue"};
@@ -133,6 +136,17 @@ public class ThreadPoolManager {
                 poolName, needRegisterMetric);
     }
 
+    public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth(
+            int numThread,
+            int queueSize,
+            String poolName,
+            boolean needRegisterMetric,
+            PreExecutionAuthenticator preAuth) {
+        return newDaemonThreadPoolWithPreAuth(numThread, numThread, 
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 
60),
+            poolName, needRegisterMetric, preAuth);
+    }
+
     public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, 
int queueSize,
                                                               String poolName, 
int timeoutSeconds,
                                                               boolean 
needRegisterMetric) {
@@ -222,6 +236,40 @@ public class ThreadPoolManager {
         return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(poolName + "-%d").build();
     }
 
+
+    public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth(
+            int corePoolSize,
+            int maximumPoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            RejectedExecutionHandler handler,
+            String poolName,
+            boolean needRegisterMetric,
+            PreExecutionAuthenticator preAuth) {
+        ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, 
preAuth);
+        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, 
maximumPoolSize,
+                keepAliveTime, unit, workQueue, threadFactory, handler);
+        if (needRegisterMetric) {
+            nameToThreadPoolMap.put(poolName, threadPool);
+        }
+        return threadPool;
+    }
+
+    private static ThreadFactory namedThreadFactoryWithPreAuth(String 
poolName, PreExecutionAuthenticator preAuth) {
+        return new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(poolName + "-%d")
+            .setThreadFactory(runnable -> new Thread(() -> {
+                try {
+                    preAuth.execute(runnable);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }))
+            .build();
+    }
+
     private static class PriorityThreadPoolExecutor<T> extends 
ThreadPoolExecutor {
 
         private final Comparator<T> comparator;
@@ -377,4 +425,25 @@ public class ThreadPoolManager {
             }
         }
     }
+
+    public static void shutdownExecutorService(ExecutorService 
executorService) {
+        // Disable new tasks from being submitted
+        executorService.shutdown();
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                // Cancel currently executing tasks
+                executorService.shutdownNow();
+                // Wait a while for tasks to respond to being cancelled
+                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                    LOG.warn("ExecutorService did not terminate");
+                }
+            }
+        } catch (InterruptedException e) {
+            // (Re-)Cancel if current thread also interrupted
+            executorService.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 778c110df0b..b8ccdee3a87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.io.Text;
@@ -90,6 +91,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
 /**
@@ -151,6 +153,7 @@ public abstract class ExternalCatalog
     protected Optional<Boolean> useMetaCache = Optional.empty();
     protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
     protected PreExecutionAuthenticator preExecutionAuthenticator;
+    protected ThreadPoolExecutor threadPoolWithPreAuth;
 
     private volatile Configuration cachedConf = null;
     private byte[] confLock = new byte[0];
@@ -715,6 +718,9 @@ public abstract class ExternalCatalog
     @Override
     public void onClose() {
         removeAccessController();
+        if (threadPoolWithPreAuth != null) {
+            ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
+        }
         CatalogIf.super.onClose();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 457ee88fec5..1b252d0ae3d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -332,17 +332,19 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             if (splitAssignment.getSampleSplit() == null && 
!isFileStreamType()) {
                 return;
             }
-            selectedSplitNum = numApproximateSplits();
-
             FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
             TFileType locationType = fileSplit.getLocationType();
+            selectedSplitNum = numApproximateSplits();
+            if (selectedSplitNum < 0) {
+                throw new UserException("Approximate split number should not 
be negative");
+            }
             totalFileSize = fileSplit.getLength() * selectedSplitNum;
             long maxWaitTime = sessionVariable.getFetchSplitsMaxWaitTime();
             // Not accurate, only used to estimate concurrency.
             // Here, we must take the max of 1, because
             // in the case of multiple BEs, `numApproximateSplits() / 
backendPolicy.numBackends()` may be 0,
             // and finally numSplitsPerBE is 0, resulting in no data being 
queried.
-            int numSplitsPerBE = Math.max(numApproximateSplits() / 
backendPolicy.numBackends(), 1);
+            int numSplitsPerBE = Math.max(selectedSplitNum / 
backendPolicy.numBackends(), 1);
             for (Backend backend : backendPolicy.getBackends()) {
                 SplitSource splitSource = new SplitSource(backend, 
splitAssignment, maxWaitTime);
                 splitSources.add(splitSource);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index a26abc7fc5e..470f4092d21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -23,7 +23,10 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.collect.Multimap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -40,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * `SplitGenerator` provides the file splits, and `FederationBackendPolicy` 
assigns these splits to backends.
  */
 public class SplitAssignment {
+    private static final Logger LOG = 
LogManager.getLogger(SplitAssignment.class);
     private final Set<Long> sources = new HashSet<>();
     private final FederationBackendPolicy backendPolicy;
     private final SplitGenerator splitGenerator;
@@ -50,10 +54,11 @@ public class SplitAssignment {
     private final List<String> pathPartitionKeys;
     private final Object assignLock = new Object();
     private Split sampleSplit = null;
-    private final AtomicBoolean isStop = new AtomicBoolean(false);
+    private final AtomicBoolean isStopped = new AtomicBoolean(false);
     private final AtomicBoolean scheduleFinished = new AtomicBoolean(false);
 
     private UserException exception = null;
+    private final List<Closeable> closeableResources = new ArrayList<>();
 
     public SplitAssignment(
             FederationBackendPolicy backendPolicy,
@@ -85,7 +90,7 @@ public class SplitAssignment {
     }
 
     private boolean waitFirstSplit() {
-        return !scheduleFinished.get() && !isStop.get() && exception == null;
+        return !scheduleFinished.get() && !isStopped.get() && exception == 
null;
     }
 
     private void appendBatch(Multimap<Backend, Split> batch) throws 
UserException {
@@ -150,7 +155,7 @@ public class SplitAssignment {
         }
         BlockingQueue<Collection<TScanRangeLocations>> splits = 
assignment.computeIfAbsent(backend,
                 be -> new LinkedBlockingQueue<>());
-        if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) {
+        if (scheduleFinished.get() && splits.isEmpty() || isStopped.get()) {
             return null;
         }
         return splits;
@@ -167,11 +172,26 @@ public class SplitAssignment {
     }
 
     public void stop() {
-        isStop.set(true);
+        if (isStop()) {
+            return;
+        }
+        isStopped.set(true);
+        closeableResources.forEach((closeable) -> {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                LOG.warn("close resource error:{}", e.getMessage(), e);
+                // ignore
+            }
+        });
         notifyAssignment();
     }
 
     public boolean isStop() {
-        return isStop.get();
+        return isStopped.get();
+    }
+
+    public void addCloseable(Closeable resource) {
+        closeableResources.add(resource);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index 34ff3911445..391552a5106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -52,7 +52,7 @@ public interface SplitGenerator {
         return -1;
     }
 
-    default void startSplit(int numBackends) {
+    default void startSplit(int numBackends) throws UserException {
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 0fa69825a01..6a3265388f3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -43,6 +46,7 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
     protected String icebergCatalogType;
     protected Catalog catalog;
+    private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;
 
     public IcebergExternalCatalog(long catalogId, String name, String comment) 
{
         super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
@@ -54,9 +58,20 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     @Override
     protected void initLocalObjectsImpl() {
         preExecutionAuthenticator = new PreExecutionAuthenticator();
+        // TODO If the storage environment does not support Kerberos (such as 
s3),
+        //      there is no need to generate a simple authentication 
information anymore.
+        AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
+        HadoopAuthenticator authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(config);
+        preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
         initCatalog();
         IcebergMetadataOps ops = 
ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
         transactionManager = 
TransactionManagerFactory.createIcebergTransactionManager(ops);
+        threadPoolWithPreAuth = 
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+            ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+            Integer.MAX_VALUE,
+            String.format("iceberg_catalog_%s_executor_pool", name),
+            true,
+            preExecutionAuthenticator);
         metadataOps = ops;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index c5a99c157ce..51d39357b81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.property.PropertyConverter;
 
@@ -37,11 +35,6 @@ public class IcebergHMSExternalCatalog extends 
IcebergExternalCatalog {
     protected void initCatalog() {
         icebergCatalogType = ICEBERG_HMS;
         catalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
-        if (preExecutionAuthenticator.getHadoopAuthenticator() == null) {
-            AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
-            HadoopAuthenticator authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(config);
-            preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
-        }
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index b5322e292ee..58a68f59e27 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -90,11 +90,19 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public boolean tableExist(String dbName, String tblName) {
-        return catalog.tableExists(getTableIdentifier(dbName, tblName));
+        try {
+            return preExecutionAuthenticator.execute(() -> 
catalog.tableExists(getTableIdentifier(dbName, tblName)));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check table exist, error 
message is:" + e.getMessage(), e);
+        }
     }
 
     public boolean databaseExist(String dbName) {
-        return nsCatalog.namespaceExists(getNamespace(dbName));
+        try {
+            return preExecutionAuthenticator.execute(() -> 
nsCatalog.namespaceExists(getNamespace(dbName)));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check database exist, error 
message is:" + e.getMessage(), e);
+        }
     }
 
     public List<String> listDatabaseNames() {
@@ -110,8 +118,14 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public List<String> listTableNames(String dbName) {
-        List<TableIdentifier> tableIdentifiers = 
catalog.listTables(getNamespace(dbName));
-        return 
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+        try {
+            return preExecutionAuthenticator.execute(() -> {
+                List<TableIdentifier> tableIdentifiers = 
catalog.listTables(getNamespace(dbName));
+                return 
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list table names, error 
message is:" + e.getMessage(), e);
+        }
     }
 
     @Override
@@ -270,7 +284,11 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public Table loadTable(String dbName, String tblName) {
-        return catalog.loadTable(getTableIdentifier(dbName, tblName));
+        try {
+            return preExecutionAuthenticator.execute(() -> 
catalog.loadTable(getTableIdentifier(dbName, tblName)));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load table, error message 
is:" + e.getMessage(), e);
+        }
     }
 
     private TableIdentifier getTableIdentifier(String dbName, String tblName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 29c07be8192..756c9024cdc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalTable;
@@ -52,7 +53,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
@@ -72,11 +72,12 @@ import org.apache.logging.log4j.Logger;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class IcebergScanNode extends FileQueryScanNode {
 
@@ -94,6 +95,11 @@ public class IcebergScanNode extends FileQueryScanNode {
     // And for split level count push down opt, the flag is set in each split.
     private boolean tableLevelPushDownCount = false;
     private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
+    private long targetSplitSize;
+    private ConcurrentHashMap.KeySetView<Object, Boolean> partitionPathSet;
+    private boolean isPartitionedTable;
+    private int formatVersion;
+    private PreExecutionAuthenticator preExecutionAuthenticator;
 
     /**
      * External file scan node for Query iceberg table
@@ -128,6 +134,11 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     protected void doInitialize() throws UserException {
         icebergTable = source.getIcebergTable();
+        targetSplitSize = getRealFileSplitSize(0);
+        partitionPathSet = ConcurrentHashMap.newKeySet();
+        isPartitionedTable = icebergTable.spec().isPartitioned();
+        formatVersion = ((BaseTable) 
icebergTable).operations().current().formatVersion();
+        preExecutionAuthenticator = 
source.getCatalog().getPreExecutionAuthenticator();
         super.doInitialize();
     }
 
@@ -142,7 +153,6 @@ public class IcebergScanNode extends FileQueryScanNode {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
         TIcebergFileDesc fileDesc = new TIcebergFileDesc();
-        int formatVersion = icebergSplit.getFormatVersion();
         fileDesc.setFormatVersion(formatVersion);
         fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
         if (tableLevelPushDownCount) {
@@ -184,14 +194,50 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     public List<Split> getSplits(int numBackends) throws UserException {
         try {
-            return 
source.getCatalog().getPreExecutionAuthenticator().execute(() -> 
doGetSplits(numBackends));
+            return preExecutionAuthenticator.execute(() -> 
doGetSplits(numBackends));
         } catch (Exception e) {
             throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), 
e);
         }
+    }
 
+    @Override
+    public void startSplit(int numBackends) throws UserException {
+        try {
+            preExecutionAuthenticator.execute(() -> {
+                doStartSplit();
+                return null;
+            });
+        } catch (Exception e) {
+            throw new UserException(e.getMessage(), e);
+        }
     }
 
-    private List<Split> doGetSplits(int numBackends) throws UserException {
+    public void doStartSplit() throws UserException {
+        TableScan scan = createTableScan();
+        CompletableFuture.runAsync(() -> {
+            try {
+                preExecutionAuthenticator.execute(
+                        () -> {
+                            CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan);
+
+                            // 1. this task should stop when all splits are 
assigned
+                            // 2. if we want to stop this plan, we can close 
the fileScanTasks to stop
+                            splitAssignment.addCloseable(fileScanTasks);
+
+                            fileScanTasks.forEach(fileScanTask ->
+                                    
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask))));
+
+                            return null;
+                        }
+                );
+                splitAssignment.finishSchedule();
+            } catch (Exception e) {
+                splitAssignment.setException(new UserException(e.getMessage(), 
e));
+            }
+        });
+    }
+
+    private TableScan createTableScan() throws UserException {
         TableScan scan = icebergTable.newScan();
 
         // set snapshot
@@ -213,16 +259,16 @@ public class IcebergScanNode extends FileQueryScanNode {
             this.pushdownIcebergPredicates.add(predicate.toString());
         }
 
-        // get splits
-        List<Split> splits = new ArrayList<>();
-        int formatVersion = ((BaseTable) 
icebergTable).operations().current().formatVersion();
-        HashSet<String> partitionPathSet = new HashSet<>();
-        boolean isPartitionedTable = icebergTable.spec().isPartitioned();
+        scan = scan.planWith(source.getCatalog().getThreadPoolWithPreAuth());
+
+        return scan;
+    }
 
-        long realFileSplitSize = getRealFileSplitSize(0);
-        CloseableIterable<FileScanTask> fileScanTasks = null;
+    private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+        long targetSplitSize = getRealFileSplitSize(0);
+        CloseableIterable<FileScanTask> splitFiles;
         try {
-            fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), 
realFileSplitSize);
+            splitFiles = TableScanUtil.splitFiles(scan.planFiles(), 
targetSplitSize);
         } catch (NullPointerException e) {
             /*
         Caused by: java.lang.NullPointerException: Type cannot be null
@@ -253,33 +299,45 @@ public class IcebergScanNode extends FileQueryScanNode {
             LOG.warn("Iceberg TableScanUtil.splitFiles throw 
NullPointerException. Cause : ", e);
             throw new NotSupportedException("Unable to read Iceberg table with 
dropped old partition column.");
         }
-        try (CloseableIterable<CombinedScanTask> combinedScanTasks =
-                     TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 
1, 0)) {
-            combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
-                if (isPartitionedTable) {
-                    StructLike structLike = splitTask.file().partition();
-                    // Counts the number of partitions read
-                    partitionPathSet.add(structLike.toString());
-                }
-                String originalPath = splitTask.file().path().toString();
-                LocationPath locationPath = new LocationPath(originalPath, 
source.getCatalog().getProperties());
-                IcebergSplit split = new IcebergSplit(
-                        locationPath,
-                        splitTask.start(),
-                        splitTask.length(),
-                        splitTask.file().fileSizeInBytes(),
-                        new String[0],
-                        formatVersion,
-                        source.getCatalog().getProperties(),
-                        new ArrayList<>(),
-                        originalPath);
-                split.setTargetSplitSize(realFileSplitSize);
-                if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
-                    
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
-                }
-                split.setTableFormatType(TableFormatType.ICEBERG);
+        return splitFiles;
+    }
+
+    private Split createIcebergSplit(FileScanTask fileScanTask) {
+        if (isPartitionedTable) {
+            StructLike structLike = fileScanTask.file().partition();
+            // Counts the number of partitions read
+            partitionPathSet.add(structLike.toString());
+        }
+        String originalPath = fileScanTask.file().path().toString();
+        LocationPath locationPath = new LocationPath(originalPath, 
source.getCatalog().getProperties());
+        IcebergSplit split = new IcebergSplit(
+                locationPath,
+                fileScanTask.start(),
+                fileScanTask.length(),
+                fileScanTask.file().fileSizeInBytes(),
+                new String[0],
+                formatVersion,
+                source.getCatalog().getProperties(),
+                new ArrayList<>(),
+                originalPath);
+        if (!fileScanTask.deletes().isEmpty()) {
+            split.setDeleteFileFilters(getDeleteFileFilters(fileScanTask));
+        }
+        split.setTableFormatType(TableFormatType.ICEBERG);
+        split.setTargetSplitSize(targetSplitSize);
+        return split;
+    }
+
+    private List<Split> doGetSplits(int numBackends) throws UserException {
+
+        TableScan scan = createTableScan();
+        List<Split> splits = new ArrayList<>();
+
+        try (CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan)) {
+            fileScanTasks.forEach(taskGrp ->  {
+                Split split = createIcebergSplit(taskGrp);
                 splits.add(split);
-            }));
+            });
         } catch (IOException e) {
             throw new UserException(e.getMessage(), e.getCause());
         }
@@ -308,6 +366,12 @@ public class IcebergScanNode extends FileQueryScanNode {
         return splits;
     }
 
+    @Override
+    public boolean isBatchMode() {
+        // TODO Use a better judgment method to decide whether to use batch 
mode.
+        return sessionVariable.getNumPartitionsInBatchMode() > 1024;
+    }
+
     public Long getSpecifiedSnapshot() throws UserException {
         TableSnapshot tableSnapshot = getQueryTableSnapshot();
         if (tableSnapshot != null) {
@@ -460,4 +524,9 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
         ((IcebergSplit) splits.get(size - 
1)).setTableLevelRowCount(countPerSplit + totalCount % size);
     }
+
+    @Override
+    public int numApproximateSplits() {
+        return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ? 
partitionPathSet.size() : 1;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 0520612935a..e31ec5c3fad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -22,6 +22,7 @@ import org.apache.doris.datasource.FileSplit;
 
 import lombok.Data;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -35,7 +36,7 @@ public class IcebergSplit extends FileSplit {
     // but the original datafile path must be used.
     private final String originalPath;
     private Integer formatVersion;
-    private List<IcebergDeleteFileFilter> deleteFileFilters;
+    private List<IcebergDeleteFileFilter> deleteFileFilters = new 
ArrayList<>();
     private Map<String, String> config;
     // tableLevelRowCount will be set only table-level count push down opt is 
available.
     private long tableLevelRowCount = -1;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 23c2c91be6e..0e2a5ee4d03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -96,7 +96,7 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     protected SplitAssignment splitAssignment = null;
 
     protected long selectedPartitionNum = 0;
-    protected long selectedSplitNum = 0;
+    protected int selectedSplitNum = 0;
 
     // create a mapping between output slot's id and project expr
     Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index f68fd1423c3..ea4921bef85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1660,7 +1660,7 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"batch方式中BE获取splits的最大等待时间",
                     "The max wait time of getting splits in batch mode."},
             needForward = true)
-    public long fetchSplitsMaxWaitTime = 4000;
+    public long fetchSplitsMaxWaitTime = 1000;
 
     @VariableMgr.VarAttr(
             name = ENABLE_PARQUET_LAZY_MAT,
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
index 7e654175f9c..e74d3b07f93 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
@@ -19,6 +19,7 @@ suite("test_iceberg_filter", 
"p0,external,doris,external_docker,external_docker_
     String enabled = context.config.otherConfigs.get("enableIcebergTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
+            sql """set num_partitions_in_batch_mode=0"""
             String rest_port = 
context.config.otherConfigs.get("iceberg_rest_uri_port")
             String minio_port = 
context.config.otherConfigs.get("iceberg_minio_port")
             String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
@@ -91,6 +92,7 @@ suite("test_iceberg_filter", 
"p0,external,doris,external_docker,external_docker_
             }
 
         } finally {
+            sql """set num_partitions_in_batch_mode=1024"""
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to