Repository: qpid-jms Updated Branches: refs/heads/master e2e0cee97 -> fe90adaf2
QPIDJMS-407 Address thread unsafe failover requests handling Address thread unsafe handling of in progress requests being added and removed from the request tracking map that lead to issues on reconnect and recover losing track of pending requests. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/fe90adaf Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/fe90adaf Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/fe90adaf Branch: refs/heads/master Commit: fe90adaf2f63444c09ddb6b058f03568cacb04ef Parents: e2e0cee Author: Timothy Bish <tabish...@gmail.com> Authored: Wed Aug 8 09:56:21 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Wed Aug 8 09:56:21 2018 -0400 ---------------------------------------------------------------------- .../jms/provider/failover/FailoverProvider.java | 49 ++++++++++++++------ 1 file changed, 35 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe90adaf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index 654ec48..ad4752c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.LinkedHashMap; import java.util.List; @@ -99,7 +100,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide private final AtomicBoolean failed = new AtomicBoolean(); private final AtomicBoolean closingConnection = new AtomicBoolean(false); private final AtomicLong requestId = new AtomicLong(); - private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>(); + private final Map<Long, FailoverRequest> requests = Collections.synchronizedMap(new LinkedHashMap<Long, FailoverRequest>()); private final DefaultProviderListener closedListener = new DefaultProviderListener(); private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>(); private final ProviderFutureFactory futureFactory; @@ -178,9 +179,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide public void run() { try { IOException error = failureCause != null ? failureCause : new IOException("Connection closed"); - List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values()); + final List<FailoverRequest> pending; + synchronized (requests) { + pending = new ArrayList<FailoverRequest>(requests.values()); + } for (FailoverRequest request : pending) { - request.onFailure(error); + if (!request.isComplete()) { + request.onFailure(error); + } } if (requestTimeoutTask != null) { @@ -558,11 +564,13 @@ public class FailoverProvider extends DefaultProviderListener implements Provide if (listener != null) { listener.onConnectionInterrupted(failedURI); } - - if (!requests.isEmpty()) { - for (FailoverRequest request : requests.values()) { - request.whenOffline(cause); - } + + final List<FailoverRequest> pending; + synchronized (requests) { + pending = new ArrayList<FailoverRequest>(requests.values()); + } + for (FailoverRequest request : pending) { + request.whenOffline(cause); } // Start watching for request timeouts while we are offline, unless we already are. @@ -626,9 +634,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide listener.onConnectionRestored(provider.getRemoteURI()); // Last step: Send pending actions. - List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values()); + final List<FailoverRequest> pending; + synchronized (requests) { + pending = new ArrayList<FailoverRequest>(requests.values()); + } for (FailoverRequest request : pending) { - request.run(); + if (!request.isComplete()) { + request.run(); + } } reconnectControl.connectionEstablished(); @@ -636,9 +649,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide processAlternates(provider.getAlternateURIs()); // Last step: Send pending actions. - List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values()); + final List<FailoverRequest> pending; + synchronized (requests) { + pending = new ArrayList<FailoverRequest>(requests.values()); + } for (FailoverRequest request : pending) { - request.run(); + if (!request.isComplete()) { + request.run(); + } } } @@ -1084,8 +1102,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide @Override public void run() { - List<FailoverRequest> copied = new ArrayList<FailoverRequest>(requests.values()); - for (FailoverRequest request : copied) { + final List<FailoverRequest> pending; + synchronized (requests) { + pending = new ArrayList<FailoverRequest>(requests.values()); + } + for (FailoverRequest request : pending) { if (request.isExpired()) { LOG.trace("Task {} has timed out, sending failure notice.", request); request.onFailure(request.createTimedOutException()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org