Making CEP fault handling window processors resilient against issues in CEP. 
Wait for a given timeout or the first event is received before processing 
health stats. This is to avoid false positive faulty members. Make CEP fault 
handling window processor wait until complete topology event is received at the 
startup.


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a3ab4aef
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a3ab4aef
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a3ab4aef

Branch: refs/heads/master
Commit: a3ab4aef07e125b2998461108a217149a86c5b27
Parents: b063eb4
Author: Akila Perera <[email protected]>
Authored: Fri Jun 17 13:12:23 2016 +0530
Committer: Akila Perera <[email protected]>
Committed: Fri Jun 17 13:12:23 2016 +0530

----------------------------------------------------------------------
 .../cep/extension/CEPTopologyEventReceiver.java | 26 ++++---
 .../extension/FaultHandlingWindowProcessor.java | 82 ++++++++++----------
 .../cep/extension/CEPTopologyEventReceiver.java | 26 ++++---
 .../extension/FaultHandlingWindowProcessor.java | 81 ++++++++++---------
 4 files changed, 114 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 2696271..51e4850 100644
--- 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -35,24 +35,16 @@ import 
org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
 public class CEPTopologyEventReceiver {
-
     private static final Log log = 
LogFactory.getLog(CEPTopologyEventReceiver.class);
-
     private FaultHandlingWindowProcessor faultHandler;
     private TopologyEventReceiver topologyEventReceiver;
 
-    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) 
{
+    CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-//    @Override
-//    public void execute() {
-//        super.execute();
-//        log.info("CEP topology event receiver thread started");
-//    }
-
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
         topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
@@ -63,7 +55,7 @@ public class CEPTopologyEventReceiver {
                 if (!initialized) {
                     try {
                         TopologyManager.acquireReadLock();
-                        log.debug("Complete topology event received to fault 
handling window processor.");
+                        log.info("Complete topology event received to fault 
handling window processor.");
                         CompleteTopologyEvent completeTopologyEvent = 
(CompleteTopologyEvent) event;
                         initialized = 
faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
                     } catch (Exception e) {
@@ -81,7 +73,11 @@ public class CEPTopologyEventReceiver {
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
                 
faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
-                log.debug("Member was removed from the timestamp map: [member] 
" + memberTerminatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was removed from the timestamp map: 
[member] " + memberTerminatedEvent
+                            .getMemberId());
+
+                }
             }
         });
 
@@ -94,8 +90,14 @@ public class CEPTopologyEventReceiver {
                 // do not put this member if we have already received a health 
event
                 
faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
                         System.currentTimeMillis());
-                log.debug("Member was added to the timestamp map: [member] " + 
memberActivatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was added to the timestamp map: [member] 
" + memberActivatedEvent.getMemberId());
+                }
             }
         });
     }
