Repository: qpid-jms
Updated Branches:
  refs/heads/master e2e0cee97 -> fe90adaf2


QPIDJMS-407 Address thread unsafe failover requests handling

Address thread unsafe handling of in progress requests being added and
removed from the request tracking map that lead to issues on reconnect
and recover losing track of pending requests.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/fe90adaf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/fe90adaf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/fe90adaf

Branch: refs/heads/master
Commit: fe90adaf2f63444c09ddb6b058f03568cacb04ef
Parents: e2e0cee
Author: Timothy Bish <tabish...@gmail.com>
Authored: Wed Aug 8 09:56:21 2018 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Wed Aug 8 09:56:21 2018 -0400

----------------------------------------------------------------------
 .../jms/provider/failover/FailoverProvider.java | 49 ++++++++++++++------
 1 file changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe90adaf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 654ec48..ad4752c 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -99,7 +100,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
     private final AtomicBoolean failed = new AtomicBoolean();
     private final AtomicBoolean closingConnection = new AtomicBoolean(false);
     private final AtomicLong requestId = new AtomicLong();
-    private final Map<Long, FailoverRequest> requests = new 
LinkedHashMap<Long, FailoverRequest>();
+    private final Map<Long, FailoverRequest> requests = 
Collections.synchronizedMap(new LinkedHashMap<Long, FailoverRequest>());
     private final DefaultProviderListener closedListener = new 
DefaultProviderListener();
     private final AtomicReference<JmsMessageFactory> messageFactory = new 
AtomicReference<JmsMessageFactory>();
     private final ProviderFutureFactory futureFactory;
@@ -178,9 +179,14 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                 public void run() {
                     try {
                         IOException error = failureCause != null ? 
failureCause : new IOException("Connection closed");
-                        List<FailoverRequest> pending = new 
ArrayList<FailoverRequest>(requests.values());
+                        final List<FailoverRequest> pending;
+                        synchronized (requests) {
+                            pending = new 
ArrayList<FailoverRequest>(requests.values());
+                        }
                         for (FailoverRequest request : pending) {
-                            request.onFailure(error);
+                            if (!request.isComplete()) {
+                                request.onFailure(error);
+                            }
                         }
 
                         if (requestTimeoutTask != null) {
@@ -558,11 +564,13 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                 if (listener != null) {
                     listener.onConnectionInterrupted(failedURI);
                 }
-                
-                if (!requests.isEmpty()) {
-                       for (FailoverRequest request : requests.values()) {
-                               request.whenOffline(cause);
-                       }
+
+                final List<FailoverRequest> pending;
+                synchronized (requests) {
+                    pending = new 
ArrayList<FailoverRequest>(requests.values());
+                }
+                for (FailoverRequest request : pending) {
+                    request.whenOffline(cause);
                 }
 
                 // Start watching for request timeouts while we are offline, 
unless we already are.
@@ -626,9 +634,14 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                         listener.onConnectionRestored(provider.getRemoteURI());
 
                         // Last step: Send pending actions.
-                        List<FailoverRequest> pending = new 
ArrayList<FailoverRequest>(requests.values());
+                        final List<FailoverRequest> pending;
+                        synchronized (requests) {
+                            pending = new 
ArrayList<FailoverRequest>(requests.values());
+                        }
                         for (FailoverRequest request : pending) {
-                            request.run();
+                            if (!request.isComplete()) {
+                                request.run();
+                            }
                         }
 
                         reconnectControl.connectionEstablished();
@@ -636,9 +649,14 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                         processAlternates(provider.getAlternateURIs());
 
                         // Last step: Send pending actions.
-                        List<FailoverRequest> pending = new 
ArrayList<FailoverRequest>(requests.values());
+                        final List<FailoverRequest> pending;
+                        synchronized (requests) {
+                            pending = new 
ArrayList<FailoverRequest>(requests.values());
+                        }
                         for (FailoverRequest request : pending) {
-                            request.run();
+                            if (!request.isComplete()) {
+                                request.run();
+                            }
                         }
                     }
 
@@ -1084,8 +1102,11 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
 
         @Override
         public void run() {
-            List<FailoverRequest> copied = new 
ArrayList<FailoverRequest>(requests.values());
-            for (FailoverRequest request : copied) {
+            final List<FailoverRequest> pending;
+            synchronized (requests) {
+                pending = new ArrayList<FailoverRequest>(requests.values());
+            }
+            for (FailoverRequest request : pending) {
                 if (request.isExpired()) {
                     LOG.trace("Task {} has timed out, sending failure 
notice.", request);
                     request.onFailure(request.createTimedOutException());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to