http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java deleted file mode 100755 index 2ffccaa..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator.impl; - -import org.apache.eagle.alert.config.ConfigBusConsumer; -import org.apache.eagle.alert.config.ConfigChangeCallback; -import org.apache.eagle.alert.config.ConfigValue; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.coordination.model.*; -import org.apache.eagle.alert.engine.coordinator.MetadataType; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -/** - * <b>TODO</b>: performance tuning: It is not JVM level service, so it may cause - * zookeeper burden in case of too many listeners This does not support - * dynamically adding topic, all topics should be available when service object - * is created. - * ZK path format is as following: - * <ul> - * <li>/alert/topology_1/spout</li> - * <li>/alert/topology_1/router</li> - * <li>/alert/topology_1/alert</li> - * <li>/alert/topology_1/publisher</li> - * </ul> - */ -public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements ConfigChangeCallback { - private static final long serialVersionUID = -1509237694501235144L; - private static final Logger LOG = LoggerFactory.getLogger(ZKMetadataChangeNotifyService.class); - private ZKConfig zkConfig; - private String topologyId; - private ConfigBusConsumer consumer; - - private transient IMetadataServiceClient client; - - public ZKMetadataChangeNotifyService(ZKConfig config, String topologyId) { - this.zkConfig = config; - this.topologyId = topologyId; - } - - @Override - public void init(Config config, MetadataType type) { - super.init(config, type); - client = new MetadataServiceClientImpl(config); - consumer = new ConfigBusConsumer(zkConfig, topologyId + "/" + getMetadataTopicSuffix(), this); - LOG.info("init called for client"); - } - - @Override - public void activateFetchMetaData() throws Exception { - this.onNewConfig(consumer.getConfigValue()); - } - - private String getMetadataTopicSuffix() { - switch (type) { - case ALERT_BOLT: - return "alert"; - case ALERT_PUBLISH_BOLT: - return "publisher"; - case SPOUT: - return "spout"; - case STREAM_ROUTER_BOLT: - return "router"; - default: - throw new RuntimeException(String.format("unexpected metadata type: %s !", type)); - } - } - - @Override - public void close() throws IOException { - consumer.close(); - LOG.info("Config consumer closed"); - } - - @Override - public void onNewConfig(ConfigValue value) { - LOG.info("Metadata changed {}", value); - - if (client == null) { - LOG.error("OnNewConfig trigger, but metadata service client is null. Metadata type {}", type); - return; - } - - // analyze config value and notify corresponding listeners - String version = value.getValue().toString(); - // brute-force load all: this might introduce load's on metadata service. - // FIXME : after ScheduleState persisted with better normalization, load - // state based on type and version - ScheduleState state = client.getVersionedSpec(version); - if (state == null) { - LOG.error("Failed to load schedule state of version {}, this is possibly a bug, pls check coordinator log !", version); - return; - } - Map<String, StreamDefinition> sds = getStreams(state.getStreamSnapshots()); - switch (type) { - case ALERT_BOLT: - // we might query metadata service query get metadata snapshot and StreamDefinition - AlertBoltSpec alertSpec = state.getAlertSpecs().get(topologyId); - if (alertSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - prePopulate(alertSpec, state.getPolicySnapshots()); - notifyAlertBolt(alertSpec, sds); - } - break; - case ALERT_PUBLISH_BOLT: - PublishSpec pubSpec = state.getPublishSpecs().get(topologyId); - if (pubSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - notifyAlertPublishBolt(pubSpec, sds); - if (state.getAlertSpecs().get(topologyId) != null) { - notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds); - } - } - break; - case SPOUT: - SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId); - if (spoutSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - notifySpout(spoutSpec, sds); - } - break; - case STREAM_ROUTER_BOLT: - RouterSpec gSpec = state.getGroupSpecs().get(topologyId); - if (gSpec == null) { - LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); - } else { - notifyStreamRouterBolt(gSpec, sds); - } - break; - default: - LOG.error("unexpected metadata type: {} ", type); - } - } - - private void prePopulate(AlertBoltSpec alertSpec, List<VersionedPolicyDefinition> list) { - Map<String, PolicyDefinition> policyMap = listToMap(list); - for (Entry<String, List<String>> policyEntry : alertSpec.getBoltPolicyIdsMap().entrySet()) { - List<PolicyDefinition> pds = alertSpec.getBoltPoliciesMap().get(policyEntry.getKey()); - if (pds == null) { - pds = new ArrayList<PolicyDefinition>(); - alertSpec.getBoltPoliciesMap().put(policyEntry.getKey(), pds); - } - for (String policyName : policyEntry.getValue()) { - if (policyMap.containsKey(policyName)) { - pds.add(policyMap.get(policyName)); - } - } - } - } - - private Map<String, StreamDefinition> getStreams(List<VersionedStreamDefinition> streamSnapshots) { - Map<String, StreamDefinition> result = new HashMap<String, StreamDefinition>(); - for (VersionedStreamDefinition vsd : streamSnapshots) { - result.put(vsd.getDefinition().getStreamId(), vsd.getDefinition()); - } - return result; - } - - private Map<String, PolicyDefinition> listToMap(List<VersionedPolicyDefinition> listStreams) { - Map<String, PolicyDefinition> result = new HashMap<String, PolicyDefinition>(); - for (VersionedPolicyDefinition sd : listStreams) { - result.put(sd.getDefinition().getName(), sd.getDefinition()); - } - return result; - } - -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java deleted file mode 100644 index d90fd9c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Created on 7/27/16. - */ -public class CompositePolicyHandler implements PolicyStreamHandler { - private static final Logger LOG = LoggerFactory.getLogger(CompositePolicyHandler.class); - - private PolicyStreamHandler policyHandler; - private PolicyStreamHandler stateHandler; - private List<PolicyStreamHandler> handlers = new ArrayList<>(); - - private Collector<AlertStreamEvent> collector; - - private Map<String, StreamDefinition> sds; - - public CompositePolicyHandler(Map<String, StreamDefinition> sds) { - this.sds = sds; - } - - @Override - public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - this.collector = collector; - // TODO: create two handlers - policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition(), sds); - policyHandler.prepare(collector, context); - handlers.add(policyHandler); - - if (context.getPolicyDefinition().getStateDefinition() != null) { - stateHandler = PolicyStreamHandlers.createStateHandler(context.getPolicyDefinition().getStateDefinition().type, sds); - stateHandler.prepare(collector, context); - handlers.add(stateHandler); - } - } - - @Override - public void send(StreamEvent event) throws Exception { - // policyHandler.send(event); - send(event, 0); - } - - // send event to index of stream handler - public void send(StreamEvent event, int idx) throws Exception { - if (handlers.size() > idx) { - handlers.get(idx).send(event); - } else if (event instanceof AlertStreamEvent) { - if (LOG.isDebugEnabled()) { - LOG.debug("Emit new alert event: {}", event); - } - collector.emit((AlertStreamEvent) event); // for alert stream events, emit if no handler found. - } else { - // nothing found. LOG, and throw exception - LOG.error("non-alert-stream-event {} send with index {}, but the handler is not found!", event, idx); - throw new Exception(String.format("event %s send with idx %d can not found expecting handler!", event, idx)); - } - } - - @Override - public void close() throws Exception { - for (PolicyStreamHandler handler : handlers) { - try { - handler.close(); - } catch (Exception e) { - LOG.error("close handler {} failed, continue to run.", handler); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java deleted file mode 100644 index 20c2e1d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -import java.util.List; -import java.util.Map; - -public interface PolicyChangeListener { - void onPolicyChange(String version, - List<PolicyDefinition> added, - List<PolicyDefinition> removed, - List<PolicyDefinition> modified, Map<String, StreamDefinition> sds); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java deleted file mode 100644 index e970ddd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.AlertStreamCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.model.PartitionedEvent; - -import java.io.Serializable; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * policy group refers to the policies which belong to the same MonitoredStream - * 3 lifecycle steps are involved in PolicyGroupEvaluator - * Step 1: create object. Be aware that in distributed environment, this object may be serialized and transferred across network - * Step 2: init. This normally is invoked only once before nextEvent is invoked - * Step 3: nextEvent - * Step 4: close - */ -public interface PolicyGroupEvaluator extends PolicyChangeListener, Serializable { - void init(StreamContext context, AlertStreamCollector collector); - - /** - * Evaluate event. - */ - void nextEvent(PartitionedEvent event); - - String getName(); - - void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java deleted file mode 100644 index 59d9e1f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.StreamCounter; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import com.typesafe.config.Config; - -public class PolicyHandlerContext { - private PolicyDefinition policyDefinition; - private PolicyGroupEvaluator policyEvaluator; - private StreamCounter policyCounter; - private String policyEvaluatorId; - private Config config; - - public PolicyDefinition getPolicyDefinition() { - return policyDefinition; - } - - public void setPolicyDefinition(PolicyDefinition policyDefinition) { - this.policyDefinition = policyDefinition; - } - - public PolicyGroupEvaluator getPolicyEvaluator() { - return policyEvaluator; - } - - public void setPolicyEvaluator(PolicyGroupEvaluator policyEvaluator) { - this.policyEvaluator = policyEvaluator; - } - - public void setPolicyCounter(StreamCounter metric) { - this.policyCounter = metric; - } - - public StreamCounter getPolicyCounter() { - return policyCounter; - } - - public String getPolicyEvaluatorId() { - return policyEvaluatorId; - } - - public void setPolicyEvaluatorId(String policyEvaluatorId) { - this.policyEvaluatorId = policyEvaluatorId; - } - - public Config getConfig() { - return config; - } - - public void setConfig(Config config) { - this.config = config; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java deleted file mode 100755 index 7b457b0..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; - -public interface PolicyStreamHandler { - void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception; - - void send(StreamEvent event) throws Exception; - - void close() throws Exception; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java deleted file mode 100644 index 116f633..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler; -import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler; -import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyStateHandler; -import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1). - */ -public class PolicyStreamHandlers { - private static final Logger LOG = LoggerFactory.getLogger(PolicyStreamHandlers.class); - - public static final String SIDDHI_ENGINE = "siddhi"; - public static final String NO_DATA_ALERT_ENGINE = "nodataalert"; - public static final String ABSENCE_ALERT_ENGINE = "absencealert"; - public static final String CUSTOMIZED_ENGINE = "Custom"; - - public static PolicyStreamHandler createHandler(PolicyDefinition.Definition definition, Map<String, StreamDefinition> sds) { - if (SIDDHI_ENGINE.equals(definition.getType())) { - return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16 - } else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) { - // no data for an entire stream won't trigger gap alert (use local time & batch window instead) - return new NoDataPolicyTimeBatchHandler(sds); - } else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) { - return new AbsencePolicyHandler(sds); - } else if (CUSTOMIZED_ENGINE.equals(definition.getType())) { - try { - Class<?> handlerClz = Class.forName(definition.getHandlerClass()); - PolicyStreamHandler handler = (PolicyStreamHandler) handlerClz.getConstructor(Map.class).newInstance(sds); - return handler; - } catch (Exception e) { - LOG.error("Not able to create policy handler for handler class " + definition.getHandlerClass(), e); - throw new IllegalArgumentException("Illegal extended policy handler class!" + definition.getHandlerClass()); - } - } - throw new IllegalArgumentException("Illegal policy stream handler type " + definition.getType()); - } - - public static PolicyStreamHandler createStateHandler(String type, Map<String, StreamDefinition> sds) { - if (SIDDHI_ENGINE.equals(type)) { - return new SiddhiPolicyStateHandler(sds, 1); //// FIXME: 8/2/16 - } - throw new IllegalArgumentException("Illegal policy state handler type " + type); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java deleted file mode 100644 index f53139f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.absence; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * this assumes that event comes in time order. - * Since 7/7/16. - */ -public class AbsenceAlertDriver { - private static final Logger LOG = LoggerFactory.getLogger(AbsenceAlertDriver.class); - private List<Object> expectedAttrs; - private AbsenceWindowProcessor processor; - private AbsenceWindowGenerator windowGenerator; - - public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator) { - this.expectedAttrs = expectedAttrs; - this.windowGenerator = windowGenerator; - } - - public boolean process(List<Object> appearAttrs, long occurTime) { - // initialize window - if (processor == null) { - processor = nextProcessor(occurTime); - LOG.info("initialized a new window {}", processor); - } - processor.process(appearAttrs, occurTime); - AbsenceWindowProcessor.OccurStatus status = processor.checkStatus(); - boolean expired = processor.checkExpired(); - boolean isAbsenceAlert = false; - if (expired) { - if (status == AbsenceWindowProcessor.OccurStatus.absent) { - // send alert - LOG.info("==================="); - LOG.info("|| Absence Alert ||"); - LOG.info("==================="); - isAbsenceAlert = true; - // figure out next window and set the new window - } - processor = nextProcessor(occurTime); - LOG.info("created a new window {}", processor); - } - - return isAbsenceAlert; - } - - /** - * calculate absolute time range based on current timestamp. - * - * @param currTime milliseconds - * @return - */ - private AbsenceWindowProcessor nextProcessor(long currTime) { - AbsenceWindow window = windowGenerator.nextWindow(currTime); - return new AbsenceWindowProcessor(expectedAttrs, window); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java deleted file mode 100644 index db4be7c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.absence; - -/** - * Since 7/7/16. - */ -public class AbsenceDailyRule implements AbsenceRule { - public static final long DAY_MILLI_SECONDS = 86400 * 1000L; - public long startOffset; - public long endOffset; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java deleted file mode 100644 index c372411..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.absence; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.utils.AlertStreamUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.SimpleDateFormat; -import java.util.*; - -/** - * Since 7/6/16. - * * policy would be like: - * { - * "name": "absenceAlertPolicy", - * "description": "absenceAlertPolicy", - * "inputStreams": [ - * "absenceAlertStream" - * ], - * "outputStreams": [ - * "absenceAlertStream_out" - * ], - * "definition": { - * "type": "absencealert", - * "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00" - * }, - * "partitionSpec": [ - * { - * "streamId": "absenceAlertStream", - * "type": "GROUPBY", - * "columns" : ["jobID"] - * } - * ], - * "parallelismHint": 2 - * } - */ -public class AbsencePolicyHandler implements PolicyStreamHandler { - private static final Logger LOG = LoggerFactory.getLogger(AbsencePolicyHandler.class); - private Map<String, StreamDefinition> sds; - private volatile PolicyDefinition policyDef; - private volatile Collector<AlertStreamEvent> collector; - private volatile PolicyHandlerContext context; - private volatile List<Integer> expectFieldIndices = new ArrayList<>(); - private volatile List<Object> expectValues = new ArrayList<>(); - private AbsenceAlertDriver driver; - - public AbsencePolicyHandler(Map<String, StreamDefinition> sds) { - this.sds = sds; - } - - @Override - public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - this.collector = collector; - this.context = context; - this.policyDef = context.getPolicyDefinition(); - List<String> inputStreams = policyDef.getInputStreams(); - // validate inputStreams has to contain only one stream - if (inputStreams.size() != 1) { - throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert"); - } - // validate outputStream has to contain only one stream - if (policyDef.getOutputStreams().size() != 1) { - throw new IllegalArgumentException("policy outputStream size has to be 1 for absence alert"); - } - - String is = inputStreams.get(0); - StreamDefinition sd = sds.get(is); - - String policyValue = policyDef.getDefinition().getValue(); - - // Assume that absence alert policy value consists of - // "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset" - String[] segments = policyValue.split(",\\s*"); - int offset = 0; - // populate wisb field names - int numOfFields = Integer.parseInt(segments[offset++]); - for (int i = offset; i < offset + numOfFields; i++) { - String fn = segments[i]; - expectFieldIndices.add(sd.getColumnIndex(fn)); - } - offset += numOfFields; - for (int i = offset; i < offset + numOfFields; i++) { - String fn = segments[i]; - expectValues.add(fn); - } - offset += numOfFields; - String absenceWindowRuleType = segments[offset++]; - AbsenceDailyRule rule = new AbsenceDailyRule(); - SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); - sdf.setTimeZone(TimeZone.getTimeZone("UTC")); - Date t1 = sdf.parse(segments[offset++]); - rule.startOffset = t1.getTime(); - Date t2 = sdf.parse(segments[offset++]); - rule.endOffset = t2.getTime(); - AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule); - driver = new AbsenceAlertDriver(expectValues, generator); - } - - @Override - public void send(StreamEvent event) throws Exception { - Object[] data = event.getData(); - List<Object> columnValues = new ArrayList<>(); - for (int i = 0; i < expectFieldIndices.size(); i++) { - Object o = data[expectFieldIndices.get(i)]; - // convert value to string - columnValues.add(o.toString()); - } - - boolean isAbsenceAlert = driver.process(columnValues, event.getTimestamp()); - - // Publishing alerts. - if (isAbsenceAlert) { - AlertStreamEvent alertEvent = AlertStreamUtils.createAlertEvent(event, context, sds); - collector.emit(alertEvent); - } - } - - @Override - public void close() throws Exception { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java deleted file mode 100644 index 272d5cf..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.absence; - -/** - * Since 7/7/16. - */ -public interface AbsenceRule { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java deleted file mode 100644 index 9958dc7..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.absence; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.TimeZone; - -/** - * Since 7/7/16. - */ -public class AbsenceWindow { - public long startTime; - public long endTime; - - public String toString() { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - sdf.setTimeZone(TimeZone.getTimeZone("UTC")); - String t1 = sdf.format(new Date(startTime)); - String t2 = sdf.format(new Date(endTime)); - String format = "startTime=%d (%s), endTime=%d (%s)"; - return String.format(format, startTime, t1, endTime, t2); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java deleted file mode 100644 index dfde09a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.absence; - -/** - * Since 7/7/16. - */ -public class AbsenceWindowGenerator { - private AbsenceRule rule; - - public AbsenceWindowGenerator(AbsenceRule rule) { - this.rule = rule; - } - - /** - * nextWindow. - * - * @param currTime current timestamp - */ - public AbsenceWindow nextWindow(long currTime) { - AbsenceWindow window = new AbsenceWindow(); - if (rule instanceof AbsenceDailyRule) { - AbsenceDailyRule r = (AbsenceDailyRule) rule; - long adjustment = 0; // if today's window already expires, then adjust to tomorrow's window - if (currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset) { - adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS; - } - // use current timestamp to round down to day - long day = currTime - currTime % AbsenceDailyRule.DAY_MILLI_SECONDS; - day += adjustment; - window.startTime = day + r.startOffset; - window.endTime = day + r.endOffset; - return window; - } else { - throw new UnsupportedOperationException("not supported rule " + rule); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java deleted file mode 100644 index 8f00b31..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.absence; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * Since 7/6/16. - * To process each incoming event - * internally maintain state machine to trigger alert when some attribute does not occur within this window - */ -public class AbsenceWindowProcessor { - private static final Logger LOG = LoggerFactory.getLogger(AbsenceWindowProcessor.class); - private List<Object> expectAttrs; - private AbsenceWindow window; - private boolean expired; // to mark if the time range has been went through - private OccurStatus status = OccurStatus.not_sure; - - public enum OccurStatus { - not_sure, - occured, - absent - } - - public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window) { - this.expectAttrs = expectAttrs; - this.window = window; - expired = false; - } - - /** - * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false. - */ - public void process(List<Object> appearAttrs, long occurTime) { - if (expired) { - throw new IllegalStateException("Expired window can't recieve events"); - } - switch (status) { - case not_sure: - if (occurTime < window.startTime) { - break; - } else if (occurTime >= window.startTime - && occurTime <= window.endTime) { - if (expectAttrs.equals(appearAttrs)) { - status = OccurStatus.occured; - } - break; - } else { - status = OccurStatus.absent; - break; - } - case occured: - if (occurTime > window.endTime) { - expired = true; - } - break; - default: - break; - } - // reset status - if (status == OccurStatus.absent) { - expired = true; - } - } - - public OccurStatus checkStatus() { - return status; - } - - public boolean checkExpired() { - return expired; - } - - public AbsenceWindow currWindow() { - return window; - } - - public String toString() { - return "expectAttrs=" + expectAttrs + ", status=" + status + ", expired=" + expired + ", window=[" + window + "]"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java deleted file mode 100755 index 185853d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import org.apache.eagle.alert.engine.AlertStreamCollector; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.router.StreamOutputCollector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - -/** - * <h2>Thread Safe Mechanism.</h2> - * <ul> - * <li> - * emit() method is thread-safe enough to be called anywhere asynchronously in multi-thread - * </li> - * <li> - * flush() method must be called synchronously, because Storm OutputCollector is not thread-safe - * </li> - * </ul> - */ -public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector { - private final StreamOutputCollector delegate; - private final LinkedBlockingQueue<AlertStreamEvent> queue; - private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class); - private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis()); - private final AutoAlertFlusher flusher; - private static final int MAX_ALERT_DELAY_SECS = 10; - - public AlertBoltOutputCollectorThreadSafeWrapper(StreamOutputCollector outputCollector) { - this.delegate = outputCollector; - this.queue = new LinkedBlockingQueue<>(); - this.flusher = new AutoAlertFlusher(this); - this.flusher.setName(Thread.currentThread().getName() + "-alertFlusher"); - this.flusher.start(); - } - - private static class AutoAlertFlusher extends Thread { - private final AlertBoltOutputCollectorThreadSafeWrapper collector; - private boolean stopped = false; - private static final Logger LOG = LoggerFactory.getLogger(AutoAlertFlusher.class); - - private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper collector) { - this.collector = collector; - } - - @Override - public void run() { - LOG.info("Starting"); - while (!this.stopped) { - if (System.currentTimeMillis() - collector.lastFlushTime.get() >= MAX_ALERT_DELAY_SECS * 1000L) { - this.collector.flush(); - } - try { - Thread.sleep(5000); - } catch (InterruptedException ignored) { - // ignored - } - } - LOG.info("Stopped"); - } - - public void shutdown() { - LOG.info("Stopping"); - this.stopped = true; - } - } - - /** - * Emit method can be called in multi-thread. - * - * @param event - */ - @Override - public void emit(AlertStreamEvent event) { - try { - queue.put(event); - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - } - } - - /** - * Flush will be called in synchronous way like StormBolt.execute() as Storm OutputCollector is not thread-safe - */ - @Override - public void flush() { - if (!queue.isEmpty()) { - List<AlertStreamEvent> events = new ArrayList<>(); - queue.drainTo(events); - events.forEach((event) -> delegate.emit(Arrays.asList(event.getStreamId(), event))); - LOG.info("Flushed {} alerts", events.size()); - } - lastFlushTime.set(System.currentTimeMillis()); - } - - @Override - public void close() { - this.flusher.shutdown(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java deleted file mode 100755 index 606ddce..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import org.apache.eagle.alert.engine.AlertStreamCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.router.StreamOutputCollector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - - -public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { - - private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorWrapper.class); - - private final StreamOutputCollector delegate; - private final Object outputLock; - private final StreamContext streamContext; - - private volatile Set<PublishPartition> publishPartitions; - - public AlertBoltOutputCollectorWrapper(StreamOutputCollector outputCollector, Object outputLock, - StreamContext streamContext) { - this.delegate = outputCollector; - this.outputLock = outputLock; - this.streamContext = streamContext; - - this.publishPartitions = new HashSet<>(); - } - - @Override - public void emit(AlertStreamEvent event) { - Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions); - for (PublishPartition publishPartition : clonedPublishPartitions) { - // skip the publish partition which is not belong to this policy and also check streamId - PublishPartition cloned = publishPartition.clone(); - Optional.ofNullable(event) - .filter(x -> x != null - && x.getSchema() != null - && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId()) - && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId()) - || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT))) - .ifPresent(x -> { - cloned.getColumns().stream() - .filter(y -> event.getSchema().getColumnIndex(y) >= 0 - && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size()) - .map(y -> event.getData()[event.getSchema().getColumnIndex(y)]) - .filter(y -> y != null) - .forEach(y -> cloned.getColumnValues().add(y)); - synchronized (outputLock) { - streamContext.counter().incr("alert_count"); - delegate.emit(Arrays.asList(cloned, event)); - } - }); - } - } - - @Override - public void flush() { - // do nothing - } - - @Override - public void close() { - } - - public synchronized void onAlertBoltSpecChange(Collection<PublishPartition> addedPublishPartitions, - Collection<PublishPartition> removedPublishPartitions, - Collection<PublishPartition> modifiedPublishPartitions) { - Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions); - clonedPublishPartitions.addAll(addedPublishPartitions); - clonedPublishPartitions.removeAll(removedPublishPartitions); - clonedPublishPartitions.addAll(modifiedPublishPartitions); - publishPartitions = clonedPublishPartitions; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java deleted file mode 100644 index 25ebfca..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.CompositePolicyHandler; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.output.StreamCallback; - -/** - * Created on 8/2/16. - */ -public class AlertStreamCallback extends StreamCallback { - - private static final Logger LOG = LoggerFactory.getLogger(AlertStreamCallback.class); - private final String outputStream; - private final Collector<AlertStreamEvent> collector; - private final PolicyHandlerContext context; - private final StreamDefinition definition; - - private int currentIndex; - - public AlertStreamCallback(String outputStream, - StreamDefinition streamDefinition, - Collector<AlertStreamEvent> collector, - PolicyHandlerContext context, - int currentIndex) { - this.outputStream = outputStream; - this.collector = collector; - this.context = context; - this.definition = streamDefinition; - this.currentIndex = currentIndex; - } - - /** - * Possibly more than one event will be triggered for alerting. - */ - @Override - public void receive(Event[] events) { - String policyName = context.getPolicyDefinition().getName(); - String siteId = context.getPolicyDefinition().getSiteId(); - CompositePolicyHandler handler = ((PolicyGroupEvaluatorImpl) context.getPolicyEvaluator()).getPolicyHandler(policyName); - if (LOG.isDebugEnabled()) { - LOG.debug("Generated {} alerts from policy '{}' in {}, index of definiton {} ", events.length, policyName, context.getPolicyEvaluatorId(), currentIndex); - } - for (Event e : events) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setSiteId(siteId); - event.setTimestamp(e.getTimestamp()); - event.setData(e.getData()); - event.setStreamId(outputStream); - event.setPolicyId(context.getPolicyDefinition().getName()); - if (this.context.getPolicyEvaluator() != null) { - event.setCreatedBy(context.getPolicyEvaluator().getName()); - } - event.setCreatedTime(System.currentTimeMillis()); - event.setSchema(definition); - - if (LOG.isDebugEnabled()) { - LOG.debug("Generate new alert event: {}", event); - } - try { - if (handler == null) { - // extreme case: the handler is removed from the evaluator. Just emit. - if (LOG.isDebugEnabled()) { - LOG.debug(" handler not found when callback received event, directly emit. policy removed? "); - } - collector.emit(event); - } else { - handler.send(event, currentIndex + 1); - } - } catch (Exception ex) { - LOG.error(String.format("send event %s to index %d failed with exception. ", event, currentIndex), ex); - } - } - context.getPolicyCounter().incrBy(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count"), events.length); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java deleted file mode 100644 index af35551..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import org.apache.eagle.alert.engine.AlertStreamCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.CompositePolicyHandler; -import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { - private static final long serialVersionUID = -5499413193675985288L; - - private static final Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class); - - private AlertStreamCollector collector; - // mapping from policy name to PolicyDefinition - private volatile Map<String, PolicyDefinition> policyDefinitionMap = new HashMap<>(); - // mapping from policy name to PolicyStreamHandler - private volatile Map<String, CompositePolicyHandler> policyStreamHandlerMap = new HashMap<>(); - private String policyEvaluatorId; - private StreamContext context; - - public PolicyGroupEvaluatorImpl(String policyEvaluatorId) { - this.policyEvaluatorId = policyEvaluatorId; - } - - public void init(StreamContext context, AlertStreamCollector collector) { - this.collector = collector; - this.policyStreamHandlerMap = new HashMap<>(); - this.context = context; - Thread.currentThread().setName(policyEvaluatorId); - } - - public void nextEvent(PartitionedEvent event) { - this.context.counter().incr("receive_count"); - dispatch(event); - } - - @Override - public String getName() { - return this.policyEvaluatorId; - } - - public void close() { - for (PolicyStreamHandler handler : policyStreamHandlerMap.values()) { - try { - handler.close(); - } catch (Exception e) { - LOG.error("Failed to close handler {}", handler.toString(), e); - } - } - } - - /** - * fixme make selection of PolicyStreamHandler to be more efficient. - * - * @param partitionedEvent PartitionedEvent - */ - private void dispatch(PartitionedEvent partitionedEvent) { - boolean handled = false; - for (Map.Entry<String, CompositePolicyHandler> policyStreamHandler : policyStreamHandlerMap.entrySet()) { - if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) { - try { - handled = true; - this.context.counter().incr("eval_count"); - policyStreamHandler.getValue().send(partitionedEvent.getEvent()); - } catch (Exception e) { - this.context.counter().incr("fail_count"); - LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e); - } - } - } - if (!handled) { - this.context.counter().incr("drop_count"); - LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent); - } else { - this.context.counter().incr("accept_count"); - } - } - - private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy) { - return policy.getPartitionSpec().contains(event.getPartition()) - && (policy.getInputStreams().contains(event.getEvent().getStreamId()) - || policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId())); - } - - @Override - public void onPolicyChange(String version, List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) { - Map<String, PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap); - Map<String, CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap); - for (PolicyDefinition pd : added) { - inplaceAdd(copyPolicies, copyHandlers, pd, sds); - } - for (PolicyDefinition pd : removed) { - inplaceRemove(copyPolicies, copyHandlers, pd); - } - for (PolicyDefinition pd : modified) { - inplaceRemove(copyPolicies, copyHandlers, pd); - inplaceAdd(copyPolicies, copyHandlers, pd, sds); - } - - // logging - LOG.info("{} with {} Policy metadata updated with added={}, removed={}, modified={}", policyEvaluatorId, version, added, removed, modified); - - // switch reference - this.policyDefinitionMap = copyPolicies; - this.policyStreamHandlerMap = copyHandlers; - } - - private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) { - if (handlers.containsKey(policy.getName())) { - LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy); - } else { - policies.put(policy.getName(), policy); - CompositePolicyHandler handler = new CompositePolicyHandler(sds); - try { - PolicyHandlerContext handlerContext = new PolicyHandlerContext(); - handlerContext.setPolicyCounter(this.context.counter()); - handlerContext.setPolicyDefinition(policy); - handlerContext.setPolicyEvaluator(this); - handlerContext.setPolicyEvaluatorId(policyEvaluatorId); - handlerContext.setConfig(this.context.config()); - handler.prepare(collector, handlerContext); - handlers.put(policy.getName(), handler); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - policies.remove(policy.getName()); - handlers.remove(policy.getName()); - } - } - } - - private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy) { - if (handlers.containsKey(policy.getName())) { - PolicyStreamHandler handler = handlers.get(policy.getName()); - try { - handler.close(); - } catch (Exception e) { - LOG.error("Failed to close handler {}", handler, e); - } finally { - policies.remove(policy.getName()); - handlers.remove(policy.getName()); - LOG.info("Removed policy: {}", policy); - } - } else { - LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: " + policy); - } - } - - - public CompositePolicyHandler getPolicyHandler(String policy) { - return policyStreamHandlerMap.get(policy); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java deleted file mode 100644 index a732e66..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.definition.Attribute; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class SiddhiDefinitionAdapter { - private static final Logger LOG = LoggerFactory.getLogger(SiddhiDefinitionAdapter.class); - public static final String DEFINE_STREAM_TEMPLATE = "define stream %s ( %s );"; - - public static String buildStreamDefinition(StreamDefinition streamDefinition) { - List<String> columns = new ArrayList<>(); - Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null"); - if (streamDefinition.getColumns() != null) { - for (StreamColumn column : streamDefinition.getColumns()) { - columns.add(String.format("%s %s", column.getName(), convertToSiddhiAttributeType(column.getType()).toString().toLowerCase())); - } - } else { - LOG.warn("No columns found for stream {}" + streamDefinition.getStreamId()); - } - return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getStreamId(), StringUtils.join(columns, ",")); - } - - public static Attribute.Type convertToSiddhiAttributeType(StreamColumn.Type type) { - if (_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)) { - return _EAGLE_SIDDHI_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown stream type: " + type); - } - - public static Class<?> convertToJavaAttributeType(StreamColumn.Type type) { - if (_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)) { - return _EAGLE_JAVA_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown stream type: " + type); - } - - public static StreamColumn.Type convertFromJavaAttributeType(Class<?> type) { - if (_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)) { - return _JAVA_EAGLE_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown stream type: " + type); - } - - public static StreamColumn.Type convertFromSiddhiAttributeType(Attribute.Type type) { - if (_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)) { - return _SIDDHI_EAGLE_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown siddhi type: " + type); - } - - public static String buildSiddhiExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) { - StringBuilder builder = new StringBuilder(); - PolicyDefinition.Definition coreDefinition = policyDefinition.getDefinition(); - // init if not present - List<String> inputStreams = coreDefinition.getInputStreams(); - if (inputStreams == null || inputStreams.isEmpty()) { - inputStreams = policyDefinition.getInputStreams(); - } - - for (String inputStream : inputStreams) { - builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream))); - builder.append("\n"); - } - builder.append(coreDefinition.value); - if (LOG.isDebugEnabled()) { - LOG.debug("Generated siddhi execution plan: {} from definition: {}", builder.toString(), coreDefinition); - } - return builder.toString(); - } - - public static String buildSiddhiExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) { - StringBuilder builder = new StringBuilder(); - for (Map.Entry<String,StreamDefinition> entry: inputStreamDefinitions.entrySet()) { - builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(entry.getValue())); - builder.append("\n"); - } - builder.append(policyDefinition); - if (LOG.isDebugEnabled()) { - LOG.debug("Generated siddhi execution plan: {}", builder.toString()); - } - return builder.toString(); - } - - /** - * public enum Type { - * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT - * }. - */ - private static final Map<StreamColumn.Type, Attribute.Type> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>(); - private static final Map<StreamColumn.Type, Class<?>> _EAGLE_JAVA_TYPE_MAPPING = new HashMap<>(); - private static final Map<Class<?>, StreamColumn.Type> _JAVA_EAGLE_TYPE_MAPPING = new HashMap<>(); - private static final Map<Attribute.Type, StreamColumn.Type> _SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>(); - - static { - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, Attribute.Type.STRING); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, Attribute.Type.INT); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, Attribute.Type.LONG); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, Attribute.Type.FLOAT); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, Attribute.Type.DOUBLE); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, Attribute.Type.BOOL); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, Attribute.Type.OBJECT); - - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING, String.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT, Integer.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG, Long.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, Float.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, Double.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL, Boolean.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, Object.class); - - _JAVA_EAGLE_TYPE_MAPPING.put(String.class, StreamColumn.Type.STRING); - _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class, StreamColumn.Type.INT); - _JAVA_EAGLE_TYPE_MAPPING.put(Long.class, StreamColumn.Type.LONG); - _JAVA_EAGLE_TYPE_MAPPING.put(Float.class, StreamColumn.Type.FLOAT); - _JAVA_EAGLE_TYPE_MAPPING.put(Double.class, StreamColumn.Type.DOUBLE); - _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class, StreamColumn.Type.BOOL); - _JAVA_EAGLE_TYPE_MAPPING.put(Object.class, StreamColumn.Type.OBJECT); - - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING, StreamColumn.Type.STRING); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT, StreamColumn.Type.INT); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG, StreamColumn.Type.LONG); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT, StreamColumn.Type.FLOAT); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE, StreamColumn.Type.DOUBLE); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL, StreamColumn.Type.BOOL); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT, StreamColumn.Type.OBJECT); - } - - public static StreamDefinition convertFromSiddiDefinition(AbstractDefinition siddhiDefinition) { - StreamDefinition streamDefinition = new StreamDefinition(); - streamDefinition.setStreamId(siddhiDefinition.getId()); - List<StreamColumn> columns = new ArrayList<>(siddhiDefinition.getAttributeNameArray().length); - for (Attribute attribute : siddhiDefinition.getAttributeList()) { - StreamColumn column = new StreamColumn(); - column.setType(convertFromSiddhiAttributeType(attribute.getType())); - column.setName(attribute.getName()); - columns.add(column); - } - streamDefinition.setColumns(columns); - streamDefinition.setTimeseries(true); - streamDefinition.setDescription("Auto-generated stream schema from siddhi for " + siddhiDefinition.getId()); - return streamDefinition; - } -} \ No newline at end of file
