Repository: incubator-eagle Updated Branches: refs/heads/master c897f74b5 -> 0fac33799
[EAGLE-569]: AlertPublishImpl: Concurrency : Inplace change metadata cause concurrent modification issue Author : ralphsu This closes #457 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0fac3379 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0fac3379 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0fac3379 Branch: refs/heads/master Commit: 0fac33799e21ae92d9b4a7e98e5090b032145b06 Parents: c897f74 Author: Ralph, Su <suliang...@gmail.com> Authored: Tue Sep 27 10:18:40 2016 -0700 Committer: Ralph, Su <suliang...@gmail.com> Committed: Tue Sep 27 10:23:53 2016 -0700 ---------------------------------------------------------------------- .../publisher/impl/AlertPublisherImpl.java | 85 +++++++++++++------- 1 file changed, 55 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fac3379/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index fe1438e..e97a763 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -17,16 +17,19 @@ package org.apache.eagle.alert.engine.publisher.impl; +import com.typesafe.config.Config; +import org.apache.commons.collections.ListUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.AlertPublisher; -import com.typesafe.config.Config; -import org.apache.commons.collections.ListUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.MessageFormat; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -67,16 +70,18 @@ public class AlertPublisherImpl implements AlertPublisher { private void notifyAlert(AlertStreamEvent event) { String policyId = event.getPolicyId(); - if (policyId == null || !policyPublishPluginMapping.containsKey(policyId)) { - LOG.warn("Policy {} does NOT subscribe any publishments", policyId); + if (StringUtils.isEmpty(policyId)) { + LOG.warn("policyId cannot be null for event to be published"); return; } - for (String id : policyPublishPluginMapping.get(policyId)) { - AlertPublishPlugin plugin = publishPluginMapping.get(id); + for (String pubId : policyPublishPluginMapping.get(policyId)) { + AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) : null; + if (plugin == null) { + LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId); + continue; + } try { - if (LOG.isDebugEnabled()) { - LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName()); - } + LOG.debug("Execute alert publisher {}", plugin.getClass().getCanonicalName()); plugin.onAlert(event); } catch (Exception ex) { LOG.error("Fail invoking publisher's onAlert, continue ", ex); @@ -91,7 +96,7 @@ public class AlertPublisherImpl implements AlertPublisher { @SuppressWarnings("unchecked") @Override - public void onPublishChange(List<Publishment> added, + public synchronized void onPublishChange(List<Publishment> added, List<Publishment> removed, List<Publishment> afterModified, List<Publishment> beforeModified) { @@ -113,25 +118,31 @@ public class AlertPublisherImpl implements AlertPublisher { return; } + // copy and swap to avoid concurrency issue + Map<String, List<String>> newPolicyPublishPluginMapping = new HashMap<>(policyPublishPluginMapping); + Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping); + + // added for (Publishment publishment : added) { - if (LOG.isDebugEnabled()) { - LOG.debug(publishment.toString()); - } + LOG.debug("OnPublishmentChange : add publishment : {} ", publishment); AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (plugin != null) { - publishPluginMapping.put(publishment.getName(), plugin); - onPolicyAdded(publishment.getPolicyIds(), publishment.getName()); + newPublishMap.put(publishment.getName(), plugin); + addPublishmentPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), publishment.getName()); } else { - LOG.error("Initialized alertPublisher {} failed due to invalid format", publishment); + LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } } + //removed + List<AlertPublishPlugin> toBeClosed = new ArrayList<>(); for (Publishment publishment : removed) { String pubName = publishment.getName(); - onPolicyDeleted(publishment.getPolicyIds(), pubName); - publishPluginMapping.get(pubName).close(); - publishPluginMapping.remove(publishment.getName()); + removePublihsPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), pubName); + toBeClosed.add(newPublishMap.get(pubName)); + newPublishMap.remove(publishment.getName()); } + // updated for (int i = 0; i < afterModified.size(); i++) { String pubName = afterModified.get(i).getName(); List<String> newPolicies = afterModified.get(i).getPolicyIds(); @@ -139,36 +150,50 @@ public class AlertPublisherImpl implements AlertPublisher { if (!newPolicies.equals(oldPolicies)) { List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies); - onPolicyDeleted(deletedPolicies, pubName); + removePublihsPolicies(newPolicyPublishPluginMapping, deletedPolicies, pubName); List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies); - onPolicyAdded(addedPolicies, pubName); + addPublishmentPolicies(newPolicyPublishPluginMapping, addedPolicies, pubName); } Publishment newPub = afterModified.get(i); - publishPluginMapping.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties()); + newPublishMap.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties()); } + + // now do the swap + publishPluginMapping = newPublishMap; + policyPublishPluginMapping = newPolicyPublishPluginMapping; + + // safely close : it depend on plugin to check if want to wait all data to be flushed. + closePlugins(toBeClosed); } - private synchronized void onPolicyAdded(List<String> addedPolicyIds, String pubName) { + private void closePlugins(List<AlertPublishPlugin> toBeClosed) { + for (AlertPublishPlugin p : toBeClosed) { + try { + p.close(); + } catch (Exception e) { + LOG.error(MessageFormat.format("Error when close publish plugin {}, {}!", p.getClass().getCanonicalName()), e); + } + } + } + + private void addPublishmentPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> addedPolicyIds, String pubName) { if (addedPolicyIds == null || pubName == null) { return; } for (String policyId : addedPolicyIds) { - if (policyPublishPluginMapping.get(policyId) == null) { - policyPublishPluginMapping.put(policyId, new ArrayList<>()); - } - List<String> publishIds = policyPublishPluginMapping.get(policyId); - publishIds.add(pubName); + newPolicyPublishPluginMapping.putIfAbsent(policyId, new ArrayList<>()); + newPolicyPublishPluginMapping.get(policyId).add(pubName); } } - private synchronized void onPolicyDeleted(List<String> deletedPolicyIds, String pubName) { + private synchronized void removePublihsPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> deletedPolicyIds, String pubName) { if (deletedPolicyIds == null || pubName == null) { return; } for (String policyId : deletedPolicyIds) { - List<String> publishIds = policyPublishPluginMapping.get(policyId); + List<String> publishIds = newPolicyPublishPluginMapping.get(policyId); publishIds.remove(pubName); } }