http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java index 4e8d381..8f00b31 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java @@ -33,65 +33,66 @@ public class AbsenceWindowProcessor { private boolean expired; // to mark if the time range has been went through private OccurStatus status = OccurStatus.not_sure; - public enum OccurStatus{ + public enum OccurStatus { not_sure, occured, absent } - public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window){ + public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window) { this.expectAttrs = expectAttrs; this.window = window; expired = false; } /** - * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false - * @param appearAttrs - * @param occurTime - * @return + * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false. */ - public void process(List<Object> appearAttrs, long occurTime){ - if(expired) + public void process(List<Object> appearAttrs, long occurTime) { + if (expired) { throw new IllegalStateException("Expired window can't recieve events"); - switch(status) { + } + switch (status) { case not_sure: - if(occurTime < window.startTime) { + if (occurTime < window.startTime) { break; - }else if(occurTime >= window.startTime && - occurTime <= window.endTime) { - if(expectAttrs.equals(appearAttrs)) { + } else if (occurTime >= window.startTime + && occurTime <= window.endTime) { + if (expectAttrs.equals(appearAttrs)) { status = OccurStatus.occured; } break; - }else{ + } else { status = OccurStatus.absent; break; } case occured: - if(occurTime > window.endTime) + if (occurTime > window.endTime) { expired = true; + } break; default: break; } // reset status - if(status == OccurStatus.absent){ + if (status == OccurStatus.absent) { expired = true; } } - public OccurStatus checkStatus(){ + public OccurStatus checkStatus() { return status; } - public boolean checkExpired(){ + + public boolean checkExpired() { return expired; } - public AbsenceWindow currWindow(){ + + public AbsenceWindow currWindow() { return window; } - public String toString(){ + public String toString() { return "expectAttrs=" + expectAttrs + ", status=" + status + ", expired=" + expired + ", window=[" + window + "]"; } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java index ca1f622..33d502e 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java @@ -16,37 +16,36 @@ */ package org.apache.eagle.alert.engine.evaluator.impl; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import backtype.storm.task.OutputCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.task.OutputCollector; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; /** - * <h2>Thread Safe Mechanism</h2> + * <h2>Thread Safe Mechanism.</h2> * <ul> * <li> - * emit() method is thread-safe enough to be called anywhere asynchronously in multi-thread + * emit() method is thread-safe enough to be called anywhere asynchronously in multi-thread * </li> * <li> - * flush() method must be called synchronously, because Storm OutputCollector is not thread-safe + * flush() method must be called synchronously, because Storm OutputCollector is not thread-safe * </li> * </ul> */ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector { private final OutputCollector delegate; private final LinkedBlockingQueue<AlertStreamEvent> queue; - private final static Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class); + private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class); private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis()); private final AutoAlertFlusher flusher; - private final static int MAX_ALERT_DELAY_SECS = 10; + private static final int MAX_ALERT_DELAY_SECS = 10; public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector) { this.delegate = outputCollector; @@ -59,7 +58,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCol private static class AutoAlertFlusher extends Thread { private final AlertBoltOutputCollectorThreadSafeWrapper collector; private boolean stopped = false; - private final static Logger LOG = LoggerFactory.getLogger(AutoAlertFlusher.class); + private static final Logger LOG = LoggerFactory.getLogger(AutoAlertFlusher.class); private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper collector) { this.collector = collector; @@ -75,6 +74,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCol try { Thread.sleep(5000); } catch (InterruptedException ignored) { + // ignored } } LOG.info("Stopped"); @@ -87,7 +87,8 @@ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCol } /** - * Emit method can be called in multi-thread + * Emit method can be called in multi-thread. + * * @param event */ @Override @@ -95,7 +96,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCol try { queue.put(event); } catch (InterruptedException e) { - LOG.error(e.getMessage(),e); + LOG.error(e.getMessage(), e); } } @@ -104,7 +105,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCol */ @Override public void flush() { - if(!queue.isEmpty()) { + if (!queue.isEmpty()) { List<AlertStreamEvent> events = new ArrayList<>(); queue.drainTo(events); events.forEach((event) -> delegate.emit(Arrays.asList(event.getStreamId(), event))); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index 8d8a4d2..042fca7 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -16,20 +16,19 @@ */ package org.apache.eagle.alert.engine.evaluator.impl; -import java.util.Arrays; - import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.StreamContext; import org.apache.eagle.alert.engine.model.AlertStreamEvent; - import backtype.storm.task.OutputCollector; +import java.util.Arrays; + public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { private final OutputCollector delegate; private final Object outputLock; private final StreamContext streamContext; - public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock, StreamContext streamContext){ + public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock, StreamContext streamContext) { this.delegate = outputCollector; this.outputLock = outputLock; this.streamContext = streamContext; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java index b9a109c..ee1853c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java @@ -52,14 +52,12 @@ public class AlertStreamCallback extends StreamCallback { } /** - * Possibly more than one event will be triggered for alerting - * - * @param events + * Possibly more than one event will be triggered for alerting. */ @Override public void receive(Event[] events) { String policyName = context.getPolicyDefinition().getName(); - CompositePolicyHandler handler = ((PolicyGroupEvaluatorImpl)context.getPolicyEvaluator()).getPolicyHandler(policyName); + CompositePolicyHandler handler = ((PolicyGroupEvaluatorImpl) context.getPolicyEvaluator()).getPolicyHandler(policyName); if (LOG.isDebugEnabled()) { LOG.debug("Generated {} alerts from policy '{}' in {}, index of definiton {} ", events.length, policyName, context.getPolicyEvaluatorId(), currentIndex); } @@ -75,18 +73,21 @@ public class AlertStreamCallback extends StreamCallback { event.setCreatedTime(System.currentTimeMillis()); event.setSchema(definition); - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug("Generate new alert event: {}", event); + } try { if (handler == null) { // extreme case: the handler is removed from the evaluator. Just emit. - if (LOG.isDebugEnabled()) LOG.debug(" handler not found when callback received event, directly emit. policy removed? "); + if (LOG.isDebugEnabled()) { + LOG.debug(" handler not found when callback received event, directly emit. policy removed? "); + } collector.emit(event); } else { handler.send(event, currentIndex + 1); } } catch (Exception ex) { - LOG.error(String.format("send event %s to index %d failed with exception. ",event, currentIndex), ex); + LOG.error(String.format("send event %s to index %d failed with exception. ", event, currentIndex), ex); } } context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count")).incrBy(events.length); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java index 26ae19e..eed4b3b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java @@ -35,17 +35,17 @@ import java.util.Map; public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { private static final long serialVersionUID = -5499413193675985288L; - private final static Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class); private AlertStreamCollector collector; // mapping from policy name to PolicyDefinition - private volatile Map<String,PolicyDefinition> policyDefinitionMap = new HashMap<>(); + private volatile Map<String, PolicyDefinition> policyDefinitionMap = new HashMap<>(); // mapping from policy name to PolicyStreamHandler - private volatile Map<String,CompositePolicyHandler> policyStreamHandlerMap = new HashMap<>(); + private volatile Map<String, CompositePolicyHandler> policyStreamHandlerMap = new HashMap<>(); private String policyEvaluatorId; private StreamContext context; - public PolicyGroupEvaluatorImpl(String policyEvaluatorId){ + public PolicyGroupEvaluatorImpl(String policyEvaluatorId) { this.policyEvaluatorId = policyEvaluatorId; } @@ -67,34 +67,35 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { } public void close() { - for(PolicyStreamHandler handler: policyStreamHandlerMap.values()){ + for (PolicyStreamHandler handler : policyStreamHandlerMap.values()) { try { handler.close(); } catch (Exception e) { - LOG.error("Failed to close handler {}",handler.toString(),e); + LOG.error("Failed to close handler {}", handler.toString(), e); } } } /** - * fixme make selection of PolicyStreamHandler to be more efficient + * fixme make selection of PolicyStreamHandler to be more efficient. + * * @param partitionedEvent PartitionedEvent */ - private void dispatch(PartitionedEvent partitionedEvent){ + private void dispatch(PartitionedEvent partitionedEvent) { boolean handled = false; - for(Map.Entry<String,CompositePolicyHandler> policyStreamHandler: policyStreamHandlerMap.entrySet()){ - if(isAcceptedByPolicy(partitionedEvent,policyDefinitionMap.get(policyStreamHandler.getKey()))){ + for (Map.Entry<String, CompositePolicyHandler> policyStreamHandler : policyStreamHandlerMap.entrySet()) { + if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) { try { handled = true; this.context.counter().scope("eval_count").incr(); policyStreamHandler.getValue().send(partitionedEvent.getEvent()); } catch (Exception e) { this.context.counter().scope("fail_count").incr(); - LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent(), e); + LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e); } } } - if(!handled){ + if (!handled) { this.context.counter().scope("drop_count").incr(); LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent); } else { @@ -103,22 +104,22 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { } private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy) { - return policy.getPartitionSpec().contains(event.getPartition()) && - ( policy.getInputStreams().contains(event.getEvent().getStreamId()) || - policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId()) ); + return policy.getPartitionSpec().contains(event.getPartition()) + && (policy.getInputStreams().contains(event.getEvent().getStreamId()) + || policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId())); } @Override public void onPolicyChange(List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) { - Map<String,PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap); - Map<String,CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap); - for(PolicyDefinition pd : added){ + Map<String, PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap); + Map<String, CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap); + for (PolicyDefinition pd : added) { inplaceAdd(copyPolicies, copyHandlers, pd, sds); } - for(PolicyDefinition pd : removed){ + for (PolicyDefinition pd : removed) { inplaceRemove(copyPolicies, copyHandlers, pd); } - for(PolicyDefinition pd : modified){ + for (PolicyDefinition pd : modified) { inplaceRemove(copyPolicies, copyHandlers, pd); inplaceAdd(copyPolicies, copyHandlers, pd, sds); } @@ -132,9 +133,9 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { } private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) { - if(handlers.containsKey(policy.getName())){ + if (handlers.containsKey(policy.getName())) { LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy); - }else { + } else { policies.put(policy.getName(), policy); CompositePolicyHandler handler = new CompositePolicyHandler(sds); try { @@ -154,20 +155,20 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { } } - private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy) { - if(handlers.containsKey(policy.getName())) { + private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy) { + if (handlers.containsKey(policy.getName())) { PolicyStreamHandler handler = handlers.get(policy.getName()); try { handler.close(); } catch (Exception e) { - LOG.error("Failed to close handler {}",handler,e); - }finally { + LOG.error("Failed to close handler {}", handler, e); + } finally { policies.remove(policy.getName()); handlers.remove(policy.getName()); - LOG.info("Removed policy: {}",policy); + LOG.info("Removed policy: {}", policy); } } else { - LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: "+policy); + LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: " + policy); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java index 5b83abb..9b9fcac 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java @@ -16,119 +16,118 @@ */ package org.apache.eagle.alert.engine.evaluator.impl; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wso2.siddhi.query.api.definition.AbstractDefinition; import org.wso2.siddhi.query.api.definition.Attribute; -import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class SiddhiDefinitionAdapter { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiDefinitionAdapter.class); - public final static String DEFINE_STREAM_TEMPLATE = "define stream %s ( %s );"; + private static final Logger LOG = LoggerFactory.getLogger(SiddhiDefinitionAdapter.class); + public static final String DEFINE_STREAM_TEMPLATE = "define stream %s ( %s );"; - public static String buildStreamDefinition(StreamDefinition streamDefinition){ + public static String buildStreamDefinition(StreamDefinition streamDefinition) { List<String> columns = new ArrayList<>(); - Preconditions.checkNotNull(streamDefinition,"StreamDefinition is null"); - if(streamDefinition.getColumns()!=null) { + Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null"); + if (streamDefinition.getColumns() != null) { for (StreamColumn column : streamDefinition.getColumns()) { columns.add(String.format("%s %s", column.getName(), convertToSiddhiAttributeType(column.getType()).toString().toLowerCase())); } - }else{ - LOG.warn("No columns found for stream {}"+streamDefinition.getStreamId()); + } else { + LOG.warn("No columns found for stream {}" + streamDefinition.getStreamId()); } - return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getStreamId(),StringUtils.join(columns,",")); + return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getStreamId(), StringUtils.join(columns, ",")); } - public static Attribute.Type convertToSiddhiAttributeType(StreamColumn.Type type){ - if(_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)){ + public static Attribute.Type convertToSiddhiAttributeType(StreamColumn.Type type) { + if (_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)) { return _EAGLE_SIDDHI_TYPE_MAPPING.get(type); } - throw new IllegalArgumentException("Unknown stream type: "+type); + throw new IllegalArgumentException("Unknown stream type: " + type); } - public static Class<?> convertToJavaAttributeType(StreamColumn.Type type){ - if(_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)){ + public static Class<?> convertToJavaAttributeType(StreamColumn.Type type) { + if (_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)) { return _EAGLE_JAVA_TYPE_MAPPING.get(type); } - throw new IllegalArgumentException("Unknown stream type: "+type); + throw new IllegalArgumentException("Unknown stream type: " + type); } - public static StreamColumn.Type convertFromJavaAttributeType(Class<?> type){ - if(_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)){ + public static StreamColumn.Type convertFromJavaAttributeType(Class<?> type) { + if (_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)) { return _JAVA_EAGLE_TYPE_MAPPING.get(type); } - throw new IllegalArgumentException("Unknown stream type: "+type); + throw new IllegalArgumentException("Unknown stream type: " + type); } - public static StreamColumn.Type convertFromSiddhiAttributeType(Attribute.Type type){ - if(_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)){ + public static StreamColumn.Type convertFromSiddhiAttributeType(Attribute.Type type) { + if (_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)) { return _SIDDHI_EAGLE_TYPE_MAPPING.get(type); } - throw new IllegalArgumentException("Unknown siddhi type: "+type); + throw new IllegalArgumentException("Unknown siddhi type: " + type); } /** * public enum Type { - * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT - * } + * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT + * }. */ - private final static Map<StreamColumn.Type,Attribute.Type> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>(); - private final static Map<StreamColumn.Type,Class<?>> _EAGLE_JAVA_TYPE_MAPPING = new HashMap<>(); - private final static Map<Class<?>,StreamColumn.Type> _JAVA_EAGLE_TYPE_MAPPING = new HashMap<>(); - private final static Map<Attribute.Type,StreamColumn.Type> _SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>(); + private static final Map<StreamColumn.Type, Attribute.Type> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>(); + private static final Map<StreamColumn.Type, Class<?>> _EAGLE_JAVA_TYPE_MAPPING = new HashMap<>(); + private static final Map<Class<?>, StreamColumn.Type> _JAVA_EAGLE_TYPE_MAPPING = new HashMap<>(); + private static final Map<Attribute.Type, StreamColumn.Type> _SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>(); static { - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING,Attribute.Type.STRING); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT,Attribute.Type.INT); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG,Attribute.Type.LONG); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Attribute.Type.FLOAT); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Attribute.Type.DOUBLE); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Attribute.Type.BOOL); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Attribute.Type.OBJECT); - - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING,String.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT,Integer.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG,Long.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Float.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Double.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Boolean.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Object.class); - - _JAVA_EAGLE_TYPE_MAPPING.put(String.class,StreamColumn.Type.STRING); - _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class,StreamColumn.Type.INT); - _JAVA_EAGLE_TYPE_MAPPING.put(Long.class,StreamColumn.Type.LONG); - _JAVA_EAGLE_TYPE_MAPPING.put(Float.class,StreamColumn.Type.FLOAT); - _JAVA_EAGLE_TYPE_MAPPING.put(Double.class,StreamColumn.Type.DOUBLE); - _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class,StreamColumn.Type.BOOL); - _JAVA_EAGLE_TYPE_MAPPING.put(Object.class,StreamColumn.Type.OBJECT); - - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING,StreamColumn.Type.STRING); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT,StreamColumn.Type.INT); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG,StreamColumn.Type.LONG); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT,StreamColumn.Type.FLOAT); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE,StreamColumn.Type.DOUBLE); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL,StreamColumn.Type.BOOL); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT,StreamColumn.Type.OBJECT); + _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, Attribute.Type.STRING); + _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, Attribute.Type.INT); + _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, Attribute.Type.LONG); + _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, Attribute.Type.FLOAT); + _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, Attribute.Type.DOUBLE); + _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, Attribute.Type.BOOL); + _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, Attribute.Type.OBJECT); + + _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING, String.class); + _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT, Integer.class); + _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG, Long.class); + _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, Float.class); + _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, Double.class); + _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL, Boolean.class); + _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, Object.class); + + _JAVA_EAGLE_TYPE_MAPPING.put(String.class, StreamColumn.Type.STRING); + _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class, StreamColumn.Type.INT); + _JAVA_EAGLE_TYPE_MAPPING.put(Long.class, StreamColumn.Type.LONG); + _JAVA_EAGLE_TYPE_MAPPING.put(Float.class, StreamColumn.Type.FLOAT); + _JAVA_EAGLE_TYPE_MAPPING.put(Double.class, StreamColumn.Type.DOUBLE); + _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class, StreamColumn.Type.BOOL); + _JAVA_EAGLE_TYPE_MAPPING.put(Object.class, StreamColumn.Type.OBJECT); + + _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING, StreamColumn.Type.STRING); + _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT, StreamColumn.Type.INT); + _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG, StreamColumn.Type.LONG); + _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT, StreamColumn.Type.FLOAT); + _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE, StreamColumn.Type.DOUBLE); + _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL, StreamColumn.Type.BOOL); + _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT, StreamColumn.Type.OBJECT); } - public static StreamDefinition convertFromSiddiDefinition(AbstractDefinition siddhiDefinition){ + public static StreamDefinition convertFromSiddiDefinition(AbstractDefinition siddhiDefinition) { StreamDefinition streamDefinition = new StreamDefinition(); streamDefinition.setStreamId(siddhiDefinition.getId()); List<StreamColumn> columns = new ArrayList<>(siddhiDefinition.getAttributeNameArray().length); - for(Attribute attribute:siddhiDefinition.getAttributeList()){ + for (Attribute attribute : siddhiDefinition.getAttributeList()) { StreamColumn column = new StreamColumn(); column.setType(convertFromSiddhiAttributeType(attribute.getType())); column.setName(attribute.getName()); @@ -136,7 +135,7 @@ public class SiddhiDefinitionAdapter { } streamDefinition.setColumns(columns); streamDefinition.setTimeseries(true); - streamDefinition.setDescription("Auto-generated stream schema from siddhi for "+siddhiDefinition.getId()); + streamDefinition.setDescription("Auto-generated stream schema from siddhi for " + siddhiDefinition.getId()); return streamDefinition; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java index 481e3af..e7ed56f 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Map; public class SiddhiPolicyHandler implements PolicyStreamHandler { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class); private ExecutionPlanRuntime executionRuntime; private SiddhiManager siddhiManager; private Map<String, StreamDefinition> sds; @@ -43,7 +43,7 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler { private int currentIndex = 0; // the index of current definition statement inside the policy definition - public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index){ + public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index) { this.sds = sds; this.currentIndex = index; } @@ -59,44 +59,46 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler { coreDefinition.setOutputStreams(policyDefinition.getOutputStreams()); } - for(String inputStream : coreDefinition.getInputStreams()) { + for (String inputStream : coreDefinition.getInputStreams()) { builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream))); builder.append("\n"); } builder.append(coreDefinition.value); - if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi execution plan: {} from definition: {}", builder.toString(), coreDefinition); + if (LOG.isDebugEnabled()) { + LOG.debug("Generated siddhi execution plan: {} from definition: {}", builder.toString(), coreDefinition); + } return builder.toString(); } @Override public void prepare(final Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - LOG.info("Initializing handler for policy {}",context.getPolicyDefinition()); + LOG.info("Initializing handler for policy {}", context.getPolicyDefinition()); this.policy = context.getPolicyDefinition(); this.siddhiManager = new SiddhiManager(); String plan = generateExecutionPlan(policy, sds); try { this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan); - LOG.info("Created siddhi runtime {}",executionRuntime.getName()); - }catch (Exception parserException){ - LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n",context.getPolicyDefinition().getName(),plan,parserException); + LOG.info("Created siddhi runtime {}", executionRuntime.getName()); + } catch (Exception parserException) { + LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n", context.getPolicyDefinition().getName(), plan, parserException); throw parserException; } // add output stream callback List<String> outputStreams = getOutputStreams(policy); - for(final String outputStream: outputStreams) { + for (final String outputStream : outputStreams) { if (executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) { this.executionRuntime.addCallback(outputStream, - new AlertStreamCallback( - outputStream, SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream)) - , collector, context, currentIndex)); + new AlertStreamCallback( + outputStream, SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream)), + collector, context, currentIndex)); } else { throw new IllegalStateException("Undefined output stream " + outputStream); } } this.executionRuntime.start(); this.context = context; - LOG.info("Initialized policy handler for policy: {}",policy.getName()); + LOG.info("Initialized policy handler for policy: {}", policy.getName()); } protected List<String> getOutputStreams(PolicyDefinition policy) { @@ -104,29 +106,29 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler { } public void send(StreamEvent event) throws Exception { - context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"receive_count")).incr(); + context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count")).incr(); String streamId = event.getStreamId(); InputHandler inputHandler = executionRuntime.getInputHandler(streamId); - if(inputHandler != null){ - context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr(); - inputHandler.send(event.getTimestamp(),event.getData()); - + if (inputHandler != null) { + context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count")).incr(); + inputHandler.send(event.getTimestamp(), event.getData()); + if (LOG.isDebugEnabled()) { LOG.debug("sent event to siddhi stream {} ", streamId); } - }else{ - context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr(); - LOG.warn("No input handler found for stream {}",streamId); + } else { + context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count")).incr(); + LOG.warn("No input handler found for stream {}", streamId); } } public void close() throws Exception { - LOG.info("Closing handler for policy {}",this.policy.getName()); + LOG.info("Closing handler for policy {}", this.policy.getName()); this.executionRuntime.shutdown(); - LOG.info("Shutdown siddhi runtime {}",this.executionRuntime.getName()); + LOG.info("Shutdown siddhi runtime {}", this.executionRuntime.getName()); this.siddhiManager.shutdown(); - LOG.info("Shutdown siddhi manager {}",this.siddhiManager); - LOG.info("Closed handler for policy {}",this.policy.getName()); + LOG.info("Shutdown siddhi manager {}", this.siddhiManager); + LOG.info("Closed handler for policy {}", this.policy.getName()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java index 43b8f30..11f484d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java @@ -30,7 +30,7 @@ import java.util.Map; */ public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyStateHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyStateHandler.class); public SiddhiPolicyStateHandler(Map<String, StreamDefinition> sds, int index) { super(sds, index); @@ -40,12 +40,14 @@ public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler { protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamDefinitionNotFoundException { StringBuilder builder = new StringBuilder(); PolicyDefinition.Definition stateDefiniton = policyDefinition.getStateDefinition(); - for(String inputStream : stateDefiniton.getInputStreams()) { // the state stream follow the output stream of the policy definition + for (String inputStream : stateDefiniton.getInputStreams()) { // the state stream follow the output stream of the policy definition builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream))); builder.append("\n"); } builder.append(stateDefiniton.value); - if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi state execution plan: {} from definiton: {}", builder.toString(), stateDefiniton); + if (LOG.isDebugEnabled()) { + LOG.debug("Generated siddhi state execution plan: {} from definiton: {}", builder.toString(), stateDefiniton); + } return builder.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java index 357504e..ef806fb 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java @@ -16,6 +16,10 @@ */ package org.apache.eagle.alert.engine.evaluator.nodata; +import org.apache.eagle.alert.engine.model.StreamEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -24,105 +28,101 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class DistinctValuesInTimeBatchWindow { - private static final Logger LOG = LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class); - - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - // wisb (what is should be) set for expected full set value of multiple columns - @SuppressWarnings("rawtypes") - private volatile Set wisb = new HashSet(); - - private NoDataPolicyTimeBatchHandler handler; - - /** - * map from value to max timestamp for this value - */ - private Map<Object, Long> valueMaxTimeMap = new HashMap<>(); - - private long startTime = -1; - private long nextEmitTime = -1; - private long timeInMilliSeconds; - - public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler handler, - long timeInMilliSeconds, @SuppressWarnings("rawtypes") Set wisb) { - this.handler = handler; - this.timeInMilliSeconds = timeInMilliSeconds; - if (wisb != null) { - this.wisb = wisb; - } - } - - public Map<Object, Long> distinctValues() { - return valueMaxTimeMap; - } - - public void send(StreamEvent event, Object value, long timestamp) { - synchronized(this) { - if (startTime < 0) { - startTime = System.currentTimeMillis(); - - scheduler.scheduleAtFixedRate(new Runnable() { - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public void run() { - try { - LOG.info("{}/{}: {}", startTime, nextEmitTime, valueMaxTimeMap.keySet()); - synchronized (valueMaxTimeMap) { - boolean sendAlerts = false; - - if (nextEmitTime < 0) { - nextEmitTime = startTime + timeInMilliSeconds; - } - - if (System.currentTimeMillis() > nextEmitTime) { - startTime = nextEmitTime; - nextEmitTime += timeInMilliSeconds; - sendAlerts = true; - } else { - sendAlerts = false; - } - - if (sendAlerts) { - // alert - handler.compareAndEmit(wisb, distinctValues().keySet(), event); - LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), wisb); - - if (distinctValues().keySet().size() > 0) { - wisb = new HashSet(distinctValues().keySet()); - } - valueMaxTimeMap.clear(); - LOG.info("Clear wiri & update wisb to {}", wisb); - } - } - } catch (Throwable t) { - LOG.error("failed to run batch window for gap alert", t); - } - } - - }, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS); - } - } - - if (valueMaxTimeMap.containsKey(value)) { - // remove that entry with old timestamp in timeSortedMap - long oldTime = valueMaxTimeMap.get(value); - if (oldTime >= timestamp) { - // no any effect as the new timestamp is equal or even less than - // old timestamp - return; - } - } - // update new timestamp in valueMaxTimeMap - valueMaxTimeMap.put(value, timestamp); - - LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), startTime, nextEmitTime); - } + private static final Logger LOG = LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class); + + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + // wisb (what is should be) set for expected full set value of multiple columns + @SuppressWarnings("rawtypes") + private volatile Set wisb = new HashSet(); + + private NoDataPolicyTimeBatchHandler handler; + + /** + * map from value to max timestamp for this value. + */ + private Map<Object, Long> valueMaxTimeMap = new HashMap<>(); + + private long startTime = -1; + private long nextEmitTime = -1; + private long timeInMilliSeconds; + + public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler handler, + long timeInMilliSeconds, @SuppressWarnings("rawtypes") Set wisb) { + this.handler = handler; + this.timeInMilliSeconds = timeInMilliSeconds; + if (wisb != null) { + this.wisb = wisb; + } + } + + public Map<Object, Long> distinctValues() { + return valueMaxTimeMap; + } + + public void send(StreamEvent event, Object value, long timestamp) { + synchronized (this) { + if (startTime < 0) { + startTime = System.currentTimeMillis(); + + scheduler.scheduleAtFixedRate(new Runnable() { + + @SuppressWarnings( {"unchecked", "rawtypes"}) + @Override + public void run() { + try { + LOG.info("{}/{}: {}", startTime, nextEmitTime, valueMaxTimeMap.keySet()); + synchronized (valueMaxTimeMap) { + boolean sendAlerts = false; + + if (nextEmitTime < 0) { + nextEmitTime = startTime + timeInMilliSeconds; + } + + if (System.currentTimeMillis() > nextEmitTime) { + startTime = nextEmitTime; + nextEmitTime += timeInMilliSeconds; + sendAlerts = true; + } else { + sendAlerts = false; + } + + if (sendAlerts) { + // alert + handler.compareAndEmit(wisb, distinctValues().keySet(), event); + LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), wisb); + + if (distinctValues().keySet().size() > 0) { + wisb = new HashSet(distinctValues().keySet()); + } + valueMaxTimeMap.clear(); + LOG.info("Clear wiri & update wisb to {}", wisb); + } + } + } catch (Throwable t) { + LOG.error("failed to run batch window for gap alert", t); + } + } + + }, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS); + } + } + + if (valueMaxTimeMap.containsKey(value)) { + // remove that entry with old timestamp in timeSortedMap + long oldTime = valueMaxTimeMap.get(value); + if (oldTime >= timestamp) { + // no any effect as the new timestamp is equal or even less than + // old timestamp + return; + } + } + // update new timestamp in valueMaxTimeMap + valueMaxTimeMap.put(value, timestamp); + + LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), startTime, nextEmitTime); + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java index 676357a..4aae040 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java @@ -16,70 +16,69 @@ */ package org.apache.eagle.alert.engine.evaluator.nodata; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - import org.apache.commons.lang.builder.HashCodeBuilder; +import java.util.*; + /** * Since 6/28/16. * to get distinct values within a specified time window * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had * timeSortedMap : map sorted by timestamp first and then value - * With the above 2 data structure, we can get distinct values in LOG(N) + * With the above 2 data structure, we can get distinct values in LOG(N). */ public class DistinctValuesInTimeWindow { - public static class ValueAndTime{ + public static class ValueAndTime { Object value; long timestamp; - public ValueAndTime(Object value, long timestamp){ + + public ValueAndTime(Object value, long timestamp) { this.value = value; this.timestamp = timestamp; } - public String toString(){ + public String toString() { return "[" + value + "," + timestamp + "]"; } - public int hashCode(){ + public int hashCode() { return new HashCodeBuilder().append(value).append(timestamp).toHashCode(); } - public boolean equals(Object that){ - if(!(that instanceof ValueAndTime)) + public boolean equals(Object that) { + if (!(that instanceof ValueAndTime)) { return false; - ValueAndTime another = (ValueAndTime)that; + } + ValueAndTime another = (ValueAndTime) that; return another.timestamp == this.timestamp && another.value.equals(this.value); } } - public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{ + public static class ValueAndTimeComparator implements Comparator<ValueAndTime> { @Override public int compare(ValueAndTime o1, ValueAndTime o2) { - if(o1.timestamp != o2.timestamp) + if (o1.timestamp != o2.timestamp) { return (o1.timestamp > o2.timestamp) ? 1 : -1; - if(o1.value.equals(o2.value)) + } + if (o1.value.equals(o2.value)) { return 0; - else { + } else { // this is not strictly correct, but I don't want to write too many comparators here :-) - if(o1.hashCode() > o2.hashCode()) + if (o1.hashCode() > o2.hashCode()) { return 1; - else + } else { return -1; + } } } } /** - * map from value to max timestamp for this value + * map from value to max timestamp for this value. */ private Map<Object, Long> valueMaxTimeMap = new HashMap<>(); /** - * map sorted by time(max timestamp for the value) and then value + * map sorted by time(max timestamp for the value) and then value. */ private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator()); private long maxTimestamp = 0L; @@ -87,20 +86,20 @@ public class DistinctValuesInTimeWindow { private boolean windowSlided; /** - * @param window - milliseconds + * @param window - milliseconds. */ - public DistinctValuesInTimeWindow(long window){ + public DistinctValuesInTimeWindow(long window) { this.window = window; } - public void send(Object value, long timestamp){ + public void send(Object value, long timestamp) { ValueAndTime vt = new ValueAndTime(value, timestamp); // todo think of time out of order - if(valueMaxTimeMap.containsKey(value)){ + if (valueMaxTimeMap.containsKey(value)) { // remove that entry with old timestamp in timeSortedMap long oldTime = valueMaxTimeMap.get(value); - if(oldTime >= timestamp){ + if (oldTime >= timestamp) { // no any effect as the new timestamp is equal or even less than old timestamp return; } @@ -117,25 +116,25 @@ public class DistinctValuesInTimeWindow { // check if some values should be evicted because of time window Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator(); - while(it.hasNext()){ + while (it.hasNext()) { Map.Entry<ValueAndTime, ValueAndTime> entry = it.next(); - if(entry.getKey().timestamp < maxTimestamp - window){ + if (entry.getKey().timestamp < maxTimestamp - window) { // should remove the entry in valueMaxTimeMap and timeSortedMap valueMaxTimeMap.remove(entry.getKey().value); windowSlided = true; it.remove(); - }else { + } else { break; } } } - public Map<Object, Long> distinctValues(){ + public Map<Object, Long> distinctValues() { return valueMaxTimeMap; } - public boolean windowSlided(){ + public boolean windowSlided() { return windowSlided; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java index 8f37b93..ec6e6e9 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java @@ -16,7 +16,6 @@ */ package org.apache.eagle.alert.engine.evaluator.nodata; -import org.apache.commons.collections.CollectionUtils; import org.apache.eagle.alert.engine.Collector; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; @@ -25,6 +24,7 @@ import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.model.StreamEvent; import org.apache.eagle.alert.utils.TimePeriodUtils; +import org.apache.commons.collections.CollectionUtils; import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,56 +39,54 @@ import java.util.*; * 2. timestamp field: timestamp column * 3. wiri safe time window: how long window is good for full set of wiri * 4. wisb: full set - * * No data policy definition should include * fixed fields and dynamic fields * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name * dynamic fields depend on wisb type. - * * policy would be like: * { - "name": "noDataAlertPolicy", - "description": "noDataAlertPolicy", - "inputStreams": [ - "noDataAlertStream" - ], - "outputStreams": [ - "noDataAlertStream_out" - ], - "definition": { - "type": "nodataalert", - "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host" - }, - "partitionSpec": [ - { - "streamId": "noDataAlertStream", - "type": "GROUPBY" - } - ], - "parallelismHint": 2 - } - "name": "noDataAlertPolicy", - "description": "noDataAlertPolicy", - "inputStreams": [ - "noDataAlertStream" - ], - "outputStreams": [ - "noDataAlertStream_out" - ], - "definition": { - "type": "nodataalert", - "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host" - }, - "partitionSpec": [ - { - "streamId": "noDataAlertStream", - "type": "GROUPBY" - } - ], - "parallelismHint": 2 - } + * "name": "noDataAlertPolicy", + * "description": "noDataAlertPolicy", + * "inputStreams": [ + * "noDataAlertStream" + * ], + * "outputStreams": [ + * "noDataAlertStream_out" + * ], + * "definition": { + * "type": "nodataalert", + * "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host" + * }, + * "partitionSpec": [ + * { + * "streamId": "noDataAlertStream", + * "type": "GROUPBY" + * } + * ], + * "parallelismHint": 2 + * } + * "name": "noDataAlertPolicy", + * "description": "noDataAlertPolicy", + * "inputStreams": [ + * "noDataAlertStream" + * ], + * "outputStreams": [ + * "noDataAlertStream_out" + * ], + * "definition": { + * "type": "nodataalert", + * "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host" + * }, + * "partitionSpec": [ + * { + * "streamId": "noDataAlertStream", + * "type": "GROUPBY" + * } + * ], + * "parallelismHint": 2 + * } */ -public class NoDataPolicyHandler implements PolicyStreamHandler{ +public class NoDataPolicyHandler implements PolicyStreamHandler { private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class); private Map<String, StreamDefinition> sds; @@ -103,9 +101,10 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{ private volatile NoDataWisbType wisbType; private volatile DistinctValuesInTimeWindow distinctWindow; - public NoDataPolicyHandler(Map<String, StreamDefinition> sds){ + public NoDataPolicyHandler(Map<String, StreamDefinition> sds) { this.sds = sds; } + @Override public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { this.collector = collector; @@ -113,11 +112,13 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{ this.policyDef = context.getPolicyDefinition(); List<String> inputStreams = policyDef.getInputStreams(); // validate inputStreams has to contain only one stream - if(inputStreams.size() != 1) + if (inputStreams.size() != 1) { throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert"); + } // validate outputStream has to contain only one stream - if(policyDef.getOutputStreams().size() != 1) + if (policyDef.getOutputStreams().size() != 1) { throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert"); + } String is = inputStreams.get(0); StreamDefinition sd = sds.get(is); @@ -129,23 +130,23 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{ distinctWindow = new DistinctValuesInTimeWindow(windowPeriod); this.wisbType = NoDataWisbType.valueOf(segments[1]); // for provided wisb values, need to parse, for dynamic wisb values, it is computed through a window - if(wisbType == NoDataWisbType.provided) { + if (wisbType == NoDataWisbType.provided) { wisbValues = new NoDataWisbProvidedParser().parse(segments); } // populate wisb field names int numOfFields = Integer.parseInt(segments[2]); - for(int i = 3; i < 3+numOfFields; i++){ + for (int i = 3; i < 3 + numOfFields; i++) { String fn = segments[i]; wisbFieldIndices.add(sd.getColumnIndex(fn)); } } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings( {"rawtypes", "unchecked"}) @Override public void send(StreamEvent event) throws Exception { Object[] data = event.getData(); List<Object> columnValues = new ArrayList<>(); - for(int i=0; i<wisbFieldIndices.size(); i++){ + for (int i = 0; i < wisbFieldIndices.size(); i++) { Object o = data[wisbFieldIndices.get(i)]; // convert value to string columnValues.add(o.toString()); @@ -155,18 +156,18 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{ LOG.debug("window slided: {}, with wiri: {}", distinctWindow.windowSlided(), distinctWindow.distinctValues()); - if(distinctWindow.windowSlided()) { + if (distinctWindow.windowSlided()) { compareAndEmit(wisbValues, wiriValues, event); } - if(wisbType == NoDataWisbType.dynamic) { + if (wisbType == NoDataWisbType.dynamic) { // deep copy wisbValues = new HashSet<>(wiriValues); } } @SuppressWarnings("rawtypes") - private void compareAndEmit(Set wisb, Set wiri, StreamEvent event){ + private void compareAndEmit(Set wisb, Set wiri, StreamEvent event) { // compare with wisbValues if wisbValues are already there for dynamic type Collection noDataValues = CollectionUtils.subtract(wisb, wiri); LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri); @@ -177,9 +178,9 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{ } } - private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent){ + private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent) { String is = policyDef.getInputStreams().get(0); - StreamDefinition sd = sds.get(is); + final StreamDefinition sd = sds.get(is); AlertStreamEvent event = new AlertStreamEvent(); event.setTimestamp(timestamp); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java index 53a7af0..73ee9b2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java @@ -16,7 +16,6 @@ */ package org.apache.eagle.alert.engine.evaluator.nodata; -import org.apache.commons.collections.CollectionUtils; import org.apache.eagle.alert.engine.Collector; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; @@ -25,6 +24,7 @@ import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.model.StreamEvent; import org.apache.eagle.alert.utils.TimePeriodUtils; +import org.apache.commons.collections.CollectionUtils; import org.apache.storm.guava.base.Joiner; import org.joda.time.Period; import org.slf4j.Logger; @@ -33,132 +33,134 @@ import org.slf4j.LoggerFactory; import java.util.*; public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler { - - private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class); - private Map<String, StreamDefinition> sds; - - private volatile List<Integer> wisbFieldIndices = new ArrayList<>(); - // reuse PolicyDefinition.defintion.value field to store full set of values - // separated by comma - private volatile PolicyDefinition policyDef; - private volatile Collector<AlertStreamEvent> collector; - private volatile PolicyHandlerContext context; - private volatile NoDataWisbType wisbType; - private volatile DistinctValuesInTimeBatchWindow distinctWindow; - - public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds){ + + private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class); + private Map<String, StreamDefinition> sds; + + private volatile List<Integer> wisbFieldIndices = new ArrayList<>(); + // reuse PolicyDefinition.defintion.value field to store full set of values + // separated by comma + private volatile PolicyDefinition policyDef; + private volatile Collector<AlertStreamEvent> collector; + private volatile PolicyHandlerContext context; + private volatile NoDataWisbType wisbType; + private volatile DistinctValuesInTimeBatchWindow distinctWindow; + + public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds) { this.sds = sds; } - @Override - public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - this.collector = collector; - this.context = context; - this.policyDef = context.getPolicyDefinition(); - List<String> inputStreams = policyDef.getInputStreams(); - // validate inputStreams has to contain only one stream - if (inputStreams.size() != 1) - throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert"); - // validate outputStream has to contain only one stream - if (policyDef.getOutputStreams().size() != 1) - throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert"); - - String is = inputStreams.get(0); - StreamDefinition sd = sds.get(is); - - String policyValue = policyDef.getDefinition().getValue(); - // assume that no data alert policy value consists of "windowPeriod, - // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, - // f2_value} - String[] segments = policyValue.split(","); - this.wisbType = NoDataWisbType.valueOf(segments[1]); - // for provided wisb values, need to parse, for dynamic wisb values, it - // is computed through a window - @SuppressWarnings("rawtypes") - Set wisbValues = null; - if (wisbType == NoDataWisbType.provided) { - wisbValues = new NoDataWisbProvidedParser().parse(segments); - } - long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0])); - distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues); - // populate wisb field names - int numOfFields = Integer.parseInt(segments[2]); - for (int i = 3; i < 3 + numOfFields; i++) { - String fn = segments[i]; - wisbFieldIndices.add(sd.getColumnIndex(fn)); - } - } - - @Override - public void send(StreamEvent event) throws Exception { - Object[] data = event.getData(); - - List<Object> columnValues = new ArrayList<>(); - for (int i = 0; i < wisbFieldIndices.size(); i++) { - Object o = data[wisbFieldIndices.get(i)]; - // convert value to string - columnValues.add(o.toString()); - } - // use local timestamp rather than event timestamp - distinctWindow.send(event, columnValues, System.currentTimeMillis()); - LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues()); - } - - @SuppressWarnings("rawtypes") - public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) { - // compare with wisbValues if wisbValues are already there for dynamic - // type - Collection noDataValues = CollectionUtils.subtract(wisb, wiri); - LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri); - if (noDataValues != null && noDataValues.size() > 0) { - LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb); - - String is = policyDef.getOutputStreams().get(0); - StreamDefinition sd = sds.get(is); - int timestampIndex = sd.getColumnIndex("timestamp"); - int hostIndex = sd.getColumnIndex("host"); - int originalStreamNameIndex = sd.getColumnIndex("originalStreamName"); - - for (Object one : noDataValues) { - Object[] triggerEvent = new Object[sd.getColumns().size()]; - for (int i = 0; i < sd.getColumns().size(); i ++) { - if (i == timestampIndex) { - triggerEvent[i] = System.currentTimeMillis(); - } else if (i == hostIndex) { - triggerEvent[hostIndex] = ((List) one).get(0); - } else if (i == originalStreamNameIndex) { - triggerEvent[originalStreamNameIndex] = event.getStreamId(); - } else if (sd.getColumns().size() < i) { - LOG.error("strema event data have different lenght compare to column definition!"); - } else { - triggerEvent[i] = sd.getColumns().get(i).getDefaultValue(); - } - } - AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent); - LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent))); - collector.emit(alertEvent); - } - - } - } - - private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setTimestamp(timestamp); - event.setData(triggerEvent); - event.setStreamId(policyDef.getOutputStreams().get(0)); - event.setPolicyId(context.getPolicyDefinition().getName()); - if (this.context.getPolicyEvaluator() != null) { - event.setCreatedBy(context.getPolicyEvaluator().getName()); - } - event.setCreatedTime(System.currentTimeMillis()); - event.setSchema(sd); - return event; - } - - @Override - public void close() throws Exception { - - } + @Override + public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { + this.collector = collector; + this.context = context; + this.policyDef = context.getPolicyDefinition(); + List<String> inputStreams = policyDef.getInputStreams(); + // validate inputStreams has to contain only one stream + if (inputStreams.size() != 1) { + throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert"); + } + // validate outputStream has to contain only one stream + if (policyDef.getOutputStreams().size() != 1) { + throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert"); + } + + String is = inputStreams.get(0); + StreamDefinition sd = sds.get(is); + + String policyValue = policyDef.getDefinition().getValue(); + // assume that no data alert policy value consists of "windowPeriod, + // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, + // f2_value} + String[] segments = policyValue.split(","); + this.wisbType = NoDataWisbType.valueOf(segments[1]); + // for provided wisb values, need to parse, for dynamic wisb values, it + // is computed through a window + @SuppressWarnings("rawtypes") + Set wisbValues = null; + if (wisbType == NoDataWisbType.provided) { + wisbValues = new NoDataWisbProvidedParser().parse(segments); + } + long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0])); + distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues); + // populate wisb field names + int numOfFields = Integer.parseInt(segments[2]); + for (int i = 3; i < 3 + numOfFields; i++) { + String fn = segments[i]; + wisbFieldIndices.add(sd.getColumnIndex(fn)); + } + } + + @Override + public void send(StreamEvent event) throws Exception { + Object[] data = event.getData(); + + List<Object> columnValues = new ArrayList<>(); + for (int i = 0; i < wisbFieldIndices.size(); i++) { + Object o = data[wisbFieldIndices.get(i)]; + // convert value to string + columnValues.add(o.toString()); + } + // use local timestamp rather than event timestamp + distinctWindow.send(event, columnValues, System.currentTimeMillis()); + LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues()); + } + + @SuppressWarnings("rawtypes") + public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) { + // compare with wisbValues if wisbValues are already there for dynamic + // type + Collection noDataValues = CollectionUtils.subtract(wisb, wiri); + LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri); + if (noDataValues != null && noDataValues.size() > 0) { + LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb); + + String is = policyDef.getOutputStreams().get(0); + StreamDefinition sd = sds.get(is); + int timestampIndex = sd.getColumnIndex("timestamp"); + int hostIndex = sd.getColumnIndex("host"); + int originalStreamNameIndex = sd.getColumnIndex("originalStreamName"); + + for (Object one : noDataValues) { + Object[] triggerEvent = new Object[sd.getColumns().size()]; + for (int i = 0; i < sd.getColumns().size(); i++) { + if (i == timestampIndex) { + triggerEvent[i] = System.currentTimeMillis(); + } else if (i == hostIndex) { + triggerEvent[hostIndex] = ((List) one).get(0); + } else if (i == originalStreamNameIndex) { + triggerEvent[originalStreamNameIndex] = event.getStreamId(); + } else if (sd.getColumns().size() < i) { + LOG.error("strema event data have different lenght compare to column definition!"); + } else { + triggerEvent[i] = sd.getColumns().get(i).getDefaultValue(); + } + } + AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent); + LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent))); + collector.emit(alertEvent); + } + + } + } + + private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) { + AlertStreamEvent event = new AlertStreamEvent(); + event.setTimestamp(timestamp); + event.setData(triggerEvent); + event.setStreamId(policyDef.getOutputStreams().get(0)); + event.setPolicyId(context.getPolicyDefinition().getName()); + if (this.context.getPolicyEvaluator() != null) { + event.setCreatedBy(context.getPolicyEvaluator().getName()); + } + event.setCreatedTime(System.currentTimeMillis()); + event.setSchema(sd); + return event; + } + + @Override + public void close() throws Exception { + + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java index fe06067..fa27108 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java @@ -25,7 +25,8 @@ import java.util.Set; public interface NoDataWisbParser { /** * parse policy definition and return WISB values for one or multiple fields - * for example host and data center are 2 fields for no data alert, then WISB is a list of two values + * for example host and data center are 2 fields for no data alert, then WISB is a list of two values. + * * @param args some information parsed from policy defintion * @return list of list of field values */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java index e13826a..4f54358 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java @@ -24,19 +24,19 @@ import java.util.Set; /** * Since 6/29/16. */ -public class NoDataWisbProvidedParser implements NoDataWisbParser{ - @Override +public class NoDataWisbProvidedParser implements NoDataWisbParser { /** - * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value" + * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value". */ + @Override public Set<List<String>> parse(String[] args) { int numOfFields = Integer.parseInt(args[2]); Set<List<String>> wisbValues = new HashSet<>(); int i = 3 + numOfFields; - while(i<args.length){ + while (i < args.length) { List<String> fields = new ArrayList<>(); - for(int j=0; j<numOfFields; j++){ - fields.add(args[i+j]); + for (int j = 0; j < numOfFields; j++) { + fields.add(args[i + j]); } wisbValues.add(fields); i += numOfFields; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java index ed04d5a..2f71c7f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java @@ -25,8 +25,8 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent; */ public interface AlertDeduplicator { - AlertStreamEvent dedup(AlertStreamEvent event); + AlertStreamEvent dedup(AlertStreamEvent event); - void setDedupIntervalMin(String intervalMin); + void setDedupIntervalMin(String intervalMin); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java index 8f1c248..fdb01a7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java @@ -1,10 +1,4 @@ -package org.apache.eagle.alert.engine.publisher; - -import java.util.List; - -import org.apache.eagle.alert.engine.coordinator.Publishment; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -20,6 +14,12 @@ import org.apache.eagle.alert.engine.coordinator.Publishment; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.publisher; + +import org.apache.eagle.alert.engine.coordinator.Publishment; + +import java.util.List; + public interface AlertPublishListener { void onPublishChange(List<Publishment> added, List<Publishment> removed, http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java index d24bdb0..4c3a2ad 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java @@ -17,22 +17,22 @@ */ package org.apache.eagle.alert.engine.publisher; -import java.io.Closeable; -import java.util.Map; - import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.impl.PublishStatus; - import com.typesafe.config.Config; +import java.io.Closeable; +import java.util.Map; + /** * Created on 2/10/16. * Notification Plug-in interface which provide abstraction layer to notify to different system */ public interface AlertPublishPlugin extends Closeable { /** - * + * Init alert publish plugin. + * * @param config * @param publishment * @param configProperties - storm config that would be useful for some implementation