Repository: cassandra
Updated Branches:
  refs/heads/trunk 3dabeeaa2 -> c86de2a98


Fix NPEs in original CASSANDRA-13324 commit

Patch by Ariel Weisberg; Reviewed by Marcus Eriksson


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c86de2a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c86de2a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c86de2a9

Branch: refs/heads/trunk
Commit: c86de2a9817aa45930afe181ae1891d2363393c7
Parents: 3dabeea
Author: Ariel Weisberg <aweisb...@apple.com>
Authored: Fri Mar 24 17:48:24 2017 -0400
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Mon Mar 27 13:24:36 2017 -0400

----------------------------------------------------------------------
 .../apache/cassandra/net/MessagingService.java  | 40 +++++++++++++-------
 1 file changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c86de2a9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 55604d0..b7d4329 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -536,8 +536,6 @@ public final class MessagingService implements 
MessagingServiceMBean
                 if (cp != null)
                     cp.incrementTimeout();
 
-                
getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
-
                 if (expiredCallbackInfo.callback.supportsBackPressure())
                 {
                     updateBackPressureOnReceive(expiredCallbackInfo.target, 
expiredCallbackInfo.callback, true);
@@ -607,8 +605,12 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
         {
-            BackPressureState backPressureState = 
getConnectionPool(host).getBackPressureState();
-            backPressureState.onMessageSent(message);
+            OutboundTcpConnectionPool cp = getConnectionPool(host);
+            if (cp != null)
+            {
+                BackPressureState backPressureState = 
cp.getBackPressureState();
+                backPressureState.onMessageSent(message);
+            }
         }
     }
 
@@ -623,11 +625,15 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
         {
-            BackPressureState backPressureState = 
getConnectionPool(host).getBackPressureState();
-            if (!timeout)
-                backPressureState.onResponseReceived();
-            else
-                backPressureState.onResponseTimeout();
+            OutboundTcpConnectionPool cp = getConnectionPool(host);
+            if (cp != null)
+            {
+                BackPressureState backPressureState = 
cp.getBackPressureState();
+                if (!timeout)
+                    backPressureState.onResponseReceived();
+                else
+                    backPressureState.onResponseTimeout();
+            }
         }
     }
 
@@ -644,10 +650,16 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         if (DatabaseDescriptor.backPressureEnabled())
         {
-            backPressure.apply(StreamSupport.stream(hosts.spliterator(), false)
-                    .filter(h -> !h.equals(FBUtilities.getBroadcastAddress()))
-                    .map(h -> getConnectionPool(h).getBackPressureState())
-                    .collect(Collectors.toSet()), timeoutInNanos, 
TimeUnit.NANOSECONDS);
+            Set<BackPressureState> states = new HashSet<BackPressureState>();
+            for (InetAddress host : hosts)
+            {
+                if (host.equals(FBUtilities.getBroadcastAddress()))
+                    continue;
+                OutboundTcpConnectionPool cp = getConnectionPool(host);
+                if (cp != null)
+                    states.add(cp.getBackPressureState());
+            }
+            backPressure.apply(states, timeoutInNanos, TimeUnit.NANOSECONDS);
         }
     }
 
@@ -679,7 +691,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         if (cp != null)
         {
             logger.trace("Resetting pool for {}", ep);
-            getConnectionPool(ep).reset();
+            cp.reset();
         }
         else
         {

Reply via email to