This is an automated email from the ASF dual-hosted git repository. wujimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push: new 509be4e [SCB-1753]accessor problem fix: LinkedBlockingQueueEx queue features 509be4e is described below commit 509be4ee98eedacef43fb8739ea6636e36478873 Author: liubao <bi...@qq.com> AuthorDate: Mon Aug 3 20:55:10 2020 +0800 [SCB-1753]accessor problem fix: LinkedBlockingQueueEx queue features --- .../servicecomb/core/executor/GroupExecutor.java | 2 +- .../core/executor/LinkedBlockingQueueEx.java | 63 ++++++---------------- .../core/executor/ThreadPoolExecutorEx.java | 13 ++++- .../core/executor/TestThreadPoolExecutorEx.java | 14 +++-- .../it/edge/handler/ExceptionConvertHandler.java | 2 - 5 files changed, 40 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java b/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java index 41371b4..535b8a2 100644 --- a/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java +++ b/core/src/main/java/org/apache/servicecomb/core/executor/GroupExecutor.java @@ -87,7 +87,7 @@ public class GroupExecutor implements Executor, Closeable { maxThreads, maxIdleInSecond, TimeUnit.SECONDS, - new LinkedBlockingQueueEx<>(maxQueueSize), + new LinkedBlockingQueueEx(maxQueueSize), factory); executorList.add(executor); } diff --git a/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java b/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java index 57b8450..aba525f 100644 --- a/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java +++ b/core/src/main/java/org/apache/servicecomb/core/executor/LinkedBlockingQueueEx.java @@ -16,81 +16,50 @@ */ package org.apache.servicecomb.core.executor; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.RejectedExecutionException; -public class LinkedBlockingQueueEx<E> extends LinkedBlockingQueue<E> { +public class LinkedBlockingQueueEx extends LinkedBlockingQueue<Runnable> { private static final long serialVersionUID = -1L; - private static final int COUNT_BITS = Integer.SIZE - 3; - - private static final int CAPACITY = (1 << COUNT_BITS) - 1; - - private static int workerCountOf(int c) { - return c & CAPACITY; - } - - private static Method addWrokerMethod; - private transient volatile ThreadPoolExecutorEx owner = null; - private AtomicInteger ctl; - public LinkedBlockingQueueEx(int capacity) { super(capacity); } public void setOwner(ThreadPoolExecutorEx owner) { this.owner = owner; - try { - addWrokerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addWorker", Runnable.class, boolean.class); - addWrokerMethod.setAccessible(true); - - Field field = ThreadPoolExecutor.class.getDeclaredField("ctl"); - field.setAccessible(true); - ctl = (AtomicInteger) field.get(owner); - } catch (Throwable e) { - throw new IllegalStateException("failed to init queue.", e); - } } @Override - public boolean offer(E runnable) { + public boolean offer(Runnable runnable) { // task can come before owner available if (owner == null) { return super.offer(runnable); } - // can not create more thread, just queue the task - if (workerCountOf(ctl.get()) == owner.getMaximumPoolSize()) { + if (owner.getPoolSize() == owner.getMaximumPoolSize()) { return super.offer(runnable); } // no need to create more thread, just queue the task - if (owner.getNotFinished() <= workerCountOf(ctl.get())) { + if (owner.getNotFinished() <= owner.getPoolSize()) { return super.offer(runnable); } // all threads are busy, and can create new thread, not queue the task - if (workerCountOf(ctl.get()) < owner.getMaximumPoolSize()) { - try { - // low frequency event, reflect is no problem - if (!(Boolean) addWrokerMethod.invoke(owner, runnable, false)) { - // failed to create new thread, queue the task - // if failed to queue the task, owner will try to addWorker again, - // if still failed, the will reject the task - return super.offer(runnable); - } - - // create new thread successfully, treat it as queue success - return true; - } catch (Throwable e) { - // reflection exception, should never happened - return super.offer(runnable); - } + if (owner.getPoolSize() < owner.getMaximumPoolSize()) { + return false; } + return super.offer(runnable); + } + /* + * when task is rejected (thread pool if full), force the item onto queue. + */ + public boolean force(Runnable runnable) { + if (owner == null || owner.isShutdown()) { + throw new RejectedExecutionException("queue is not running."); + } return super.offer(runnable); } } diff --git a/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java b/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java index bbd7f23..c8e5b11 100644 --- a/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java +++ b/core/src/main/java/org/apache/servicecomb/core/executor/ThreadPoolExecutorEx.java @@ -42,7 +42,18 @@ public class ThreadPoolExecutorEx extends ThreadPoolExecutor { @Override public void execute(Runnable command) { submittedCount.incrementAndGet(); - super.execute(command); + try { + super.execute(command); + } catch (RejectedExecutionException e) { + if (getQueue() instanceof LinkedBlockingQueueEx) { + final LinkedBlockingQueueEx queue = (LinkedBlockingQueueEx) getQueue(); + if (!queue.force(command)) { + throw new RejectedExecutionException("thread pool queue is full"); + } + } else { + throw e; + } + } } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { diff --git a/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java b/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java index 12f421a..eaf4133 100644 --- a/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java +++ b/core/src/test/java/org/apache/servicecomb/core/executor/TestThreadPoolExecutorEx.java @@ -52,8 +52,8 @@ public class TestThreadPoolExecutorEx { } } - ThreadPoolExecutorEx executorEx = new ThreadPoolExecutorEx(2, 4, 60, TimeUnit.SECONDS, - new LinkedBlockingQueueEx<>(2), Executors.defaultThreadFactory()); + ThreadPoolExecutorEx executorEx = new ThreadPoolExecutorEx(2, 4, 2, TimeUnit.SECONDS, + new LinkedBlockingQueueEx(2), Executors.defaultThreadFactory()); public TestTask submitTask() { TestTask task = new TestTask(); @@ -139,11 +139,18 @@ public class TestThreadPoolExecutorEx { t4.quit(); t5.quit(); t6.quit(); + waitForResult(2, executorEx::getPoolSize); executorEx.shutdown(); } private void waitForResult(int expect, IntSupplier supplier) { + long max = 30000; + long waited = 0; + for (; ; ) { + if (waited > max) { + throw new IllegalStateException("timed out waiting."); + } int actual = supplier.getAsInt(); if (expect == actual) { return; @@ -151,7 +158,8 @@ public class TestThreadPoolExecutorEx { LOGGER.info("waiting for thread result, expect:{}, actual: {}.", expect, actual); try { - TimeUnit.MILLISECONDS.sleep(100); + TimeUnit.MILLISECONDS.sleep(200); + waited += 200; } catch (InterruptedException e) { throw new IllegalStateException(e); } diff --git a/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java b/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java index c03fc65..bc9ac39 100644 --- a/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java +++ b/integration-tests/it-edge/src/main/java/org/apache/servicecomb/it/edge/handler/ExceptionConvertHandler.java @@ -17,8 +17,6 @@ package org.apache.servicecomb.it.edge.handler; -import java.util.concurrent.TimeoutException; - import javax.ws.rs.core.Response.Status; import org.apache.servicecomb.core.Handler;