Repository: cassandra Updated Branches: refs/heads/trunk 3dabeeaa2 -> c86de2a98
Fix NPEs in original CASSANDRA-13324 commit Patch by Ariel Weisberg; Reviewed by Marcus Eriksson Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c86de2a9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c86de2a9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c86de2a9 Branch: refs/heads/trunk Commit: c86de2a9817aa45930afe181ae1891d2363393c7 Parents: 3dabeea Author: Ariel Weisberg <aweisb...@apple.com> Authored: Fri Mar 24 17:48:24 2017 -0400 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Mon Mar 27 13:24:36 2017 -0400 ---------------------------------------------------------------------- .../apache/cassandra/net/MessagingService.java | 40 +++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c86de2a9/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 55604d0..b7d4329 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -536,8 +536,6 @@ public final class MessagingService implements MessagingServiceMBean if (cp != null) cp.incrementTimeout(); - getConnectionPool(expiredCallbackInfo.target).incrementTimeout(); - if (expiredCallbackInfo.callback.supportsBackPressure()) { updateBackPressureOnReceive(expiredCallbackInfo.target, expiredCallbackInfo.callback, true); @@ -607,8 +605,12 @@ public final class MessagingService implements MessagingServiceMBean { if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) { - BackPressureState backPressureState = getConnectionPool(host).getBackPressureState(); - backPressureState.onMessageSent(message); + OutboundTcpConnectionPool cp = getConnectionPool(host); + if (cp != null) + { + BackPressureState backPressureState = cp.getBackPressureState(); + backPressureState.onMessageSent(message); + } } } @@ -623,11 +625,15 @@ public final class MessagingService implements MessagingServiceMBean { if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) { - BackPressureState backPressureState = getConnectionPool(host).getBackPressureState(); - if (!timeout) - backPressureState.onResponseReceived(); - else - backPressureState.onResponseTimeout(); + OutboundTcpConnectionPool cp = getConnectionPool(host); + if (cp != null) + { + BackPressureState backPressureState = cp.getBackPressureState(); + if (!timeout) + backPressureState.onResponseReceived(); + else + backPressureState.onResponseTimeout(); + } } } @@ -644,10 +650,16 @@ public final class MessagingService implements MessagingServiceMBean { if (DatabaseDescriptor.backPressureEnabled()) { - backPressure.apply(StreamSupport.stream(hosts.spliterator(), false) - .filter(h -> !h.equals(FBUtilities.getBroadcastAddress())) - .map(h -> getConnectionPool(h).getBackPressureState()) - .collect(Collectors.toSet()), timeoutInNanos, TimeUnit.NANOSECONDS); + Set<BackPressureState> states = new HashSet<BackPressureState>(); + for (InetAddress host : hosts) + { + if (host.equals(FBUtilities.getBroadcastAddress())) + continue; + OutboundTcpConnectionPool cp = getConnectionPool(host); + if (cp != null) + states.add(cp.getBackPressureState()); + } + backPressure.apply(states, timeoutInNanos, TimeUnit.NANOSECONDS); } } @@ -679,7 +691,7 @@ public final class MessagingService implements MessagingServiceMBean if (cp != null) { logger.trace("Resetting pool for {}", ep); - getConnectionPool(ep).reset(); + cp.reset(); } else {