Repository: incubator-eagle
Updated Branches:
  refs/heads/master e520e4011 -> c15e7f814


EAGLE-672: remove mongo state store for dedup

Author: Li, Garrett
Reviewer: ralphsu

This closes #553


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c15e7f81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c15e7f81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c15e7f81

Branch: refs/heads/master
Commit: c15e7f81497f6873dee25421c69f4c1351fdcb9c
Parents: e520e40
Author: Xiancheng Li <xiancheng...@ebay.com>
Authored: Mon Oct 24 09:32:21 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Mon Oct 24 14:18:13 2016 +0800

----------------------------------------------------------------------
 .../engine/publisher/dedup/DedupCache.java      |  93 +----
 .../publisher/dedup/DedupEventsStore.java       |  32 --
 .../dedup/DedupEventsStoreFactory.java          |  57 ---
 .../publisher/dedup/MongoDedupEventsStore.java  | 146 -------
 .../publisher/dedup/TransformerUtils.java       | 117 ------
 .../publisher/dedup/DedupCacheStoreTest.java    | 150 -------
 .../engine/publisher/dedup/DedupCacheTest.java  | 202 +++++-----
 .../dedup/DefaultDeduplicatorTest.java          | 399 ++++++++++---------
 .../dedup/ExtendedDeduplicatorTest.java         |  35 +-
 .../publisher/dedup/MongoDedupStoreTest.java    |  68 ----
 .../dedup/MongoDependencyBaseTest.java          |  67 ----
 .../engine/router/TestAlertPublisherBolt.java   |  54 +--
 12 files changed, 344 insertions(+), 1076 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index 2080441..abb83d6 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -16,42 +16,37 @@
  */
 package org.apache.eagle.alert.engine.publisher.dedup;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.typesafe.config.Config;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import 
org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
 
 public class DedupCache {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DedupCache.class);
 
     private static final long CACHE_MAX_EXPIRE_TIME_IN_DAYS = 30;
-    private static final long CACHE_MAX_EVENT_QUEUE_SIZE = 10;
 
     public static final String DEDUP_COUNT = "dedupCount";
     public static final String DOC_ID = "docId";
     public static final String DEDUP_FIRST_OCCURRENCE = 
"dedupFirstOccurrenceTime";
 
-    private static final DedupEventsStoreType type = 
DedupEventsStoreType.Mongo;
-
     private long lastUpdated = -1;
     private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new 
ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
 
-    private static final ConcurrentLinkedDeque<DedupCache> caches = new 
ConcurrentLinkedDeque<DedupCache>();
-
+    @SuppressWarnings("unused")
     private Config config;
 
     private String publishName;
