http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java deleted file mode 100755 index 628b2e4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.stream.input.InputHandler; - -import java.util.List; -import java.util.Map; - -public class SiddhiPolicyHandler implements PolicyStreamHandler { - private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class); - private ExecutionPlanRuntime executionRuntime; - private SiddhiManager siddhiManager; - private Map<String, StreamDefinition> sds; - private PolicyDefinition policy; - private PolicyHandlerContext context; - - private int currentIndex = 0; // the index of current definition statement inside the policy definition - - public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index) { - this.sds = sds; - this.currentIndex = index; - } - - protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamNotDefinedException { - return SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition,sds); - } - - @Override - public void prepare(final Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - LOG.info("Initializing handler for policy {}", context.getPolicyDefinition()); - this.policy = context.getPolicyDefinition(); - this.siddhiManager = new SiddhiManager(); - String plan = generateExecutionPlan(policy, sds); - try { - this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan); - LOG.info("Created siddhi runtime {}", executionRuntime.getName()); - } catch (Exception parserException) { - LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n", context.getPolicyDefinition().getName(), plan, parserException); - throw parserException; - } - - // add output stream callback - List<String> outputStreams = getOutputStreams(policy); - for (final String outputStream : outputStreams) { - if (executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) { - StreamDefinition streamDefinition = SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream)); - this.executionRuntime.addCallback(outputStream, - new AlertStreamCallback(outputStream, streamDefinition, - collector, context, currentIndex)); - } else { - throw new IllegalStateException("Undefined output stream " + outputStream); - } - } - this.executionRuntime.start(); - this.context = context; - LOG.info("Initialized policy handler for policy: {}", policy.getName()); - } - - protected List<String> getOutputStreams(PolicyDefinition policy) { - return policy.getOutputStreams().isEmpty() ? policy.getDefinition().getOutputStreams() : policy.getOutputStreams(); - } - - public void send(StreamEvent event) throws Exception { - context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count")); - String streamId = event.getStreamId(); - InputHandler inputHandler = executionRuntime.getInputHandler(streamId); - if (inputHandler != null) { - context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count")); - inputHandler.send(event.getTimestamp(), event.getData()); - - if (LOG.isDebugEnabled()) { - LOG.debug("sent event to siddhi stream {} ", streamId); - } - } else { - context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count")); - LOG.warn("No input handler found for stream {}", streamId); - } - } - - public void close() throws Exception { - LOG.info("Closing handler for policy {}", this.policy.getName()); - this.executionRuntime.shutdown(); - LOG.info("Shutdown siddhi runtime {}", this.executionRuntime.getName()); - this.siddhiManager.shutdown(); - LOG.info("Shutdown siddhi manager {}", this.siddhiManager); - LOG.info("Closed handler for policy {}", this.policy.getName()); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder("SiddhiPolicyHandler for policy: "); - sb.append(this.policy == null ? "" : this.policy.getName()); - return sb.toString(); - } - -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java deleted file mode 100644 index 141c819..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -/** - * Created on 7/27/16. - */ -public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler { - - private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyStateHandler.class); - - public SiddhiPolicyStateHandler(Map<String, StreamDefinition> sds, int index) { - super(sds, index); - } - - @Override - protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamNotDefinedException { - StringBuilder builder = new StringBuilder(); - PolicyDefinition.Definition stateDefiniton = policyDefinition.getStateDefinition(); - List<String> inputStreams = stateDefiniton.getInputStreams(); - for (String inputStream : inputStreams) { // the state stream follow the output stream of the policy definition - builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream))); - builder.append("\n"); - } - builder.append(stateDefiniton.value); - if (LOG.isDebugEnabled()) { - LOG.debug("Generated siddhi state execution plan: {} from definiton: {}", builder.toString(), stateDefiniton); - } - return builder.toString(); - } - - @Override - protected List<String> getOutputStreams(PolicyDefinition policy) { - return policy.getStateDefinition().getOutputStreams(); - } - - // more validation on prepare - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java deleted file mode 100644 index ef806fb..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.nodata; - -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class DistinctValuesInTimeBatchWindow { - - private static final Logger LOG = LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class); - - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - // wisb (what is should be) set for expected full set value of multiple columns - @SuppressWarnings("rawtypes") - private volatile Set wisb = new HashSet(); - - private NoDataPolicyTimeBatchHandler handler; - - /** - * map from value to max timestamp for this value. - */ - private Map<Object, Long> valueMaxTimeMap = new HashMap<>(); - - private long startTime = -1; - private long nextEmitTime = -1; - private long timeInMilliSeconds; - - public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler handler, - long timeInMilliSeconds, @SuppressWarnings("rawtypes") Set wisb) { - this.handler = handler; - this.timeInMilliSeconds = timeInMilliSeconds; - if (wisb != null) { - this.wisb = wisb; - } - } - - public Map<Object, Long> distinctValues() { - return valueMaxTimeMap; - } - - public void send(StreamEvent event, Object value, long timestamp) { - synchronized (this) { - if (startTime < 0) { - startTime = System.currentTimeMillis(); - - scheduler.scheduleAtFixedRate(new Runnable() { - - @SuppressWarnings( {"unchecked", "rawtypes"}) - @Override - public void run() { - try { - LOG.info("{}/{}: {}", startTime, nextEmitTime, valueMaxTimeMap.keySet()); - synchronized (valueMaxTimeMap) { - boolean sendAlerts = false; - - if (nextEmitTime < 0) { - nextEmitTime = startTime + timeInMilliSeconds; - } - - if (System.currentTimeMillis() > nextEmitTime) { - startTime = nextEmitTime; - nextEmitTime += timeInMilliSeconds; - sendAlerts = true; - } else { - sendAlerts = false; - } - - if (sendAlerts) { - // alert - handler.compareAndEmit(wisb, distinctValues().keySet(), event); - LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), wisb); - - if (distinctValues().keySet().size() > 0) { - wisb = new HashSet(distinctValues().keySet()); - } - valueMaxTimeMap.clear(); - LOG.info("Clear wiri & update wisb to {}", wisb); - } - } - } catch (Throwable t) { - LOG.error("failed to run batch window for gap alert", t); - } - } - - }, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS); - } - } - - if (valueMaxTimeMap.containsKey(value)) { - // remove that entry with old timestamp in timeSortedMap - long oldTime = valueMaxTimeMap.get(value); - if (oldTime >= timestamp) { - // no any effect as the new timestamp is equal or even less than - // old timestamp - return; - } - } - // update new timestamp in valueMaxTimeMap - valueMaxTimeMap.put(value, timestamp); - - LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), startTime, nextEmitTime); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java deleted file mode 100644 index 4aae040..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.nodata; - -import org.apache.commons.lang.builder.HashCodeBuilder; - -import java.util.*; - -/** - * Since 6/28/16. - * to get distinct values within a specified time window - * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had - * timeSortedMap : map sorted by timestamp first and then value - * With the above 2 data structure, we can get distinct values in LOG(N). - */ -public class DistinctValuesInTimeWindow { - public static class ValueAndTime { - Object value; - long timestamp; - - public ValueAndTime(Object value, long timestamp) { - this.value = value; - this.timestamp = timestamp; - } - - public String toString() { - return "[" + value + "," + timestamp + "]"; - } - - public int hashCode() { - return new HashCodeBuilder().append(value).append(timestamp).toHashCode(); - } - - public boolean equals(Object that) { - if (!(that instanceof ValueAndTime)) { - return false; - } - ValueAndTime another = (ValueAndTime) that; - return another.timestamp == this.timestamp && another.value.equals(this.value); - } - } - - public static class ValueAndTimeComparator implements Comparator<ValueAndTime> { - @Override - public int compare(ValueAndTime o1, ValueAndTime o2) { - if (o1.timestamp != o2.timestamp) { - return (o1.timestamp > o2.timestamp) ? 1 : -1; - } - if (o1.value.equals(o2.value)) { - return 0; - } else { - // this is not strictly correct, but I don't want to write too many comparators here :-) - if (o1.hashCode() > o2.hashCode()) { - return 1; - } else { - return -1; - } - } - } - } - - /** - * map from value to max timestamp for this value. - */ - private Map<Object, Long> valueMaxTimeMap = new HashMap<>(); - /** - * map sorted by time(max timestamp for the value) and then value. - */ - private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator()); - private long maxTimestamp = 0L; - private long window; - private boolean windowSlided; - - /** - * @param window - milliseconds. - */ - public DistinctValuesInTimeWindow(long window) { - this.window = window; - } - - public void send(Object value, long timestamp) { - ValueAndTime vt = new ValueAndTime(value, timestamp); - - // todo think of time out of order - if (valueMaxTimeMap.containsKey(value)) { - // remove that entry with old timestamp in timeSortedMap - long oldTime = valueMaxTimeMap.get(value); - if (oldTime >= timestamp) { - // no any effect as the new timestamp is equal or even less than old timestamp - return; - } - timeSortedMap.remove(new ValueAndTime(value, oldTime)); - } - // insert entry with new timestamp in timeSortedMap - timeSortedMap.put(vt, vt); - // update new timestamp in valueMaxTimeMap - valueMaxTimeMap.put(value, timestamp); - - // evict old entries - // store max timestamp if possible - maxTimestamp = Math.max(maxTimestamp, timestamp); - - // check if some values should be evicted because of time window - Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<ValueAndTime, ValueAndTime> entry = it.next(); - if (entry.getKey().timestamp < maxTimestamp - window) { - // should remove the entry in valueMaxTimeMap and timeSortedMap - valueMaxTimeMap.remove(entry.getKey().value); - windowSlided = true; - - it.remove(); - } else { - break; - } - } - } - - public Map<Object, Long> distinctValues() { - return valueMaxTimeMap; - } - - public boolean windowSlided() { - return windowSlided; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java deleted file mode 100644 index ec6e6e9..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.nodata; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.utils.TimePeriodUtils; -import org.apache.commons.collections.CollectionUtils; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * Since 6/28/16. - * No Data Policy engine - * based on the following information - * 1. stream definition: group by columns - * 2. timestamp field: timestamp column - * 3. wiri safe time window: how long window is good for full set of wiri - * 4. wisb: full set - * No data policy definition should include - * fixed fields and dynamic fields - * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name - * dynamic fields depend on wisb type. - * policy would be like: - * { - * "name": "noDataAlertPolicy", - * "description": "noDataAlertPolicy", - * "inputStreams": [ - * "noDataAlertStream" - * ], - * "outputStreams": [ - * "noDataAlertStream_out" - * ], - * "definition": { - * "type": "nodataalert", - * "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host" - * }, - * "partitionSpec": [ - * { - * "streamId": "noDataAlertStream", - * "type": "GROUPBY" - * } - * ], - * "parallelismHint": 2 - * } - * "name": "noDataAlertPolicy", - * "description": "noDataAlertPolicy", - * "inputStreams": [ - * "noDataAlertStream" - * ], - * "outputStreams": [ - * "noDataAlertStream_out" - * ], - * "definition": { - * "type": "nodataalert", - * "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host" - * }, - * "partitionSpec": [ - * { - * "streamId": "noDataAlertStream", - * "type": "GROUPBY" - * } - * ], - * "parallelismHint": 2 - * } - */ -public class NoDataPolicyHandler implements PolicyStreamHandler { - private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class); - private Map<String, StreamDefinition> sds; - - // wisb(what is should be) set for expected full set value of multiple columns - @SuppressWarnings("rawtypes") - private volatile Set wisbValues = null; - private volatile List<Integer> wisbFieldIndices = new ArrayList<>(); - // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma - private volatile PolicyDefinition policyDef; - private volatile Collector<AlertStreamEvent> collector; - private volatile PolicyHandlerContext context; - private volatile NoDataWisbType wisbType; - private volatile DistinctValuesInTimeWindow distinctWindow; - - public NoDataPolicyHandler(Map<String, StreamDefinition> sds) { - this.sds = sds; - } - - @Override - public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - this.collector = collector; - this.context = context; - this.policyDef = context.getPolicyDefinition(); - List<String> inputStreams = policyDef.getInputStreams(); - // validate inputStreams has to contain only one stream - if (inputStreams.size() != 1) { - throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert"); - } - // validate outputStream has to contain only one stream - if (policyDef.getOutputStreams().size() != 1) { - throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert"); - } - - String is = inputStreams.get(0); - StreamDefinition sd = sds.get(is); - - String policyValue = policyDef.getDefinition().getValue(); - // assume that no data alert policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value} - String[] segments = policyValue.split(","); - long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0])); - distinctWindow = new DistinctValuesInTimeWindow(windowPeriod); - this.wisbType = NoDataWisbType.valueOf(segments[1]); - // for provided wisb values, need to parse, for dynamic wisb values, it is computed through a window - if (wisbType == NoDataWisbType.provided) { - wisbValues = new NoDataWisbProvidedParser().parse(segments); - } - // populate wisb field names - int numOfFields = Integer.parseInt(segments[2]); - for (int i = 3; i < 3 + numOfFields; i++) { - String fn = segments[i]; - wisbFieldIndices.add(sd.getColumnIndex(fn)); - } - } - - @SuppressWarnings( {"rawtypes", "unchecked"}) - @Override - public void send(StreamEvent event) throws Exception { - Object[] data = event.getData(); - List<Object> columnValues = new ArrayList<>(); - for (int i = 0; i < wisbFieldIndices.size(); i++) { - Object o = data[wisbFieldIndices.get(i)]; - // convert value to string - columnValues.add(o.toString()); - } - distinctWindow.send(columnValues, event.getTimestamp()); - Set wiriValues = distinctWindow.distinctValues().keySet(); - - LOG.debug("window slided: {}, with wiri: {}", distinctWindow.windowSlided(), distinctWindow.distinctValues()); - - if (distinctWindow.windowSlided()) { - compareAndEmit(wisbValues, wiriValues, event); - } - - if (wisbType == NoDataWisbType.dynamic) { - // deep copy - wisbValues = new HashSet<>(wiriValues); - } - } - - @SuppressWarnings("rawtypes") - private void compareAndEmit(Set wisb, Set wiri, StreamEvent event) { - // compare with wisbValues if wisbValues are already there for dynamic type - Collection noDataValues = CollectionUtils.subtract(wisb, wiri); - LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri); - if (noDataValues != null && noDataValues.size() > 0) { - LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisbValues); - AlertStreamEvent alertEvent = createAlertEvent(event.getTimestamp(), event.getData()); - collector.emit(alertEvent); - } - } - - private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent) { - String is = policyDef.getInputStreams().get(0); - final StreamDefinition sd = sds.get(is); - - AlertStreamEvent event = new AlertStreamEvent(); - event.setTimestamp(timestamp); - event.setData(triggerEvent); - event.setStreamId(policyDef.getOutputStreams().get(0)); - event.setPolicyId(context.getPolicyDefinition().getName()); - if (this.context.getPolicyEvaluator() != null) { - event.setCreatedBy(context.getPolicyEvaluator().getName()); - } - event.setCreatedTime(System.currentTimeMillis()); - event.setSchema(sd); - return event; - } - - @Override - public void close() throws Exception { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java deleted file mode 100644 index b1e32bd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.nodata; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.utils.TimePeriodUtils; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; - -public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler { - - private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class); - private Map<String, StreamDefinition> sds; - - private volatile List<Integer> wisbFieldIndices = new ArrayList<>(); - // reuse PolicyDefinition.defintion.value field to store full set of values - // separated by comma - private volatile PolicyDefinition policyDef; - private volatile Collector<AlertStreamEvent> collector; - private volatile PolicyHandlerContext context; - private volatile NoDataWisbType wisbType; - private volatile DistinctValuesInTimeBatchWindow distinctWindow; - - public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds) { - this.sds = sds; - } - - @Override - public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - this.collector = collector; - this.context = context; - this.policyDef = context.getPolicyDefinition(); - List<String> inputStreams = policyDef.getInputStreams(); - // validate inputStreams has to contain only one stream - if (inputStreams.size() != 1) { - throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert"); - } - // validate outputStream has to contain only one stream - if (policyDef.getOutputStreams().size() != 1) { - throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert"); - } - - String policyValue = policyDef.getDefinition().getValue(); - // assume that no data alert policy value consists of "windowPeriod, - // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, - // f2_value} - String[] segments = policyValue.split(","); - this.wisbType = NoDataWisbType.valueOf(segments[1]); - // for provided wisb values, need to parse, for dynamic wisb values, it - // is computed through a window - Set<String> wisbValues = new HashSet<String>(); - if (wisbType == NoDataWisbType.provided) { - for (int i = 2; i < segments.length; i++) { - wisbValues.add(segments[i]); - } - } - - long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0])); - distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues); - // populate wisb field names - String is = inputStreams.get(0); - StreamDefinition sd = sds.get(is); - String nodataColumnNameKey = "nodataColumnName"; - if (!policyDef.getDefinition().getProperties().containsKey(nodataColumnNameKey)) { - throw new IllegalArgumentException("policy nodata column name has to be defined for no data alert"); - } - wisbFieldIndices.add(sd.getColumnIndex((String) policyDef.getDefinition().getProperties().get(nodataColumnNameKey))); - } - - @Override - public void send(StreamEvent event) throws Exception { - Object[] data = event.getData(); - - List<Object> columnValues = new ArrayList<>(); - for (int i = 0; i < wisbFieldIndices.size(); i++) { - Object o = data[wisbFieldIndices.get(i)]; - // convert value to string - columnValues.add(o.toString()); - } - // use local timestamp rather than event timestamp - distinctWindow.send(event, columnValues, System.currentTimeMillis()); - LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues()); - } - - @SuppressWarnings("rawtypes") - public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) { - // compare with wisbValues if wisbValues are already there for dynamic - // type - Collection noDataValues = CollectionUtils.subtract(wisb, wiri); - LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri); - if (noDataValues != null && noDataValues.size() > 0) { - LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb); - - String is = policyDef.getOutputStreams().get(0); - StreamDefinition sd = sds.get(is); - int timestampIndex = sd.getColumnIndex("timestamp"); - int hostIndex = sd.getColumnIndex("host"); - int originalStreamNameIndex = sd.getColumnIndex("originalStreamName"); - - for (Object one : noDataValues) { - Object[] triggerEvent = new Object[sd.getColumns().size()]; - for (int i = 0; i < sd.getColumns().size(); i++) { - if (i == timestampIndex) { - triggerEvent[i] = System.currentTimeMillis(); - } else if (i == hostIndex) { - triggerEvent[hostIndex] = ((List) one).get(0); - } else if (i == originalStreamNameIndex) { - triggerEvent[originalStreamNameIndex] = event.getStreamId(); - } else if (sd.getColumns().size() < i) { - LOG.error("strema event data have different lenght compare to column definition!"); - } else { - triggerEvent[i] = sd.getColumns().get(i).getDefaultValue(); - } - } - AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent); - LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent))); - collector.emit(alertEvent); - } - - } - } - - private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setTimestamp(timestamp); - event.setData(triggerEvent); - event.setStreamId(policyDef.getOutputStreams().get(0)); - event.setPolicyId(context.getPolicyDefinition().getName()); - if (this.context.getPolicyEvaluator() != null) { - event.setCreatedBy(context.getPolicyEvaluator().getName()); - } - event.setCreatedTime(System.currentTimeMillis()); - event.setSchema(sd); - return event; - } - - @Override - public void close() throws Exception { - - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java deleted file mode 100644 index fa27108..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.nodata; - -import java.util.List; -import java.util.Set; - -/** - * Since 6/29/16. - */ -public interface NoDataWisbParser { - /** - * parse policy definition and return WISB values for one or multiple fields - * for example host and data center are 2 fields for no data alert, then WISB is a list of two values. - * - * @param args some information parsed from policy defintion - * @return list of list of field values - */ - Set<List<String>> parse(String[] args); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java deleted file mode 100644 index 4f54358..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.nodata; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Since 6/29/16. - */ -public class NoDataWisbProvidedParser implements NoDataWisbParser { - /** - * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value". - */ - @Override - public Set<List<String>> parse(String[] args) { - int numOfFields = Integer.parseInt(args[2]); - Set<List<String>> wisbValues = new HashSet<>(); - int i = 3 + numOfFields; - while (i < args.length) { - List<String> fields = new ArrayList<>(); - for (int j = 0; j < numOfFields; j++) { - fields.add(args[i + j]); - } - wisbValues.add(fields); - i += numOfFields; - } - return wisbValues; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java deleted file mode 100644 index 887d099..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.nodata; - -/** - * Since 6/29/16. - */ -public enum NoDataWisbType { - provided, - dynamic -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java deleted file mode 100644 index 7ecc36f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.interpreter; - -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.wso2.siddhi.query.api.ExecutionPlan; - -import java.util.List; -import java.util.Map; - -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -public class PolicyExecutionPlan { - /** - * Actual input streams. - */ - private Map<String, List<StreamColumn>> inputStreams; - - /** - * Actual output streams. - */ - private Map<String, List<StreamColumn>> outputStreams; - - /** - * Execution plan source. - */ - private String executionPlanSource; - - /** - * Execution plan. - */ - private ExecutionPlan internalExecutionPlan; - - private String executionPlanDesc; - - private List<StreamPartition> streamPartitions; - - public String getExecutionPlanSource() { - return executionPlanSource; - } - - public void setExecutionPlanSource(String executionPlanSource) { - this.executionPlanSource = executionPlanSource; - } - - public ExecutionPlan getInternalExecutionPlan() { - return internalExecutionPlan; - } - - public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) { - this.internalExecutionPlan = internalExecutionPlan; - } - - public String getExecutionPlanDesc() { - return executionPlanDesc; - } - - public void setExecutionPlanDesc(String executionPlanDesc) { - this.executionPlanDesc = executionPlanDesc; - } - - public List<StreamPartition> getStreamPartitions() { - return streamPartitions; - } - - public void setStreamPartitions(List<StreamPartition> streamPartitions) { - this.streamPartitions = streamPartitions; - } - - public Map<String, List<StreamColumn>> getInputStreams() { - return inputStreams; - } - - public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) { - this.inputStreams = inputStreams; - } - - public Map<String, List<StreamColumn>> getOutputStreams() { - return outputStreams; - } - - public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) { - this.outputStreams = outputStreams; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java deleted file mode 100644 index b8e5e42..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.interpreter; - -/** - * Keep PolicyExecutionPlanner as simple and fast as possible (avoid any backend data exchanging). - */ -interface PolicyExecutionPlanner { - /** - * @return PolicyExecutionPlan. - */ - PolicyExecutionPlan getExecutionPlan(); - - static PolicyExecutionPlan parseExecutionPlan(String executionPlan) throws Exception { - return new PolicyExecutionPlannerImpl(executionPlan).getExecutionPlan(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java deleted file mode 100644 index 4e6901d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.interpreter; - -import com.google.common.base.Preconditions; -import org.apache.commons.collections.ListUtils; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.exception.DefinitionNotExistException; -import org.wso2.siddhi.query.api.ExecutionPlan; -import org.wso2.siddhi.query.api.exception.DuplicateDefinitionException; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; -import org.wso2.siddhi.query.api.execution.ExecutionElement; -import org.wso2.siddhi.query.api.execution.query.Query; -import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler; -import org.wso2.siddhi.query.api.execution.query.input.handler.Window; -import org.wso2.siddhi.query.api.execution.query.input.state.*; -import org.wso2.siddhi.query.api.execution.query.input.stream.*; -import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream; -import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute; -import org.wso2.siddhi.query.api.execution.query.selection.Selector; -import org.wso2.siddhi.query.api.expression.Expression; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.condition.Compare; -import org.wso2.siddhi.query.api.expression.constant.IntConstant; -import org.wso2.siddhi.query.api.expression.constant.LongConstant; -import org.wso2.siddhi.query.api.expression.constant.TimeConstant; -import org.wso2.siddhi.query.compiler.SiddhiCompiler; - -import java.util.*; -import java.util.stream.Collectors; - -class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner { - - private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class); - - /** - * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow. - */ - private static final String WINDOW_EXTERNAL_TIME = "externalTime"; - - private final String executionPlan; - private final Map<String,List<StreamColumn>> effectiveInputStreams; - private final Map<String, String> effectiveInputStreamsAlias; - private final Map<String,List<StreamColumn>> effectiveOutputStreams; - private final Map<String,StreamPartition> effectivePartitions; - private final PolicyExecutionPlan policyExecutionPlan; - - public PolicyExecutionPlannerImpl(String executionPlan) throws Exception { - this.executionPlan = executionPlan; - this.effectiveInputStreams = new HashMap<>(); - this.effectiveInputStreamsAlias = new HashMap<>(); - this.effectiveOutputStreams = new HashMap<>(); - this.effectivePartitions = new HashMap<>(); - this.policyExecutionPlan = doParse(); - } - - @Override - public PolicyExecutionPlan getExecutionPlan() { - return policyExecutionPlan; - } - - private PolicyExecutionPlan doParse() throws Exception { - PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan(); - try { - ExecutionPlan executionPlan = SiddhiCompiler.parse(this.executionPlan); - - policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString()); - - // Set current execution plan as valid - policyExecutionPlan.setExecutionPlanSource(this.executionPlan); - policyExecutionPlan.setInternalExecutionPlan(executionPlan); - - - // Go through execution element - for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) { - // ------------- - // Explain Query - // ------------- - if (executionElement instanceof Query) { - // ----------------------- - // Query Level Variables - // ----------------------- - InputStream inputStream = ((Query) executionElement).getInputStream(); - Selector selector = ((Query) executionElement).getSelector(); - Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>(); - - // Inputs stream definitions - for (String streamId : inputStream.getUniqueStreamIds()) { - if (!effectiveInputStreams.containsKey(streamId)) { - org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId); - if (streamDefinition != null) { - effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns()); - } else { - effectiveInputStreams.put(streamId, null); - } - } - } - - // Window Spec and Partition - if (inputStream instanceof SingleInputStream) { - retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping); - retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector)); - } else { - if (inputStream instanceof JoinInputStream) { - // Only Support JOIN/INNER_JOIN Now - if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) { - SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream(); - SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream(); - - retrievePartition(findStreamPartition(leftInputStream, selector)); - retrievePartition(findStreamPartition(rightInputStream, selector)); - retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping); - retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping); - - } else { - throw new ExecutionPlanValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN"); - } - - Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare(); - - if (joinCondition != null) { - if (joinCondition instanceof Compare) { - if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) { - Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression(); - Preconditions.checkNotNull(leftExpression.getStreamId()); - Preconditions.checkNotNull(leftExpression.getAttributeName()); - - StreamPartition leftPartition = new StreamPartition(); - leftPartition.setType(StreamPartition.Type.GROUPBY); - leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName())); - leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping)); - retrievePartition(leftPartition); - - Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression(); - Preconditions.checkNotNull(rightExpression.getStreamId()); - Preconditions.checkNotNull(rightExpression.getAttributeName()); - StreamPartition rightPartition = new StreamPartition(); - rightPartition.setType(StreamPartition.Type.GROUPBY); - rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName())); - rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping)); - retrievePartition(leftPartition); - } else { - throw new ExecutionPlanValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition); - } - } else { - throw new ExecutionPlanValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition); - } - } - } else if (inputStream instanceof StateInputStream) { - // Group By Spec - List<Variable> groupBy = selector.getGroupByList(); - if (groupBy.size() >= 0) { - Map<String, List<Variable>> streamGroupBy = new HashMap<>(); - for (String streamId : inputStream.getUniqueStreamIds()) { - streamGroupBy.put(streamId, new ArrayList<>()); - } - - collectStreamReferenceIdMapping(((StateInputStream)inputStream).getStateElement()); - - for (Variable variable : groupBy) { - // Not stream not set, then should be all streams' same field - if (variable.getStreamId() == null) { - for (String streamId : inputStream.getUniqueStreamIds()) { - streamGroupBy.get(streamId).add(variable); - } - } else { - String streamId = variable.getStreamId(); - if (!this.effectiveInputStreamsAlias.containsKey(streamId)) { - streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping); - } else { - streamId = this.effectiveInputStreamsAlias.get(streamId); - } - if (streamGroupBy.containsKey(streamId)) { - streamGroupBy.get(streamId).add(variable); - } else { - throw new DefinitionNotExistException(streamId); - } - } - } - for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) { - if (entry.getValue().size() > 0) { - StreamPartition partition = generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()]))); - if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN) - || ((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.SEQUENCE)) { - if (effectivePartitions.containsKey(partition.getStreamId())) { - StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId()); - if (!existingPartition.equals(partition) - && existingPartition.getType().equals(partition.getType()) - && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())) { - partition.setSortSpec(existingPartition.getSortSpec()); - } - } - } - retrievePartition(partition); - } - } - } - } - } - - // Output streams - OutputStream outputStream = ((Query) executionElement).getOutputStream(); - effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList())); - } else { - LOG.warn("Unhandled execution element: {}", executionElement.toString()); - } - } - // Set effective input streams - policyExecutionPlan.setInputStreams(effectiveInputStreams); - - // Set effective output streams - policyExecutionPlan.setOutputStreams(effectiveOutputStreams); - - // Set Partitions - for (String streamId : effectiveInputStreams.keySet()) { - // Use shuffle partition by default - if (!effectivePartitions.containsKey(streamId)) { - StreamPartition shufflePartition = new StreamPartition(); - shufflePartition.setStreamId(streamId); - shufflePartition.setType(StreamPartition.Type.SHUFFLE); - effectivePartitions.put(streamId, shufflePartition); - } - } - policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values())); - } catch (Exception ex) { - LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex); - throw ex; - } - return policyExecutionPlan; - } - - private void collectStreamReferenceIdMapping(StateElement stateElement) { - if (stateElement instanceof LogicalStateElement) { - collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement1()); - collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement2()); - } else if (stateElement instanceof CountStateElement) { - collectStreamReferenceIdMapping(((CountStateElement) stateElement).getStreamStateElement()); - } else if (stateElement instanceof EveryStateElement) { - collectStreamReferenceIdMapping(((EveryStateElement) stateElement).getStateElement()); - } else if (stateElement instanceof NextStateElement) { - collectStreamReferenceIdMapping(((NextStateElement) stateElement).getStateElement()); - collectStreamReferenceIdMapping(((NextStateElement) stateElement).getNextStateElement()); - } else if (stateElement instanceof StreamStateElement) { - BasicSingleInputStream basicSingleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream(); - this.effectiveInputStreamsAlias.put(basicSingleInputStream.getStreamReferenceId(), basicSingleInputStream.getStreamId()); - } - } - - private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) { - Preconditions.checkNotNull(variable.getStreamId(), "streamId"); - if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) { - throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId()); - } else if (streamMap.containsKey(variable.getStreamId())) { - return variable.getStreamId(); - } else if (aliasMap.containsKey(variable.getStreamId())) { - return aliasMap.get(variable.getStreamId()).getStreamId(); - } else { - throw new DefinitionNotExistException(variable.getStreamId()); - } - } - - private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) { - // Window Spec - List<Window> windows = new ArrayList<>(); - for (StreamHandler streamHandler : inputStream.getStreamHandlers()) { - if (streamHandler instanceof Window) { - windows.add((Window) streamHandler); - } - } - - // Group By Spec - List<Variable> groupBy = selector.getGroupByList(); - if (windows.size() > 0 || groupBy.size() >= 0) { - return generatePartition(inputStream.getStreamId(), windows, groupBy); - } else { - return null; - } - } - - private void retrievePartition(StreamPartition partition) { - if (partition == null) { - return; - } - - if (!effectivePartitions.containsKey(partition.getStreamId())) { - effectivePartitions.put(partition.getStreamId(), partition); - } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) { - StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId()); - // If same Type & Columns but different sort spec, then use larger - if (existingPartition.getType().equals(partition.getType()) - && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns()) - && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis() - || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) { - effectivePartitions.put(partition.getStreamId(), partition); - } else { - // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode - throw new ExecutionPlanValidationException("You have incompatible partitions on stream " + partition.getStreamId() - + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + ""); - } - } - } - - private void retrieveAliasForQuery(SingleInputStream inputStream, Map<String, SingleInputStream> aliasStreamMapping) { - if (inputStream.getStreamReferenceId() != null) { - if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) { - throw new ExecutionPlanValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream); - } else { - aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream); - } - } - } - - private StreamPartition generatePartition(String streamId, List<Window> windows, List<Variable> groupBy) { - StreamPartition partition = new StreamPartition(); - partition.setStreamId(streamId); - StreamSortSpec sortSpec = null; - if (windows != null && windows.size() > 0) { - for (Window window : windows) { - if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) { - sortSpec = new StreamSortSpec(); - sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window)); - sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5); - } - } - } - partition.setSortSpec(sortSpec); - if (groupBy != null && groupBy.size() > 0) { - partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList())); - partition.setType(StreamPartition.Type.GROUPBY); - } else { - partition.setType(StreamPartition.Type.SHUFFLE); - } - return partition; - } - - private static int getExternalTimeWindowSize(Window window) { - Expression windowSize = window.getParameters()[1]; - if (windowSize instanceof TimeConstant) { - return ((TimeConstant) windowSize).getValue().intValue(); - } else if (windowSize instanceof IntConstant) { - return ((IntConstant) windowSize).getValue(); - } else if (windowSize instanceof LongConstant) { - return ((LongConstant) windowSize).getValue().intValue(); - } else { - throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString()); - } - } - - private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) { - return outputAttributeList.stream().map(outputAttribute -> { - StreamColumn streamColumn = new StreamColumn(); - streamColumn.setName(outputAttribute.getRename()); - streamColumn.setDescription(outputAttribute.getExpression().toString()); - return streamColumn; - }).collect(Collectors.toList()); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java deleted file mode 100644 index 4add3ff..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.interpreter; - -import com.google.common.base.Preconditions; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers; -import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; - -/** - * PolicyInterpreter Helper Methods: - * <ul> - * <li>Parse: parse siddhi query and generate static execution plan</li> - * <li>Validate: validate policy definition with execution plan and metadata</li> - * </ul> - * - * @see PolicyExecutionPlanner - * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a> - */ -public class PolicyInterpreter { - private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class); - - /** - * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow. - */ - private static final String WINDOW_EXTERNAL_TIME = "externalTime"; - - public static PolicyParseResult parse(String executionPlanQuery) { - PolicyParseResult policyParseResult = new PolicyParseResult(); - try { - policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery)); - policyParseResult.setSuccess(true); - policyParseResult.setMessage("Parsed successfully"); - } catch (Exception exception) { - LOG.error("Got error to parse policy: {}", executionPlanQuery, exception); - policyParseResult.setSuccess(false); - policyParseResult.setMessage(exception.getMessage()); - policyParseResult.setStackTrace(exception); - } - return policyParseResult; - } - - /** - * Quick parseExecutionPlan policy. - */ - public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception { - // Validate inputStreams are valid - Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from"); - return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions)); - } - - public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception { - return PolicyExecutionPlanner.parseExecutionPlan(executionPlanQuery); - } - - public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) { - Map<String, StreamDefinition> inputDefinitions = new HashMap<>(); - PolicyValidationResult policyValidationResult = new PolicyValidationResult(); - policyValidationResult.setPolicyDefinition(policy); - try { - if (policy.getInputStreams() != null) { - for (String streamId : policy.getInputStreams()) { - if (allDefinitions.containsKey(streamId)) { - inputDefinitions.put(streamId, allDefinitions.get(streamId)); - } else { - throw new StreamNotDefinedException(streamId); - } - } - } - - PolicyExecutionPlan policyExecutionPlan = null; - if (PolicyStreamHandlers.SIDDHI_ENGINE.equalsIgnoreCase(policy.getDefinition().getType())) { - policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions); - // Validate output - if (policy.getOutputStreams() != null) { - for (String outputStream : policy.getOutputStreams()) { - if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) { - throw new StreamNotDefinedException("Output stream " + outputStream + " not defined"); - } - } - } - } - policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan); - policyValidationResult.setSuccess(true); - policyValidationResult.setMessage("Validated successfully"); - } catch (Exception exception) { - LOG.error("Got error to validate policy definition: {}", policy, exception); - policyValidationResult.setSuccess(false); - policyValidationResult.setMessage(exception.getMessage()); - policyValidationResult.setStackTrace(exception); - } - - return policyValidationResult; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java deleted file mode 100644 index a0f3ad2..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.interpreter; - -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.commons.lang3.exception.ExceptionUtils; - -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -public class PolicyParseResult { - private boolean success; - private String message; - private String exception; - - private PolicyExecutionPlan policyExecutionPlan; - - public String getException() { - return exception; - } - - public void setException(String exception) { - this.exception = exception; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public void setStackTrace(Throwable throwable) { - this.setException(ExceptionUtils.getStackTrace(throwable)); - } - - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public PolicyExecutionPlan getPolicyExecutionPlan() { - return policyExecutionPlan; - } - - public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) { - this.policyExecutionPlan = policyExecutionPlan; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java deleted file mode 100644 index 17f6091..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.interpreter; - - -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; - -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -public class PolicyValidationResult { - private boolean success; - private String message; - private String exception; - - private PolicyExecutionPlan policyExecutionPlan; - private PolicyDefinition policyDefinition; - - public String getException() { - return exception; - } - - public void setException(String exception) { - this.exception = exception; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public void setStackTrace(Throwable throwable) { - this.setException(ExceptionUtils.getStackTrace(throwable)); - } - - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public PolicyExecutionPlan getPolicyExecutionPlan() { - return policyExecutionPlan; - } - - public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) { - this.policyExecutionPlan = policyExecutionPlan; - } - - public PolicyDefinition getPolicyDefinition() { - return policyDefinition; - } - - public void setPolicyDefinition(PolicyDefinition policyDefinition) { - this.policyDefinition = policyDefinition; - } -} \ No newline at end of file