+
+    void destroy() {
+        topologyEventReceiver.terminate();
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 7aec0d5..5b77723 100644
--- 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -20,7 +20,6 @@ package org.apache.stratos.cep.extension;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -45,26 +44,26 @@ import 
org.wso2.siddhi.query.api.expression.constant.IntConstant;
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CEP window processor to handle faulty member instances. This window 
processor is responsible for
  * publishing MemberFault event if health stats are not received within a 
given time window.
  */
-@SiddhiExtension(namespace = "stratos",
-                 function = "faultHandling")
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements 
RunnableWindowProcessor {
-
     private static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
-
+    private static final String ACTIVATE_TIMEOUT_KEY = 
"cep.fault.handler.extension.activate.timeout";
+    private static final int ACTIVATE_TIMEOUT =
+            Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
     private static final int TIME_OUT = 60 * 1000;
-    public static final String CEP_EXTENSION_THREAD_POOL_KEY = 
"cep.extension.thread.pool";
-    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
-
-    private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
     private ThreadBarrier threadBarrier;
@@ -77,6 +76,9 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     // Map of member id's to their last received health event time stamp
     private ConcurrentHashMap<String, Long> memberTimeStampMap = new 
ConcurrentHashMap<String, Long>();
+    private volatile boolean isActive;
+    private volatile boolean hasMemberTimeStampMapInitialized;
+    private long startTime = System.currentTimeMillis();
 
     // Event receiver to receive topology events published by cloud-controller
     private CEPTopologyEventReceiver cepTopologyEventReceiver = new 
CEPTopologyEventReceiver(this);
@@ -101,7 +103,11 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
      *
      * @param event Event received by Siddhi.
      */
-    protected void addDataToMap(InEvent event) {
+    private void addDataToMap(InEvent event) {
+        if (!isActive) {
+            log.info("Received first event. Marking fault handling window 
processor as active");
+            isActive = true;
+        }
         String id = (String) event.getData()[memberIdAttrIndex];
         //checking whether this member is the topology.
         //sometimes there can be a delay between publishing member terminated 
events
@@ -143,7 +149,6 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
      * @param topology Topology model object
      */
     boolean loadTimeStampMapFromTopology(Topology topology) {
-
         long currentTimeStamp = System.currentTimeMillis();
         if (topology == null || topology.getServices() == null) {
             return false;
@@ -164,10 +169,10 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
                 }
             }
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    "Member timestamps were successfully loaded from the 
topology: [timestamps] " + memberTimeStampMap);
+        hasMemberTimeStampMapInitialized = true;
+        if (log.isInfoEnabled()) {
+            log.info("Member timestamps were successfully loaded from the 
topology: [timestamps] " +
+                    Arrays.toString(memberTimeStampMap.entrySet().toArray()));
         }
         return true;
     }
@@ -222,7 +227,19 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
     @Override
     public void run() {
         try {
+            // wait until the first event OR given timeout to expire in order 
to activate this window processor
+            // this is to prevent false positives at the CEP startup
+            if (!isActive && System.currentTimeMillis() - startTime > 
ACTIVATE_TIMEOUT) {
+                log.info("Activation wait timeout has expired. Marking fault 
handling window processor as active");
+                isActive = true;
+            }
+            // do not process events until memberTimeStampMap is initialized 
and window processor is activated
+            // memberTimeStampMap will be initialized only after receiving the 
complete topology event
+            if (!(isActive && hasMemberTimeStampMapInitialized)) {
+                return;
+            }
             threadBarrier.pass();
+
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
@@ -255,7 +272,7 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     @Override
     protected Object[] currentState() {
-        return new Object[] { window.currentState() };
+        return new Object[]{window.currentState()};
     }
 
     @Override
@@ -267,8 +284,8 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor,
-            AbstractDefinition streamDefinition, String elementId, boolean 
async, SiddhiContext siddhiContext) {
-
+                        AbstractDefinition streamDefinition, String elementId, 
boolean async, SiddhiContext
+                                siddhiContext) {
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
         } else {
@@ -286,17 +303,13 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
         MemberFaultEventMap
                 
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", 
memberFaultEventMessageMap);
 
-//        executorService = StratosThreadPool
-//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, 
CEP_EXTENSION_THREAD_POOL_SIZE);
-//        cepTopologyEventReceiver.setExecutorService(executorService);
-//        cepTopologyEventReceiver.execute();
-
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()) {
-            log.debug("Fault handling window processor initialized with 
[timeToKeep] " + timeToKeep +
-                    ", [memberIdAttrName] " + memberIdAttrName + ", 
[memberIdAttrIndex] " + memberIdAttrIndex +
-                    ", [distributed-enabled] " + 
this.siddhiContext.isDistributedProcessingEnabled());
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Fault handling window processor 
initialized with [timeToKeep] %s, " +
+                            "[memberIdAttrName] %s, [memberIdAttrIndex] %s, 
[distributed-enabled] %s, " +
+                            "[activate-timeout] %d", timeToKeep, 
memberIdAttrName, memberIdAttrIndex,
+                    siddhiContext.isDistributedProcessingEnabled(), 
ACTIVATE_TIMEOUT));
         }
     }
 
@@ -329,20 +342,11 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
     @Override
     public void destroy() {
         // terminate topology listener thread
-//        cepTopologyEventReceiver.terminate();
+        cepTopologyEventReceiver.destroy();
         window = null;
-
-        // Shutdown executor service
-        if (executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down cep extension 
executor service", e);
-            }
-        }
     }
 
