Making CEP fault handling window processors resilient against issues in CEP. Wait for a given timeout or the first event is received before processing health stats. This is to avoid false positive faulty members. Make CEP fault handling window processor wait until complete topology event is received at the startup.
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a3ab4aef Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a3ab4aef Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a3ab4aef Branch: refs/heads/master Commit: a3ab4aef07e125b2998461108a217149a86c5b27 Parents: b063eb4 Author: Akila Perera <[email protected]> Authored: Fri Jun 17 13:12:23 2016 +0530 Committer: Akila Perera <[email protected]> Committed: Fri Jun 17 13:12:23 2016 +0530 ---------------------------------------------------------------------- .../cep/extension/CEPTopologyEventReceiver.java | 26 ++++--- .../extension/FaultHandlingWindowProcessor.java | 82 ++++++++++---------- .../cep/extension/CEPTopologyEventReceiver.java | 26 ++++--- .../extension/FaultHandlingWindowProcessor.java | 81 ++++++++++--------- 4 files changed, 114 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java index 2696271..51e4850 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -35,24 +35,16 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; * CEP Topology Receiver for Fault Handling Window Processor. */ public class CEPTopologyEventReceiver { - private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); - private FaultHandlingWindowProcessor faultHandler; private TopologyEventReceiver topologyEventReceiver; - public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { + CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { this.faultHandler = faultHandler; this.topologyEventReceiver = TopologyEventReceiver.getInstance(); addEventListeners(); } -// @Override -// public void execute() { -// super.execute(); -// log.info("CEP topology event receiver thread started"); -// } - private void addEventListeners() { // Load member time stamp map from the topology as a one time task topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @@ -63,7 +55,7 @@ public class CEPTopologyEventReceiver { if (!initialized) { try { TopologyManager.acquireReadLock(); - log.debug("Complete topology event received to fault handling window processor."); + log.info("Complete topology event received to fault handling window processor."); CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); } catch (Exception e) { @@ -81,7 +73,11 @@ public class CEPTopologyEventReceiver { protected void onEvent(Event event) { MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); - log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId()); + if (log.isDebugEnabled()) { + log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent + .getMemberId()); + + } } }); @@ -94,8 +90,14 @@ public class CEPTopologyEventReceiver { // do not put this member if we have already received a health event faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), System.currentTimeMillis()); - log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); + if (log.isDebugEnabled()) { + log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); + } } }); } + + void destroy() { + topologyEventReceiver.terminate(); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 7aec0d5..5b77723 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -20,7 +20,6 @@ package org.apache.stratos.cep.extension; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.topology.*; @@ -45,26 +44,26 @@ import org.wso2.siddhi.query.api.expression.constant.IntConstant; import org.wso2.siddhi.query.api.expression.constant.LongConstant; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * CEP window processor to handle faulty member instances. This window processor is responsible for * publishing MemberFault event if health stats are not received within a given time window. */ -@SiddhiExtension(namespace = "stratos", - function = "faultHandling") +@SiddhiExtension(namespace = "stratos", function = "faultHandling") public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); - + private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout"; + private static final int ACTIVATE_TIMEOUT = + Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15); private static final int TIME_OUT = 60 * 1000; - public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; - public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10; - - private ExecutorService executorService; private ScheduledExecutorService faultHandleScheduler; private ScheduledFuture<?> lastSchedule; private ThreadBarrier threadBarrier; @@ -77,6 +76,9 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run // Map of member id's to their last received health event time stamp private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + private volatile boolean isActive; + private volatile boolean hasMemberTimeStampMapInitialized; + private long startTime = System.currentTimeMillis(); // Event receiver to receive topology events published by cloud-controller private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); @@ -101,7 +103,11 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run * * @param event Event received by Siddhi. */ - protected void addDataToMap(InEvent event) { + private void addDataToMap(InEvent event) { + if (!isActive) { + log.info("Received first event. Marking fault handling window processor as active"); + isActive = true; + } String id = (String) event.getData()[memberIdAttrIndex]; //checking whether this member is the topology. //sometimes there can be a delay between publishing member terminated events @@ -143,7 +149,6 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run * @param topology Topology model object */ boolean loadTimeStampMapFromTopology(Topology topology) { - long currentTimeStamp = System.currentTimeMillis(); if (topology == null || topology.getServices() == null) { return false; @@ -164,10 +169,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } } - - if (log.isDebugEnabled()) { - log.debug( - "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap); + hasMemberTimeStampMapInitialized = true; + if (log.isInfoEnabled()) { + log.info("Member timestamps were successfully loaded from the topology: [timestamps] " + + Arrays.toString(memberTimeStampMap.entrySet().toArray())); } return true; } @@ -222,7 +227,19 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void run() { try { + // wait until the first event OR given timeout to expire in order to activate this window processor + // this is to prevent false positives at the CEP startup + if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) { + log.info("Activation wait timeout has expired. Marking fault handling window processor as active"); + isActive = true; + } + // do not process events until memberTimeStampMap is initialized and window processor is activated + // memberTimeStampMap will be initialized only after receiving the complete topology event + if (!(isActive && hasMemberTimeStampMapInitialized)) { + return; + } threadBarrier.pass(); + for (Object o : memberTimeStampMap.entrySet()) { Map.Entry pair = (Map.Entry) o; long currentTime = System.currentTimeMillis(); @@ -255,7 +272,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected Object[] currentState() { - return new Object[] { window.currentState() }; + return new Object[]{window.currentState()}; } @Override @@ -267,8 +284,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext + siddhiContext) { if (parameters[0] instanceof IntConstant) { timeToKeep = ((IntConstant) parameters[0]).getValue(); } else { @@ -286,17 +303,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run MemberFaultEventMap .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); -// executorService = StratosThreadPool -// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); -// cepTopologyEventReceiver.setExecutorService(executorService); -// cepTopologyEventReceiver.execute(); - //Ordinary scheduling window.schedule(); - if (log.isDebugEnabled()) { - log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + - ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + - ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); + if (log.isInfoEnabled()) { + log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " + + "[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " + + "[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex, + siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT)); } } @@ -329,20 +342,11 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void destroy() { // terminate topology listener thread -// cepTopologyEventReceiver.terminate(); + cepTopologyEventReceiver.destroy(); window = null; - - // Shutdown executor service - if (executorService != null) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down cep extension executor service", e); - } - } } - public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { + ConcurrentHashMap<String, Long> getMemberTimeStampMap() { return memberTimeStampMap; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java index 2696271..51e4850 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -35,24 +35,16 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; * CEP Topology Receiver for Fault Handling Window Processor. */ public class CEPTopologyEventReceiver { - private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); - private FaultHandlingWindowProcessor faultHandler; private TopologyEventReceiver topologyEventReceiver; - public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { + CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { this.faultHandler = faultHandler; this.topologyEventReceiver = TopologyEventReceiver.getInstance(); addEventListeners(); } -// @Override -// public void execute() { -// super.execute(); -// log.info("CEP topology event receiver thread started"); -// } - private void addEventListeners() { // Load member time stamp map from the topology as a one time task topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @@ -63,7 +55,7 @@ public class CEPTopologyEventReceiver { if (!initialized) { try { TopologyManager.acquireReadLock(); - log.debug("Complete topology event received to fault handling window processor."); + log.info("Complete topology event received to fault handling window processor."); CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); } catch (Exception e) { @@ -81,7 +73,11 @@ public class CEPTopologyEventReceiver { protected void onEvent(Event event) { MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); - log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId()); + if (log.isDebugEnabled()) { + log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent + .getMemberId()); + + } } }); @@ -94,8 +90,14 @@ public class CEPTopologyEventReceiver { // do not put this member if we have already received a health event faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), System.currentTimeMillis()); - log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); + if (log.isDebugEnabled()) { + log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); + } } }); } + + void destroy() { + topologyEventReceiver.terminate(); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 2abfda1..0c2ea92 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -14,7 +14,6 @@ package org.apache.stratos.cep.extension; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.topology.*; @@ -39,26 +38,26 @@ import org.wso2.siddhi.query.api.expression.constant.IntConstant; import org.wso2.siddhi.query.api.expression.constant.LongConstant; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * CEP window processor to handle faulty member instances. This window processor is responsible for * publishing MemberFault event if health stats are not received within a given time window. */ -@SiddhiExtension(namespace = "stratos", - function = "faultHandling") +@SiddhiExtension(namespace = "stratos", function = "faultHandling") public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - + private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout"; + private static final int ACTIVATE_TIMEOUT = + Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15); private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); - private static final int TIME_OUT = 60 * 1000; - public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; - public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10; - - private ExecutorService executorService; private ScheduledExecutorService faultHandleScheduler; private ScheduledFuture<?> lastSchedule; private ThreadBarrier threadBarrier; @@ -71,6 +70,9 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run // Map of member id's to their last received health event time stamp private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + private volatile boolean isActive; + private volatile boolean hasMemberTimeStampMapInitialized; + private long startTime = System.currentTimeMillis(); // Event receiver to receive topology events published by cloud-controller private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); @@ -95,7 +97,11 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run * * @param event Event received by Siddhi. */ - protected void addDataToMap(InEvent event) { + private void addDataToMap(InEvent event) { + if (!isActive) { + log.info("Received first event. Marking fault handling window processor as active"); + isActive = true; + } String id = (String) event.getData()[memberIdAttrIndex]; //checking whether this member is the topology. //sometimes there can be a delay between publishing member terminated events @@ -137,7 +143,6 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run * @param topology Topology model object */ boolean loadTimeStampMapFromTopology(Topology topology) { - long currentTimeStamp = System.currentTimeMillis(); if (topology == null || topology.getServices() == null) { return false; @@ -158,10 +163,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } } - - if (log.isDebugEnabled()) { - log.debug( - "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap); + hasMemberTimeStampMapInitialized = true; + if (log.isInfoEnabled()) { + log.info("Member timestamps were successfully loaded from the topology: [timestamps] " + + Arrays.toString(memberTimeStampMap.entrySet().toArray())); } return true; } @@ -216,7 +221,19 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void run() { try { + // wait until the first event OR given timeout to expire in order to activate this window processor + // this is to prevent false positives at the CEP startup + if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) { + log.info("Activation wait timeout has expired. Marking fault handling window processor as active"); + isActive = true; + } + // do not process events until memberTimeStampMap is initialized and window processor is activated + // memberTimeStampMap will be initialized only after receiving the complete topology event + if (!(isActive && hasMemberTimeStampMapInitialized)) { + return; + } threadBarrier.pass(); + for (Object o : memberTimeStampMap.entrySet()) { Map.Entry pair = (Map.Entry) o; long currentTime = System.currentTimeMillis(); @@ -249,7 +266,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected Object[] currentState() { - return new Object[] { window.currentState() }; + return new Object[]{window.currentState()}; } @Override @@ -260,7 +277,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext + siddhiContext) { if (parameters[0] instanceof IntConstant) { timeToKeep = ((IntConstant) parameters[0]).getValue(); @@ -279,17 +297,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run MemberFaultEventMap .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); -// executorService = StratosThreadPool -// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); -// cepTopologyEventReceiver.setExecutorService(executorService); -// cepTopologyEventReceiver.execute(); - //Ordinary scheduling window.schedule(); - if (log.isDebugEnabled()) { - log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + - ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + - ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); + if (log.isInfoEnabled()) { + log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " + + "[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " + + "[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex, + siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT)); } } @@ -322,20 +336,11 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void destroy() { // terminate topology listener thread -// cepTopologyEventReceiver.terminate(); + cepTopologyEventReceiver.destroy(); window = null; - - // Shutdown executor service - if (executorService != null) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down cep extension executor service", e); - } - } } - public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { + ConcurrentHashMap<String, Long> getMemberTimeStampMap() { return memberTimeStampMap; } }
