Repository: incubator-eagle Updated Branches: refs/heads/master e520e4011 -> c15e7f814
EAGLE-672: remove mongo state store for dedup Author: Li, Garrett Reviewer: ralphsu This closes #553 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c15e7f81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c15e7f81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c15e7f81 Branch: refs/heads/master Commit: c15e7f81497f6873dee25421c69f4c1351fdcb9c Parents: e520e40 Author: Xiancheng Li <xiancheng...@ebay.com> Authored: Mon Oct 24 09:32:21 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Mon Oct 24 14:18:13 2016 +0800 ---------------------------------------------------------------------- .../engine/publisher/dedup/DedupCache.java | 93 +---- .../publisher/dedup/DedupEventsStore.java | 32 -- .../dedup/DedupEventsStoreFactory.java | 57 --- .../publisher/dedup/MongoDedupEventsStore.java | 146 ------- .../publisher/dedup/TransformerUtils.java | 117 ------ .../publisher/dedup/DedupCacheStoreTest.java | 150 ------- .../engine/publisher/dedup/DedupCacheTest.java | 202 +++++----- .../dedup/DefaultDeduplicatorTest.java | 399 ++++++++++--------- .../dedup/ExtendedDeduplicatorTest.java | 35 +- .../publisher/dedup/MongoDedupStoreTest.java | 68 ---- .../dedup/MongoDependencyBaseTest.java | 67 ---- .../engine/router/TestAlertPublisherBolt.java | 54 +-- 12 files changed, 344 insertions(+), 1076 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java index 2080441..abb83d6 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java @@ -16,42 +16,37 @@ */ package org.apache.eagle.alert.engine.publisher.dedup; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.typesafe.config.Config; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; + import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType; import org.apache.eagle.alert.engine.publisher.impl.EventUniq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; +import com.google.common.base.Objects; +import com.typesafe.config.Config; public class DedupCache { private static final Logger LOG = LoggerFactory.getLogger(DedupCache.class); private static final long CACHE_MAX_EXPIRE_TIME_IN_DAYS = 30; - private static final long CACHE_MAX_EVENT_QUEUE_SIZE = 10; public static final String DEDUP_COUNT = "dedupCount"; public static final String DOC_ID = "docId"; public static final String DEDUP_FIRST_OCCURRENCE = "dedupFirstOccurrenceTime"; - private static final DedupEventsStoreType type = DedupEventsStoreType.Mongo; - private long lastUpdated = -1; private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>(); - private static final ConcurrentLinkedDeque<DedupCache> caches = new ConcurrentLinkedDeque<DedupCache>(); - + @SuppressWarnings("unused") private Config config; private String publishName; @@ -59,39 +54,6 @@ public class DedupCache { public DedupCache(Config config, String publishName) { this.config = config; this.publishName = publishName; - // only happens during startup, won't introduce perf issue here - synchronized (caches) { - if (caches.size() == 0) { - // create daemon to clean up old removable events periodically - ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - }); - scheduleSrv.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - for (DedupCache cache : caches) { - if (cache == null || cache.getEvents() == null) { - continue; - } - HashSet<EventUniq> eventUniqs = new HashSet<EventUniq>(cache.getEvents().keySet()); - for (EventUniq one : eventUniqs) { - if (one.removable && one.createdTime < System.currentTimeMillis() - 3600000 * 24) { - cache.removeEvent(one); - LOG.info("Remove dedup key {} from cache & db", one); - } - } - } - } - }, 5, 60, TimeUnit.MINUTES); - LOG.info("Create daemon to clean up old removable events periodically"); - } - caches.add(this); - } } public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() { @@ -99,8 +61,6 @@ public class DedupCache { || System.currentTimeMillis() - lastUpdated > CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY || events.size() <= 0) { lastUpdated = System.currentTimeMillis(); - DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName); - events = accessor.getEvents(); } return events; } @@ -112,9 +72,6 @@ public class DedupCache { public void removeEvent(EventUniq eventEniq) { if (this.contains(eventEniq)) { this.events.remove(eventEniq); - - DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName); - accessor.remove(eventEniq); } } @@ -142,9 +99,9 @@ public class DedupCache { Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = this.getEvents(); if (!events.containsKey(eventEniq) || (events.containsKey(eventEniq) - && events.get(eventEniq).size() > 0 - && !StringUtils.equalsIgnoreCase(stateFieldValue, - events.get(eventEniq).getLast().getStateFieldValue()))) { + && events.get(eventEniq).size() > 0 + && !StringUtils.equalsIgnoreCase(stateFieldValue, + events.get(eventEniq).getLast().getStateFieldValue()))) { DedupValue[] dedupValues = this.add(eventEniq, event, stateFieldValue, stateCloseValue); return dedupValues; } else { @@ -164,19 +121,12 @@ public class DedupCache { events.put(eventEniq, dedupValues); LOG.info("{} Add new dedup key {}, and value {}", this.publishName, eventEniq, dedupValues); } else if (!StringUtils.equalsIgnoreCase(stateFieldValue, - events.get(eventEniq).getLast().getStateFieldValue())) { + events.get(eventEniq).getLast().getStateFieldValue())) { // existing a de-dup value, try update or reset DedupValue lastDedupValue = events.get(eventEniq).getLast(); dedupValue = updateDedupValue(lastDedupValue, eventEniq, event, stateFieldValue, stateCloseValue); LOG.info("{} Update dedup key {}, and value {}", this.publishName, eventEniq, dedupValue); } - - if (dedupValue != null) { - DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName); - accessor.add(eventEniq, events.get(eventEniq)); - LOG.info("{} Store dedup key {}, value {} to DB", this.publishName, eventEniq, - Joiner.on(",").join(events.get(eventEniq))); - } if (dedupValue == null) { return null; } @@ -190,7 +140,7 @@ public class DedupCache { } if (lastDedupValue.getStateFieldValue().equals(stateCloseValue) - && eventEniq.timestamp < lastDedupValue.getCloseTime()) { + && eventEniq.timestamp < lastDedupValue.getCloseTime()) { DedupValue dv = createDedupValue(eventEniq, event, stateFieldValue); lastDedupValue.resetTo(dv); } else { @@ -208,7 +158,7 @@ public class DedupCache { dedupValue = new DedupValue(); dedupValue.setFirstOccurrence(eventEniq.timestamp); int idx = event.getSchema().getColumnIndex(DOC_ID); - if (idx >= 0 ) { + if (idx >= 0) { dedupValue.setDocId(event.getData()[idx].toString()); } else { dedupValue.setDocId(""); @@ -219,13 +169,6 @@ public class DedupCache { return dedupValue; } - public void persistUpdatedEventUniq(EventUniq eventEniq) { - DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName); - accessor.add(eventEniq, events.get(eventEniq)); - LOG.info("{} Store dedup key {}, value {} to DB", this.publishName, eventEniq, - Joiner.on(",").join(events.get(eventEniq))); - } - private DedupValue updateCount(EventUniq eventEniq) { ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq); if (dedupValues == null || dedupValues.size() <= 0) { @@ -237,11 +180,7 @@ public class DedupCache { String updateMsg = String.format( "%s Update count for dedup key %s, value %s and count %s", this.publishName, eventEniq, dedupValue.getStateFieldValue(), dedupValue.getCount()); - if (dedupValue.getCount() > 0 && dedupValue.getCount() % 100 == 0) { - LOG.info(updateMsg); - DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config, this.publishName); - accessor.add(eventEniq, dedupValues); - } else if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug(updateMsg); } return dedupValue; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java deleted file mode 100644 index 5918afe..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStore.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedDeque; - -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; - -public interface DedupEventsStore { - - public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents(); - - public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues); - - public void remove(EventUniq eventEniq); - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java deleted file mode 100644 index 75d8e53..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import com.typesafe.config.Config; - -public class DedupEventsStoreFactory { - - public enum DedupEventsStoreType { - Mongo, ElasticSearch - } - - ; - - private static DedupEventsStore customizedStore; - - private static MongoDedupEventsStore accessor; - - public static void customizeStore(DedupEventsStore store) { - customizedStore = store; - } - - public static DedupEventsStore getStore(DedupEventsStoreType type, Config config, String publishName) { - if (customizedStore != null) { - return customizedStore; - } - switch (type) { - case Mongo: - if (accessor == null) { - accessor = new MongoDedupEventsStore(config, publishName); - } - break; - case ElasticSearch: - default: - break; - } - if (accessor == null) { - throw new RuntimeException(String.format("Dedup events store type %s is NOT supportted", type)); - } - return accessor; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java deleted file mode 100644 index 35281bf..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.mongodb.Block; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.IndexOptions; -import com.mongodb.client.model.InsertOneOptions; -import com.typesafe.config.Config; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; -import org.bson.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; - -public class MongoDedupEventsStore implements DedupEventsStore { - - private static final Logger LOG = LoggerFactory.getLogger(MongoDedupEventsStore.class); - - public static final String DEDUP_ID = "dedupId"; - public static final String DEDUP_STREAM_ID = "streamId"; - public static final String DOC_ID = "docId"; - public static final String DEDUP_POLICY_ID = "policyId"; - public static final String DEDUP_CREATE_TIME = "createdTime"; - public static final String DEDUP_TIMESTAMP = "timestamp"; - public static final String DEDUP_REMOVABLE = "removable"; - public static final String DEDUP_CUSTOM_FIELDS_VALUES = "customFieldValues"; - public static final String DEDUP_VALUES = "dedupValues"; - public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue"; - public static final String DEDUP_COUNT = "count"; - public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence"; - public static final String DEDUP_CLOSE_TIME = "closeTime"; - public static final String DEDUP_PUBLISH_ID = "publishId"; - - private static final ObjectMapper mapper = new ObjectMapper(); - - static { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private Config config; - private String connection; - private MongoClient client; - private MongoDatabase db; - private MongoCollection<Document> stateCollection; - private String publishName; - - private static final String DB_NAME = "ump_alert_dedup"; - private static final String ALERT_STATE_COLLECTION = "alert_dedup"; - - public MongoDedupEventsStore(Config config, String publishName) { - this.config = config; - this.publishName = publishName; - this.connection = this.config.getString("connection"); - try { - this.client = new MongoClient(new MongoClientURI(this.connection)); - init(); - } catch (Throwable t) { - LOG.error(String.format("initialize mongodb %s client failed", this.connection), t); - } - } - - private void init() { - db = client.getDatabase(DB_NAME); - stateCollection = db.getCollection(ALERT_STATE_COLLECTION); - // dedup id index - IndexOptions io = new IndexOptions().background(true).unique(true).name(DEDUP_ID + "_index"); - BsonDocument doc = new BsonDocument(); - doc.append(DEDUP_ID, new BsonInt32(1)); - stateCollection.createIndex(doc, io); - } - - @Override - public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() { - try { - Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> result = new ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>(); - BsonDocument filter = new BsonDocument(); - filter.append(DEDUP_PUBLISH_ID, new BsonString(this.publishName)); - stateCollection.find(filter).forEach(new Block<Document>() { - @Override - public void apply(final Document doc) { - DedupEntity entity = TransformerUtils.transform(DedupEntity.class, BsonDocument.parse(doc.toJson())); - result.put(entity.getEventEniq(), entity.getDedupValuesInConcurrentLinkedDeque()); - } - }); - if (LOG.isDebugEnabled()) { - LOG.debug("Found {} dedup events from mongoDB", result.size()); - } - return result; - } catch (Exception e) { - LOG.error("find dedup state failed, but the state in memory is good, could be ingored.", e); - } - return new HashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>(); - } - - @Override - public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> dedupStateValues) { - try { - BsonDocument doc = TransformerUtils.transform(new DedupEntity(this.publishName, eventEniq, dedupStateValues)); - BsonDocument filter = new BsonDocument(); - filter.append(DEDUP_ID, new BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq))); - Document returnedDoc = stateCollection.findOneAndReplace(filter, Document.parse(doc.toJson())); - if (returnedDoc == null) { - InsertOneOptions option = new InsertOneOptions(); - stateCollection.insertOne(Document.parse(doc.toJson()), option); - } - } catch (Exception e) { - LOG.error("insert dedup state failed, but the state is still in memory, could be ingored.", e); - } - } - - @Override - public void remove(EventUniq eventEniq) { - try { - BsonDocument filter = new BsonDocument(); - filter.append(DEDUP_ID, new BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq))); - stateCollection.deleteOne(filter); - } catch (Exception e) { - LOG.error("delete dedup state failed, but the state in memory is good, could be ingored.", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java deleted file mode 100644 index 1ce6898..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; -import org.bson.*; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; - -public class TransformerUtils { - - public static final String MAP_KEY = "key"; - public static final String MAP_VALUE = "value"; - - @SuppressWarnings("unchecked") - public static <T> T transform(Class<T> klass, BsonDocument doc) { - if (klass.equals(DedupEntity.class)) { - String streamId = doc.getString(MongoDedupEventsStore.DEDUP_STREAM_ID).getValue(); - String policyId = doc.getString(MongoDedupEventsStore.DEDUP_POLICY_ID).getValue(); - long timestamp = doc.getInt64(MongoDedupEventsStore.DEDUP_TIMESTAMP).getValue(); - HashMap<String, String> customFieldValues = new HashMap<String, String>(); - BsonArray customFieldsValuesArray = doc.getArray( - MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES); - for (int i = 0; i < customFieldsValuesArray.size(); i++) { - BsonDocument dedupCustomFieldValuesDoc = customFieldsValuesArray.get(i).asDocument(); - customFieldValues.put( - dedupCustomFieldValuesDoc.getString(MAP_KEY).getValue(), - dedupCustomFieldValuesDoc.getString(MAP_VALUE).getValue()); - } - EventUniq eventUniq = new EventUniq(streamId, policyId, timestamp, customFieldValues); - eventUniq.removable = doc.getBoolean(MongoDedupEventsStore.DEDUP_REMOVABLE).getValue(); - eventUniq.createdTime = doc.getInt64( - MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(0)).getValue(); - List<DedupValue> dedupValues = new ArrayList<DedupValue>(); - BsonArray dedupValuesArray = doc.getArray(MongoDedupEventsStore.DEDUP_VALUES); - for (int i = 0; i < dedupValuesArray.size(); i++) { - BsonDocument dedupValuesDoc = dedupValuesArray.get(i).asDocument(); - DedupValue dedupValue = new DedupValue(); - dedupValue.setStateFieldValue(dedupValuesDoc.getString( - MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE).getValue()); - dedupValue.setCount(dedupValuesDoc.getInt64( - MongoDedupEventsStore.DEDUP_COUNT).getValue()); - dedupValue.setFirstOccurrence(dedupValuesDoc.getInt64( - MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue()); - dedupValue.setCloseTime(dedupValuesDoc.getInt64( - MongoDedupEventsStore.DEDUP_CLOSE_TIME).getValue()); - dedupValue.setDocId(dedupValuesDoc.getString( - MongoDedupEventsStore.DOC_ID).getValue()); - dedupValues.add(dedupValue); - } - String publishId = doc.getString(MongoDedupEventsStore.DEDUP_PUBLISH_ID).getValue(); - return (T) new DedupEntity(publishId, eventUniq, dedupValues); - } - throw new RuntimeException(String.format("Unknow object type %s, cannot transform", klass.getName())); - } - - public static BsonDocument transform(Object obj) { - if (obj instanceof DedupEntity) { - BsonDocument doc = new BsonDocument(); - DedupEntity entity = (DedupEntity) obj; - doc.put(MongoDedupEventsStore.DEDUP_ID, new BsonInt64(getUniqueId(entity.getPublishName(), entity.getEventEniq()))); - doc.put(MongoDedupEventsStore.DEDUP_STREAM_ID, new BsonString(entity.getEventEniq().streamId)); - doc.put(MongoDedupEventsStore.DEDUP_PUBLISH_ID, new BsonString(entity.getPublishName())); - doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new BsonString(entity.getEventEniq().policyId)); - doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new BsonInt64(entity.getEventEniq().createdTime)); - doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new BsonInt64(entity.getEventEniq().timestamp)); - doc.put(MongoDedupEventsStore.DEDUP_REMOVABLE, new BsonBoolean(entity.getEventEniq().removable)); - - List<BsonDocument> dedupCustomFieldValues = new ArrayList<BsonDocument>(); - for (Entry<String, String> entry : entity.getEventEniq().customFieldValues.entrySet()) { - BsonDocument dedupCustomFieldValuesDoc = new BsonDocument(); - dedupCustomFieldValuesDoc.put(MAP_KEY, new BsonString(entry.getKey())); - dedupCustomFieldValuesDoc.put(MAP_VALUE, new BsonString(entry.getValue())); - dedupCustomFieldValues.add(dedupCustomFieldValuesDoc); - } - doc.put(MongoDedupEventsStore.DEDUP_CUSTOM_FIELDS_VALUES, new BsonArray(dedupCustomFieldValues)); - - List<BsonDocument> dedupValuesDocs = new ArrayList<BsonDocument>(); - for (DedupValue dedupValue : entity.getDedupValues()) { - BsonDocument dedupValuesDoc = new BsonDocument(); - dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE, new BsonString(dedupValue.getStateFieldValue())); - dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_COUNT, new BsonInt64(dedupValue.getCount())); - dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE,new BsonInt64(dedupValue.getFirstOccurrence())); - dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_CLOSE_TIME, new BsonInt64(dedupValue.getCloseTime())); - dedupValuesDoc.put(MongoDedupEventsStore.DOC_ID, new BsonString(dedupValue.getDocId())); - dedupValuesDocs.add(dedupValuesDoc); - } - doc.put(MongoDedupEventsStore.DEDUP_VALUES, new BsonArray(dedupValuesDocs)); - return doc; - } - throw new RuntimeException(String.format("Unknow object type %s, cannot transform", obj.getClass().getName())); - } - - public static int getUniqueId(String publishName, EventUniq eventEniq) { - HashCodeBuilder builder = new HashCodeBuilder().append(eventEniq).append(publishName); - return builder.build(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java deleted file mode 100644 index 25518de..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; -import org.apache.storm.guava.base.Joiner; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentLinkedDeque; - -public class DedupCacheStoreTest extends MongoDependencyBaseTest { - - @Test - public void testPersistUpdatedEventUniq() throws Exception { - StreamDefinition stream = createStream(); - PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy"); - - AlertStreamEvent event = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - - HashMap<String, String> dedupFieldValues = new HashMap<String, String>(); - dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]); - EventUniq eventUniq = new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues); - - System.setProperty("config.resource", "/application-mongo-statestore.conf"); - Config config = ConfigFactory.load(); - DedupCache cache = new DedupCache(config, "testPublishment"); - cache.addOrUpdate(eventUniq, event, (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed"); - - DedupEventsStore accessor = DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config, "testPublishment"); - Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = accessor.getEvents(); - for (EventUniq one : events.keySet()) { - if (one.equals(eventUniq)) { - Assert.assertEquals(false, one.removable); - } - } - for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry : events.entrySet()) { - System.out.println(entry.getKey() + " >>> " + Joiner.on("\n\t").join(entry.getValue())); - } - - eventUniq.removable = true; - cache.persistUpdatedEventUniq(eventUniq); - - events = accessor.getEvents(); - for (EventUniq one : events.keySet()) { - if (one.equals(eventUniq)) { - Assert.assertEquals(true, one.removable); - } - } - - for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry : events.entrySet()) { - System.out.println(entry.getKey() + " >>> " + Joiner.on("\n\t").join(entry.getValue())); - } - } - - private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setPolicyId(policy.getName()); - event.setSchema(stream); - event.setStreamId(stream.getStreamId()); - event.setTimestamp(System.currentTimeMillis()); - event.setCreatedTime(System.currentTimeMillis()); - event.setData(data); - return event; - } - - private StreamDefinition createStream() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn alertKeyColumn = new StreamColumn(); - alertKeyColumn.setName("alertKey"); - alertKeyColumn.setType(StreamColumn.Type.STRING); - - StreamColumn stateColumn = new StreamColumn(); - stateColumn.setName("state"); - stateColumn.setType(StreamColumn.Type.STRING); - - // dedupCount, dedupFirstOccurrence - - StreamColumn dedupCountColumn = new StreamColumn(); - dedupCountColumn.setName("dedupCount"); - dedupCountColumn.setType(StreamColumn.Type.LONG); - - StreamColumn dedupFirstOccurrenceColumn = new StreamColumn(); - dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE); - dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn)); - sd.setDataSource("testDatasource"); - sd.setStreamId("testStream"); - sd.setDescription("test stream"); - return sd; - } - - private PolicyDefinition createPolicy(String streamName, String policyName) { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - //expression, something like "PT5S,dynamic,1,host" - def.setValue("test"); - def.setType("siddhi"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList("inputStream")); - pd.setOutputStreams(Arrays.asList("outputStream")); - pd.setName(policyName); - pd.setDescription(String.format("Test policy for stream %s", streamName)); - - StreamPartition sp = new StreamPartition(); - sp.setStreamId(streamName); - sp.setColumns(Arrays.asList("host")); - sp.setType(StreamPartition.Type.GROUPBY); - pd.addPartition(sp); - return pd; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java index d1c8457..5bf0410 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java @@ -28,120 +28,104 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.impl.EventUniq; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class DedupCacheTest { - - private DedupEventsStore store; - - @Before - public void setUp() { - store = Mockito.mock(DedupEventsStore.class); - DedupEventsStoreFactory.customizeStore(store); - } - - @After - public void tearDown() { - Mockito.reset(store); - } - - @Test - public void testNormal() throws Exception { - Config config = ConfigFactory.load(); - DedupCache dedupCache = new DedupCache(config, "testPublishment"); - - StreamDefinition stream = createStream(); - PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy"); - - String[] states = new String[] { "OPEN", "WARN", "CLOSE" }; - Random random = new Random(); - for (int i = 0; i < 20; i ++) { - AlertStreamEvent event = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", states[random.nextInt(3)], 0, 0 - }); - HashMap<String, String> dedupFieldValues = new HashMap<String, String>(); - dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]); - List<AlertStreamEvent> result = dedupCache.dedup(event, - new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues), - "state", - (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed"); - System.out.println((i + 1) + " >>>> " + ToStringBuilder.reflectionToString(result)); - } - - Assert.assertTrue(true); - } - - private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setPolicyId(policy.getName()); - event.setSchema(stream); - event.setStreamId(stream.getStreamId()); - event.setTimestamp(System.currentTimeMillis()); - event.setCreatedTime(System.currentTimeMillis()); - event.setData(data); - return event; - } - - private StreamDefinition createStream() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn alertKeyColumn = new StreamColumn(); - alertKeyColumn.setName("alertKey"); - alertKeyColumn.setType(StreamColumn.Type.STRING); - - StreamColumn stateColumn = new StreamColumn(); - stateColumn.setName("state"); - stateColumn.setType(StreamColumn.Type.STRING); - - // dedupCount, dedupFirstOccurrence - - StreamColumn dedupCountColumn = new StreamColumn(); - dedupCountColumn.setName("dedupCount"); - dedupCountColumn.setType(StreamColumn.Type.LONG); - - StreamColumn dedupFirstOccurrenceColumn = new StreamColumn(); - dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE); - dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn)); - sd.setDataSource("testDatasource"); - sd.setStreamId("testStream"); - sd.setDescription("test stream"); - return sd; - } - - private PolicyDefinition createPolicy(String streamName, String policyName) { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - //expression, something like "PT5S,dynamic,1,host" - def.setValue("test"); - def.setType("siddhi"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList("inputStream")); - pd.setOutputStreams(Arrays.asList("outputStream")); - pd.setName(policyName); - pd.setDescription(String.format("Test policy for stream %s", streamName)); - - StreamPartition sp = new StreamPartition(); - sp.setStreamId(streamName); - sp.setColumns(Arrays.asList("host")); - sp.setType(StreamPartition.Type.GROUPBY); - pd.addPartition(sp); - return pd; - } - + + @Test + public void testNormal() throws Exception { + Config config = ConfigFactory.load(); + DedupCache dedupCache = new DedupCache(config, "testPublishment"); + + StreamDefinition stream = createStream(); + PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy"); + + String[] states = new String[] {"OPEN", "WARN", "CLOSE"}; + Random random = new Random(); + for (int i = 0; i < 20; i++) { + AlertStreamEvent event = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", states[random.nextInt(3)], 0, 0 + }); + HashMap<String, String> dedupFieldValues = new HashMap<String, String>(); + dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]); + List<AlertStreamEvent> result = dedupCache.dedup(event, + new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues), + "state", + (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed"); + System.out.println((i + 1) + " >>>> " + ToStringBuilder.reflectionToString(result)); + } + + Assert.assertTrue(true); + } + + private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) { + AlertStreamEvent event = new AlertStreamEvent(); + event.setPolicyId(policy.getName()); + event.setSchema(stream); + event.setStreamId(stream.getStreamId()); + event.setTimestamp(System.currentTimeMillis()); + event.setCreatedTime(System.currentTimeMillis()); + event.setData(data); + return event; + } + + private StreamDefinition createStream() { + StreamDefinition sd = new StreamDefinition(); + StreamColumn tsColumn = new StreamColumn(); + tsColumn.setName("timestamp"); + tsColumn.setType(StreamColumn.Type.LONG); + + StreamColumn hostColumn = new StreamColumn(); + hostColumn.setName("host"); + hostColumn.setType(StreamColumn.Type.STRING); + + StreamColumn alertKeyColumn = new StreamColumn(); + alertKeyColumn.setName("alertKey"); + alertKeyColumn.setType(StreamColumn.Type.STRING); + + StreamColumn stateColumn = new StreamColumn(); + stateColumn.setName("state"); + stateColumn.setType(StreamColumn.Type.STRING); + + // dedupCount, dedupFirstOccurrence + + StreamColumn dedupCountColumn = new StreamColumn(); + dedupCountColumn.setName("dedupCount"); + dedupCountColumn.setType(StreamColumn.Type.LONG); + + StreamColumn dedupFirstOccurrenceColumn = new StreamColumn(); + dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE); + dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG); + + sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn)); + sd.setDataSource("testDatasource"); + sd.setStreamId("testStream"); + sd.setDescription("test stream"); + return sd; + } + + private PolicyDefinition createPolicy(String streamName, String policyName) { + PolicyDefinition pd = new PolicyDefinition(); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + //expression, something like "PT5S,dynamic,1,host" + def.setValue("test"); + def.setType("siddhi"); + pd.setDefinition(def); + pd.setInputStreams(Arrays.asList("inputStream")); + pd.setOutputStreams(Arrays.asList("outputStream")); + pd.setName(policyName); + pd.setDescription(String.format("Test policy for stream %s", streamName)); + + StreamPartition sp = new StreamPartition(); + sp.setStreamId(streamName); + sp.setColumns(Arrays.asList("host")); + sp.setType(StreamPartition.Type.GROUPBY); + pd.addPartition(sp); + return pd; + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java index 72aef16..e6cbe6c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java @@ -32,195 +32,212 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class DefaultDeduplicatorTest extends MongoDependencyBaseTest { - - @Test - public void testNormal() throws Exception { - //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue - // assume state: OPEN, WARN, CLOSE - System.setProperty("config.resource", "/application-mongo-statestore.conf"); - Config config = ConfigFactory.load(); - DedupCache dedupCache = new DedupCache(config, "testPublishment"); - DefaultDeduplicator deduplicator = new DefaultDeduplicator( - "PT1M", Arrays.asList(new String[] { "alertKey" }), "state", "close", dedupCache); - - StreamDefinition stream = createStream(); - PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy"); - - AlertStreamEvent e1 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e2 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0 - }); - AlertStreamEvent e3 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e4 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0 - }); - AlertStreamEvent e5 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0 - }); - AlertStreamEvent e6 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e7 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e8 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - - List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e1); - if (result != null) allResults.addAll(result); - System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e2); - if (result != null) allResults.addAll(result); - System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e3); - if (result != null) allResults.addAll(result); - System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e4); - if (result != null) allResults.addAll(result); - System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(500); - } catch (InterruptedException e) {} - - List<AlertStreamEvent> result = deduplicator.dedup(e5); - if (result != null) allResults.addAll(result); - System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e6); - if (result != null) allResults.addAll(result); - System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e7); - if (result != null) allResults.addAll(result); - System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e8); - if (result != null) allResults.addAll(result); - System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - - Thread.sleep(2000); - - long maxCount = 0; - for (AlertStreamEvent event : allResults) { - Assert.assertNotNull(event.getData()[4]); - Assert.assertNotNull(event.getData()[5]); - - if (((Long) event.getData()[4]) > maxCount) { - maxCount = (Long) event.getData()[4]; - System.out.println(String.format(">>>>>%s: %s", event, maxCount)); - } - } - - } - - private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setPolicyId(policy.getName()); - event.setSchema(stream); - event.setStreamId(stream.getStreamId()); - event.setTimestamp(System.currentTimeMillis()); - event.setCreatedTime(System.currentTimeMillis()); - event.setData(data); - return event; - } - - private StreamDefinition createStream() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn alertKeyColumn = new StreamColumn(); - alertKeyColumn.setName("alertKey"); - alertKeyColumn.setType(StreamColumn.Type.STRING); - - StreamColumn stateColumn = new StreamColumn(); - stateColumn.setName("state"); - stateColumn.setType(StreamColumn.Type.STRING); - - // dedupCount, dedupFirstOccurrence - - StreamColumn dedupCountColumn = new StreamColumn(); - dedupCountColumn.setName("dedupCount"); - dedupCountColumn.setType(StreamColumn.Type.LONG); - - StreamColumn dedupFirstOccurrenceColumn = new StreamColumn(); - dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE); - dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn)); - sd.setDataSource("testDatasource"); - sd.setStreamId("testStream"); - sd.setDescription("test stream"); - return sd; - } - - private PolicyDefinition createPolicy(String streamName, String policyName) { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - //expression, something like "PT5S,dynamic,1,host" - def.setValue("test"); - def.setType("siddhi"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList("inputStream")); - pd.setOutputStreams(Arrays.asList("outputStream")); - pd.setName(policyName); - pd.setDescription(String.format("Test policy for stream %s", streamName)); - - StreamPartition sp = new StreamPartition(); - sp.setStreamId(streamName); - sp.setColumns(Arrays.asList("host")); - sp.setType(StreamPartition.Type.GROUPBY); - pd.addPartition(sp); - return pd; - } - +public class DefaultDeduplicatorTest { + + @Test + public void testNormal() throws Exception { + //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue + // assume state: OPEN, WARN, CLOSE + System.setProperty("config.resource", "/application-mongo-statestore.conf"); + Config config = ConfigFactory.load(); + DedupCache dedupCache = new DedupCache(config, "testPublishment"); + DefaultDeduplicator deduplicator = new DefaultDeduplicator( + "PT1M", Arrays.asList(new String[] {"alertKey"}), "state", "close", dedupCache); + + StreamDefinition stream = createStream(); + PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy"); + + AlertStreamEvent e1 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 + }); + AlertStreamEvent e2 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0 + }); + AlertStreamEvent e3 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 + }); + AlertStreamEvent e4 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0 + }); + AlertStreamEvent e5 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0 + }); + AlertStreamEvent e6 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 + }); + AlertStreamEvent e7 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 + }); + AlertStreamEvent e8 = createEvent(stream, policy, new Object[] { + System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 + }); + + List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>(); + new Thread(new Runnable() { + @Override + public void run() { + List<AlertStreamEvent> result = deduplicator.dedup(e1); + if (result != null) { + allResults.addAll(result); + } + System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + List<AlertStreamEvent> result = deduplicator.dedup(e2); + if (result != null) { + allResults.addAll(result); + } + System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + List<AlertStreamEvent> result = deduplicator.dedup(e3); + if (result != null) { + allResults.addAll(result); + } + System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + List<AlertStreamEvent> result = deduplicator.dedup(e4); + if (result != null) { + allResults.addAll(result); + } + System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + List<AlertStreamEvent> result = deduplicator.dedup(e5); + if (result != null) { + allResults.addAll(result); + } + System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + List<AlertStreamEvent> result = deduplicator.dedup(e6); + if (result != null) { + allResults.addAll(result); + } + System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + List<AlertStreamEvent> result = deduplicator.dedup(e7); + if (result != null) { + allResults.addAll(result); + } + System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + List<AlertStreamEvent> result = deduplicator.dedup(e8); + if (result != null) { + allResults.addAll(result); + } + System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result)); + } + }).start(); + + Thread.sleep(2000); + + long maxCount = 0; + for (AlertStreamEvent event : allResults) { + Assert.assertNotNull(event.getData()[4]); + Assert.assertNotNull(event.getData()[5]); + + if (((Long) event.getData()[4]) > maxCount) { + maxCount = (Long) event.getData()[4]; + System.out.println(String.format(">>>>>%s: %s", event, maxCount)); + } + } + + } + + private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) { + AlertStreamEvent event = new AlertStreamEvent(); + event.setPolicyId(policy.getName()); + event.setSchema(stream); + event.setStreamId(stream.getStreamId()); + event.setTimestamp(System.currentTimeMillis()); + event.setCreatedTime(System.currentTimeMillis()); + event.setData(data); + return event; + } + + private StreamDefinition createStream() { + StreamDefinition sd = new StreamDefinition(); + StreamColumn tsColumn = new StreamColumn(); + tsColumn.setName("timestamp"); + tsColumn.setType(StreamColumn.Type.LONG); + + StreamColumn hostColumn = new StreamColumn(); + hostColumn.setName("host"); + hostColumn.setType(StreamColumn.Type.STRING); + + StreamColumn alertKeyColumn = new StreamColumn(); + alertKeyColumn.setName("alertKey"); + alertKeyColumn.setType(StreamColumn.Type.STRING); + + StreamColumn stateColumn = new StreamColumn(); + stateColumn.setName("state"); + stateColumn.setType(StreamColumn.Type.STRING); + + // dedupCount, dedupFirstOccurrence + + StreamColumn dedupCountColumn = new StreamColumn(); + dedupCountColumn.setName("dedupCount"); + dedupCountColumn.setType(StreamColumn.Type.LONG); + + StreamColumn dedupFirstOccurrenceColumn = new StreamColumn(); + dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE); + dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG); + + sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn)); + sd.setDataSource("testDatasource"); + sd.setStreamId("testStream"); + sd.setDescription("test stream"); + return sd; + } + + private PolicyDefinition createPolicy(String streamName, String policyName) { + PolicyDefinition pd = new PolicyDefinition(); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + //expression, something like "PT5S,dynamic,1,host" + def.setValue("test"); + def.setType("siddhi"); + pd.setDefinition(def); + pd.setInputStreams(Arrays.asList("inputStream")); + pd.setOutputStreams(Arrays.asList("outputStream")); + pd.setName(policyName); + pd.setDescription(String.format("Test policy for stream %s", streamName)); + + StreamPartition sp = new StreamPartition(); + sp.setStreamId(streamName); + sp.setColumns(Arrays.asList("host")); + sp.setType(StreamPartition.Type.GROUPBY); + pd.addPartition(sp); + return pd; + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java index 52d4460..a788646 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java @@ -27,11 +27,8 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory; import org.apache.eagle.alert.engine.router.TestAlertPublisherBolt; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; @@ -40,22 +37,9 @@ import com.fasterxml.jackson.databind.type.SimpleType; public class ExtendedDeduplicatorTest { - private DedupEventsStore store; - - @Before - public void setUp() { - store = Mockito.mock(DedupEventsStore.class); - DedupEventsStoreFactory.customizeStore(store); - } - - @After - public void tearDown() { - Mockito.reset(store); - } - - @Test - public void testNormal() throws Exception { - List<Publishment> pubs = loadEntities("/router/publishments-extended-deduplicator.json", Publishment.class); + @Test + public void testNormal() throws Exception { + List<Publishment> pubs = loadEntities("/router/publishments-extended-deduplicator.json", Publishment.class); AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null); AlertStreamEvent event1 = createWithStreamDef("extended_dedup_host1", "extended_dedup_testapp1", "OPEN"); @@ -65,11 +49,10 @@ public class ExtendedDeduplicatorTest { Assert.assertNotNull(plugin.dedup(event1)); Assert.assertNull(plugin.dedup(event2)); Assert.assertNotNull(plugin.dedup(event3)); - - Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject()); - } - - private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception { + + } + + private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz)); List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type); @@ -95,7 +78,7 @@ public class ExtendedDeduplicatorTest { StreamColumn hostColumn = new StreamColumn(); hostColumn.setName("hostname"); hostColumn.setType(StreamColumn.Type.STRING); - + StreamColumn stateColumn = new StreamColumn(); stateColumn.setName("state"); stateColumn.setType(StreamColumn.Type.STRING); @@ -105,5 +88,5 @@ public class ExtendedDeduplicatorTest { alert.setSchema(sd); return alert; } - + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java deleted file mode 100644 index ed44267..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentLinkedDeque; - -public class MongoDedupStoreTest extends MongoDependencyBaseTest { - - @Test - public void testNormal() throws Exception { - Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = store.getEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(0, events.size()); - - String streamId = "testStream"; - String policyId = "testPolicy"; - long timestamp = System.currentTimeMillis(); - HashMap<String, String> customFieldValues = new HashMap<String, String>(); - customFieldValues.put("alertKey", "test-alert-key"); - EventUniq eventEniq = new EventUniq(streamId, policyId, timestamp, customFieldValues); - - ConcurrentLinkedDeque<DedupValue> dedupStateValues = new ConcurrentLinkedDeque<DedupValue>(); - DedupValue one = new DedupValue(); - one.setStateFieldValue("OPEN"); - one.setCount(2); - one.setCloseTime(0); - one.setDocId("doc-id-..."); - one.setFirstOccurrence(System.currentTimeMillis()); - dedupStateValues.add(one); - store.add(eventEniq, dedupStateValues); - - events = store.getEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(1, events.size()); - - Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry = events.entrySet().iterator().next(); - Assert.assertEquals(streamId, entry.getKey().streamId); - Assert.assertEquals(1, entry.getValue().size()); - Assert.assertEquals(2, entry.getValue().getLast().getCount()); - - store.remove(events.keySet().iterator().next()); - events = store.getEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(0, events.size()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java deleted file mode 100644 index b7d7613..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.mongodb.client.MongoDatabase; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -public abstract class MongoDependencyBaseTest { - - private static Logger LOG = LoggerFactory.getLogger(MongoDependencyBaseTest.class); - - private static SimpleEmbedMongo mongo; - @SuppressWarnings("unused") - private static MongoDatabase testDB; - private static Config config; - - protected static MongoDedupEventsStore store; - - public static void before() { - try { - mongo = new SimpleEmbedMongo(); - mongo.start(); - testDB = mongo.getMongoClient().getDatabase("testDb"); - } catch (Exception e) { - LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e); - } - } - - @BeforeClass - public static void setup() throws Exception { - before(); - - System.setProperty("config.resource", "/application-mongo-statestore.conf"); - ConfigFactory.invalidateCaches(); - config = ConfigFactory.load(); - - store = new MongoDedupEventsStore(config, "testPublishment"); - } - - @AfterClass - public static void teardown() { - if (mongo != null) { - mongo.shutdown(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c15e7f81/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java index 79c03bc..5cdb6f1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java @@ -18,12 +18,11 @@ package org.apache.eagle.alert.engine.router; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.eagle.alert.coordination.model.PublishSpec; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; @@ -32,38 +31,26 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.AlertPublisher; -import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStore; -import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory; import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory; import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; import org.apache.eagle.alert.engine.runner.AlertPublisherBolt; import org.apache.eagle.alert.engine.runner.MapComparator; import org.apache.eagle.alert.engine.utils.MetadataSerDeser; -import org.junit.*; -import org.mockito.Mockito; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.fasterxml.jackson.databind.type.SimpleType; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; /** * @Since 5/14/16. */ public class TestAlertPublisherBolt { - - private DedupEventsStore store; - - @Before - public void setUp() { - store = Mockito.mock(DedupEventsStore.class); - DedupEventsStoreFactory.customizeStore(store); - } - - @After - public void tearDown() { - Mockito.reset(store); - } @SuppressWarnings("rawtypes") @Ignore @@ -86,7 +73,7 @@ public class TestAlertPublisherBolt { policy.setName("policy1"); alert.setPolicyId(policy.getName()); alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[]{"field_1", 2, "field_3"}); + alert.setData(new Object[] {"field_1", 2, "field_3"}); alert.setStreamId(streamId); alert.setCreatedBy(this.toString()); return alert; @@ -198,7 +185,7 @@ public class TestAlertPublisherBolt { StreamColumn hostColumn = new StreamColumn(); hostColumn.setName("hostname"); hostColumn.setType(StreamColumn.Type.STRING); - + StreamColumn stateColumn = new StreamColumn(); stateColumn.setName("state"); stateColumn.setType(StreamColumn.Type.STRING); @@ -209,7 +196,7 @@ public class TestAlertPublisherBolt { return alert; } - @Test + @Test public void testCustomFieldDedupEvent() throws Exception { List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class); @@ -221,12 +208,9 @@ public class TestAlertPublisherBolt { Assert.assertNotNull(plugin.dedup(event1)); Assert.assertNull(plugin.dedup(event2)); Assert.assertNotNull(plugin.dedup(event3)); - - Mockito.verify(store).getEvents(); - Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject()); } - @Test + @Test public void testEmptyCustomFieldDedupEvent() throws Exception { List<Publishment> pubs = loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class); @@ -236,8 +220,6 @@ public class TestAlertPublisherBolt { Assert.assertNotNull(plugin.dedup(event1)); Assert.assertNull(plugin.dedup(event2)); - - Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject()); } private AlertStreamEvent createSeverityWithStreamDef(String hostname, String appName, String message, String severity, String docId, String df_device, String df_type, String colo) {