This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 68e2692f48d8d21835d50bf329d9c24ef1c69f32 Author: Jingsong Lee <[email protected]> AuthorDate: Sat Jun 14 08:57:37 2025 +0800 [core] Limit max task number in ManifestReadThreadPool (#5737) --- .../org/apache/paimon/utils/ThreadPoolUtils.java | 16 +- .../paimon/utils/SemaphoredDelegatingExecutor.java | 184 +++++++++++++++++++++ .../paimon/utils/ManifestReadThreadPool.java | 27 ++- 3 files changed, 205 insertions(+), 22 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java b/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java index a4790583c5..7ad3425367 100644 --- a/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java +++ b/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java @@ -21,8 +21,6 @@ package org.apache.paimon.utils; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; -import javax.annotation.Nullable; - import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -36,8 +34,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -78,20 +74,12 @@ public class ThreadPoolUtils { return executor; } - public static ScheduledExecutorService createScheduledThreadPool( - int threadNum, String namePrefix) { - return new ScheduledThreadPoolExecutor(threadNum, newDaemonThreadFactory(namePrefix)); - } - /** This method aims to parallel process tasks with memory control and sequentially. */ public static <T, U> Iterable<T> sequentialBatchedExecute( - ThreadPoolExecutor executor, + ExecutorService executor, Function<U, List<T>> processor, List<U> input, - @Nullable Integer queueSize) { - if (queueSize == null) { - queueSize = executor.getMaximumPoolSize(); - } + int queueSize) { if (queueSize <= 0) { throw new NegativeArraySizeException("queue size should not be negative"); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java b/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java new file mode 100644 index 0000000000..bdbb23796b --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ForwardingExecutorService; +import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.Futures; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * A {@link ForwardingExecutorService} to delegate tasks to limit the number of tasks executed + * concurrently. + */ +public class SemaphoredDelegatingExecutor extends ForwardingExecutorService { + + private final Semaphore queueingPermits; + private final ExecutorService executorDelegated; + private final int permitCount; + + public SemaphoredDelegatingExecutor( + ExecutorService executorDelegated, int permitCount, boolean fair) { + this.permitCount = permitCount; + this.queueingPermits = new Semaphore(permitCount, fair); + this.executorDelegated = executorDelegated; + } + + @Override + protected ExecutorService delegate() { + return this.executorDelegated; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + try { + this.queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedFuture(e); + } + + return super.submit(new CallableWithPermitRelease(task)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + try { + this.queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedFuture(e); + } + + return super.submit(new RunnableWithPermitRelease(task), result); + } + + @Override + public Future<?> submit(Runnable task) { + try { + this.queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedFuture(e); + } + + return super.submit(new RunnableWithPermitRelease(task)); + } + + @Override + public void execute(Runnable command) { + try { + this.queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + super.execute(new RunnableWithPermitRelease(command)); + } + + public int getAvailablePermits() { + return this.queueingPermits.availablePermits(); + } + + public int getWaitingCount() { + return this.queueingPermits.getQueueLength(); + } + + public int getPermitCount() { + return this.permitCount; + } + + @Override + public String toString() { + return "SemaphoredDelegatingExecutor{" + + "permitCount=" + + getPermitCount() + + ", available=" + + getAvailablePermits() + + ", waiting=" + + getWaitingCount() + + '}'; + } + + private class RunnableWithPermitRelease implements Runnable { + + private final Runnable delegated; + + RunnableWithPermitRelease(Runnable delegated) { + this.delegated = delegated; + } + + @Override + public void run() { + try { + this.delegated.run(); + } finally { + SemaphoredDelegatingExecutor.this.queueingPermits.release(); + } + } + } + + private class CallableWithPermitRelease<T> implements Callable<T> { + + private final Callable<T> delegated; + + CallableWithPermitRelease(Callable<T> delegated) { + this.delegated = delegated; + } + + @Override + public T call() throws Exception { + T result; + try { + result = this.delegated.call(); + } finally { + SemaphoredDelegatingExecutor.this.queueingPermits.release(); + } + + return result; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java index 49fcfc8bd9..16bd73ee80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Function; @@ -35,28 +36,38 @@ public class ManifestReadThreadPool { private static ThreadPoolExecutor executorService = createCachedThreadPool(Runtime.getRuntime().availableProcessors(), THREAD_NAME); - public static synchronized ThreadPoolExecutor getExecutorService(@Nullable Integer threadNum) { - if (threadNum == null || threadNum <= executorService.getMaximumPoolSize()) { + public static synchronized ExecutorService getExecutorService(@Nullable Integer threadNum) { + if (threadNum == null || threadNum == executorService.getMaximumPoolSize()) { return executorService; } - // we don't need to close previous pool - // it is just cached pool - executorService = createCachedThreadPool(threadNum, THREAD_NAME); + if (threadNum < executorService.getMaximumPoolSize()) { + return new SemaphoredDelegatingExecutor(executorService, threadNum, false); + } else { + // we don't need to close previous pool + // it is just cached pool + executorService = createCachedThreadPool(threadNum, THREAD_NAME); - return executorService; + return executorService; + } } /** This method aims to parallel process tasks with memory control and sequentially. */ public static <T, U> Iterable<T> sequentialBatchedExecute( Function<U, List<T>> processor, List<U> input, @Nullable Integer threadNum) { - ThreadPoolExecutor executor = getExecutorService(threadNum); + ExecutorService executor = getExecutorService(threadNum); + if (threadNum == null) { + threadNum = + executor instanceof ThreadPoolExecutor + ? ((ThreadPoolExecutor) executor).getMaximumPoolSize() + : ((SemaphoredDelegatingExecutor) executor).getPermitCount(); + } return ThreadPoolUtils.sequentialBatchedExecute(executor, processor, input, threadNum); } /** This method aims to parallel process tasks with randomly but return values sequentially. */ public static <T, U> Iterator<T> randomlyExecuteSequentialReturn( Function<U, List<T>> processor, List<U> input, @Nullable Integer threadNum) { - ThreadPoolExecutor executor = getExecutorService(threadNum); + ExecutorService executor = getExecutorService(threadNum); return ThreadPoolUtils.randomlyExecuteSequentialReturn(executor, processor, input); } }
