This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1-lakehouse in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8d24b929fa515fd6fd14937806f81528c04a81e0 Author: wuwenchi <[email protected]> AuthorDate: Fri Jan 24 11:10:21 2025 +0800 [opt](iceberg)support iceberg in batch mode (#46398) 1. use thread pool in planFiles. 2. support batch mode. 3. When executing `doAs`, if the exception type is `UndeclaredThrowableException`, we should get the specific cause inside. Because the message of UndeclaredThrowableException is null, the message will not be obtained if the exception is captured externally 4. Modify the default configuration: fetch_splits_max_wait_time_ms: from 4000 to 1000 remote_split_source_batch_size: from 10240 to 1000 --- be/src/common/config.cpp | 2 +- .../authentication/HadoopAuthenticator.java | 7 + .../authentication/PreExecutionAuthenticator.java | 48 ++----- .../org/apache/doris/common/ThreadPoolManager.java | 69 ++++++++++ .../apache/doris/datasource/ExternalCatalog.java | 6 + .../apache/doris/datasource/FileQueryScanNode.java | 8 +- .../apache/doris/datasource/SplitAssignment.java | 30 ++++- .../apache/doris/datasource/SplitGenerator.java | 2 +- .../datasource/iceberg/IcebergExternalCatalog.java | 15 +++ .../iceberg/IcebergHMSExternalCatalog.java | 7 - .../datasource/iceberg/IcebergMetadataOps.java | 28 +++- .../datasource/iceberg/source/IcebergScanNode.java | 147 +++++++++++++++------ .../datasource/iceberg/source/IcebergSplit.java | 3 +- .../java/org/apache/doris/planner/ScanNode.java | 2 +- .../java/org/apache/doris/qe/SessionVariable.java | 2 +- .../iceberg/test_iceberg_filter.groovy | 2 + 16 files changed, 280 insertions(+), 98 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f047071139e..25726a2f47c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -287,7 +287,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b return true; }); DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8"); -DEFINE_Int32(remote_split_source_batch_size, "10240"); +DEFINE_Int32(remote_split_source_batch_size, "1000"); DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1"); // number of olap scanner thread pool queue size DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400"); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java index c3cab5f410b..88d32a593e1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -20,6 +20,7 @@ package org.apache.doris.common.security.authentication; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; public interface HadoopAuthenticator { @@ -31,6 +32,12 @@ public interface HadoopAuthenticator { return getUGI().doAs(action); } catch (InterruptedException e) { throw new IOException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java index 6260833b7db..a64dd4cf717 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java @@ -17,7 +17,6 @@ package org.apache.doris.common.security.authentication; -import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; /** @@ -54,14 +53,26 @@ public class PreExecutionAuthenticator { public <T> T execute(Callable<T> task) throws Exception { if (hadoopAuthenticator != null) { // Adapts Callable to PrivilegedExceptionAction for use with Hadoop authentication - PrivilegedExceptionAction<T> action = new CallableToPrivilegedExceptionActionAdapter<>(task); - return hadoopAuthenticator.doAs(action); + return hadoopAuthenticator.doAs(task::call); } else { // Executes the task directly if no authentication is needed return task.call(); } } + public void execute(Runnable task) throws Exception { + if (hadoopAuthenticator != null) { + // Adapts Runnable to PrivilegedExceptionAction for use with Hadoop authentication + hadoopAuthenticator.doAs(() -> { + task.run(); + return null; + }); + } else { + // Executes the task directly if no authentication is needed + task.run(); + } + } + /** * Retrieves the current HadoopAuthenticator. * <p>This allows checking if a HadoopAuthenticator is configured or @@ -82,35 +93,4 @@ public class PreExecutionAuthenticator { public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { this.hadoopAuthenticator = hadoopAuthenticator; } - - /** - * Adapter class to convert a Callable into a PrivilegedExceptionAction. - * <p>This is necessary to run the task within a privileged context, - * particularly for Hadoop operations with Kerberos. - * - * @param <T> The type of result returned by the action - */ - public class CallableToPrivilegedExceptionActionAdapter<T> implements PrivilegedExceptionAction<T> { - private final Callable<T> callable; - - /** - * Constructs an adapter that wraps a Callable into a PrivilegedExceptionAction. - * - * @param callable The Callable to be adapted - */ - public CallableToPrivilegedExceptionActionAdapter(Callable<T> callable) { - this.callable = callable; - } - - /** - * Executes the wrapped Callable as a PrivilegedExceptionAction. - * - * @return The result of the callable's call method - * @throws Exception If an exception occurs during callable execution - */ - @Override - public T run() throws Exception { - return callable.call(); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 3be5af8ac54..3bebd4145a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.doris.common; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.metric.Metric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; @@ -33,6 +34,7 @@ import java.util.Comparator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -69,6 +71,7 @@ import java.util.function.Supplier; public class ThreadPoolManager { + private static final Logger LOG = LogManager.getLogger(ThreadPoolManager.class); private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap = Maps.newConcurrentMap(); private static String[] poolMetricTypes = {"pool_size", "active_thread_num", "task_in_queue"}; @@ -133,6 +136,17 @@ public class ThreadPoolManager { poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth( + int numThread, + int queueSize, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + return newDaemonThreadPoolWithPreAuth(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 60), + poolName, needRegisterMetric, preAuth); + } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, int timeoutSeconds, boolean needRegisterMetric) { @@ -222,6 +236,40 @@ public class ThreadPoolManager { return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(poolName + "-%d").build(); } + + public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + RejectedExecutionHandler handler, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, preAuth); + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + keepAliveTime, unit, workQueue, threadFactory, handler); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, threadPool); + } + return threadPool; + } + + private static ThreadFactory namedThreadFactoryWithPreAuth(String poolName, PreExecutionAuthenticator preAuth) { + return new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(poolName + "-%d") + .setThreadFactory(runnable -> new Thread(() -> { + try { + preAuth.execute(runnable); + } catch (Exception e) { + throw new RuntimeException(e); + } + })) + .build(); + } + private static class PriorityThreadPoolExecutor<T> extends ThreadPoolExecutor { private final Comparator<T> comparator; @@ -377,4 +425,25 @@ public class ThreadPoolManager { } } } + + public static void shutdownExecutorService(ExecutorService executorService) { + // Disable new tasks from being submitted + executorService.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + // Cancel currently executing tasks + executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("ExecutorService did not terminate"); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 778c110df0b..b8ccdee3a87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -35,6 +35,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; @@ -90,6 +91,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; /** @@ -151,6 +153,7 @@ public abstract class ExternalCatalog protected Optional<Boolean> useMetaCache = Optional.empty(); protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache; protected PreExecutionAuthenticator preExecutionAuthenticator; + protected ThreadPoolExecutor threadPoolWithPreAuth; private volatile Configuration cachedConf = null; private byte[] confLock = new byte[0]; @@ -715,6 +718,9 @@ public abstract class ExternalCatalog @Override public void onClose() { removeAccessController(); + if (threadPoolWithPreAuth != null) { + ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth); + } CatalogIf.super.onClose(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 457ee88fec5..1b252d0ae3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -332,17 +332,19 @@ public abstract class FileQueryScanNode extends FileScanNode { if (splitAssignment.getSampleSplit() == null && !isFileStreamType()) { return; } - selectedSplitNum = numApproximateSplits(); - FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit(); TFileType locationType = fileSplit.getLocationType(); + selectedSplitNum = numApproximateSplits(); + if (selectedSplitNum < 0) { + throw new UserException("Approximate split number should not be negative"); + } totalFileSize = fileSplit.getLength() * selectedSplitNum; long maxWaitTime = sessionVariable.getFetchSplitsMaxWaitTime(); // Not accurate, only used to estimate concurrency. // Here, we must take the max of 1, because // in the case of multiple BEs, `numApproximateSplits() / backendPolicy.numBackends()` may be 0, // and finally numSplitsPerBE is 0, resulting in no data being queried. - int numSplitsPerBE = Math.max(numApproximateSplits() / backendPolicy.numBackends(), 1); + int numSplitsPerBE = Math.max(selectedSplitNum / backendPolicy.numBackends(), 1); for (Backend backend : backendPolicy.getBackends()) { SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime); splitSources.add(splitSource); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java index a26abc7fc5e..470f4092d21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java @@ -23,7 +23,10 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.collect.Multimap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -40,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * `SplitGenerator` provides the file splits, and `FederationBackendPolicy` assigns these splits to backends. */ public class SplitAssignment { + private static final Logger LOG = LogManager.getLogger(SplitAssignment.class); private final Set<Long> sources = new HashSet<>(); private final FederationBackendPolicy backendPolicy; private final SplitGenerator splitGenerator; @@ -50,10 +54,11 @@ public class SplitAssignment { private final List<String> pathPartitionKeys; private final Object assignLock = new Object(); private Split sampleSplit = null; - private final AtomicBoolean isStop = new AtomicBoolean(false); + private final AtomicBoolean isStopped = new AtomicBoolean(false); private final AtomicBoolean scheduleFinished = new AtomicBoolean(false); private UserException exception = null; + private final List<Closeable> closeableResources = new ArrayList<>(); public SplitAssignment( FederationBackendPolicy backendPolicy, @@ -85,7 +90,7 @@ public class SplitAssignment { } private boolean waitFirstSplit() { - return !scheduleFinished.get() && !isStop.get() && exception == null; + return !scheduleFinished.get() && !isStopped.get() && exception == null; } private void appendBatch(Multimap<Backend, Split> batch) throws UserException { @@ -150,7 +155,7 @@ public class SplitAssignment { } BlockingQueue<Collection<TScanRangeLocations>> splits = assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>()); - if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) { + if (scheduleFinished.get() && splits.isEmpty() || isStopped.get()) { return null; } return splits; @@ -167,11 +172,26 @@ public class SplitAssignment { } public void stop() { - isStop.set(true); + if (isStop()) { + return; + } + isStopped.set(true); + closeableResources.forEach((closeable) -> { + try { + closeable.close(); + } catch (Exception e) { + LOG.warn("close resource error:{}", e.getMessage(), e); + // ignore + } + }); notifyAssignment(); } public boolean isStop() { - return isStop.get(); + return isStopped.get(); + } + + public void addCloseable(Closeable resource) { + closeableResources.add(resource); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java index 34ff3911445..391552a5106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java @@ -52,7 +52,7 @@ public interface SplitGenerator { return -1; } - default void startSplit(int numBackends) { + default void startSplit(int numBackends) throws UserException { } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 0fa69825a01..6a3265388f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,6 +17,9 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; @@ -43,6 +46,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; protected String icebergCatalogType; protected Catalog catalog; + private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16; public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); @@ -54,9 +58,20 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { @Override protected void initLocalObjectsImpl() { preExecutionAuthenticator = new PreExecutionAuthenticator(); + // TODO If the storage environment does not support Kerberos (such as s3), + // there is no need to generate a simple authentication information anymore. + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + preExecutionAuthenticator.setHadoopAuthenticator(authenticator); initCatalog(); IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); metadataOps = ops; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index c5a99c157ce..51d39357b81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -17,8 +17,6 @@ package org.apache.doris.datasource.iceberg; -import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.property.PropertyConverter; @@ -37,11 +35,6 @@ public class IcebergHMSExternalCatalog extends IcebergExternalCatalog { protected void initCatalog() { icebergCatalogType = ICEBERG_HMS; catalog = IcebergUtils.createIcebergHiveCatalog(this, getName()); - if (preExecutionAuthenticator.getHadoopAuthenticator() == null) { - AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); - HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); - preExecutionAuthenticator.setHadoopAuthenticator(authenticator); - } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index b5322e292ee..58a68f59e27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -90,11 +90,19 @@ public class IcebergMetadataOps implements ExternalMetadataOps { @Override public boolean tableExist(String dbName, String tblName) { - return catalog.tableExists(getTableIdentifier(dbName, tblName)); + try { + return preExecutionAuthenticator.execute(() -> catalog.tableExists(getTableIdentifier(dbName, tblName))); + } catch (Exception e) { + throw new RuntimeException("Failed to check table exist, error message is:" + e.getMessage(), e); + } } public boolean databaseExist(String dbName) { - return nsCatalog.namespaceExists(getNamespace(dbName)); + try { + return preExecutionAuthenticator.execute(() -> nsCatalog.namespaceExists(getNamespace(dbName))); + } catch (Exception e) { + throw new RuntimeException("Failed to check database exist, error message is:" + e.getMessage(), e); + } } public List<String> listDatabaseNames() { @@ -110,8 +118,14 @@ public class IcebergMetadataOps implements ExternalMetadataOps { @Override public List<String> listTableNames(String dbName) { - List<TableIdentifier> tableIdentifiers = catalog.listTables(getNamespace(dbName)); - return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList()); + try { + return preExecutionAuthenticator.execute(() -> { + List<TableIdentifier> tableIdentifiers = catalog.listTables(getNamespace(dbName)); + return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList()); + }); + } catch (Exception e) { + throw new RuntimeException("Failed to list table names, error message is:" + e.getMessage(), e); + } } @Override @@ -270,7 +284,11 @@ public class IcebergMetadataOps implements ExternalMetadataOps { @Override public Table loadTable(String dbName, String tblName) { - return catalog.loadTable(getTableIdentifier(dbName, tblName)); + try { + return preExecutionAuthenticator.execute(() -> catalog.loadTable(getTableIdentifier(dbName, tblName))); + } catch (Exception e) { + throw new RuntimeException("Failed to load table, error message is:" + e.getMessage(), e); + } } private TableIdentifier getTableIdentifier(String dbName, String tblName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 29c07be8192..756c9024cdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalTable; @@ -52,7 +53,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.iceberg.BaseTable; -import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; @@ -72,11 +72,12 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class IcebergScanNode extends FileQueryScanNode { @@ -94,6 +95,11 @@ public class IcebergScanNode extends FileQueryScanNode { // And for split level count push down opt, the flag is set in each split. private boolean tableLevelPushDownCount = false; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; + private long targetSplitSize; + private ConcurrentHashMap.KeySetView<Object, Boolean> partitionPathSet; + private boolean isPartitionedTable; + private int formatVersion; + private PreExecutionAuthenticator preExecutionAuthenticator; /** * External file scan node for Query iceberg table @@ -128,6 +134,11 @@ public class IcebergScanNode extends FileQueryScanNode { @Override protected void doInitialize() throws UserException { icebergTable = source.getIcebergTable(); + targetSplitSize = getRealFileSplitSize(0); + partitionPathSet = ConcurrentHashMap.newKeySet(); + isPartitionedTable = icebergTable.spec().isPartitioned(); + formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); + preExecutionAuthenticator = source.getCatalog().getPreExecutionAuthenticator(); super.doInitialize(); } @@ -142,7 +153,6 @@ public class IcebergScanNode extends FileQueryScanNode { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); TIcebergFileDesc fileDesc = new TIcebergFileDesc(); - int formatVersion = icebergSplit.getFormatVersion(); fileDesc.setFormatVersion(formatVersion); fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); if (tableLevelPushDownCount) { @@ -184,14 +194,50 @@ public class IcebergScanNode extends FileQueryScanNode { @Override public List<Split> getSplits(int numBackends) throws UserException { try { - return source.getCatalog().getPreExecutionAuthenticator().execute(() -> doGetSplits(numBackends)); + return preExecutionAuthenticator.execute(() -> doGetSplits(numBackends)); } catch (Exception e) { throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e); } + } + @Override + public void startSplit(int numBackends) throws UserException { + try { + preExecutionAuthenticator.execute(() -> { + doStartSplit(); + return null; + }); + } catch (Exception e) { + throw new UserException(e.getMessage(), e); + } } - private List<Split> doGetSplits(int numBackends) throws UserException { + public void doStartSplit() throws UserException { + TableScan scan = createTableScan(); + CompletableFuture.runAsync(() -> { + try { + preExecutionAuthenticator.execute( + () -> { + CloseableIterable<FileScanTask> fileScanTasks = planFileScanTask(scan); + + // 1. this task should stop when all splits are assigned + // 2. if we want to stop this plan, we can close the fileScanTasks to stop + splitAssignment.addCloseable(fileScanTasks); + + fileScanTasks.forEach(fileScanTask -> + splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask)))); + + return null; + } + ); + splitAssignment.finishSchedule(); + } catch (Exception e) { + splitAssignment.setException(new UserException(e.getMessage(), e)); + } + }); + } + + private TableScan createTableScan() throws UserException { TableScan scan = icebergTable.newScan(); // set snapshot @@ -213,16 +259,16 @@ public class IcebergScanNode extends FileQueryScanNode { this.pushdownIcebergPredicates.add(predicate.toString()); } - // get splits - List<Split> splits = new ArrayList<>(); - int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); - HashSet<String> partitionPathSet = new HashSet<>(); - boolean isPartitionedTable = icebergTable.spec().isPartitioned(); + scan = scan.planWith(source.getCatalog().getThreadPoolWithPreAuth()); + + return scan; + } - long realFileSplitSize = getRealFileSplitSize(0); - CloseableIterable<FileScanTask> fileScanTasks = null; + private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { + long targetSplitSize = getRealFileSplitSize(0); + CloseableIterable<FileScanTask> splitFiles; try { - fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize); + splitFiles = TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); } catch (NullPointerException e) { /* Caused by: java.lang.NullPointerException: Type cannot be null @@ -253,33 +299,45 @@ public class IcebergScanNode extends FileQueryScanNode { LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e); throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column."); } - try (CloseableIterable<CombinedScanTask> combinedScanTasks = - TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) { - combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { - if (isPartitionedTable) { - StructLike structLike = splitTask.file().partition(); - // Counts the number of partitions read - partitionPathSet.add(structLike.toString()); - } - String originalPath = splitTask.file().path().toString(); - LocationPath locationPath = new LocationPath(originalPath, source.getCatalog().getProperties()); - IcebergSplit split = new IcebergSplit( - locationPath, - splitTask.start(), - splitTask.length(), - splitTask.file().fileSizeInBytes(), - new String[0], - formatVersion, - source.getCatalog().getProperties(), - new ArrayList<>(), - originalPath); - split.setTargetSplitSize(realFileSplitSize); - if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { - split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); - } - split.setTableFormatType(TableFormatType.ICEBERG); + return splitFiles; + } + + private Split createIcebergSplit(FileScanTask fileScanTask) { + if (isPartitionedTable) { + StructLike structLike = fileScanTask.file().partition(); + // Counts the number of partitions read + partitionPathSet.add(structLike.toString()); + } + String originalPath = fileScanTask.file().path().toString(); + LocationPath locationPath = new LocationPath(originalPath, source.getCatalog().getProperties()); + IcebergSplit split = new IcebergSplit( + locationPath, + fileScanTask.start(), + fileScanTask.length(), + fileScanTask.file().fileSizeInBytes(), + new String[0], + formatVersion, + source.getCatalog().getProperties(), + new ArrayList<>(), + originalPath); + if (!fileScanTask.deletes().isEmpty()) { + split.setDeleteFileFilters(getDeleteFileFilters(fileScanTask)); + } + split.setTableFormatType(TableFormatType.ICEBERG); + split.setTargetSplitSize(targetSplitSize); + return split; + } + + private List<Split> doGetSplits(int numBackends) throws UserException { + + TableScan scan = createTableScan(); + List<Split> splits = new ArrayList<>(); + + try (CloseableIterable<FileScanTask> fileScanTasks = planFileScanTask(scan)) { + fileScanTasks.forEach(taskGrp -> { + Split split = createIcebergSplit(taskGrp); splits.add(split); - })); + }); } catch (IOException e) { throw new UserException(e.getMessage(), e.getCause()); } @@ -308,6 +366,12 @@ public class IcebergScanNode extends FileQueryScanNode { return splits; } + @Override + public boolean isBatchMode() { + // TODO Use a better judgment method to decide whether to use batch mode. + return sessionVariable.getNumPartitionsInBatchMode() > 1024; + } + public Long getSpecifiedSnapshot() throws UserException { TableSnapshot tableSnapshot = getQueryTableSnapshot(); if (tableSnapshot != null) { @@ -460,4 +524,9 @@ public class IcebergScanNode extends FileQueryScanNode { } ((IcebergSplit) splits.get(size - 1)).setTableLevelRowCount(countPerSplit + totalCount % size); } + + @Override + public int numApproximateSplits() { + return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ? partitionPathSet.size() : 1; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 0520612935a..e31ec5c3fad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -22,6 +22,7 @@ import org.apache.doris.datasource.FileSplit; import lombok.Data; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -35,7 +36,7 @@ public class IcebergSplit extends FileSplit { // but the original datafile path must be used. private final String originalPath; private Integer formatVersion; - private List<IcebergDeleteFileFilter> deleteFileFilters; + private List<IcebergDeleteFileFilter> deleteFileFilters = new ArrayList<>(); private Map<String, String> config; // tableLevelRowCount will be set only table-level count push down opt is available. private long tableLevelRowCount = -1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 23c2c91be6e..0e2a5ee4d03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -96,7 +96,7 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { protected SplitAssignment splitAssignment = null; protected long selectedPartitionNum = 0; - protected long selectedSplitNum = 0; + protected int selectedSplitNum = 0; // create a mapping between output slot's id and project expr Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index f68fd1423c3..ea4921bef85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1660,7 +1660,7 @@ public class SessionVariable implements Serializable, Writable { description = {"batch方式中BE获取splits的最大等待时间", "The max wait time of getting splits in batch mode."}, needForward = true) - public long fetchSplitsMaxWaitTime = 4000; + public long fetchSplitsMaxWaitTime = 1000; @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy index 7e654175f9c..e74d3b07f93 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy @@ -19,6 +19,7 @@ suite("test_iceberg_filter", "p0,external,doris,external_docker,external_docker_ String enabled = context.config.otherConfigs.get("enableIcebergTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { try { + sql """set num_partitions_in_batch_mode=0""" String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") String minio_port = context.config.otherConfigs.get("iceberg_minio_port") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -91,6 +92,7 @@ suite("test_iceberg_filter", "p0,external,doris,external_docker,external_docker_ } } finally { + sql """set num_partitions_in_batch_mode=1024""" } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
