This is an automated email from the ASF dual-hosted git repository. hapylestat pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new fae912a AMBARI-25637. ConcurrentModificationException during stomp subscriptions processing. (dvitiuk via dgrinenko) (#3300) fae912a is described below commit fae912aa4eaba83e7208ea4c78bcb6bb4a4e5709 Author: dvitiiuk <dmitriiviti...@gmail.com> AuthorDate: Mon Mar 29 15:44:34 2021 +0300 AMBARI-25637. ConcurrentModificationException during stomp subscriptions processing. (dvitiuk via dgrinenko) (#3300) --- .../agent/stomp/AmbariSubscriptionRegistry.java | 50 ++++++++++++---------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java index 68330c6..8cbf0af 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java @@ -294,21 +294,23 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { this.accessCache.entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next(); String destination = entry.getKey(); - LinkedMultiValueMap<String, String> sessionMap = entry.getValue(); - List<String> subscriptions = sessionMap.get(sessionId); - if (subscriptions != null) { - subscriptions.remove(subsId); - if (subscriptions.isEmpty()) { - sessionMap.remove(sessionId); - } - if (sessionMap.isEmpty()) { - iterator.remove(); - } - else { - this.notSubscriptionCache.invalidate(destination); - this.accessCache.put(destination, sessionMap.deepCopy()); + accessCache.computeIfPresent(destination, (key, sessionMap)-> { + List<String> subscriptions = sessionMap.get(sessionId); + if (subscriptions != null) { + subscriptions.remove(subsId); + if (subscriptions.isEmpty()) { + sessionMap.remove(sessionId); + } + if (sessionMap.isEmpty()) { + return null; + } + else { + this.notSubscriptionCache.invalidate(destination); + return sessionMap.deepCopy(); + } } - } + return sessionMap; + }); } } @@ -317,16 +319,18 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { this.accessCache.entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next(); String destination = entry.getKey(); - LinkedMultiValueMap<String, String> sessionMap = entry.getValue(); - if (sessionMap.remove(info.getSessionId()) != null) { - if (sessionMap.isEmpty()) { - iterator.remove(); - } - else { - this.notSubscriptionCache.invalidate(destination); - this.accessCache.put(destination, sessionMap.deepCopy()); + accessCache.computeIfPresent(destination, (key, sessionMap)-> { + if (sessionMap.remove(info.getSessionId()) != null) { + if (sessionMap.isEmpty()) { + return null; + } + else { + this.notSubscriptionCache.invalidate(destination); + return sessionMap.deepCopy(); + } } - } + return sessionMap; + }); } }