This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
commit 82557f96f8da8767277db18f0aa7db7d5556a132 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Fri Sep 27 18:36:47 2024 +0100 QPIDJMS-602: fix potential for session shutdown NPE during competing local and remote closures Make the ProviderFuture creation safe from NPE, validate a future is returned and noop the completion wait if not since the provider ref is gone already. Add additional try-finally to ensure executor shutdown occurs. --- .../java/org/apache/qpid/jms/JmsConnection.java | 7 ++- .../main/java/org/apache/qpid/jms/JmsSession.java | 60 ++++++++++++---------- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index dcbd3a30..fbc06ba0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -921,7 +921,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } ProviderFuture newProviderFuture(ProviderSynchronization synchronization) { - return provider.newProviderFuture(synchronization); + Provider localProvider = provider; + if (localProvider != null) { + return localProvider.newProviderFuture(synchronization); + } else { + return null; + } } //----- Property setters and getters -------------------------------------// diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 65036828..31d0cd32 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -380,39 +380,45 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe // Ensure that no asynchronous completion sends remain blocked after close but wait // using the close timeout for the asynchronous sends to complete normally. final ExecutorService completionExecutor = getCompletionExecutor(); + try { + synchronized (sessionInfo) { + // Producers are now quiesced and we can await completion of asynchronous sends + // that are still pending a result or timeout once we've done a quick check to + // see if any are actually pending or have completed already. + asyncSendsCompletion = connection.newProviderFuture(); + + if (asyncSendsCompletion != null) { + completionExecutor.execute(() -> { + if (asyncSendQueue.isEmpty()) { + asyncSendsCompletion.onSuccess(); + } + }); + } + } - synchronized (sessionInfo) { - // Producers are now quiesced and we can await completion of asynchronous sends - // that are still pending a result or timeout once we've done a quick check to - // see if any are actually pending or have completed already. - asyncSendsCompletion = connection.newProviderFuture(); - - completionExecutor.execute(() -> { - if (asyncSendQueue.isEmpty()) { - asyncSendsCompletion.onSuccess(); + try { + if (asyncSendsCompletion != null) { + asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); + } + } catch (Exception ex) { + LOG.trace("Exception during wait for asynchronous sends to complete", ex); + } finally { + if (cause == null) { + cause = new JMSException("Session closed remotely before message transfer result was notified"); } - }); - } - try { - asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); - } catch (Exception ex) { - LOG.trace("Exception during wait for asynchronous sends to complete", ex); - } finally { - if (cause == null) { - cause = new JMSException("Session closed remotely before message transfer result was notified"); + // as a last task we want to fail any stragglers in the asynchronous send queue and then + // shutdown the queue to prevent any more submissions while the cleanup goes on. + completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause))); } - - // as a last task we want to fail any stragglers in the asynchronous send queue and then - // shutdown the queue to prevent any more submissions while the cleanup goes on. - completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause))); + } finally { completionExecutor.shutdown(); - } - try { - completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.trace("Session close awaiting send completions was interrupted"); + try { + completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.trace("Session close awaiting send completions was interrupted"); + } } if (shutdownError != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org