http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java deleted file mode 100644 index 2af49fb..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metadata.impl; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.MetadataUtils; -import org.apache.eagle.alert.metadata.resource.Models; -import org.apache.eagle.alert.metadata.resource.OpResult; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -/** - * In memory service for simple service start. Make all service API as - * synchronized. - * - * @since Apr 11, 2016 - */ -public class InMemMetadataDaoImpl implements IMetadataDao { - - private static final Logger LOG = LoggerFactory.getLogger(InMemMetadataDaoImpl.class); - - private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>(); - private List<StreamDefinition> schemas = new ArrayList<StreamDefinition>(); - private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>(); - private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>(); - private List<Publishment> publishments = new ArrayList<Publishment>(); - private List<PublishmentType> publishmentTypes = new ArrayList<PublishmentType>(); - private volatile int maxScheduleState = 100; - private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>(); - private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>(); - private List<Topology> topologies = new ArrayList<Topology>(); - private List<AlertPublishEvent> alerts = new ArrayList<>(); - - @Inject - public InMemMetadataDaoImpl(Config config) { - } - - @Override - public synchronized List<StreamingCluster> listClusters() { - return clusters; - } - - @Override - public OpResult addCluster(final StreamingCluster cluster) { - return addOrReplace(clusters, cluster); - } - - private synchronized <T> OpResult addOrReplace(List<T> clusters, T paramT) { - Optional<T> scOp = clusters.stream().filter(new Predicate<T>() { - @Override - public boolean test(T t) { - if (MetadataUtils.getKey(t).equalsIgnoreCase(MetadataUtils.getKey(paramT))) { - return true; - } - return false; - } - }).findFirst(); - - OpResult result = new OpResult(); - // replace - if (scOp.isPresent()) { - clusters.remove(scOp.get()); - result.message = "replace the old one!"; - } else { - result.message = "created new config!"; - } - result.code = 200; - clusters.add(paramT); - return result; - } - - @SuppressWarnings("unchecked") - private synchronized <T> OpResult remove(List<T> clusters, String id) { - T[] matched = (T[]) clusters.stream().filter(new Predicate<T>() { - - @Override - public boolean test(T t) { - if (MetadataUtils.getKey(t).equalsIgnoreCase(id)) { - return true; - } - return false; - } - }).toArray(); - - OpResult result = new OpResult(); - result.code = 200; - if (clusters.removeAll(Arrays.asList(matched))) { - result.message = "removed configuration item succeed"; - } else { - result.message = "no configuration item removed"; - } - return result; - } - - @Override - public OpResult removeCluster(final String clusterId) { - return remove(clusters, clusterId); - } - - @Override - public synchronized List<StreamDefinition> listStreams() { - return schemas; - } - - @Override - public OpResult createStream(StreamDefinition stream) { - return addOrReplace(schemas, stream); - } - - @Override - public OpResult removeStream(String streamId) { - return remove(schemas, streamId); - } - - @Override - public synchronized List<Kafka2TupleMetadata> listDataSources() { - return datasources; - } - - @Override - public OpResult addDataSource(Kafka2TupleMetadata dataSource) { - return addOrReplace(datasources, dataSource); - } - - @Override - public OpResult removeDataSource(String datasourceId) { - return remove(datasources, datasourceId); - } - - @Override - public synchronized List<PolicyDefinition> listPolicies() { - return policies; - } - - @Override - public OpResult addPolicy(PolicyDefinition policy) { - return addOrReplace(policies, policy); - } - - @Override - public OpResult removePolicy(String policyId) { - return remove(policies, policyId); - } - - @Override - public synchronized List<Publishment> listPublishment() { - return publishments; - } - - @Override - public OpResult addPublishment(Publishment publishment) { - return addOrReplace(publishments, publishment); - } - - @Override - public OpResult removePublishment(String pubId) { - return remove(publishments, pubId); - } - - @Override - public List<PublishmentType> listPublishmentType() { - return publishmentTypes; - } - - @Override - public OpResult addPublishmentType(PublishmentType publishmentType) { - return addOrReplace(publishmentTypes, publishmentType); - } - - @Override - public OpResult removePublishmentType(String pubType) { - return remove(publishmentTypes, pubType); - } - - @Override - public List<AlertPublishEvent> listAlertPublishEvent(int size) { - if (size > 0 && size <= alerts.size()) { - return alerts.subList(alerts.size() - size, alerts.size()); - } - return alerts; - } - - @Override - public AlertPublishEvent getAlertPublishEvent(String alertId) { - Optional<AlertPublishEvent> op = alerts.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny(); - if (op.isPresent()) { - return op.get(); - } else { - return null; - } - } - - @Override - public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) { - List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList()); - if (size < 0 || size > result.size()) { - size = result.size(); - } - return result.subList(result.size() - size, result.size()); - } - - @Override - public OpResult addAlertPublishEvent(AlertPublishEvent event) { - alerts.add(event); - OpResult result = new OpResult(); - result.code = 200; - return result; - } - - - @Override - public synchronized OpResult addScheduleState(ScheduleState state) { - // FIXME : might concurrent issue - String toRemove = null; - if (scheduleStates.size() > maxScheduleState) { - toRemove = scheduleStates.firstKey(); - } - scheduleStates.put(state.getVersion(), state); - if (toRemove != null) { - scheduleStates.remove(toRemove); - } - - OpResult result = new OpResult(); - result.code = 200; - result.message = "OK"; - return result; - } - - @Override - public synchronized ScheduleState getScheduleState() { - if (scheduleStates.size() > 0) { - return scheduleStates.get(scheduleStates.lastKey()); - } - return null; - } - - @Override - public ScheduleState getScheduleState(String versionId) { - return scheduleStates.get(versionId); - } - - @Override - public List<ScheduleState> listScheduleStates() { - throw new UnsupportedOperationException("listScheduleStates not support!"); - } - - @Override - public OpResult clearScheduleState(int maxCapacity) { - throw new UnsupportedOperationException("clearScheduleState not support!"); - } - - @Override - public List<PolicyAssignment> listAssignments() { - return assignments; - } - - @Override - public OpResult addAssignment(PolicyAssignment assignment) { - OpResult result = new OpResult(); - result.code = 200; - result.message = "OK"; - assignments.add(assignment); - return result; - } - - @Override - public List<Topology> listTopologies() { - return topologies; - } - - @Override - public OpResult addTopology(Topology t) { - return addOrReplace(topologies, t); - } - - @Override - public OpResult removeTopology(String topologyName) { - return remove(topologies, topologyName); - } - - @Override - public synchronized OpResult clear() { - LOG.info("clear models..."); - this.assignments.clear(); - this.clusters.clear(); - this.datasources.clear(); - this.policies.clear(); - this.publishments.clear(); - this.scheduleStates.clear(); - this.schemas.clear(); - this.topologies.clear(); - OpResult result = new OpResult(); - result.code = 200; - result.message = "OK"; - return result; - } - - @Override - public Models export() { - Models models = new Models(); - models.assignments.addAll(this.assignments); - models.clusters.addAll(this.clusters); - models.datasources.addAll(this.datasources); - models.policies.addAll(this.policies); - models.publishments.addAll(this.publishments); - models.scheduleStates.putAll(this.scheduleStates); - models.schemas.addAll(this.schemas); - models.topologies.addAll(this.topologies); - return models; - } - - @Override - public OpResult importModels(Models models) { - LOG.info("clear and import models..."); - clear(); - this.assignments.addAll(models.assignments); - this.clusters.addAll(models.clusters); - this.datasources.addAll(models.datasources); - this.policies.addAll(models.policies); - this.publishments.addAll(models.publishments); - this.scheduleStates.putAll(models.scheduleStates); - this.schemas.addAll(models.schemas); - this.topologies.addAll(models.topologies); - OpResult result = new OpResult(); - result.code = 200; - result.message = "OK"; - return result; - } - - @Override - public void close() throws IOException { - - } -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java deleted file mode 100644 index e0b5c9d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metadata.impl; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.MetadataUtils; -import org.apache.eagle.alert.metadata.resource.Models; -import org.apache.eagle.alert.metadata.resource.OpResult; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * @since May 26, 2016. - */ -public class JdbcMetadataDaoImpl implements IMetadataDao { - private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class); - private JdbcMetadataHandler handler; - - @Inject - public JdbcMetadataDaoImpl(Config config) { - handler = new JdbcMetadataHandler(config.getConfig(MetadataUtils.META_DATA)); - } - - @Override - public List<Topology> listTopologies() { - return handler.list(Topology.class); - } - - @Override - public List<StreamingCluster> listClusters() { - return handler.list(StreamingCluster.class); - } - - @Override - public List<StreamDefinition> listStreams() { - return handler.list(StreamDefinition.class); - } - - @Override - public List<Kafka2TupleMetadata> listDataSources() { - return handler.list(Kafka2TupleMetadata.class); - } - - @Override - public List<PolicyDefinition> listPolicies() { - return handler.list(PolicyDefinition.class); - } - - @Override - public List<Publishment> listPublishment() { - return handler.listPublishments(); - } - - @Override - public List<AlertPublishEvent> listAlertPublishEvent(int size) { - if (size <= 0) { - LOG.info("Invalid parameter size <= 0"); - return new ArrayList<>(); - } - return handler.listAlertEvents(null, null, size); - } - - public PolicyDefinition getPolicyById(String policyId) { - return handler.queryById(PolicyDefinition.class, policyId); - } - - public List<Publishment> getPublishmentsByPolicyId(String policyId) { - return handler.getPublishmentsByPolicyId(policyId); - } - - @Override - public AlertPublishEvent getAlertPublishEvent(String alertId) { - return handler.getAlertEventById(alertId, 1); - } - - @Override - public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) { - if (size <= 0) { - LOG.info("Invalid parameter size <= 0"); - return new ArrayList<>(); - } - return handler.getAlertEventByPolicyId(policyId, size); - } - - @Override - public ScheduleState getScheduleState(String versionId) { - return handler.queryById(ScheduleState.class, versionId); - } - - @Override - public ScheduleState getScheduleState() { - List<ScheduleState> scheduleStates = - handler.list(ScheduleState.class, JdbcMetadataHandler.SortType.DESC); - if (scheduleStates.isEmpty()) { - return null; - } else { - return scheduleStates.get(0); - } - } - - @Override - public List<ScheduleState> listScheduleStates() { - return handler.list(ScheduleState.class); - } - - @Override - public List<PolicyAssignment> listAssignments() { - return handler.list(PolicyAssignment.class); - } - - @Override - public List<PublishmentType> listPublishmentType() { - return handler.list(PublishmentType.class); - } - - @Override - public OpResult addTopology(Topology t) { - return handler.addOrReplace(Topology.class.getSimpleName(), t); - } - - @Override - public OpResult addCluster(StreamingCluster cluster) { - return handler.addOrReplace(StreamingCluster.class.getSimpleName(), cluster); - } - - @Override - public OpResult addAlertPublishEvent(AlertPublishEvent event) { - return handler.addAlertEvent(event); - } - - @Override - public OpResult createStream(StreamDefinition stream) { - return handler.addOrReplace(StreamDefinition.class.getSimpleName(), stream); - } - - @Override - public OpResult addDataSource(Kafka2TupleMetadata dataSource) { - return handler.addOrReplace(Kafka2TupleMetadata.class.getSimpleName(), dataSource); - } - - @Override - public OpResult addPolicy(PolicyDefinition policy) { - return handler.addOrReplace(PolicyDefinition.class.getSimpleName(), policy); - } - - @Override - public OpResult addPublishment(Publishment publishment) { - return handler.addOrReplace(Publishment.class.getSimpleName(), publishment); - } - - @Override - public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) { - return handler.addPublishmentsToPolicy(policyId, publishmentIds); - } - - @Override - public OpResult addScheduleState(ScheduleState state) { - return handler.addOrReplace(ScheduleState.class.getSimpleName(), state); - } - - @Override - public OpResult clearScheduleState(int maxCapacity) { - if (maxCapacity <= 0) { - maxCapacity = 10; - } - OpResult result = handler.removeScheduleStates(maxCapacity); - LOG.info(result.message); - return result; - } - - @Override - public OpResult addAssignment(PolicyAssignment assignment) { - return handler.addOrReplace(PolicyAssignment.class.getSimpleName(), assignment); - } - - @Override - public OpResult addPublishmentType(PublishmentType publishmentType) { - return handler.addOrReplace(PublishmentType.class.getSimpleName(), publishmentType); - } - - @Override - public OpResult removeTopology(String topologyName) { - return handler.removeById(Topology.class.getSimpleName(), topologyName); - } - - @Override - public OpResult removeCluster(String clusterId) { - return handler.removeById(StreamingCluster.class.getSimpleName(), clusterId); - } - - @Override - public OpResult removeStream(String streamId) { - return handler.removeById(StreamDefinition.class.getSimpleName(), streamId); - } - - @Override - public OpResult removeDataSource(String datasourceId) { - return handler.removeById(Kafka2TupleMetadata.class.getSimpleName(), datasourceId); - } - - @Override - public OpResult removePolicy(String policyId) { - return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId); - } - - @Override - public OpResult removePublishment(String pubId) { - return handler.removeById(Publishment.class.getSimpleName(), pubId); - } - - @Override - public OpResult removePublishmentType(String name) { - return handler.removeById(PublishmentType.class.getSimpleName(), name); - } - - @Override - public OpResult clear() { - throw new UnsupportedOperationException("clear not support!"); - } - - @Override - public Models export() { - throw new UnsupportedOperationException("clear not support!"); - } - - @Override - public OpResult importModels(Models models) { - throw new UnsupportedOperationException("clear not support!"); - } - - @Override - public void close() throws IOException { - if (handler != null) { - handler.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java deleted file mode 100644 index a9e3c5e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java +++ /dev/null @@ -1,506 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.metadata.impl; - -import org.apache.commons.collections.map.HashedMap; -import org.apache.commons.dbcp.BasicDataSource; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.metadata.MetadataUtils; -import org.apache.eagle.alert.metadata.resource.OpResult; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.sql.DataSource; -import java.io.IOException; -import java.sql.*; -import java.util.*; -import java.util.function.Function; - -public class JdbcMetadataHandler { - - private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataHandler.class); - // general model - private static final String INSERT_STATEMENT = "INSERT INTO %s(content, id) VALUES (?, ?)"; - private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE id=?"; - private static final String UPDATE_STATEMENT = "UPDATE %s set content=? WHERE id=?"; - private static final String QUERY_ALL_STATEMENT = "SELECT content FROM %s"; - private static final String QUERY_CONDITION_STATEMENT = "SELECT content FROM %s WHERE id=?"; - private static final String QUERY_ORDERBY_STATEMENT = "SELECT content FROM %s ORDER BY id %s"; - - // customized model - private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)"; - private static final String INSERT_ALERT_STATEMENT = "INSERT INTO alert_event(alertId, siteId, appIds, policyId, alertTimestamp, policyValue, alertData) VALUES (?, ?, ?, ?, ?, ?, ?)"; - private static final String QUERY_ALERT_STATEMENT = "SELECT * FROM alert_event order by alertTimestamp DESC limit ?"; - private static final String QUERY_ALERT_BY_ID_STATEMENT = "SELECT * FROM alert_event WHERE alertId=? order by alertTimestamp DESC limit ?"; - private static final String QUERY_ALERT_BY_POLICY_STATEMENT = "SELECT * FROM alert_event WHERE policyId=? order by alertTimestamp DESC limit ?"; - private static final String INSERT_POLICYPUBLISHMENT_STATEMENT = "INSERT INTO policy_publishment(policyId, publishmentName) VALUES (?, ?)"; - private static final String DELETE_PUBLISHMENT_STATEMENT = "DELETE FROM policy_publishment WHERE policyId=?"; - private static final String QUERY_PUBLISHMENT_BY_POLICY_STATEMENT = "SELECT content FROM publishment a INNER JOIN policy_publishment b ON a.id=b.publishmentName and b.policyId=?"; - private static final String QUERY_PUBLISHMENT_STATEMENT = "SELECT a.content, b.policyId FROM publishment a LEFT JOIN policy_publishment b ON a.id=b.publishmentName"; - - public enum SortType { DESC, ASC } - - private static Map<String, String> tblNameMap = new HashMap<>(); - - private static final ObjectMapper mapper = new ObjectMapper(); - private DataSource dataSource; - - static { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - registerTableName(StreamingCluster.class.getSimpleName(), "stream_cluster"); - registerTableName(StreamDefinition.class.getSimpleName(), "stream_definition"); - registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "kafka_tuple_metadata"); - registerTableName(PolicyDefinition.class.getSimpleName(), "policy_definition"); - registerTableName(Publishment.class.getSimpleName(), "publishment"); - registerTableName(PublishmentType.class.getSimpleName(), "publishment_type"); - registerTableName(ScheduleState.class.getSimpleName(), "schedule_state"); - registerTableName(PolicyAssignment.class.getSimpleName(), "policy_assignment"); - registerTableName(Topology.class.getSimpleName(), "topology"); - registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event"); - } - - private static void registerTableName(String clzName, String tblName) { - tblNameMap.put(clzName, tblName); - } - - public JdbcMetadataHandler(Config config) { - try { - //JdbcSchemaManager.getInstance().init(config); - BasicDataSource bDatasource = new BasicDataSource(); - bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH)); - if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) { - bDatasource.setUsername(config.getString(MetadataUtils.JDBC_USERNAME_PATH)); - bDatasource.setPassword(config.getString(MetadataUtils.JDBC_PASSWORD_PATH)); - } - bDatasource.setUrl(config.getString(MetadataUtils.JDBC_CONNECTION_PATH)); - if (config.hasPath(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH)) { - bDatasource.setConnectionProperties(config.getString(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH)); - } - this.dataSource = bDatasource; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - private String getTableName(String clzName) { - String tbl = tblNameMap.get(clzName); - if (tbl != null) { - return tbl; - } else { - return clzName; - } - } - - private void closeResource(ResultSet rs, PreparedStatement statement, Connection connection) { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOG.info(e.getMessage(), e); - } - } - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - LOG.info("Failed to close statement: {}", e.getMessage(), e); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Failed to close connection: {}", e.getMessage(), e.getCause()); - } - } - } - - private OpResult executeUpdate(Connection connection, String query, String key, String value) throws SQLException { - OpResult result = new OpResult(); - PreparedStatement statement = null; - try { - statement = connection.prepareStatement(query); - Clob clob = connection.createClob(); - clob.setString(1, value); - statement.setClob(1, clob); - statement.setString(2, key); - int status = statement.executeUpdate(); - LOG.info("update {} with query={}", status, query); - } finally { - if (statement != null) { - statement.close(); - } - } - return result; - } - - - public <T> OpResult addOrReplace(String clzName, T t) { - String tb = getTableName(clzName); - OpResult result = new OpResult(); - Savepoint savepoint = null; - String key = null; - String value = null; - Connection connection = null; - try { - connection = dataSource.getConnection(); - key = MetadataUtils.getKey(t); - value = mapper.writeValueAsString(t); - connection.setAutoCommit(false); - savepoint = connection.setSavepoint("insertEntity"); - result = executeUpdate(connection, String.format(INSERT_STATEMENT, tb), key, value); - connection.commit(); - } catch (SQLException e) { - LOG.warn("fail to insert entity due to {}, and try to updated instead", e.getMessage()); - if (connection != null) { - LOG.info("Detected duplicated entity"); - try { - connection.rollback(savepoint); - executeUpdate(connection, String.format(UPDATE_STATEMENT, tb), key, value); - connection.commit(); - connection.setAutoCommit(true); - } catch (SQLException e1) { - LOG.warn("Rollback failed", e1); - } - } - } catch (JsonProcessingException e) { - LOG.error("Got JsonProcessingException: {}", e.getMessage(), e.getCause()); - result.code = OpResult.FAILURE; - result.message = e.getMessage(); - } finally { - closeResource(null, null, connection); - } - return result; - } - - - public <T> List<T> list(Class<T> clz) { - return list(clz, null); - } - - public <T> List<T> list(Class<T> clz, SortType sortType) { - List<T> result = new LinkedList<T>(); - Connection connection = null; - PreparedStatement statement = null; - try { - String tb = getTableName(clz.getSimpleName()); - String query = String.format(QUERY_ALL_STATEMENT, tb); - if (sortType != null) { - query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType.toString()); - } - connection = dataSource.getConnection(); - statement = connection.prepareStatement(query); - return executeList(statement, clz); - } catch (SQLException ex) { - LOG.error(ex.getMessage(), ex); - } finally { - closeResource(null, statement, connection); - } - return result; - } - - private <T> List<T> executeList(PreparedStatement statement, Class<T> clz) throws SQLException { - List<T> result = new LinkedList<>(); - ResultSet rs = null; - try { - rs = statement.executeQuery(); - while (rs.next()) { - try { - String content = rs.getString(1); - result.add(mapper.readValue(content, clz)) ; - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - } finally { - if (rs != null) { - rs.close(); - } - } - return result; - } - - private <T> List<T> executeList(PreparedStatement statement, Function<ResultSet, T> selectFun) throws SQLException { - List<T> result = new LinkedList<>(); - ResultSet rs = null; - try { - rs = statement.executeQuery(); - while (rs.next()) { - result.add(selectFun.apply(rs)); - } - } finally { - if (rs != null) { - rs.close(); - } - } - return result; - } - - public <T> T queryById(Class<T> clz, String id) { - List<T> result = new LinkedList<T>(); - Connection connection = null; - PreparedStatement statement = null; - try { - String tb = getTableName(clz.getSimpleName()); - connection = dataSource.getConnection(); - statement = connection.prepareStatement(String.format(QUERY_CONDITION_STATEMENT, tb)); - statement.setString(1, id); - result = executeList(statement, clz); - } catch (SQLException ex) { - LOG.error(ex.getMessage(), ex); - } finally { - closeResource(null, statement, connection); - } - if (result.isEmpty()) { - return null; - } else { - return result.get(0); - } - } - - public AlertPublishEvent getAlertEventById(String alertId, int size) { - List<AlertPublishEvent> alerts = listAlertEvents(QUERY_ALERT_BY_ID_STATEMENT, alertId, size); - if (alerts.isEmpty()) { - return null; - } else { - return alerts.get(0); - } - } - - public List<AlertPublishEvent> getAlertEventByPolicyId(String policyId, int size) { - return listAlertEvents(QUERY_ALERT_BY_POLICY_STATEMENT, policyId, size); - } - - public List<AlertPublishEvent> listAlertEvents(String query, String filter, int size) { - List<AlertPublishEvent> alerts = new LinkedList<>(); - Connection connection = null; - PreparedStatement statement = null; - try { - connection = dataSource.getConnection(); - if (query == null) { - query = QUERY_ALERT_STATEMENT; - statement = connection.prepareStatement(query); - statement.setInt(1, size); - } else { - statement = connection.prepareStatement(query); - statement.setString(1, filter); - statement.setInt(2, size); - } - alerts = executeList(statement, rs -> { - try { - AlertPublishEvent event = new AlertPublishEvent(); - event.setAlertId(rs.getString(1)); - event.setSiteId(rs.getString(2)); - event.setAppIds(mapper.readValue(rs.getString(3), List.class)); - event.setPolicyId(rs.getString(4)); - event.setAlertTimestamp(rs.getLong(5)); - event.setPolicyValue(rs.getString(6)); - event.setAlertData(mapper.readValue(rs.getString(7), Map.class)); - return event; - } catch (Exception e) { - throw new IllegalStateException(e); - } - }); - } catch (SQLException ex) { - LOG.error(ex.getMessage(), ex); - } finally { - closeResource(null, statement, connection); - } - return alerts; - } - - public List<Publishment> listPublishments() { - List<Publishment> result = new LinkedList<>(); - Connection connection = null; - PreparedStatement statement = null; - ResultSet rs = null; - try { - connection = dataSource.getConnection(); - statement = connection.prepareStatement(QUERY_PUBLISHMENT_STATEMENT); - Map<String, List<String>> publishPolicyMap = new HashedMap(); - rs = statement.executeQuery(); - while (rs.next()) { - String publishment = rs.getString(1); - String policyId = rs.getString(2); - List<String> policyIds = publishPolicyMap.get(publishment); - if (policyIds == null) { - policyIds = new ArrayList<>(); - publishPolicyMap.put(publishment, policyIds); - } - if (policyId != null) { - policyIds.add(policyId); - } - } - for (Map.Entry<String, List<String>> entry : publishPolicyMap.entrySet()) { - Publishment publishment = mapper.readValue(entry.getKey(), Publishment.class); - publishment.setPolicyIds(entry.getValue()); - result.add(publishment); - } - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - } finally { - closeResource(rs, statement, connection); - } - return result; - } - - public List<Publishment> getPublishmentsByPolicyId(String policyId) { - List<Publishment> result = new LinkedList<>(); - Connection connection = null; - PreparedStatement statement = null; - try { - connection = dataSource.getConnection(); - statement = connection.prepareStatement(QUERY_PUBLISHMENT_BY_POLICY_STATEMENT); - statement.setString(1, policyId); - result = executeList(statement, Publishment.class); - } catch (SQLException ex) { - LOG.error(ex.getMessage(), ex); - } finally { - closeResource(null, statement, connection); - } - return result; - } - - public OpResult addAlertEvent(AlertPublishEvent event) { - Connection connection = null; - PreparedStatement statement = null; - OpResult result = new OpResult(); - try { - connection = dataSource.getConnection(); - statement = connection.prepareStatement(INSERT_ALERT_STATEMENT); - statement.setString(1, event.getAlertId()); - statement.setString(2, event.getSiteId()); - statement.setString(3, mapper.writeValueAsString(event.getAppIds())); - statement.setString(4, event.getPolicyId()); - statement.setLong(5, event.getAlertTimestamp()); - statement.setString(6, event.getPolicyValue()); - statement.setString(7, mapper.writeValueAsString(event.getAlertData())); - LOG.info("start to add alert event"); - int status = statement.executeUpdate(); - result.code = OpResult.SUCCESS; - result.message = String.format("add %d records into alert_event successfully", status); - } catch (Exception ex) { - result.code = OpResult.FAILURE; - result.message = ex.getMessage(); - } finally { - closeResource(null, statement, connection); - } - LOG.info(result.message); - return result; - } - - public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) { - OpResult result = new OpResult(); - Connection connection = null; - PreparedStatement statement = null; - try { - connection = dataSource.getConnection(); - connection.setAutoCommit(false); - statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT); - statement.setString(1, policyId); - int status = statement.executeUpdate(); - LOG.info("delete {} records from policy_publishment", status); - closeResource(null, statement, null); - - statement = connection.prepareStatement(INSERT_POLICYPUBLISHMENT_STATEMENT); - for (String pub : publishmentIds) { - statement.setString(1, policyId); - statement.setString(2, pub); - statement.addBatch(); - } - int[] num = statement.executeBatch(); - connection.commit(); - connection.setAutoCommit(true); - int sum = 0; - for (int i : num) { - sum += i; - } - result.code = OpResult.SUCCESS; - result.message = String.format("Add %d records into policy_publishment", sum); - } catch (SQLException ex) { - LOG.error("Error to add publishments to policy {}", policyId, ex); - result.code = OpResult.FAILURE; - result.message = ex.getMessage(); - } finally { - closeResource(null, statement, connection); - } - LOG.info(result.message); - return result; - } - - public OpResult removeById(String clzName, String key) { - Connection connection = null; - PreparedStatement statement = null; - OpResult result = new OpResult(); - try { - String tb = getTableName(clzName); - connection = dataSource.getConnection(); - statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb)); - statement.setString(1, key); - LOG.info("start to delete records from {} with id={}", tb, key); - int status = statement.executeUpdate(); - result.code = OpResult.SUCCESS; - result.message = String.format("removed %d records from %s successfully", status, tb); - } catch (SQLException ex) { - result.code = OpResult.FAILURE; - result.message = ex.getMessage(); - } finally { - closeResource(null, statement, connection); - } - LOG.info(result.message); - return result; - } - - public void close() throws IOException { - //JdbcSchemaManager.getInstance().shutdown(); - } - - public OpResult removeScheduleStates(int capacity) { - Connection connection = null; - PreparedStatement statement = null; - OpResult result = new OpResult(); - try { - connection = dataSource.getConnection(); - statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT); - statement.setInt(1, capacity); - LOG.info("start to delete schedule states"); - int status = statement.executeUpdate(); - result.code = OpResult.SUCCESS; - result.message = String.format("removed %d records from schedule_state successfully", status); - } catch (SQLException ex) { - result.code = OpResult.FAILURE; - result.message = ex.getMessage(); - } finally { - closeResource(null, statement, connection); - } - LOG.info(result.message); - return result; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java deleted file mode 100644 index a02c51e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.metadata.impl; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.metadata.MetadataUtils; -import com.typesafe.config.Config; -import org.apache.ddlutils.Platform; -import org.apache.ddlutils.PlatformFactory; -import org.apache.ddlutils.model.Column; -import org.apache.ddlutils.model.Database; -import org.apache.ddlutils.model.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Types; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -@Deprecated -public class JdbcSchemaManager { - - private static final Logger LOG = LoggerFactory.getLogger(JdbcSchemaManager.class); - private Database database; - private Platform platform; - - private static JdbcSchemaManager instance; - - public static Map<String, String> tblNameMap = new HashMap<>(); - - private JdbcSchemaManager() { - } - - private static void registerTableName(String clzName, String tblName) { - tblNameMap.put(clzName, tblName); - } - - static { - registerTableName(StreamingCluster.class.getSimpleName(), "cluster"); - registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema"); - registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource"); - registerTableName(PolicyDefinition.class.getSimpleName(), "policy"); - registerTableName(Publishment.class.getSimpleName(), "publishment"); - registerTableName(PublishmentType.class.getSimpleName(), "publishment_type"); - registerTableName(ScheduleState.class.getSimpleName(), "schedule_state"); - registerTableName(PolicyAssignment.class.getSimpleName(), "assignment"); - registerTableName(Topology.class.getSimpleName(), "topology"); - registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event"); - } - - public static JdbcSchemaManager getInstance() { - if (instance == null) { - instance = new JdbcSchemaManager(); - } - return instance; - } - - public void init(Config config) { - Connection connection = null; - try { - this.platform = PlatformFactory.createNewPlatformInstance("mysql"); - - connection = MetadataUtils.getJdbcConnection(config); - String dbName = config.getString(MetadataUtils.JDBC_DATABASE_PATH); - this.database = platform.readModelFromDatabase(connection, dbName); - LOG.info("Loaded " + database); - - Database _database = identifyNewTables(); - if (_database.getTableCount() > 0) { - LOG.info("Creating {} new tables (totally {} tables)", _database.getTableCount(), database.getTableCount()); - this.platform.createTables(connection, _database, false, true); - LOG.info("Created {} new tables: ", _database.getTableCount(), _database.getTables()); - } else { - LOG.debug("All the {} tables have already been created, no new tables", database.getTableCount()); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IllegalStateException(e); - } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOG.warn(e.getMessage(), e); - } - } - } - } - - private Database identifyNewTables() { - Database _database = new Database(); - _database.setName(database.getName()); - Collection<String> tableNames = tblNameMap.values(); - LOG.info("Initializing database and creating tables"); - for (String tableName : tableNames) { - if (database.findTable(tableName) == null) { - Table table = createTable(tableName); - LOG.info("Creating {}", table.toVerboseString()); - _database.addTable(table); - database.addTable(table); - } else { - LOG.debug("Table {} already exists", tableName); - } - } - return _database; - } - - public void shutdown() { - this.platform.shutdownDatabase(); - } - - private Table createTable(String tableName) { - Table table = new Table(); - table.setName(tableName); - buildTable(table); - return table; - } - - private void buildTable(Table table) { - Column id = new Column(); - id.setName("id"); - id.setPrimaryKey(true); - id.setRequired(true); - id.setTypeCode(Types.VARCHAR); - id.setSize("50"); - table.addColumn(id); - - Column value = new Column(); - value.setName("value"); - value.setTypeCode(Types.CLOB); - table.addColumn(value); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java deleted file mode 100644 index 5082e50..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metadata.impl; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.MetadataUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Constructor; - -/** - * @since Apr 12, 2016. - */ -public class MetadataDaoFactory { - - private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class); - - private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory(); - - private IMetadataDao dao; - - private MetadataDaoFactory() { - Config config = ConfigFactory.load(); - if (!config.hasPath(MetadataUtils.META_DATA)) { - LOG.warn("metadata is not configured, use in-memory store !!!"); - dao = new InMemMetadataDaoImpl(null); - } else { - Config metaDataConfig = config.getConfig(MetadataUtils.META_DATA); - try { - String clsName = metaDataConfig.getString(MetadataUtils.ALERT_META_DATA_DAO); - Class<?> clz; - clz = Thread.currentThread().getContextClassLoader().loadClass(clsName); - if (IMetadataDao.class.isAssignableFrom(clz)) { - Constructor<?> cotr = clz.getConstructor(Config.class); - LOG.info("metadata.alertMetadataDao loaded: " + clsName); - dao = (IMetadataDao) cotr.newInstance(metaDataConfig); - } else { - throw new Exception("metadata.metadataDao configuration need to be implementation of IMetadataDao! "); - } - } catch (Exception e) { - LOG.error("error when initialize the dao, fall back to in memory mode!", e); - dao = new InMemMetadataDaoImpl(metaDataConfig); - } - } - } - - public static MetadataDaoFactory getInstance() { - return INSTANCE; - } - - public IMetadataDao getMetadataDao() { - return dao; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java deleted file mode 100644 index 2325f90..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java +++ /dev/null @@ -1,753 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metadata.impl; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.eagle.alert.coordination.model.AlertBoltSpec; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.coordination.model.RouterSpec; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition; -import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.ScheduleStateBase; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.PublishmentType; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamingCluster; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.MetadataUtils; -import org.apache.eagle.alert.metadata.resource.Models; -import org.apache.eagle.alert.metadata.resource.OpResult; -import org.bson.BsonDocument; -import org.bson.BsonInt32; -import org.bson.BsonString; -import org.bson.Document; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import com.mongodb.Block; -import com.mongodb.Function; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.MongoIterable; -import com.mongodb.client.model.CreateCollectionOptions; -import com.mongodb.client.model.IndexOptions; -import com.mongodb.client.model.UpdateOptions; -import com.mongodb.client.result.DeleteResult; -import com.mongodb.client.result.UpdateResult; -import com.typesafe.config.Config; - -/** - * @since Apr 11, 2016. - */ -public class MongoMetadataDaoImpl implements IMetadataDao { - - private static final String DEFAULT_DB_NAME = "ump_alert_metadata"; - private static final Logger LOG = LoggerFactory.getLogger(MongoMetadataDaoImpl.class); - private static final ObjectMapper mapper = new ObjectMapper(); - private static final int DEFAULT_CAPPED_MAX_SIZE = 500 * 1024 * 1024; - private static final int DEFAULT_CAPPED_MAX_DOCUMENTS = 20000; - private static final String MONGO_CAPPED_MAX_SIZE = "mongo.cappedMaxSize"; - private static final String MONGO_CAPPED_MAX_DOCUMENTS = "mongo.cappedMaxDocuments"; - - static { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private final String connection; - private final String dbname; - private final MongoClient client; - private final int cappedMaxSize; - private final int cappedMaxDocuments; - - private MongoDatabase db; - private MongoCollection<Document> cluster; - private MongoCollection<Document> schema; - private MongoCollection<Document> datasource; - private MongoCollection<Document> policy; - private MongoCollection<Document> publishment; - private MongoCollection<Document> publishmentType; - private MongoCollection<Document> topologies; - private MongoCollection<Document> alerts; - - // scheduleStates splits to several collections - private MongoCollection<Document> scheduleStates; - private MongoCollection<Document> spoutSpecs; - private MongoCollection<Document> alertSpecs; - private MongoCollection<Document> groupSpecs; - private MongoCollection<Document> publishSpecs; - private MongoCollection<Document> policySnapshots; - private MongoCollection<Document> streamSnapshots; - private MongoCollection<Document> monitoredStreams; - private MongoCollection<Document> assignments; - - @Inject - public MongoMetadataDaoImpl(Config config) { - this.connection = config.getString(MetadataUtils.MONGO_CONNECTION_PATH); - this.cappedMaxSize = config.hasPath(MONGO_CAPPED_MAX_SIZE) ? config.getInt(MONGO_CAPPED_MAX_SIZE) : DEFAULT_CAPPED_MAX_SIZE; - this.cappedMaxDocuments = config.hasPath(MONGO_CAPPED_MAX_DOCUMENTS) ? config.getInt(MONGO_CAPPED_MAX_DOCUMENTS) : DEFAULT_CAPPED_MAX_DOCUMENTS; - this.client = new MongoClient(new MongoClientURI(this.connection)); - this.dbname = config.hasPath(MetadataUtils.MONGO_DATABASE) ? config.getString(MetadataUtils.MONGO_DATABASE) : DEFAULT_DB_NAME; - init(); - } - - private boolean isCollectionExists(String collectionName) { - boolean result = false; - MongoIterable<String> allCollections = db.listCollectionNames(); - for ( String collection : allCollections ) { - if (collection.equals(collectionName)) { - result = true; - break; - } - } - - return result; - } - - private MongoCollection<Document> getCollection(String collectionName) { - // first check if collection exists, if not then create a new collection with cappedSize - if (!isCollectionExists(collectionName)) { - CreateCollectionOptions option = new CreateCollectionOptions(); - option.capped(true); - option.maxDocuments(cappedMaxDocuments); - option.sizeInBytes(cappedMaxSize); - db.createCollection(collectionName, option); - } - - return db.getCollection(collectionName); - - } - - private void init() { - db = client.getDatabase(this.dbname); - IndexOptions io = new IndexOptions().background(true).name("nameIndex"); - BsonDocument doc = new BsonDocument(); - doc.append("name", new BsonInt32(1)); - cluster = db.getCollection("clusters"); - cluster.createIndex(doc, io); - { - BsonDocument doc2 = new BsonDocument(); - doc2.append("streamId", new BsonInt32(1)); - schema = db.getCollection("schemas"); - schema.createIndex(doc2, io); - } - datasource = db.getCollection("datasources"); - datasource.createIndex(doc, io); - policy = db.getCollection("policies"); - policy.createIndex(doc, io); - publishment = db.getCollection("publishments"); - publishment.createIndex(doc, io); - topologies = db.getCollection("topologies"); - topologies.createIndex(doc, io); - publishmentType = db.getCollection("publishmentTypes"); - publishmentType.createIndex(doc, io); - - alerts = db.getCollection("alerts"); - { - IndexOptions io1 = new IndexOptions().background(true).unique(true).name("alertIndex"); - BsonDocument doc1 = new BsonDocument(); - doc1.append("alertId", new BsonInt32(1)); - alerts.createIndex(doc1, io1); - } - - - // below is for schedule_specs and its splitted collections - BsonDocument doc1 = new BsonDocument(); - IndexOptions io1 = new IndexOptions().background(true).name("versionIndex"); - doc1.append("version", new BsonInt32(1)); - scheduleStates = getCollection("schedule_specs"); - scheduleStates.createIndex(doc1, io1); - - spoutSpecs = getCollection("spoutSpecs"); - { - IndexOptions ioInternal = new IndexOptions().background(true).name("topologyIdIndex"); - BsonDocument docInternal = new BsonDocument(); - docInternal.append("topologyId", new BsonInt32(1)); - spoutSpecs.createIndex(docInternal, ioInternal); - } - - alertSpecs = getCollection("alertSpecs"); - { - IndexOptions ioInternal = new IndexOptions().background(true).name("topologyNameIndex"); - BsonDocument docInternal = new BsonDocument(); - docInternal.append("topologyName", new BsonInt32(1)); - alertSpecs.createIndex(docInternal, ioInternal); - } - - groupSpecs = getCollection("groupSpecs"); - groupSpecs.createIndex(doc1, io1); - - publishSpecs = getCollection("publishSpecs"); - publishSpecs.createIndex(doc1, io1); - - policySnapshots = getCollection("policySnapshots"); - policySnapshots.createIndex(doc1, io); - - streamSnapshots = getCollection("streamSnapshots"); - streamSnapshots.createIndex(doc1, io); - - monitoredStreams = getCollection("monitoredStreams"); - monitoredStreams.createIndex(doc1, io); - - assignments = getCollection("assignments"); - assignments.createIndex(doc1, io1); - } - - @Override - public List<StreamingCluster> listClusters() { - return list(cluster, StreamingCluster.class); - } - - - private <T> OpResult addOrReplace(MongoCollection<Document> collection, T t) { - BsonDocument filter = new BsonDocument(); - if (t instanceof StreamDefinition) { - filter.append("streamId", new BsonString(MetadataUtils.getKey(t))); - } else if (t instanceof AlertPublishEvent) { - filter.append("alertId", new BsonString(MetadataUtils.getKey(t))); - } else { - filter.append("name", new BsonString(MetadataUtils.getKey(t))); - } - - String json = ""; - OpResult result = new OpResult(); - try { - json = mapper.writeValueAsString(t); - UpdateOptions options = new UpdateOptions(); - options.upsert(true); - UpdateResult ur = collection.replaceOne(filter, Document.parse(json), options); - // FIXME: could based on matched count do better matching... - if (ur.getModifiedCount() > 0 || ur.getUpsertedId() != null) { - result.code = 200; - result.message = String.format("update %d configuration item.", ur.getModifiedCount()); - } else { - result.code = 500; - result.message = "no configuration item create/updated."; - } - } catch (Exception e) { - result.code = 500; - result.message = e.getMessage(); - LOG.error("", e); - } - return result; - } - - private <T> OpResult remove(MongoCollection<Document> collection, String name) { - return removeObject(collection, "name", name); - } - - private <T> OpResult removeObject(MongoCollection<Document> collection, String nameField, String name) { - BsonDocument filter = new BsonDocument(); - filter.append(nameField, new BsonString(name)); - DeleteResult dr = collection.deleteOne(filter); - OpResult result = new OpResult(); - result.code = 200; - result.message = String.format(" %d config item removed!", dr.getDeletedCount()); - return result; - } - - @Override - public OpResult addCluster(StreamingCluster cluster) { - return addOrReplace(this.cluster, cluster); - } - - @Override - public OpResult removeCluster(String clusterId) { - return remove(cluster, clusterId); - } - - @Override - public List<StreamDefinition> listStreams() { - return list(schema, StreamDefinition.class); - } - - @Override - public OpResult createStream(StreamDefinition stream) { - return addOrReplace(this.schema, stream); - } - - @Override - public OpResult removeStream(String streamId) { - return removeObject(schema, "streamId", streamId); - } - - @Override - public List<Kafka2TupleMetadata> listDataSources() { - return list(datasource, Kafka2TupleMetadata.class); - } - - @Override - public OpResult addDataSource(Kafka2TupleMetadata dataSource) { - return addOrReplace(this.datasource, dataSource); - } - - @Override - public OpResult removeDataSource(String datasourceId) { - return remove(datasource, datasourceId); - } - - @Override - public List<PolicyDefinition> listPolicies() { - return list(policy, PolicyDefinition.class); - } - - @Override - public OpResult addPolicy(PolicyDefinition policy) { - return addOrReplace(this.policy, policy); - } - - @Override - public OpResult removePolicy(String policyId) { - return remove(policy, policyId); - } - - @Override - public List<Publishment> listPublishment() { - return list(publishment, Publishment.class); - } - - @Override - public OpResult addPublishment(Publishment publishment) { - return addOrReplace(this.publishment, publishment); - } - - @Override - public OpResult removePublishment(String pubId) { - return remove(publishment, pubId); - } - - @Override - public List<PublishmentType> listPublishmentType() { - return list(publishmentType, PublishmentType.class); - } - - @Override - public OpResult addPublishmentType(PublishmentType pubType) { - return addOrReplace(this.publishmentType, pubType); - } - - @Override - public OpResult removePublishmentType(String pubType) { - return remove(publishmentType, pubType); - } - - @Override - public List<AlertPublishEvent> listAlertPublishEvent(int size) { - List<AlertPublishEvent> result = list(alerts, AlertPublishEvent.class); - if (size < 0 || size > result.size()) { - size = result.size(); - } - return result.subList(result.size() - size, result.size()); - } - - @Override - public AlertPublishEvent getAlertPublishEvent(String alertId) { - List<AlertPublishEvent> results = list(alerts, AlertPublishEvent.class); - Optional<AlertPublishEvent> op = results.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny(); - if (op.isPresent()) { - return op.get(); - } - return null; - } - - @Override - public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) { - List<AlertPublishEvent> events = list(alerts, AlertPublishEvent.class); - List<AlertPublishEvent> result = events.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList()); - if (size < 0 || size > result.size()) { - size = result.size(); - } - return events.subList(result.size() - size, result.size()); - } - - @Override - public OpResult addAlertPublishEvent(AlertPublishEvent event) { - return addOrReplace(alerts, event); - } - - private <T> OpResult addOne(MongoCollection<Document> collection, T t) { - OpResult result = new OpResult(); - String json = ""; - try { - json = mapper.writeValueAsString(t); - collection.insertOne(Document.parse(json)); - result.code = 200; - result.message = String.format("add one document [%s] to collection [%s] succeed!", json, collection.getNamespace()); - LOG.info(result.message); - } catch (Exception e) { - result.code = 400; - result.message = e.getMessage(); - LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, collection.getNamespace()), e); - } - return result; - } - - /** - * Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name, we need to transform the - * format to store in Mongo. - * @return opresult - */ - private <T> OpResult addOneSpoutSpec(T t) { - OpResult result = new OpResult(); - String json = ""; - try { - json = mapper.writeValueAsString(t); - Document doc = Document.parse(json); - - String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"}; - for (String metadataMapName: metadataMapArrays) { - Document _metadataMapDoc = (Document) doc.get(metadataMapName); - doc.remove(metadataMapName); - - ArrayList<Document> _metadataMapArray = new ArrayList<>(); - - for ( String key : _metadataMapDoc.keySet()) { - Document _subDoc = new Document(); - _subDoc.put("topicName", key); - _subDoc.put(metadataMapName, _metadataMapDoc.get(key)); - _metadataMapArray.add(_subDoc); - } - doc.append(metadataMapName, _metadataMapArray); - } - - spoutSpecs.insertOne(doc); - result.code = 200; - result.message = String.format("add one document [%s] to collection [%s] succeed!", doc.toJson(), spoutSpecs.getNamespace()); - LOG.info(result.message); - } catch (Exception e) { - result.code = 400; - result.message = e.getMessage(); - LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, spoutSpecs.getNamespace()), e); - } - return result; - } - - @Override - public ScheduleState getScheduleState(String versionId) { - BsonDocument doc = new BsonDocument(); - doc.append("version", new BsonString(versionId)); - ScheduleState state = scheduleStates.find(doc).map(new Function<Document, ScheduleState>() { - @Override - public ScheduleState apply(Document t) { - String json = t.toJson(); - try { - return mapper.readValue(json, ScheduleState.class); - } catch (IOException e) { - LOG.error("deserialize config item failed!", e); - } - return null; - } - }).first(); - - if (state != null) { - // based on version, to add content from collections of spoutSpecs/alertSpecs/etc.. - state = addDetailForScheduleState(state, versionId); - } - - return state; - } - - /** - * get the basic ScheduleState, and then based on the version to get all sub-part(spoutSpecs/alertSpecs/etc) - * to form a completed ScheduleState. - * @return the latest ScheduleState - */ - @Override - public ScheduleState getScheduleState() { - BsonDocument sort = new BsonDocument(); - sort.append("generateTime", new BsonInt32(-1)); - ScheduleState state = scheduleStates.find().sort(sort).map(new Function<Document, ScheduleState>() { - @Override - public ScheduleState apply(Document t) { - String json = t.toJson(); - try { - return mapper.readValue(json, ScheduleState.class); - } catch (IOException e) { - LOG.error("deserialize config item failed!", e); - } - return null; - } - }).first(); - - if (state != null) { - String version = state.getVersion(); - // based on version, to add content from collections of spoutSpecs/alertSpecs/etc.. - state = addDetailForScheduleState(state, version); - } - - return state; - } - - @Override - public List<ScheduleState> listScheduleStates() { - throw new UnsupportedOperationException("listScheduleStates not support!"); - } - - @Override - public OpResult clearScheduleState(int maxCapacity) { - throw new UnsupportedOperationException("clearScheduleState not support!"); - } - - private ScheduleState addDetailForScheduleState(ScheduleState state, String version) { - Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version); - if (spoutMaps.size() != 0) { - state.setSpoutSpecs(spoutMaps); - } - - Map<String, AlertBoltSpec> alertMaps = maps(alertSpecs, AlertBoltSpec.class, version); - if (alertMaps.size() != 0) { - state.setAlertSpecs(alertMaps); - } - - Map<String, RouterSpec> groupMaps = maps(groupSpecs, RouterSpec.class, version); - if (groupMaps.size() != 0) { - state.setGroupSpecs(groupMaps); - } - - Map<String, PublishSpec> publishMaps = maps(publishSpecs, PublishSpec.class, version); - if (publishMaps.size() != 0) { - state.setPublishSpecs(publishMaps); - } - - List<VersionedPolicyDefinition> policyLists = list(policySnapshots, VersionedPolicyDefinition.class, version); - if (policyLists.size() != 0) { - state.setPolicySnapshots(policyLists); - } - - List<VersionedStreamDefinition> streamLists = list(streamSnapshots, VersionedStreamDefinition.class, version); - if (streamLists.size() != 0) { - state.setStreamSnapshots(streamLists); - } - - List<MonitoredStream> monitorLists = list(monitoredStreams, MonitoredStream.class, version); - if (monitorLists.size() != 0) { - state.setMonitoredStreams(monitorLists); - } - - List<PolicyAssignment> assignmentLists = list(assignments, PolicyAssignment.class, version); - if (assignmentLists.size() != 0) { - state.setAssignments(assignmentLists); - } - return state; - } - - private <T> Map<String, T> maps(MongoCollection<Document> collection, Class<T> clz, String version) { - BsonDocument doc = new BsonDocument(); - doc.append("version", new BsonString(version)); - - Map<String, T> maps = new HashMap<String, T>(); - String mapKey = (clz == SpoutSpec.class) ? "topologyId" : "topologyName"; - collection.find(doc).forEach(new Block<Document>() { - @Override - public void apply(Document document) { - String json = document.toJson(); - try { - //Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name, - // we need to transform the format while reading from Mongo. - if (clz == SpoutSpec.class) { - Document doc = Document.parse(json); - String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"}; - for (String metadataMapName: metadataMapArrays) { - ArrayList<Document> subDocs = (ArrayList) doc.get(metadataMapName); - doc.remove(metadataMapName); - - Document replaceDoc = new Document(); - for ( Document subDoc : subDocs) { - replaceDoc.put((String) subDoc.get("topicName"), subDoc.get(metadataMapName)); - } - doc.put(metadataMapName, replaceDoc); - } - - json = doc.toJson(); - } - T t = mapper.readValue(json, clz); - maps.put(document.getString(mapKey), t); - } catch (IOException e) { - LOG.error("deserialize config item failed!", e); - } - } - }); - - return maps; - } - - private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz, String version) { - BsonDocument doc = new BsonDocument(); - doc.append("version", new BsonString(version)); - - List<T> result = new LinkedList<T>(); - collection.find(doc).map(new Function<Document, T>() { - @Override - public T apply(Document t) { - String json = t.toJson(); - try { - return mapper.readValue(json, clz); - } catch (IOException e) { - LOG.error("deserialize config item failed!", e); - } - return null; - } - }).into(result); - return result; - } - - private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz) { - List<T> result = new LinkedList<T>(); - collection.find().map(new Function<Document, T>() { - @Override - public T apply(Document t) { - String json = t.toJson(); - try { - return mapper.readValue(json, clz); - } catch (IOException e) { - LOG.error("deserialize config item failed!", e); - } - return null; - } - }).into(result); - return result; - } - - /** - * write ScheduleState to several collections. basic info writes to ScheduleState, other writes to collections - * named by spoutSpecs/alertSpecs/etc. - * - * @param state - * @return - */ - @Override - public OpResult addScheduleState(ScheduleState state) { - OpResult result = new OpResult(); - try { - for (String key : state.getSpoutSpecs().keySet()) { - SpoutSpec spoutSpec = state.getSpoutSpecs().get(key); - addOneSpoutSpec(spoutSpec); - } - - for (String key : state.getAlertSpecs().keySet()) { - AlertBoltSpec alertBoltSpec = state.getAlertSpecs().get(key); - addOne(alertSpecs, alertBoltSpec); - } - - for (String key : state.getGroupSpecs().keySet()) { - RouterSpec groupSpec = state.getGroupSpecs().get(key); - addOne(groupSpecs, groupSpec); - } - - for (String key : state.getPublishSpecs().keySet()) { - PublishSpec publishSpec = state.getPublishSpecs().get(key); - addOne(publishSpecs, publishSpec); - } - - for (VersionedPolicyDefinition policySnapshot : state.getPolicySnapshots()) { - addOne(policySnapshots, policySnapshot); - } - - for (VersionedStreamDefinition streamSnapshot : state.getStreamSnapshots()) { - addOne(streamSnapshots, streamSnapshot); - } - - for (MonitoredStream monitoredStream : state.getMonitoredStreams()) { - addOne(monitoredStreams, monitoredStream); - } - - for (PolicyAssignment assignment : state.getAssignments()) { - addOne(assignments, assignment); - } - - ScheduleStateBase stateBase = new ScheduleStateBase( - state.getVersion(), state.getGenerateTime(), state.getCode(), - state.getMessage(), state.getScheduleTimeMillis()); - - addOne(scheduleStates, stateBase); - - result.code = 200; - result.message = "add document to collection schedule_specs succeed"; - } catch (Exception e) { - result.code = 400; - result.message = e.getMessage(); - LOG.error("", e); - } - return result; - } - - @Override - public List<PolicyAssignment> listAssignments() { - return list(assignments, PolicyAssignment.class); - } - - @Override - public OpResult addAssignment(PolicyAssignment assignment) { - return addOne(assignments, assignment); - } - - @Override - public List<Topology> listTopologies() { - return list(topologies, Topology.class); - } - - @Override - public OpResult addTopology(Topology t) { - return addOrReplace(this.topologies, t); - } - - @Override - public OpResult removeTopology(String topologyName) { - return remove(topologies, topologyName); - } - - @Override - public OpResult clear() { - throw new UnsupportedOperationException("clear not support!"); - } - - @Override - public Models export() { - throw new UnsupportedOperationException("export not support!"); - } - - @Override - public OpResult importModels(Models models) { - throw new UnsupportedOperationException("importModels not support!"); - } - - @Override - public void close() throws IOException { - client.close(); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java deleted file mode 100644 index 2463e5b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metadata.resource; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamingCluster; - -import java.util.ArrayList; -import java.util.List; -import java.util.SortedMap; -import java.util.TreeMap; - -/** - * This models used for metadata export/import to easy of test. - * - * @since May 23, 2016 - */ -public class Models { - public List<StreamingCluster> clusters = new ArrayList<StreamingCluster>(); - public List<StreamDefinition> schemas = new ArrayList<StreamDefinition>(); - public List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>(); - public List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>(); - public List<Publishment> publishments = new ArrayList<Publishment>(); - public SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>(); - public List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>(); - public List<Topology> topologies = new ArrayList<Topology>(); -}
