This is an automated email from the ASF dual-hosted git repository. brahma pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 209f4e35e5 AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting.(Replace StandardEvaluationContext with SimpleEvaluationContext) 209f4e35e5 is described below commit 209f4e35e561636ebc6e86c2648643fa4376c931 Author: Brahma Reddy Battula <bbatt...@visa.com> AuthorDate: Tue Sep 6 12:38:52 2022 +0530 AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting.(Replace StandardEvaluationContext with SimpleEvaluationContext) --- .../agent/stomp/AmbariSubscriptionRegistry.java | 233 +++++++++++---------- 1 file changed, 127 insertions(+), 106 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java index 8cbf0af408..70175f43c5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java @@ -29,7 +29,6 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.expression.AccessException; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; @@ -37,7 +36,8 @@ import org.springframework.expression.PropertyAccessor; import org.springframework.expression.TypedValue; import org.springframework.expression.spel.SpelEvaluationException; import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.expression.spel.support.SimpleEvaluationContext; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; @@ -60,6 +60,10 @@ import com.google.common.cache.CacheBuilder; public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { private static final Logger LOG = LoggerFactory.getLogger(AmbariSubscriptionRegistry.class); + /** Static evaluation context to reuse. */ + private static final EvaluationContext messageEvalContext = + SimpleEvaluationContext.forPropertyAccessors(new SimpMessageHeaderPropertyAccessor()).build(); + private PathMatcher pathMatcher = new AntPathMatcher(); private volatile int cacheLimit; @@ -138,25 +142,33 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { protected void addSubscriptionInternal( String sessionId, String subsId, String destination, Message<?> message) { + Expression expression = getSelectorExpression(message.getHeaders()); + this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression); + this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); + } + + @Nullable + private Expression getSelectorExpression(MessageHeaders headers) { + Expression expression = null; - MessageHeaders headers = message.getHeaders(); - String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers); - if (selector != null) { - try { - expression = this.expressionParser.parseExpression(selector); - this.selectorHeaderInUse = true; - if (logger.isTraceEnabled()) { - logger.trace("Subscription selector: [" + selector + "]"); + if (getSelectorHeaderName() != null) { + String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers); + if (selector != null) { + try { + expression = this.expressionParser.parseExpression(selector); + this.selectorHeaderInUse = true; + if (logger.isTraceEnabled()) { + logger.trace("Subscription selector: [" + selector + "]"); + } } - } - catch (Throwable ex) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to parse selector: " + selector, ex); + catch (Throwable ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to parse selector: " + selector, ex); + } } } } - this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression); - this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); + return expression; } @Override @@ -190,29 +202,23 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { if (!this.selectorHeaderInUse) { return allMatches; } - EvaluationContext context = null; - MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size()); - for (String sessionId : allMatches.keySet()) { - for (String subId : allMatches.get(sessionId)) { + MultiValueMap<String, String> result = new LinkedMultiValueMap<>(allMatches.size()); + allMatches.forEach((sessionId, subIds) -> { + subIds.forEach(subId -> { SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); if (info == null) { - continue; + return; } Subscription sub = info.getSubscription(subId); if (sub == null) { - continue; + return; } Expression expression = sub.getSelectorExpression(); if (expression == null) { - result.add(sessionId, subId); - continue; - } - if (context == null) { - context = new StandardEvaluationContext(message); - context.getPropertyAccessors().add(new SimpMessageHeaderPropertyAccessor()); + return; } try { - if (expression.getValue(context, boolean.class)) { + if (Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) { result.add(sessionId, subId); } } @@ -224,8 +230,8 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { catch (Throwable ex) { logger.debug("Failed to evaluate selector", ex); } - } - } + }); + }); return result; } @@ -250,35 +256,42 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { CacheBuilder.newBuilder().maximumSize(cacheLimit).build(); public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) { + LinkedMultiValueMap<String, String> copiedSubscriptions = new LinkedMultiValueMap<>(); if (notSubscriptionCache.asMap().keySet().contains(destination)) { - return new LinkedMultiValueMap<>(); + return copiedSubscriptions; } - LinkedMultiValueMap<String, String> subscriptions = this.accessCache.computeIfAbsent(destination, (key) -> { - LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>(); - for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) { - for (String destinationPattern : info.getDestinations()) { - //TODO temporary changed to more fast-acting check without regex, need move investigation - if (destinationPattern.equals(destination)) { - for (Subscription subscription : info.getSubscriptions(destinationPattern)) { - result.add(info.sessionId, subscription.getId()); + this.accessCache.compute(destination, (key, value) -> { + if (value == null) { + LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>(); + subscriptionRegistry.getAllSubscriptions().forEach((info) -> { + info.getDestinations().forEach((destinationPattern) -> { + //TODO temporary changed to more fast-acting check without regex, need move investigation + if (destinationPattern.equals(destination)) { + info.getSubscriptions(destinationPattern).forEach((subscription) -> { + result.add(info.sessionId, subscription.getId()); + }); } - } + }); + }); + if (!result.isEmpty()) { + copiedSubscriptions.addAll(result.deepCopy()); + return result; + } else { + notSubscriptionCache.put(destination, ""); + return null; } - } - if (!result.isEmpty()) { - return result; } else { - notSubscriptionCache.put(destination, ""); - return null; + copiedSubscriptions.addAll(value.deepCopy()); + return value; } }); - return subscriptions == null ? new LinkedMultiValueMap<>() : subscriptions; + return copiedSubscriptions; } public void updateAfterNewSubscription(String destination, String sessionId, String subsId) { LinkedMultiValueMap<String, String> updatedMap = this.accessCache.computeIfPresent(destination, (key, value) -> { if (getPathMatcher().match(destination, key)) { - LinkedMultiValueMap<String, String> subs = value.deepCopy(); + LinkedMultiValueMap<String, String> subs = value; subs.add(sessionId, subsId); return subs; } @@ -294,22 +307,22 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { this.accessCache.entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next(); String destination = entry.getKey(); - accessCache.computeIfPresent(destination, (key, sessionMap)-> { - List<String> subscriptions = sessionMap.get(sessionId); - if (subscriptions != null) { - subscriptions.remove(subsId); - if (subscriptions.isEmpty()) { - sessionMap.remove(sessionId); - } - if (sessionMap.isEmpty()) { - return null; - } - else { - this.notSubscriptionCache.invalidate(destination); - return sessionMap.deepCopy(); + this.accessCache.compute(destination, (key, value) -> { + if (value != null) { + List<String> subscriptions = value.get(sessionId); + if (subscriptions != null) { + subscriptions.remove(subsId); + if (subscriptions.isEmpty()) { + value.remove(sessionId); + } + if (value.isEmpty()) { + return null; + } else { + this.notSubscriptionCache.invalidate(destination); + } } } - return sessionMap; + return value; }); } } @@ -319,17 +332,17 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { this.accessCache.entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next(); String destination = entry.getKey(); - accessCache.computeIfPresent(destination, (key, sessionMap)-> { - if (sessionMap.remove(info.getSessionId()) != null) { - if (sessionMap.isEmpty()) { - return null; - } - else { - this.notSubscriptionCache.invalidate(destination); - return sessionMap.deepCopy(); + this.accessCache.compute(destination, (key, value) -> { + if (value != null) { + if (value.remove(info.getSessionId()) != null) { + if (value.isEmpty()) { + return null; + } else { + this.notSubscriptionCache.invalidate(destination); + } } } - return sessionMap; + return value; }); } } @@ -347,8 +360,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { private static class SessionSubscriptionRegistry { // sessionId -> SessionSubscriptionInfo - private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = - new ConcurrentHashMap<String, SessionSubscriptionInfo>(); + private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = new ConcurrentHashMap<>(); public SessionSubscriptionInfo getSubscriptions(String sessionId) { return this.sessions.get(sessionId); @@ -359,8 +371,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { } public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId, - String destination, Expression selectorExpression) { - + String destination, @Nullable Expression selectorExpression) { SessionSubscriptionInfo info = this.sessions.get(sessionId); if (info == null) { info = new SessionSubscriptionInfo(sessionId); @@ -373,6 +384,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { return info; } + @Nullable public SessionSubscriptionInfo removeSubscriptions(String sessionId) { return this.sessions.remove(sessionId); } @@ -392,8 +404,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { private final String sessionId; // destination -> subscriptions - private final Map<String, Set<Subscription>> destinationLookup = - new ConcurrentHashMap<String, Set<Subscription>>(4); + private final Map<String, Set<Subscription>> destinationLookup = new ConcurrentHashMap<>(4); public SessionSubscriptionInfo(String sessionId) { Assert.notNull(sessionId, "'sessionId' must not be null"); @@ -412,27 +423,26 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { return this.destinationLookup.get(destination); } + @Nullable public Subscription getSubscription(String subscriptionId) { - for (String destination : this.destinationLookup.keySet()) { - Set<Subscription> subs = this.destinationLookup.get(destination); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.getId().equalsIgnoreCase(subscriptionId)) { - return sub; - } + for (Map.Entry<String, Set<Subscription>> destinationEntry : + this.destinationLookup.entrySet()) { + for (Subscription sub : destinationEntry.getValue()) { + if (sub.getId().equalsIgnoreCase(subscriptionId)) { + return sub; } } } return null; } - public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) { + public void addSubscription(String destination, String subscriptionId, @Nullable Expression selectorExpression) { Set<Subscription> subs = this.destinationLookup.get(destination); if (subs == null) { synchronized (this.destinationLookup) { subs = this.destinationLookup.get(destination); if (subs == null) { - subs = new CopyOnWriteArraySet<Subscription>(); + subs = new CopyOnWriteArraySet<>(); this.destinationLookup.put(destination, subs); } } @@ -440,18 +450,20 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { subs.add(new Subscription(subscriptionId, selectorExpression)); } + @Nullable public String removeSubscription(String subscriptionId) { - for (String destination : this.destinationLookup.keySet()) { - Set<Subscription> subs = this.destinationLookup.get(destination); + for (Map.Entry<String, Set<Subscription>> destinationEntry : + this.destinationLookup.entrySet()) { + Set<Subscription> subs = destinationEntry.getValue(); if (subs != null) { for (Subscription sub : subs) { if (sub.getId().equals(subscriptionId) && subs.remove(sub)) { synchronized (this.destinationLookup) { if (subs.isEmpty()) { - this.destinationLookup.remove(destination); + this.destinationLookup.remove(destinationEntry.getKey()); } } - return destination; + return destinationEntry.getKey(); } } } @@ -470,9 +482,10 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { private final String id; + @Nullable private final Expression selectorExpression; - public Subscription(String id, Expression selector) { + public Subscription(String id, @Nullable Expression selector) { Assert.notNull(id, "Subscription id must not be null"); this.id = id; this.selectorExpression = selector; @@ -482,6 +495,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { return this.id; } + @Nullable public Expression getSelectorExpression() { return this.selectorExpression; } @@ -507,39 +521,46 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public Class<?>[] getSpecificTargetClasses() { - return new Class<?>[] {MessageHeaders.class}; + return new Class<?>[]{Message.class, MessageHeaders.class}; } @Override - public boolean canRead(EvaluationContext context, Object target, String name) { + public boolean canRead(EvaluationContext context, @Nullable Object target, String name) { return true; } @Override - public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException { - MessageHeaders headers = (MessageHeaders) target; - SimpMessageHeaderAccessor accessor = - MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class); + public TypedValue read(EvaluationContext context, @Nullable Object target, String name) { Object value; - if ("destination".equalsIgnoreCase(name)) { - value = accessor.getDestination(); - } - else { - value = accessor.getFirstNativeHeader(name); - if (value == null) { - value = headers.get(name); + if (target instanceof Message) { + value = name.equals("headers") ? ((Message) target).getHeaders() : null; + } else if (target instanceof MessageHeaders) { + MessageHeaders headers = (MessageHeaders) target; + SimpMessageHeaderAccessor accessor = + MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class); + Assert.state(accessor != null, "No SimpMessageHeaderAccessor"); + if ("destination".equalsIgnoreCase(name)) { + value = accessor.getDestination(); + } else { + value = accessor.getFirstNativeHeader(name); + if (value == null) { + value = headers.get(name); + } } + } else { + // Should never happen... + throw new IllegalStateException("Expected Message or MessageHeaders."); } return new TypedValue(value); } @Override - public boolean canWrite(EvaluationContext context, Object target, String name) { + public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) { return false; } @Override - public void write(EvaluationContext context, Object target, String name, Object value) { + public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object value) { } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@ambari.apache.org For additional commands, e-mail: commits-h...@ambari.apache.org