[ https://issues.apache.org/jira/browse/DIRMINA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375884#comment-16375884 ]
Jonathan Valliere commented on DIRMINA-1076: -------------------------------------------- *Why Patch 2.0 doesn't fully work* Seems like {{Patch 2.0}} had a problem where {{scheduleRemove(Session)}} would be called concurrently and have it added to the {{removingSessions}} queue twice despite the check for {{contains(Session)}} because both threads could check for contains(Session) and return a false-negative which caused the {{Session}} to be added twice. {{Queue}} lacks atomic operations like {{putIfAbsent}} which makes it harder to solve the problem. *Option 1* One option would be to {{synchronize}} the entire code-block; however, I felt that it wasn't the safest option available because {{scheduleRemove(Session)}} could still be called by an external actor after the {{Processor}} has executed {{removeSessions()}} but before the {{for(...)}} loop cycles causing the {{Session}} to be removed twice again even if {{selector.keys()}} was used as a reference point. The sequential operations would not be concurrent relative to each other unless {{synchronize}} blocks were added all over the place. *Option 2* The safest option to avoid this is to create a new {{queue}} which contains the active sessions only. The new queue serves as an atomic reference of whether a {{Session}} was already removed or not. *Option 2, Liability* The new {{Queue}} could become a memory leak if {{Sessions}} are added or removed in some way without calling {{addNow(Session)}} or {{removeNow(Session)}}. *Patch 3.0 (Option 2) - Passes Unit Tests* {code:java} diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java index 79885fa..27e7d76 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java @@ -23,6 +23,7 @@ import java.net.PortUnreachableException; import java.nio.channels.ClosedSelectorException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Queue; @@ -86,6 +87,9 @@ /** A Session queue containing the newly created sessions */ private final Queue<S> newSessions = new ConcurrentLinkedQueue<>(); + + /** A queue used to store all active sessions */ + private final Queue<S> activeSessions = new ConcurrentLinkedQueue<>(); /** A queue used to store the sessions to be removed */ private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>(); @@ -239,7 +243,9 @@ * * @return {@link Iterator} of {@link IoSession} */ - protected abstract Iterator<S> allSessions(); + protected Iterator<S> allSessions() { + return Collections.unmodifiableCollection(activeSessions).iterator(); + } /** * Get an {@link Iterator} for the list of {@link IoSession} found selected @@ -411,7 +417,7 @@ } private void scheduleRemove(S session) { - if (!removingSessions.contains(session)) { + if (!removingSessions.contains(session) && activeSessions.contains(session)) { removingSessions.add(session); } } @@ -659,9 +665,20 @@ long currentTime = System.currentTimeMillis(); flush(currentTime); + // Disconnect all sessions immediately if disposal has been + // requested so that we exit this loop eventually. + if (isDisposing()) { + for (Iterator<S> i = allSessions(); i.hasNext();) { + IoSession session = i.next(); + scheduleRemove((S) session); + } + } + // And manage removed sessions nSessions -= removeSessions(); - + + assert nSessions > -1 : "Internal Session Count is Negative"; + // Last, not least, send Idle events to the idle sessions notifyIdleSessions(currentTime); @@ -685,26 +702,6 @@ } assert processorRef.get() == this; - } - - // Disconnect all sessions immediately if disposal has been - // requested so that we exit this loop eventually. - if (isDisposing()) { - boolean hasKeys = false; - - for (Iterator<S> i = allSessions(); i.hasNext();) { - IoSession session = i.next(); - - scheduleRemove((S) session); - - if (session.isActive()) { - hasKeys = true; - } - } - - if (hasKeys) { - wakeup(); - } } } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop @@ -819,30 +816,41 @@ private boolean addNow(S session) { boolean registered = false; - try { - init(session); - registered = true; + try { + if (activeSessions.contains(session)) { + return true; + } - // Build the filter chain of this session. - IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); - chainBuilder.buildFilterChain(session.getFilterChain()); + if (activeSessions.add(session)) { + init(session); + registered = true; - // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here - // in AbstractIoFilterChain.fireSessionOpened(). - // Propagate the SESSION_CREATED event up to the chain - IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); - listeners.fireSessionCreated(session); - } catch (Exception e) { - ExceptionMonitor.getInstance().exceptionCaught(e); + // Build the filter chain of this session. + IoFilterChainBuilder chainBuilder = session.getService() + .getFilterChainBuilder(); + chainBuilder.buildFilterChain(session.getFilterChain()); - try { - destroy(session); - } catch (Exception e1) { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } finally { - registered = false; - } - } + // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside + // here + // in AbstractIoFilterChain.fireSessionOpened(). + // Propagate the SESSION_CREATED event up to the chain + IoServiceListenerSupport listeners = ((AbstractIoService) session + .getService()).getListeners(); + listeners.fireSessionCreated(session); + } + } catch (Exception e) { + ExceptionMonitor.getInstance().exceptionCaught(e); + + if (activeSessions.remove(session)) { + try { + destroy(session); + } catch (Exception e1) { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } finally { + registered = false; + } + } + } return registered; } @@ -850,40 +858,36 @@ private int removeSessions() { int removedSessions = 0; - for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) { - SessionState state = getState(session); + for (S session = removingSessions.poll(); session != null; session = removingSessions .poll()) { + SessionState state = getState(session); - // Now deal with the removal accordingly to the session's state - switch (state) { - case OPENED: - // Try to remove this session - if (removeNow(session)) { - removedSessions++; - } + // Now deal with the removal accordingly to the session's + // state + switch (state) { + case OPENED: + case CLOSING: + // Try to remove this session + if (removeNow(session)) { + removedSessions++; + } - break; + break; - case CLOSING: - // Skip if channel is already closed - // In any case, remove the session from the queue - removedSessions++; - break; + case OPENING: + // Remove session from the newSessions queue and + // remove it + newSessions.remove(session); - case OPENING: - // Remove session from the newSessions queue and - // remove it - newSessions.remove(session); + if (removeNow(session)) { + removedSessions++; + } - if (removeNow(session)) { - removedSessions++; - } + break; - break; - - default: - throw new IllegalStateException(String.valueOf(state)); - } - } + default: + throw new IllegalStateException(String.valueOf(state)); + } + } return removedSessions; } @@ -1145,27 +1149,32 @@ } private boolean removeNow(S session) { - clearWriteRequestQueue(session); + if (activeSessions.remove(session)) { + clearWriteRequestQueue(session); - try { - destroy(session); - return true; - } catch (Exception e) { - IoFilterChain filterChain = session.getFilterChain(); - filterChain.fireExceptionCaught(e); - } finally { - try { - clearWriteRequestQueue(session); - ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session); - } catch (Exception e) { - // The session was either destroyed or not at this point. - // We do not want any exception thrown from this "cleanup" code - // to change - // the return value by bubbling up. - IoFilterChain filterChain = session.getFilterChain(); - filterChain.fireExceptionCaught(e); - } - } + try { + destroy(session); + return true; + } catch (Exception e) { + IoFilterChain filterChain = session.getFilterChain(); + filterChain.fireExceptionCaught(e); + } finally { + try { + clearWriteRequestQueue(session); + ((AbstractIoService) session.getService()) + .getListeners().fireSessionDestroyed(session); + } catch (Exception e) { + // The session was either destroyed or not at this + // point. + // We do not want any exception thrown from this + // "cleanup" code + // to change + // the return value by bubbling up. + IoFilterChain filterChain = session.getFilterChain(); + filterChain.fireExceptionCaught(e); + } + } + } return false; } diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java index 3b0fa40..fc60124 100644 --- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java @@ -149,15 +149,20 @@ } } +// @Override +// protected Iterator<NioSession> allSessions() { +// selectorLock.readLock().lock(); +// +// try { +// return new IoSessionIterator(selector.keys()); +// } finally { +// selectorLock.readLock().unlock(); +// } +// } + @Override protected Iterator<NioSession> allSessions() { - selectorLock.readLock().lock(); - - try { - return new IoSessionIterator(selector.keys()); - } finally { - selectorLock.readLock().unlock(); - } + return super.allSessions(); } @SuppressWarnings("synthetic-access") @@ -182,14 +187,13 @@ @Override protected void destroy(NioSession session) throws Exception { ByteChannel ch = session.getChannel(); - SelectionKey key = session.getSelectionKey(); if (key != null) { key.cancel(); } - if ( ch.isOpen() ) { + if (ch.isOpen() ) { ch.close(); } } {code} > Leaking NioProcessors/NioSocketConnectors hanging in call to dispose > -------------------------------------------------------------------- > > Key: DIRMINA-1076 > URL: https://issues.apache.org/jira/browse/DIRMINA-1076 > Project: MINA > Issue Type: Bug > Affects Versions: 2.0.16 > Reporter: Christoph John > Assignee: Jonathan Valliere > Priority: Major > Attachments: mina-dispose-hang.txt, mina-test-log.txt, > mina-test-patch.txt > > > Follow-up to mailing list discussion. > I was now able to reproduce the problem with a MINA test. Or let's say I did > the brute-force approach by re-running one test in an endless loop. > I have attached a patch of AbstractIoServiceTest (against > [https://github.com/apache/mina/tree/2.0]) and a stack trace. After a few > loops the test is stuck. You can see a lot of threads hanging in dispose() > and the test is stuck when it tries to dispose the acceptor. > > What is a little strange is that the javadoc says that > connector.dispose(TRUE) should not be called from an IoFutureListener, but in > the test it is done anyway. However, changing the parameter to FALSE does not > help either. > > Is there anything that can be done to prevent this hang? -- This message was sent by Atlassian JIRA (v7.6.3#76005)