@@ -59,39 +54,6 @@ public class DedupCache {
     public DedupCache(Config config, String publishName) {
         this.config = config;
         this.publishName = publishName;
-        // only happens during startup, won't introduce perf issue here
-        synchronized (caches) {
-            if (caches.size() == 0) {
-                // create daemon to clean up old removable events periodically
-                ScheduledExecutorService scheduleSrv = 
Executors.newScheduledThreadPool(1, new ThreadFactory() {
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        Thread t = new Thread(r);
-                        t.setDaemon(true);
-                        return t;
-                    }
-                });
-                scheduleSrv.scheduleAtFixedRate(new Runnable() {
-                    @Override
-                    public void run() {
-                        for (DedupCache cache : caches) {
-                            if (cache == null || cache.getEvents() == null) {
-                                continue;
-                            }
-                            HashSet<EventUniq> eventUniqs = new 
HashSet<EventUniq>(cache.getEvents().keySet());
-                            for (EventUniq one : eventUniqs) {
-                                if (one.removable && one.createdTime < 
System.currentTimeMillis() - 3600000 * 24) {
-                                    cache.removeEvent(one);
-                                    LOG.info("Remove dedup key {} from cache & 
db", one);
-                                }
-                            }
-                        }
-                    }
-                }, 5, 60, TimeUnit.MINUTES);
-                LOG.info("Create daemon to clean up old removable events 
periodically");
-            }
-            caches.add(this);
-        }
     }
 
     public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
@@ -99,8 +61,6 @@ public class DedupCache {
             || System.currentTimeMillis() - lastUpdated > 
CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY
             || events.size() <= 0) {
             lastUpdated = System.currentTimeMillis();
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
-            events = accessor.getEvents();
         }
         return events;
     }
@@ -112,9 +72,6 @@ public class DedupCache {
     public void removeEvent(EventUniq eventEniq) {
         if (this.contains(eventEniq)) {
             this.events.remove(eventEniq);
-
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
-            accessor.remove(eventEniq);
         }
     }
 
@@ -142,9 +99,9 @@ public class DedupCache {
         Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = 
this.getEvents();
         if (!events.containsKey(eventEniq)
             || (events.containsKey(eventEniq)
-                && events.get(eventEniq).size() > 0
-                && !StringUtils.equalsIgnoreCase(stateFieldValue,
-                    events.get(eventEniq).getLast().getStateFieldValue()))) {
+            && events.get(eventEniq).size() > 0
+            && !StringUtils.equalsIgnoreCase(stateFieldValue,
+            events.get(eventEniq).getLast().getStateFieldValue()))) {
             DedupValue[] dedupValues = this.add(eventEniq, event, 
stateFieldValue, stateCloseValue);
             return dedupValues;
         } else {
@@ -164,19 +121,12 @@ public class DedupCache {
             events.put(eventEniq, dedupValues);
             LOG.info("{} Add new dedup key {}, and value {}", 
this.publishName, eventEniq, dedupValues);
         } else if (!StringUtils.equalsIgnoreCase(stateFieldValue,
-                events.get(eventEniq).getLast().getStateFieldValue())) {
+            events.get(eventEniq).getLast().getStateFieldValue())) {
             // existing a de-dup value, try update or reset
             DedupValue lastDedupValue = events.get(eventEniq).getLast();
             dedupValue = updateDedupValue(lastDedupValue, eventEniq, event, 
stateFieldValue, stateCloseValue);
             LOG.info("{} Update dedup key {}, and value {}", this.publishName, 
eventEniq, dedupValue);
         }
-
-        if (dedupValue != null) {
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
-            accessor.add(eventEniq, events.get(eventEniq));
-            LOG.info("{} Store dedup key {}, value {} to DB", 
this.publishName, eventEniq,
-                Joiner.on(",").join(events.get(eventEniq)));
-        }
         if (dedupValue == null) {
             return null;
         }
@@ -190,7 +140,7 @@ public class DedupCache {
         }
 
         if (lastDedupValue.getStateFieldValue().equals(stateCloseValue)
-                && eventEniq.timestamp < lastDedupValue.getCloseTime()) {
+            && eventEniq.timestamp < lastDedupValue.getCloseTime()) {
             DedupValue dv = createDedupValue(eventEniq, event, 
stateFieldValue);
             lastDedupValue.resetTo(dv);
         } else {
@@ -208,7 +158,7 @@ public class DedupCache {
         dedupValue = new DedupValue();
         dedupValue.setFirstOccurrence(eventEniq.timestamp);
         int idx = event.getSchema().getColumnIndex(DOC_ID);
-        if (idx >= 0 ) {
+        if (idx >= 0) {
             dedupValue.setDocId(event.getData()[idx].toString());
         } else {
             dedupValue.setDocId("");
@@ -219,13 +169,6 @@ public class DedupCache {
         return dedupValue;
     }
 
-    public void persistUpdatedEventUniq(EventUniq eventEniq) {
-        DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
-        accessor.add(eventEniq, events.get(eventEniq));
-        LOG.info("{} Store dedup key {}, value {} to DB", this.publishName, 
eventEniq,
-            Joiner.on(",").join(events.get(eventEniq)));
-    }
-
     private DedupValue updateCount(EventUniq eventEniq) {
         ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
         if (dedupValues == null || dedupValues.size() <= 0) {
@@ -237,11 +180,7 @@ public class DedupCache {
             String updateMsg = String.format(
                 "%s Update count for dedup key %s, value %s and count %s", 
this.publishName, eventEniq,
                 dedupValue.getStateFieldValue(), dedupValue.getCount());
-            if (dedupValue.getCount() > 0 && dedupValue.getCount() % 100 == 0) 
{
-                LOG.info(updateMsg);
-                DedupEventsStore accessor = 
DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
-                accessor.add(eventEniq, dedupValues);
-            } else if (LOG.isDebugEnabled()) {
+            if (LOG.isDebugEnabled()) {
                 LOG.debug(updateMsg);
             }
             return dedupValue;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
deleted file mode 100644
index 5918afe..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-
-public interface DedupEventsStore {
-
-    public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents();
-
-    public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> 
dedupStateValues);
-
-    public void remove(EventUniq eventEniq);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
deleted file mode 100644
index 75d8e53..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.typesafe.config.Config;
-
-public class DedupEventsStoreFactory {
-
-    public enum DedupEventsStoreType {
-        Mongo, ElasticSearch
-    }
-
-    ;
-
-    private static DedupEventsStore customizedStore;
-
-    private static MongoDedupEventsStore accessor;
-
-    public static void customizeStore(DedupEventsStore store) {
-        customizedStore = store;
-    }
-
-    public static DedupEventsStore getStore(DedupEventsStoreType type, Config 
config, String publishName) {
-        if (customizedStore != null) {
-            return customizedStore;
-        }
-        switch (type) {
-            case Mongo:
-                if (accessor == null) {
-                    accessor = new MongoDedupEventsStore(config, publishName);
-                }
-                break;
-            case ElasticSearch:
-            default:
-                break;
-        }
-        if (accessor == null) {
-            throw new RuntimeException(String.format("Dedup events store type 
%s is NOT supportted", type));
-        }
-        return accessor;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
deleted file mode 100644
index 35281bf..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.mongodb.Block;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.InsertOneOptions;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.bson.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-public class MongoDedupEventsStore implements DedupEventsStore {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDedupEventsStore.class);
-
-    public static final String DEDUP_ID = "dedupId";
-    public static final String DEDUP_STREAM_ID = "streamId";
-    public static final String DOC_ID = "docId";
-    public static final String DEDUP_POLICY_ID = "policyId";
-    public static final String DEDUP_CREATE_TIME = "createdTime";
-    public static final String DEDUP_TIMESTAMP = "timestamp";
-    public static final String DEDUP_REMOVABLE = "removable";
-    public static final String DEDUP_CUSTOM_FIELDS_VALUES = 
"customFieldValues";
-    public static final String DEDUP_VALUES = "dedupValues";
-    public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";
-    public static final String DEDUP_COUNT = "count";
-    public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence";
-    public static final String DEDUP_CLOSE_TIME = "closeTime";
-    public static final String DEDUP_PUBLISH_ID = "publishId";
-
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    static {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-    }
-
-    private Config config;
-    private String connection;
-    private MongoClient client;
-    private MongoDatabase db;
-    private MongoCollection<Document> stateCollection;
-    private String publishName;
-
-    private static final String DB_NAME = "ump_alert_dedup";
-    private static final String ALERT_STATE_COLLECTION = "alert_dedup";
-
-    public MongoDedupEventsStore(Config config, String publishName) {
-        this.config = config;
-        this.publishName = publishName;
-        this.connection = this.config.getString("connection");
-        try {
-            this.client = new MongoClient(new MongoClientURI(this.connection));
-            init();
-        } catch (Throwable t) {
-            LOG.error(String.format("initialize mongodb %s client failed", 
this.connection), t);
-        }
-    }
-
-    private void init() {
-        db = client.getDatabase(DB_NAME);
-        stateCollection = db.getCollection(ALERT_STATE_COLLECTION);
-        // dedup id index
-        IndexOptions io = new 
IndexOptions().background(true).unique(true).name(DEDUP_ID + "_index");
-        BsonDocument doc = new BsonDocument();
-        doc.append(DEDUP_ID, new BsonInt32(1));
-        stateCollection.createIndex(doc, io);
-    }
-
-    @Override
-    public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
-        try {
-            Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> result = new 
ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
-            BsonDocument filter = new BsonDocument();
-            filter.append(DEDUP_PUBLISH_ID, new BsonString(this.publishName));
-            stateCollection.find(filter).forEach(new Block<Document>() {
-                @Override
-                public void apply(final Document doc) {
-                    DedupEntity entity = 
TransformerUtils.transform(DedupEntity.class, BsonDocument.parse(doc.toJson()));
-                    result.put(entity.getEventEniq(), 
entity.getDedupValuesInConcurrentLinkedDeque());
-                }
-            });
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Found {} dedup events from mongoDB", result.size());
-            }
-            return result;
-        } catch (Exception e) {
-            LOG.error("find dedup state failed, but the state in memory is 
good, could be ingored.", e);
-        }
-        return new HashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
-    }
-
-    @Override
-    public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> 
dedupStateValues) {
-        try {
-            BsonDocument doc = TransformerUtils.transform(new 
DedupEntity(this.publishName, eventEniq, dedupStateValues));
-            BsonDocument filter = new BsonDocument();
-            filter.append(DEDUP_ID, new 
BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
-            Document returnedDoc = stateCollection.findOneAndReplace(filter, 
Document.parse(doc.toJson()));
-            if (returnedDoc == null) {
-                InsertOneOptions option = new InsertOneOptions();
-                stateCollection.insertOne(Document.parse(doc.toJson()), 
option);
-            }
-        } catch (Exception e) {
-            LOG.error("insert dedup state failed, but the state is still in 
memory, could be ingored.", e);
-        }
-    }
-
-    @Override
-    public void remove(EventUniq eventEniq) {
-        try {
-            BsonDocument filter = new BsonDocument();
-            filter.append(DEDUP_ID, new 
BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
-            stateCollection.deleteOne(filter);
-        } catch (Exception e) {
-            LOG.error("delete dedup state failed, but the state in memory is 
good, could be ingored.", e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
deleted file mode 100644
index 1ce6898..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.bson.*;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
-public class TransformerUtils {
-
-    public static final String MAP_KEY = "key";
-    public static final String MAP_VALUE = "value";
-
-    @SuppressWarnings("unchecked")
-    public static <T> T transform(Class<T> klass, BsonDocument doc) {
-        if (klass.equals(DedupEntity.class)) {
-            String streamId = 
doc.getString(MongoDedupEventsStore.DEDUP_STREAM_ID).getValue();
-            String policyId = 
doc.getString(MongoDedupEventsStore.DEDUP_POLICY_ID).getValue();
-            long timestamp = 
doc.getInt64(MongoDedupEventsStore.DEDUP_TIMESTAMP).getValue();
-            HashMap<String, String> customFieldValues = new HashMap<String, 
String>();
-            BsonArray customFieldsValuesArray = doc.getArray(
-                MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES);
-            for (int i = 0; i < customFieldsValuesArray.size(); i++) {
-                BsonDocument dedupCustomFieldValuesDoc = 
customFieldsValuesArray.get(i).asDocument();
-                customFieldValues.put(
-                    dedupCustomFieldValuesDoc.getString(MAP_KEY).getValue(),
-                    dedupCustomFieldValuesDoc.getString(MAP_VALUE).getValue());
-            }
-            EventUniq eventUniq = new EventUniq(streamId, policyId, timestamp, 
customFieldValues);
-            eventUniq.removable = 
doc.getBoolean(MongoDedupEventsStore.DEDUP_REMOVABLE).getValue();
-            eventUniq.createdTime = doc.getInt64(
-                MongoDedupEventsStore.DEDUP_CREATE_TIME, new 
BsonInt64(0)).getValue();
-            List<DedupValue> dedupValues = new ArrayList<DedupValue>();
-            BsonArray dedupValuesArray = 
doc.getArray(MongoDedupEventsStore.DEDUP_VALUES);
-            for (int i = 0; i < dedupValuesArray.size(); i++) {
-                BsonDocument dedupValuesDoc = 
dedupValuesArray.get(i).asDocument();
-                DedupValue dedupValue = new DedupValue();
-                dedupValue.setStateFieldValue(dedupValuesDoc.getString(
-                    MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE).getValue());
-                dedupValue.setCount(dedupValuesDoc.getInt64(
-                    MongoDedupEventsStore.DEDUP_COUNT).getValue());
-                dedupValue.setFirstOccurrence(dedupValuesDoc.getInt64(
-                    MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue());
-                dedupValue.setCloseTime(dedupValuesDoc.getInt64(
-                    MongoDedupEventsStore.DEDUP_CLOSE_TIME).getValue());
-                dedupValue.setDocId(dedupValuesDoc.getString(
-                    MongoDedupEventsStore.DOC_ID).getValue());
-                dedupValues.add(dedupValue);
-            }
-            String publishId = 
doc.getString(MongoDedupEventsStore.DEDUP_PUBLISH_ID).getValue();
-            return (T) new DedupEntity(publishId, eventUniq, dedupValues);
-        }
-        throw new RuntimeException(String.format("Unknow object type %s, 
cannot transform", klass.getName()));
-    }
-
-    public static BsonDocument transform(Object obj) {
-        if (obj instanceof DedupEntity) {
-            BsonDocument doc = new BsonDocument();
-            DedupEntity entity = (DedupEntity) obj;
-            doc.put(MongoDedupEventsStore.DEDUP_ID, new 
BsonInt64(getUniqueId(entity.getPublishName(), entity.getEventEniq())));
-            doc.put(MongoDedupEventsStore.DEDUP_STREAM_ID, new 
BsonString(entity.getEventEniq().streamId));
-            doc.put(MongoDedupEventsStore.DEDUP_PUBLISH_ID, new 
BsonString(entity.getPublishName()));
-            doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new 
BsonString(entity.getEventEniq().policyId));
-            doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new 
BsonInt64(entity.getEventEniq().createdTime));
-            doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new 
BsonInt64(entity.getEventEniq().timestamp));
-            doc.put(MongoDedupEventsStore.DEDUP_REMOVABLE, new 
BsonBoolean(entity.getEventEniq().removable));
-
-            List<BsonDocument> dedupCustomFieldValues = new 
ArrayList<BsonDocument>();
-            for (Entry<String, String> entry : 
entity.getEventEniq().customFieldValues.entrySet()) {
-                BsonDocument dedupCustomFieldValuesDoc = new BsonDocument();
-                dedupCustomFieldValuesDoc.put(MAP_KEY, new 
BsonString(entry.getKey()));
-                dedupCustomFieldValuesDoc.put(MAP_VALUE, new 
BsonString(entry.getValue()));
-                dedupCustomFieldValues.add(dedupCustomFieldValuesDoc);
-            }
-            doc.put(MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES, new 
BsonArray(dedupCustomFieldValues));
-
-            List<BsonDocument> dedupValuesDocs = new ArrayList<BsonDocument>();
-            for (DedupValue dedupValue : entity.getDedupValues()) {
-                BsonDocument dedupValuesDoc = new BsonDocument();
-                
dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE, new 
BsonString(dedupValue.getStateFieldValue()));
-                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_COUNT, new 
BsonInt64(dedupValue.getCount()));
-                
dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE,new 
BsonInt64(dedupValue.getFirstOccurrence()));
-                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_CLOSE_TIME, new 
BsonInt64(dedupValue.getCloseTime()));
-                dedupValuesDoc.put(MongoDedupEventsStore.DOC_ID, new 
BsonString(dedupValue.getDocId()));
-                dedupValuesDocs.add(dedupValuesDoc);
-            }
-            doc.put(MongoDedupEventsStore.DEDUP_VALUES, new 
BsonArray(dedupValuesDocs));
-            return doc;
-        }
-        throw new RuntimeException(String.format("Unknow object type %s, 
cannot transform", obj.getClass().getName()));
-    }
-
-    public static int getUniqueId(String publishName, EventUniq eventEniq) {
-        HashCodeBuilder builder = new 
HashCodeBuilder().append(eventEniq).append(publishName);
-        return builder.build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
deleted file mode 100644
index 25518de..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import 
org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.apache.storm.guava.base.Joiner;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-public class DedupCacheStoreTest extends MongoDependencyBaseTest {
-
-       @Test
-       public void testPersistUpdatedEventUniq() throws Exception {
-               StreamDefinition stream = createStream();
-               PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");
-               
-               AlertStreamEvent event = createEvent(stream, policy, new 
Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "OPEN", 0, 0
-               });
-               
-               HashMap<String, String> dedupFieldValues = new HashMap<String, 
String>();
-               dedupFieldValues.put("alertKey", (String) 
event.getData()[event.getSchema().getColumnIndex("alertKey")]);
-               EventUniq eventUniq = new EventUniq(event.getStreamId(), 
event.getPolicyId(), event.getCreatedTime(), dedupFieldValues);
-               
-               System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
-               Config config = ConfigFactory.load();
-               DedupCache cache = new DedupCache(config, "testPublishment");
-               cache.addOrUpdate(eventUniq, event, (String) 
event.getData()[event.getSchema().getColumnIndex("state")], "closed");
-               
-               DedupEventsStore accessor = 
DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config, 
"testPublishment");
-               Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = 
accessor.getEvents();
-               for (EventUniq one : events.keySet()) {
-                       if (one.equals(eventUniq)) {
-                               Assert.assertEquals(false, one.removable);
-                       }
-               }
-               for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry 
: events.entrySet()) {
-                       System.out.println(entry.getKey() + " >>> " + 
Joiner.on("\n\t").join(entry.getValue()));
-               }
-               
-               eventUniq.removable = true;
-               cache.persistUpdatedEventUniq(eventUniq);
-               
-               events = accessor.getEvents();
-               for (EventUniq one : events.keySet()) {
-                       if (one.equals(eventUniq)) {
-                               Assert.assertEquals(true, one.removable);
-                       }
-               }
-               
-               for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry 
: events.entrySet()) {
-                       System.out.println(entry.getKey() + " >>> " + 
Joiner.on("\n\t").join(entry.getValue()));
-               }
-       }
-       
-       private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {
-               AlertStreamEvent event = new AlertStreamEvent();
-               event.setPolicyId(policy.getName());
-               event.setSchema(stream);
-               event.setStreamId(stream.getStreamId());
-               event.setTimestamp(System.currentTimeMillis());
-               event.setCreatedTime(System.currentTimeMillis());
-               event.setData(data);
-               return event;
-       }
-       
-       private StreamDefinition createStream() {
-               StreamDefinition sd = new StreamDefinition();
-               StreamColumn tsColumn = new StreamColumn();
-               tsColumn.setName("timestamp");
-               tsColumn.setType(StreamColumn.Type.LONG);
-               
-               StreamColumn hostColumn = new StreamColumn();
-               hostColumn.setName("host");
-               hostColumn.setType(StreamColumn.Type.STRING);
-               
-               StreamColumn alertKeyColumn = new StreamColumn();
-               alertKeyColumn.setName("alertKey");
-               alertKeyColumn.setType(StreamColumn.Type.STRING);
-
-               StreamColumn stateColumn = new StreamColumn();
-               stateColumn.setName("state");
-               stateColumn.setType(StreamColumn.Type.STRING);
-               
-               // dedupCount, dedupFirstOccurrence
-               
-               StreamColumn dedupCountColumn = new StreamColumn();
-               dedupCountColumn.setName("dedupCount");
-               dedupCountColumn.setType(StreamColumn.Type.LONG);
-               
-               StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
-               
dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
-               dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-               
-               sd.setColumns(Arrays.asList(tsColumn, hostColumn, 
alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
-               sd.setDataSource("testDatasource");
-               sd.setStreamId("testStream");
-               sd.setDescription("test stream");
-               return sd;
-       }
-       
-       private PolicyDefinition createPolicy(String streamName, String 
policyName) {
-               PolicyDefinition pd = new PolicyDefinition();
-               PolicyDefinition.Definition def = new 
PolicyDefinition.Definition();
-               //expression, something like "PT5S,dynamic,1,host"
-               def.setValue("test");
-               def.setType("siddhi");
-               pd.setDefinition(def);
-               pd.setInputStreams(Arrays.asList("inputStream"));
-               pd.setOutputStreams(Arrays.asList("outputStream"));
-               pd.setName(policyName);
-               pd.setDescription(String.format("Test policy for stream %s", 
streamName));
-               
-               StreamPartition sp = new StreamPartition();
-               sp.setStreamId(streamName);
-               sp.setColumns(Arrays.asList("host"));
-               sp.setType(StreamPartition.Type.GROUPBY);
-               pd.addPartition(sp);
-               return pd;
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index d1c8457..5bf0410 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -28,120 +28,104 @@ import 
org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 public class DedupCacheTest {
-       
-       private DedupEventsStore store;
-       
-       @Before
-       public void setUp() {
-               store = Mockito.mock(DedupEventsStore.class);
-       DedupEventsStoreFactory.customizeStore(store);
-       }
-       
-       @After
-       public void tearDown() {
-        Mockito.reset(store);
-       }
-
-       @Test
-       public void testNormal() throws Exception {
-               Config config = ConfigFactory.load();
-               DedupCache dedupCache = new DedupCache(config, 
"testPublishment");
-               
-               StreamDefinition stream = createStream();
-               PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");
-               
-               String[] states = new String[] { "OPEN", "WARN", "CLOSE" };
-               Random random = new Random();
-               for (int i = 0; i < 20; i ++) {
-                       AlertStreamEvent event = createEvent(stream, policy, 
new Object[] {
-                                       System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", states[random.nextInt(3)], 0, 0
-                       });
-                       HashMap<String, String> dedupFieldValues = new 
HashMap<String, String>();
-                       dedupFieldValues.put("alertKey", (String) 
event.getData()[event.getSchema().getColumnIndex("alertKey")]);
-                       List<AlertStreamEvent> result = dedupCache.dedup(event, 
-                                       new EventUniq(event.getStreamId(), 
event.getPolicyId(), event.getCreatedTime(), dedupFieldValues), 
-                                       "state", 
-                                       (String) 
event.getData()[event.getSchema().getColumnIndex("state")], "closed");
-                       System.out.println((i + 1) + " >>>> " + 
ToStringBuilder.reflectionToString(result));
-               }
-               
-               Assert.assertTrue(true);
-       }
-       
-       private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {
-               AlertStreamEvent event = new AlertStreamEvent();
-               event.setPolicyId(policy.getName());
-               event.setSchema(stream);
-               event.setStreamId(stream.getStreamId());
-               event.setTimestamp(System.currentTimeMillis());
-               event.setCreatedTime(System.currentTimeMillis());
-               event.setData(data);
-               return event;
-       }
-       
-       private StreamDefinition createStream() {
-               StreamDefinition sd = new StreamDefinition();
-               StreamColumn tsColumn = new StreamColumn();
-               tsColumn.setName("timestamp");
-               tsColumn.setType(StreamColumn.Type.LONG);
-               
-               StreamColumn hostColumn = new StreamColumn();
-               hostColumn.setName("host");
-               hostColumn.setType(StreamColumn.Type.STRING);
-               
-               StreamColumn alertKeyColumn = new StreamColumn();
-               alertKeyColumn.setName("alertKey");
-               alertKeyColumn.setType(StreamColumn.Type.STRING);
-
-               StreamColumn stateColumn = new StreamColumn();
-               stateColumn.setName("state");
-               stateColumn.setType(StreamColumn.Type.STRING);
-               
-               // dedupCount, dedupFirstOccurrence
-               
-               StreamColumn dedupCountColumn = new StreamColumn();
-               dedupCountColumn.setName("dedupCount");
-               dedupCountColumn.setType(StreamColumn.Type.LONG);
-               
-               StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
-               
dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
-               dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-               
-               sd.setColumns(Arrays.asList(tsColumn, hostColumn, 
alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
-               sd.setDataSource("testDatasource");
-               sd.setStreamId("testStream");
-               sd.setDescription("test stream");
-               return sd;
-       }
-       
-       private PolicyDefinition createPolicy(String streamName, String 
policyName) {
-               PolicyDefinition pd = new PolicyDefinition();
-               PolicyDefinition.Definition def = new 
PolicyDefinition.Definition();
-               //expression, something like "PT5S,dynamic,1,host"
-               def.setValue("test");
-               def.setType("siddhi");
-               pd.setDefinition(def);
-               pd.setInputStreams(Arrays.asList("inputStream"));
-               pd.setOutputStreams(Arrays.asList("outputStream"));
-               pd.setName(policyName);
-               pd.setDescription(String.format("Test policy for stream %s", 
streamName));
-               
-               StreamPartition sp = new StreamPartition();
-               sp.setStreamId(streamName);
-               sp.setColumns(Arrays.asList("host"));
-               sp.setType(StreamPartition.Type.GROUPBY);
-               pd.addPartition(sp);
-               return pd;
-       }
-       
+
+    @Test
+    public void testNormal() throws Exception {
+        Config config = ConfigFactory.load();
+        DedupCache dedupCache = new DedupCache(config, "testPublishment");
+
+        StreamDefinition stream = createStream();
+        PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");
+
+        String[] states = new String[] {"OPEN", "WARN", "CLOSE"};
+        Random random = new Random();
+        for (int i = 0; i < 20; i++) {
+            AlertStreamEvent event = createEvent(stream, policy, new Object[] {
+                System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
states[random.nextInt(3)], 0, 0
+            });
+            HashMap<String, String> dedupFieldValues = new HashMap<String, 
String>();
+            dedupFieldValues.put("alertKey", (String) 
event.getData()[event.getSchema().getColumnIndex("alertKey")]);
+            List<AlertStreamEvent> result = dedupCache.dedup(event,
+                new EventUniq(event.getStreamId(), event.getPolicyId(), 
event.getCreatedTime(), dedupFieldValues),
+                "state",
+                (String) 
event.getData()[event.getSchema().getColumnIndex("state")], "closed");
+            System.out.println((i + 1) + " >>>> " + 
ToStringBuilder.reflectionToString(result));
+        }
+
+        Assert.assertTrue(true);
+    }
+
+    private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setPolicyId(policy.getName());
+        event.setSchema(stream);
+        event.setStreamId(stream.getStreamId());
+        event.setTimestamp(System.currentTimeMillis());
+        event.setCreatedTime(System.currentTimeMillis());
+        event.setData(data);
+        return event;
+    }
+
+    private StreamDefinition createStream() {
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn alertKeyColumn = new StreamColumn();
+        alertKeyColumn.setName("alertKey");
+        alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn stateColumn = new StreamColumn();
+        stateColumn.setName("state");
+        stateColumn.setType(StreamColumn.Type.STRING);
+
+        // dedupCount, dedupFirstOccurrence
+
+        StreamColumn dedupCountColumn = new StreamColumn();
+        dedupCountColumn.setName("dedupCount");
+        dedupCountColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+        dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
+        dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, 
stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+        sd.setDataSource("testDatasource");
+        sd.setStreamId("testStream");
+        sd.setDescription("test stream");
+        return sd;
+    }
+
+    private PolicyDefinition createPolicy(String streamName, String 
policyName) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        //expression, something like "PT5S,dynamic,1,host"
+        def.setValue("test");
+        def.setType("siddhi");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList("inputStream"));
+        pd.setOutputStreams(Arrays.asList("outputStream"));
+        pd.setName(policyName);
+        pd.setDescription(String.format("Test policy for stream %s", 
streamName));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamName);
+        sp.setColumns(Arrays.asList("host"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        return pd;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 72aef16..e6cbe6c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -32,195 +32,212 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
-
-       @Test
-       public void testNormal() throws Exception {
-               //String intervalMin, List<String> customDedupFields, String 
dedupStateField, String dedupStateCloseValue
-               // assume state: OPEN, WARN, CLOSE
-               System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
-               Config config = ConfigFactory.load();
-               DedupCache dedupCache = new DedupCache(config, 
"testPublishment");
-               DefaultDeduplicator deduplicator = new DefaultDeduplicator(
-                               "PT1M", Arrays.asList(new String[] { "alertKey" 
}), "state", "close", dedupCache);
-               
-               StreamDefinition stream = createStream();
-               PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");
-               
-               AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "OPEN", 0, 0
-               });
-               AlertStreamEvent e2 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "WARN", 0, 0
-               });
-               AlertStreamEvent e3 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "OPEN", 0, 0
-               });
-               AlertStreamEvent e4 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "WARN", 0, 0
-               });
-               AlertStreamEvent e5 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "CLOSE", 0, 0
-               });
-               AlertStreamEvent e6 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "OPEN", 0, 0
-               });
-               AlertStreamEvent e7 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "OPEN", 0, 0
-               });
-               AlertStreamEvent e8 = createEvent(stream, policy, new Object[] {
-                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "OPEN", 0, 0
-               });
-               
-               List<AlertStreamEvent> allResults = new 
ArrayList<AlertStreamEvent>();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e1);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("1 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e2);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("2 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e3);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("3 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e4);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("4 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       Thread.sleep(500);
-                               } catch (InterruptedException e) {}
-                               
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e5);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("5 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e6);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("6 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e7);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("7 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               List<AlertStreamEvent> result = 
deduplicator.dedup(e8);
-                               if (result != null) allResults.addAll(result);
-                               System.out.println("8 >>>> " + 
ToStringBuilder.reflectionToString(result));
-                       }
-               }).start();
-               
-               Thread.sleep(2000);
-               
-               long maxCount = 0;
-               for (AlertStreamEvent event : allResults) {
-                       Assert.assertNotNull(event.getData()[4]);
-                       Assert.assertNotNull(event.getData()[5]);
-                       
-                       if (((Long) event.getData()[4]) > maxCount) {
-                               maxCount = (Long) event.getData()[4];
-                               System.out.println(String.format(">>>>>%s: %s", 
event, maxCount));
-                       }
-               }
-               
-       }
-       
-       private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {
-               AlertStreamEvent event = new AlertStreamEvent();
-               event.setPolicyId(policy.getName());
-               event.setSchema(stream);
-               event.setStreamId(stream.getStreamId());
-               event.setTimestamp(System.currentTimeMillis());
-               event.setCreatedTime(System.currentTimeMillis());
-               event.setData(data);
-               return event;
-       }
-       
-       private StreamDefinition createStream() {
-               StreamDefinition sd = new StreamDefinition();
-               StreamColumn tsColumn = new StreamColumn();
-               tsColumn.setName("timestamp");
-               tsColumn.setType(StreamColumn.Type.LONG);
-               
-               StreamColumn hostColumn = new StreamColumn();
-               hostColumn.setName("host");
-               hostColumn.setType(StreamColumn.Type.STRING);
-               
-               StreamColumn alertKeyColumn = new StreamColumn();
-               alertKeyColumn.setName("alertKey");
-               alertKeyColumn.setType(StreamColumn.Type.STRING);
-
-               StreamColumn stateColumn = new StreamColumn();
-               stateColumn.setName("state");
-               stateColumn.setType(StreamColumn.Type.STRING);
-               
-               // dedupCount, dedupFirstOccurrence
-               
-               StreamColumn dedupCountColumn = new StreamColumn();
-               dedupCountColumn.setName("dedupCount");
-               dedupCountColumn.setType(StreamColumn.Type.LONG);
-               
-               StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
-               
dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
-               dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-               
-               sd.setColumns(Arrays.asList(tsColumn, hostColumn, 
alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
-               sd.setDataSource("testDatasource");
-               sd.setStreamId("testStream");
-               sd.setDescription("test stream");
-               return sd;
-       }
-       
-       private PolicyDefinition createPolicy(String streamName, String 
policyName) {
-               PolicyDefinition pd = new PolicyDefinition();
-               PolicyDefinition.Definition def = new 
PolicyDefinition.Definition();
-               //expression, something like "PT5S,dynamic,1,host"
-               def.setValue("test");
-               def.setType("siddhi");
-               pd.setDefinition(def);
-               pd.setInputStreams(Arrays.asList("inputStream"));
-               pd.setOutputStreams(Arrays.asList("outputStream"));
-               pd.setName(policyName);
-               pd.setDescription(String.format("Test policy for stream %s", 
streamName));
-               
-               StreamPartition sp = new StreamPartition();
-               sp.setStreamId(streamName);
-               sp.setColumns(Arrays.asList("host"));
-               sp.setType(StreamPartition.Type.GROUPBY);
-               pd.addPartition(sp);
-               return pd;
-       }
-       
+public class DefaultDeduplicatorTest {
+
+    @Test
+    public void testNormal() throws Exception {
+        //String intervalMin, List<String> customDedupFields, String 
dedupStateField, String dedupStateCloseValue
+        // assume state: OPEN, WARN, CLOSE
+        System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
+        Config config = ConfigFactory.load();
+        DedupCache dedupCache = new DedupCache(config, "testPublishment");
+        DefaultDeduplicator deduplicator = new DefaultDeduplicator(
+            "PT1M", Arrays.asList(new String[] {"alertKey"}), "state", 
"close", dedupCache);
+
+        StreamDefinition stream = createStream();
+        PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");
+
+        AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"OPEN", 0, 0
+        });
+        AlertStreamEvent e2 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"WARN", 0, 0
+        });
+        AlertStreamEvent e3 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"OPEN", 0, 0
+        });
+        AlertStreamEvent e4 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"WARN", 0, 0
+        });
+        AlertStreamEvent e5 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"CLOSE", 0, 0
+        });
+        AlertStreamEvent e6 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"OPEN", 0, 0
+        });
+        AlertStreamEvent e7 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"OPEN", 0, 0
+        });
+        AlertStreamEvent e8 = createEvent(stream, policy, new Object[] {
+            System.currentTimeMillis(), "host1", "testPolicy-host1-01", 
"OPEN", 0, 0
+        });
+
+        List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<AlertStreamEvent> result = deduplicator.dedup(e1);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("1 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<AlertStreamEvent> result = deduplicator.dedup(e2);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("2 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<AlertStreamEvent> result = deduplicator.dedup(e3);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("3 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<AlertStreamEvent> result = deduplicator.dedup(e4);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("4 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                }
+
+                List<AlertStreamEvent> result = deduplicator.dedup(e5);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("5 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<AlertStreamEvent> result = deduplicator.dedup(e6);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("6 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<AlertStreamEvent> result = deduplicator.dedup(e7);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("7 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                List<AlertStreamEvent> result = deduplicator.dedup(e8);
+                if (result != null) {
+                    allResults.addAll(result);
+                }
+                System.out.println("8 >>>> " + 
ToStringBuilder.reflectionToString(result));
+            }
+        }).start();
+
+        Thread.sleep(2000);
+
+        long maxCount = 0;
+        for (AlertStreamEvent event : allResults) {
+            Assert.assertNotNull(event.getData()[4]);
+            Assert.assertNotNull(event.getData()[5]);
+
+            if (((Long) event.getData()[4]) > maxCount) {
+                maxCount = (Long) event.getData()[4];
+                System.out.println(String.format(">>>>>%s: %s", event, 
maxCount));
+            }
+        }
+
+    }
+
+    private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setPolicyId(policy.getName());
+        event.setSchema(stream);
+        event.setStreamId(stream.getStreamId());
+        event.setTimestamp(System.currentTimeMillis());
+        event.setCreatedTime(System.currentTimeMillis());
+        event.setData(data);
+        return event;
+    }
+
+    private StreamDefinition createStream() {
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn alertKeyColumn = new StreamColumn();
+        alertKeyColumn.setName("alertKey");
+        alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn stateColumn = new StreamColumn();
+        stateColumn.setName("state");
+        stateColumn.setType(StreamColumn.Type.STRING);
+
+        // dedupCount, dedupFirstOccurrence
+
+        StreamColumn dedupCountColumn = new StreamColumn();
+        dedupCountColumn.setName("dedupCount");
+        dedupCountColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+        dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
+        dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, 
stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+        sd.setDataSource("testDatasource");
+        sd.setStreamId("testStream");
+        sd.setDescription("test stream");
+        return sd;
+    }
+
+    private PolicyDefinition createPolicy(String streamName, String 
policyName) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        //expression, something like "PT5S,dynamic,1,host"
+        def.setValue("test");
+        def.setType("siddhi");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList("inputStream"));
+        pd.setOutputStreams(Arrays.asList("outputStream"));
+        pd.setName(policyName);
+        pd.setDescription(String.format("Test policy for stream %s", 
streamName));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamName);
+        sp.setColumns(Arrays.asList("host"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        return pd;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
index 52d4460..a788646 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
@@ -27,11 +27,8 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
 import org.apache.eagle.alert.engine.router.TestAlertPublisherBolt;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -40,22 +37,9 @@ import com.fasterxml.jackson.databind.type.SimpleType;
 
 public class ExtendedDeduplicatorTest {
 
-       private DedupEventsStore store;
-       
-       @Before
-       public void setUp() {
-               store = Mockito.mock(DedupEventsStore.class);
-       DedupEventsStoreFactory.customizeStore(store);
-       }
-       
-       @After
-       public void tearDown() {
-        Mockito.reset(store);
-       }
-       
-       @Test
-       public void testNormal() throws Exception {
-               List<Publishment> pubs = 
loadEntities("/router/publishments-extended-deduplicator.json", 
Publishment.class);
+    @Test
+    public void testNormal() throws Exception {
+        List<Publishment> pubs = 
loadEntities("/router/publishments-extended-deduplicator.json", 
Publishment.class);
 
         AlertPublishPlugin plugin = 
AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
         AlertStreamEvent event1 = createWithStreamDef("extended_dedup_host1", 
"extended_dedup_testapp1", "OPEN");
@@ -65,11 +49,10 @@ public class ExtendedDeduplicatorTest {
         Assert.assertNotNull(plugin.dedup(event1));
         Assert.assertNull(plugin.dedup(event2));
         Assert.assertNotNull(plugin.dedup(event3));
-        
-        Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), 
Mockito.anyObject());
-       }
-       
-       private <T> List<T> loadEntities(String path, Class<T> tClz) throws 
Exception {
+
+    }
+
+    private <T> List<T> loadEntities(String path, Class<T> tClz) throws 
Exception {
         ObjectMapper objectMapper = new ObjectMapper();
         JavaType type = CollectionType.construct(List.class, 
SimpleType.construct(tClz));
         List<T> l = 
objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), 
type);
@@ -95,7 +78,7 @@ public class ExtendedDeduplicatorTest {
         StreamColumn hostColumn = new StreamColumn();
         hostColumn.setName("hostname");
         hostColumn.setType(StreamColumn.Type.STRING);
-        
+
         StreamColumn stateColumn = new StreamColumn();
         stateColumn.setName("state");
         stateColumn.setType(StreamColumn.Type.STRING);
@@ -105,5 +88,5 @@ public class ExtendedDeduplicatorTest {
         alert.setSchema(sd);
         return alert;
     }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
deleted file mode 100644
index ed44267..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-public class MongoDedupStoreTest extends MongoDependencyBaseTest {
-
-       @Test
-    public void testNormal() throws Exception {
-               Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = 
store.getEvents();
-               Assert.assertNotNull(events);
-               Assert.assertEquals(0, events.size());
-               
-               String streamId = "testStream"; 
-               String policyId = "testPolicy"; 
-               long timestamp = System.currentTimeMillis(); 
-               HashMap<String, String> customFieldValues = new HashMap<String, 
String>();
-               customFieldValues.put("alertKey", "test-alert-key");
-               EventUniq eventEniq = new EventUniq(streamId, policyId, 
timestamp, customFieldValues);
-               
-               ConcurrentLinkedDeque<DedupValue> dedupStateValues = new 
ConcurrentLinkedDeque<DedupValue>();
-               DedupValue one = new DedupValue();
-               one.setStateFieldValue("OPEN");
-               one.setCount(2);
-               one.setCloseTime(0);
-               one.setDocId("doc-id-...");
-               one.setFirstOccurrence(System.currentTimeMillis());
-               dedupStateValues.add(one);
-               store.add(eventEniq, dedupStateValues);
-               
-               events = store.getEvents();
-               Assert.assertNotNull(events);
-               Assert.assertEquals(1, events.size());
-               
-               Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry = 
events.entrySet().iterator().next();
-               Assert.assertEquals(streamId, entry.getKey().streamId);
-               Assert.assertEquals(1, entry.getValue().size());
-               Assert.assertEquals(2, entry.getValue().getLast().getCount());
-               
-               store.remove(events.keySet().iterator().next());
-               events = store.getEvents();
-               Assert.assertNotNull(events);
-               Assert.assertEquals(0, events.size());
-       }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
deleted file mode 100644
index b7d7613..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.mongodb.client.MongoDatabase;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-public abstract class MongoDependencyBaseTest {
-
-       private static Logger LOG = 
LoggerFactory.getLogger(MongoDependencyBaseTest.class);
-       
-       private static SimpleEmbedMongo mongo;
-    @SuppressWarnings("unused")
-       private static MongoDatabase testDB;
-    private static Config config;
-
-    protected static MongoDedupEventsStore store;
-    
-    public static void before() {
-        try {
-            mongo = new SimpleEmbedMongo();
-            mongo.start();
-            testDB = mongo.getMongoClient().getDatabase("testDb");
-        } catch (Exception e) {
-            LOG.error("start embed mongod failed, assume some external mongo 
running. continue run test!", e);
-        }
-    }
-
-    @BeforeClass
-    public static void setup() throws Exception {
-        before();
-
-        System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
-        ConfigFactory.invalidateCaches();
-        config = ConfigFactory.load();
-        
-        store = new MongoDedupEventsStore(config, "testPublishment");
-    }
-
-    @AfterClass
-    public static void teardown() {
-        if (mongo != null) {
-            mongo.shutdown();
-        }
-    }
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 79c03bc..5cdb6f1 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,12 +18,11 @@
 
 package org.apache.eagle.alert.engine.router;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -32,38 +31,26 @@ import 
org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStore;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
 import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
 import org.apache.eagle.alert.engine.runner.MapComparator;
 import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.junit.*;
-import org.mockito.Mockito;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 /**
  * @Since 5/14/16.
  */
 public class TestAlertPublisherBolt {
-       
-       private DedupEventsStore store;
-       
-       @Before
-       public void setUp() {
-               store = Mockito.mock(DedupEventsStore.class);
-       DedupEventsStoreFactory.customizeStore(store);
-       }
-       
-       @After
-       public void tearDown() {
-        Mockito.reset(store);
-       }
 
     @SuppressWarnings("rawtypes")
     @Ignore
@@ -86,7 +73,7 @@ public class TestAlertPublisherBolt {
         policy.setName("policy1");
         alert.setPolicyId(policy.getName());
         alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[]{"field_1", 2, "field_3"});
+        alert.setData(new Object[] {"field_1", 2, "field_3"});
         alert.setStreamId(streamId);
         alert.setCreatedBy(this.toString());
         return alert;
@@ -198,7 +185,7 @@ public class TestAlertPublisherBolt {
         StreamColumn hostColumn = new StreamColumn();
         hostColumn.setName("hostname");
         hostColumn.setType(StreamColumn.Type.STRING);
-        
+
         StreamColumn stateColumn = new StreamColumn();
         stateColumn.setName("state");
         stateColumn.setType(StreamColumn.Type.STRING);
@@ -209,7 +196,7 @@ public class TestAlertPublisherBolt {
         return alert;
     }
 
-       @Test
+    @Test
     public void testCustomFieldDedupEvent() throws Exception {
         List<Publishment> pubs = loadEntities("/router/publishments.json", 
Publishment.class);
 
@@ -221,12 +208,9 @@ public class TestAlertPublisherBolt {
         Assert.assertNotNull(plugin.dedup(event1));
         Assert.assertNull(plugin.dedup(event2));
         Assert.assertNotNull(plugin.dedup(event3));
-        
-        Mockito.verify(store).getEvents();
-        Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), 
Mockito.anyObject());
     }
 
-       @Test
+    @Test
     public void testEmptyCustomFieldDedupEvent() throws Exception {
         List<Publishment> pubs = 
loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class);
 
@@ -236,8 +220,6 @@ public class TestAlertPublisherBolt {
 
         Assert.assertNotNull(plugin.dedup(event1));
         Assert.assertNull(plugin.dedup(event2));
-        
-        Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), 
Mockito.anyObject());
     }
 
     private AlertStreamEvent createSeverityWithStreamDef(String hostname, 
String appName, String message, String severity, String docId, String 
df_device, String df_type, String colo) {

Reply via email to