-    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+    ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 2696271..51e4850 100644
--- 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -35,24 +35,16 @@ import 
org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
 public class CEPTopologyEventReceiver {
-
     private static final Log log = 
LogFactory.getLog(CEPTopologyEventReceiver.class);
-
     private FaultHandlingWindowProcessor faultHandler;
     private TopologyEventReceiver topologyEventReceiver;
 
-    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) 
{
+    CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-//    @Override
-//    public void execute() {
-//        super.execute();
-//        log.info("CEP topology event receiver thread started");
-//    }
-
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
         topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
@@ -63,7 +55,7 @@ public class CEPTopologyEventReceiver {
                 if (!initialized) {
                     try {
                         TopologyManager.acquireReadLock();
-                        log.debug("Complete topology event received to fault 
handling window processor.");
+                        log.info("Complete topology event received to fault 
handling window processor.");
                         CompleteTopologyEvent completeTopologyEvent = 
(CompleteTopologyEvent) event;
                         initialized = 
faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
                     } catch (Exception e) {
@@ -81,7 +73,11 @@ public class CEPTopologyEventReceiver {
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
                 
faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
-                log.debug("Member was removed from the timestamp map: [member] 
" + memberTerminatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was removed from the timestamp map: 
[member] " + memberTerminatedEvent
+                            .getMemberId());
+
+                }
             }
         });
 
@@ -94,8 +90,14 @@ public class CEPTopologyEventReceiver {
                 // do not put this member if we have already received a health 
event
                 
faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
                         System.currentTimeMillis());
-                log.debug("Member was added to the timestamp map: [member] " + 
memberActivatedEvent.getMemberId());
+                if (log.isDebugEnabled()) {
+                    log.debug("Member was added to the timestamp map: [member] 
" + memberActivatedEvent.getMemberId());
+                }
             }
         });
     }
