This is an automated email from the ASF dual-hosted git repository. elecharny pushed a commit to branch 2.1.X in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.1.X by this push: new 79059ca Patch for DIRMINA-1110 applied. I suspect it'll fix DIRMINA-1113... 79059ca is described below commit 79059ca54b1fe6b8e98790b0f8aad12e82fa6f93 Author: emmanuel lecharny <elecha...@apache.org> AuthorDate: Fri May 24 15:42:53 2019 +0200 Patch for DIRMINA-1110 applied. I suspect it'll fix DIRMINA-1113... --- .../filter/executor/OrderedThreadPoolExecutor.java | 25 +- .../executor/PriorityThreadPoolExecutor.java | 71 ++-- .../executor/PriorityThreadPoolExecutorTest.java | 359 ++++++++++----------- 3 files changed, 211 insertions(+), 244 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java index 65e97a0..aeada71 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java @@ -448,9 +448,6 @@ public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { // Get the session's queue of events SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session); - Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; - - boolean offerSession; // propose the new event to the event queue handler. If we // use a throttle queue handler, the message may be rejected @@ -459,30 +456,22 @@ public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { if (offerEvent) { // Ok, the message has been accepted - synchronized (tasksQueue) { + synchronized (sessionTasksQueue.tasksQueue) { // Inject the event into the executor taskQueue - tasksQueue.offer(event); + sessionTasksQueue.tasksQueue.offer(event); if (sessionTasksQueue.processingCompleted) { sessionTasksQueue.processingCompleted = false; - offerSession = true; - } else { - offerSession = false; + // Processing of the tasks queue of this session is currently not + // scheduled or underway. As new tasks have now been added, the + // session needs to be offered for processing. + waitingSessions.offer(session); } if (LOGGER.isDebugEnabled()) { - print(tasksQueue, event); + print(sessionTasksQueue.tasksQueue, event); } } - } else { - offerSession = false; - } - - if (offerSession) { - // As the tasksQueue was empty, the task has been executed - // immediately, so we can move the session to the queue - // of sessions waiting for completion. - waitingSessions.offer(session); } addWorkerIfNecessary(); diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java index 721005c..bd3ad65 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java @@ -205,28 +205,20 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { /** * Creates a new instance of a PrioritisedOrderedThreadPoolExecutor. * - * @param corePoolSize - * The initial pool sizePoolSize - * @param maximumPoolSize - * The maximum pool size - * @param keepAliveTime - * Default duration for a thread - * @param unit - * Time unit used for the keepAlive value - * @param threadFactory - * The factory used to create threads - * @param eventQueueHandler - * The queue used to store events + * @param corePoolSize The initial pool sizePoolSize + * @param maximumPoolSize The maximum pool size + * @param keepAliveTime Default duration for a thread + * @param unit Time unit used for the keepAlive value + * @param threadFactory The factory used to create threads + * @param eventQueueHandler The queue used to store events */ public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) { - // We have to initialize the pool with default values (0 and 1) in order - // to - // handle the exception in a better way. We can't add a try {} catch() - // {} + // We have to initialise the pool with default values (0 and 1) in order + // to handle the exception in a better way. We can't add a try {} catch() {} // around the super() call. - super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, - new AbortPolicy()); + super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), + threadFactory, new AbortPolicy()); if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { throw new IllegalArgumentException("corePoolSize: " + corePoolSize); @@ -260,12 +252,12 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { /** * Get the session's tasks queue. */ - private SessionQueue getSessionTasksQueue(IoSession session) { - SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + private SessionTasksQueue getSessionTasksQueue(IoSession session) { + SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE); if (queue == null) { - queue = new SessionQueue(); - SessionQueue oldQueue = (SessionQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue); + queue = new SessionTasksQueue(); + SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue); if (oldQueue != null) { queue = oldQueue; @@ -436,7 +428,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { continue; } - SessionQueue sessionTasksQueue = (SessionQueue) entry.getSession().getAttribute(TASKS_QUEUE); + SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) entry.getSession().getAttribute(TASKS_QUEUE); synchronized (sessionTasksQueue.tasksQueue) { @@ -496,10 +488,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { IoSession session = event.getSession(); // Get the session's queue of events - SessionQueue sessionTasksQueue = getSessionTasksQueue(session); - Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; - - boolean offerSession; + SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session); // propose the new event to the event queue handler. If we // use a throttle queue handler, the message may be rejected @@ -508,30 +497,22 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { if (offerEvent) { // Ok, the message has been accepted - synchronized (tasksQueue) { + synchronized (sessionTasksQueue.tasksQueue) { // Inject the event into the executor taskQueue - tasksQueue.offer(event); + sessionTasksQueue.tasksQueue.offer(event); if (sessionTasksQueue.processingCompleted) { sessionTasksQueue.processingCompleted = false; - offerSession = true; - } else { - offerSession = false; + // Processing of the tasks queue of this session is currently not + // scheduled or underway. As new tasks have now been added, the + // session needs to be offered for processing. + waitingSessions.offer(new SessionEntry(session, comparator)); } if (LOGGER.isDebugEnabled()) { - print(tasksQueue, event); + print(sessionTasksQueue.tasksQueue, event); } } - } else { - offerSession = false; - } - - if (offerSession) { - // As the tasksQueue was empty, the task has been executed - // immediately, so we can move the session to the queue - // of sessions waiting for completion. - waitingSessions.offer(new SessionEntry(session, comparator)); } addWorkerIfNecessary(); @@ -666,7 +647,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { checkTaskType(task); IoEvent event = (IoEvent) task; IoSession session = event.getSession(); - SessionQueue sessionTasksQueue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE); if (sessionTasksQueue == null) { return false; @@ -791,7 +772,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { return null; } - private void runTasks(SessionQueue sessionTasksQueue) { + private void runTasks(SessionTasksQueue sessionTasksQueue) { for (;;) { Runnable task; Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; @@ -832,7 +813,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { * A class used to store the ordered list of events to be processed by the * session, and the current task state. */ - private class SessionQueue { + private class SessionTasksQueue { /** A queue of ordered event waiting to be processed */ private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>(); diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java index fac0478..fe49857 100644 --- a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java @@ -53,16 +53,16 @@ public class PriorityThreadPoolExecutorTest { */ @Test public void fifoEntryTestNoComparatorSameSession() throws Exception { - // Set up fixture. - final IoSession session = new DummySession(); - final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null); - final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null); - - // Execute system under test. - final int result = first.compareTo(last); - - // Verify results. - assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result); + // Set up fixture. + IoSession session = new DummySession(); + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result); } /** @@ -75,16 +75,16 @@ public class PriorityThreadPoolExecutorTest { */ @Test public void fifoEntryTestNoComparatorDifferentSession() throws Exception { - // Set up fixture (the order in which the entries are created is - // relevant here!) - final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); - final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); - - // Execute system under test. - final int result = first.compareTo(last); - - // Verify results. - assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0); + // Set up fixture (the order in which the entries are created is + // relevant here!) + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0); } /** @@ -98,24 +98,25 @@ public class PriorityThreadPoolExecutorTest { */ @Test public void fifoEntryTestWithComparatorSameSession() throws Exception { - // Set up fixture. - final IoSession session = new DummySession(); - final int predeterminedResult = 3853; - final Comparator<IoSession> comparator = new Comparator<IoSession>() { - @Override - public int compare(IoSession o1, IoSession o2) { - return predeterminedResult; - } - }; - - final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); - final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); - - // Execute system under test. - final int result = first.compareTo(last); - - // Verify results. - assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result); + // Set up fixture. + IoSession session = new DummySession(); + final int predeterminedResult = 3853; + + Comparator<IoSession> comparator = new Comparator<IoSession>() { + @Override + public int compare(IoSession o1, IoSession o2) { + return predeterminedResult; + } + }; + + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result); } /** @@ -129,23 +130,25 @@ public class PriorityThreadPoolExecutorTest { */ @Test public void fifoEntryTestComparatorDifferentSession() throws Exception { - // Set up fixture (the order in which the entries are created is - // relevant here!) - final int predeterminedResult = 3853; - final Comparator<IoSession> comparator = new Comparator<IoSession>() { - @Override - public int compare(IoSession o1, IoSession o2) { - return predeterminedResult; - } - }; - final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); - final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); - - // Execute system under test. - final int result = first.compareTo(last); - - // Verify results. - assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result); + // Set up fixture (the order in which the entries are created is + // relevant here!) + final int predeterminedResult = 3853; + + Comparator<IoSession> comparator = new Comparator<IoSession>() { + @Override + public int compare(IoSession o1, IoSession o2) { + return predeterminedResult; + } + }; + + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result); } /** @@ -164,150 +167,144 @@ public class PriorityThreadPoolExecutorTest { */ @Test public void testPrioritisation() throws Throwable { - // Set up fixture. - final MockWorkFilter nextFilter = new MockWorkFilter(); - final List<LastActivityTracker> sessions = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - sessions.add(new LastActivityTracker()); - } - final LastActivityTracker preferredSession = sessions.get(4); // prefer - // an - // arbitrary - // session - // (but - // not the - // first - // or last - // session, - // for - // good - // measure). - final Comparator<IoSession> comparator = new UnfairComparator(preferredSession); - final int maximumPoolSize = 1; // keep this low, to force resource - // contention. - final int amountOfTasks = 400; - - final ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator); - final ExecutorFilter filter = new ExecutorFilter(executor); - - // Execute system under test. - int sessionIndex = 0; - for (int i = 0; i < amountOfTasks; i++) { - if (++sessionIndex >= sessions.size()) { - sessionIndex = 0; - } - - filter.messageReceived(nextFilter, sessions.get(sessionIndex), null); - - if (nextFilter.throwable != null) { - throw nextFilter.throwable; - } - } - - executor.shutdown(); - - // Verify results. - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - - for (final LastActivityTracker session : sessions) { - if (session != preferredSession) { - assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).", session.lastActivity > preferredSession.lastActivity); - } - } + // Set up fixture. + MockWorkFilter nextFilter = new MockWorkFilter(); + List<LastActivityTracker> sessions = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + sessions.add(new LastActivityTracker()); + } + + LastActivityTracker preferredSession = sessions.get(4); // prefer an arbitrary session + // (but not the first or last + // session, for good measure). + Comparator<IoSession> comparator = new UnfairComparator(preferredSession); + int maximumPoolSize = 1; // keep this low, to force resource contention. + int amountOfTasks = 400; + + ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator); + ExecutorFilter filter = new ExecutorFilter(executor); + + // Execute system under test. + int sessionIndex = 0; + for (int i = 0; i < amountOfTasks; i++) { + if (++sessionIndex >= sessions.size()) { + sessionIndex = 0; + } + + filter.messageReceived(nextFilter, sessions.get(sessionIndex), null); + + if (nextFilter.throwable != null) { + throw nextFilter.throwable; + } + } + + executor.shutdown(); + + // Verify results. + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + for (LastActivityTracker session : sessions) { + if (session != preferredSession) { + assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).", + session.lastActivity > preferredSession.lastActivity); + } + } } /** * A comparator that prefers a particular session. */ private static class UnfairComparator implements Comparator<IoSession> { - private final IoSession preferred; - - public UnfairComparator(IoSession preferred) { - this.preferred = preferred; - } - - @Override - public int compare(IoSession o1, IoSession o2) { - if (o1 == preferred) { - return -1; - } - - if (o2 == preferred) { - return 1; - } - - return 0; - } + private IoSession preferred; + + public UnfairComparator(IoSession preferred) { + this.preferred = preferred; + } + + @Override + public int compare(IoSession o1, IoSession o2) { + if (o1 == preferred) { + System.out.println( "session1 preferred" ); + return -1; + } + + if (o2 == preferred) { + System.out.println( "session2 preferred" + ", o2=" + o2 + " preferred=" + preferred ); + return 1; + } + + return 0; + } } /** * A session that tracks the timestamp of last activity. */ private static class LastActivityTracker extends DummySession { - long lastActivity = System.currentTimeMillis(); + long lastActivity = System.currentTimeMillis(); - public synchronized void setLastActivity() { - lastActivity = System.currentTimeMillis(); - } + public synchronized void setLastActivity() { + lastActivity = System.currentTimeMillis(); + } } /** * A filter that simulates a non-negligible amount of work. */ private static class MockWorkFilter implements IoFilter.NextFilter { - Throwable throwable; - - public void sessionOpened(IoSession session) { - // Do nothing - } - - public void sessionClosed(IoSession session) { - // Do nothing - } - - public void sessionIdle(IoSession session, IdleStatus status) { - // Do nothing - } - - public void exceptionCaught(IoSession session, Throwable cause) { - // Do nothing - } - - public void inputClosed(IoSession session) { - // Do nothing - } - - public void messageReceived(IoSession session, Object message) { - try { - Thread.sleep(20); // mimic work. - ((LastActivityTracker) session).setLastActivity(); - } catch (Exception e) { - if (this.throwable == null) { - this.throwable = e; - } - } - } - - public void messageSent(IoSession session, WriteRequest writeRequest) { - // Do nothing - } - - public void filterWrite(IoSession session, WriteRequest writeRequest) { - // Do nothing - } - - public void filterClose(IoSession session) { - // Do nothing - } - - public void sessionCreated(IoSession session) { - // Do nothing - } - - @Override - public void event(IoSession session, FilterEvent event) { - // TODO Auto-generated method stub - - } + Throwable throwable; + + public void sessionOpened(IoSession session) { + // Do nothing + } + + public void sessionClosed(IoSession session) { + // Do nothing + } + + public void sessionIdle(IoSession session, IdleStatus status) { + // Do nothing + } + + public void exceptionCaught(IoSession session, Throwable cause) { + // Do nothing + } + + public void inputClosed(IoSession session) { + // Do nothing + } + + public void messageReceived(IoSession session, Object message) { + try { + Thread.sleep(20); // mimic work. + ((LastActivityTracker) session).setLastActivity(); + } catch (Exception e) { + if (this.throwable == null) { + this.throwable = e; + } + } + } + + public void messageSent(IoSession session, WriteRequest writeRequest) { + // Do nothing + } + + public void filterWrite(IoSession session, WriteRequest writeRequest) { + // Do nothing + } + + public void filterClose(IoSession session) { + // Do nothing + } + + public void sessionCreated(IoSession session) { + // Do nothing + } + + @Override + public void event(IoSession session, FilterEvent event) { + // TODO Auto-generated method stub + } } }