+
+    void destroy() {
+        topologyEventReceiver.terminate();
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/a3ab4aef/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 2abfda1..0c2ea92 100644
--- 
a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -14,7 +14,6 @@ package org.apache.stratos.cep.extension;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -39,26 +38,26 @@ import 
org.wso2.siddhi.query.api.expression.constant.IntConstant;
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CEP window processor to handle faulty member instances. This window 
processor is responsible for
  * publishing MemberFault event if health stats are not received within a 
given time window.
  */
-@SiddhiExtension(namespace = "stratos",
-                 function = "faultHandling")
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements 
RunnableWindowProcessor {
-
+    private static final String ACTIVATE_TIMEOUT_KEY = 
"cep.fault.handler.extension.activate.timeout";
+    private static final int ACTIVATE_TIMEOUT =
+            Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
     private static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
-
     private static final int TIME_OUT = 60 * 1000;
-    public static final String CEP_EXTENSION_THREAD_POOL_KEY = 
"cep.extension.thread.pool";
-    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
-
-    private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
     private ThreadBarrier threadBarrier;
@@ -71,6 +70,9 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     // Map of member id's to their last received health event time stamp
     private ConcurrentHashMap<String, Long> memberTimeStampMap = new 
ConcurrentHashMap<String, Long>();
+    private volatile boolean isActive;
+    private volatile boolean hasMemberTimeStampMapInitialized;
+    private long startTime = System.currentTimeMillis();
 
     // Event receiver to receive topology events published by cloud-controller
     private CEPTopologyEventReceiver cepTopologyEventReceiver = new 
CEPTopologyEventReceiver(this);
@@ -95,7 +97,11 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
      *
      * @param event Event received by Siddhi.
      */
-    protected void addDataToMap(InEvent event) {
+    private void addDataToMap(InEvent event) {
+        if (!isActive) {
+            log.info("Received first event. Marking fault handling window 
processor as active");
+            isActive = true;
+        }
         String id = (String) event.getData()[memberIdAttrIndex];
         //checking whether this member is the topology.
         //sometimes there can be a delay between publishing member terminated 
events
@@ -137,7 +143,6 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
      * @param topology Topology model object
      */
     boolean loadTimeStampMapFromTopology(Topology topology) {
-
         long currentTimeStamp = System.currentTimeMillis();
         if (topology == null || topology.getServices() == null) {
             return false;
@@ -158,10 +163,10 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
                 }
             }
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    "Member timestamps were successfully loaded from the 
topology: [timestamps] " + memberTimeStampMap);
+        hasMemberTimeStampMapInitialized = true;
+        if (log.isInfoEnabled()) {
+            log.info("Member timestamps were successfully loaded from the 
topology: [timestamps] " +
+                    Arrays.toString(memberTimeStampMap.entrySet().toArray()));
         }
         return true;
     }
@@ -216,7 +221,19 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
     @Override
     public void run() {
         try {
+            // wait until the first event OR given timeout to expire in order 
to activate this window processor
+            // this is to prevent false positives at the CEP startup
+            if (!isActive && System.currentTimeMillis() - startTime > 
ACTIVATE_TIMEOUT) {
+                log.info("Activation wait timeout has expired. Marking fault 
handling window processor as active");
+                isActive = true;
+            }
+            // do not process events until memberTimeStampMap is initialized 
and window processor is activated
+            // memberTimeStampMap will be initialized only after receiving the 
complete topology event
+            if (!(isActive && hasMemberTimeStampMapInitialized)) {
+                return;
+            }
             threadBarrier.pass();
+
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
@@ -249,7 +266,7 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     @Override
     protected Object[] currentState() {
-        return new Object[] { window.currentState() };
+        return new Object[]{window.currentState()};
     }
 
     @Override
@@ -260,7 +277,8 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor,
-            AbstractDefinition streamDefinition, String elementId, boolean 
async, SiddhiContext siddhiContext) {
+                        AbstractDefinition streamDefinition, String elementId, 
boolean async, SiddhiContext
+                                siddhiContext) {
 
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
@@ -279,17 +297,13 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
         MemberFaultEventMap
                 
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", 
memberFaultEventMessageMap);
 
-//        executorService = StratosThreadPool
-//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, 
CEP_EXTENSION_THREAD_POOL_SIZE);
-//        cepTopologyEventReceiver.setExecutorService(executorService);
-//        cepTopologyEventReceiver.execute();
-
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()) {
-            log.debug("Fault handling window processor initialized with 
[timeToKeep] " + timeToKeep +
-                    ", [memberIdAttrName] " + memberIdAttrName + ", 
[memberIdAttrIndex] " + memberIdAttrIndex +
-                    ", [distributed-enabled] " + 
this.siddhiContext.isDistributedProcessingEnabled());
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Fault handling window processor 
initialized with [timeToKeep] %s, " +
+                            "[memberIdAttrName] %s, [memberIdAttrIndex] %s, 
[distributed-enabled] %s, " +
+                            "[activate-timeout] %d", timeToKeep, 
memberIdAttrName, memberIdAttrIndex,
+                    siddhiContext.isDistributedProcessingEnabled(), 
ACTIVATE_TIMEOUT));
         }
     }
 
@@ -322,20 +336,11 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
     @Override
     public void destroy() {
         // terminate topology listener thread
-//        cepTopologyEventReceiver.terminate();
+        cepTopologyEventReceiver.destroy();
         window = null;
-
-        // Shutdown executor service
-        if (executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down cep extension 
executor service", e);
-            }
-        }
     }
 
-    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+    ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
 }

Reply via email to