This is an automated email from the ASF dual-hosted git repository. gabriel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/master by this push: new 1e0be52 Add influxdb to statscollector (#3078) 1e0be52 is described below commit 1e0be522b1a385fb9c24c91d77efa9c36691acd6 Author: Gabriel Beims Bräscher <gabrasc...@gmail.com> AuthorDate: Wed Jan 9 11:22:35 2019 +0100 Add influxdb to statscollector (#3078) * Add Support for InfluxDB on StatsCollector * Code refactored to fit Inner Class architecture. Due to the inner class structure, test case for some methods will not be implemented. On the future it will be necessary to refactor the whole StatsCOllector architecture and extract inner classes. Each Inner Class that is a "stats collector" and sends data to Influx will extend AbstractStatsCollector to send metrics to the correct measure ("table"). For instance, HostCollector sends data to host_stats, VmStatsCollector sends data to vm_stats. Add ping test for ensure that the target InfluxDB host is reachable * Address PR reviews * Enhance and tests implemented addressing reviewers. * Set variables to private --- core/pom.xml | 5 + .../java/com/cloud/agent/api/HostStatsEntry.java | 14 + .../java/com/cloud/agent/api/VmStatsEntry.java | 49 +- server/pom.xml | 5 + .../main/java/com/cloud/server/StatsCollector.java | 688 ++++++++++++++------- .../java/com/cloud/server/StatsCollectorTest.java | 227 +++++++ .../cloudstack/utils/graphite/GraphiteClient.java | 6 +- 7 files changed, 743 insertions(+), 251 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index e95a72a..a3ef70a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -39,6 +39,11 @@ </dependency> <dependency> <groupId>org.apache.cloudstack</groupId> + <artifactId>cloud-engine-schema</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cloudstack</groupId> <artifactId>cloud-framework-security</artifactId> <version>${project.version}</version> </dependency> diff --git a/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java b/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java index 82041a6..5fd1c51 100644 --- a/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java +++ b/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java @@ -20,10 +20,12 @@ package com.cloud.agent.api; import com.cloud.host.HostStats; +import com.cloud.host.HostVO; public class HostStatsEntry implements HostStats { long hostId; + HostVO hostVo; String entityType; double cpuUtilization; double networkReadKBs; @@ -112,4 +114,16 @@ public class HostStatsEntry implements HostStats { public void setHostId(long hostId) { this.hostId = hostId; } + + public long getHostId() { + return hostId; + } + + public HostVO getHostVo() { + return hostVo; + } + + public void setHostVo(HostVO hostVo) { + this.hostVo = hostVo; + } } diff --git a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java index e6063b9..9f82808 100644 --- a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java +++ b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java @@ -19,22 +19,25 @@ package com.cloud.agent.api; +import com.cloud.vm.UserVmVO; import com.cloud.vm.VmStats; public class VmStatsEntry implements VmStats { - double cpuUtilization; - double networkReadKBs; - double networkWriteKBs; - double diskReadIOs; - double diskWriteIOs; - double diskReadKBs; - double diskWriteKBs; - double memoryKBs; - double intfreememoryKBs; - double targetmemoryKBs; - int numCPUs; - String entityType; + private long vmId; + private UserVmVO userVmVO; + private double cpuUtilization; + private double networkReadKBs; + private double networkWriteKBs; + private double diskReadIOs; + private double diskWriteIOs; + private double diskReadKBs; + private double diskWriteKBs; + private double memoryKBs; + private double intfreememoryKBs; + private double targetmemoryKBs; + private int numCPUs; + private String entityType; public VmStatsEntry() { } @@ -50,14 +53,12 @@ public class VmStatsEntry implements VmStats { this.entityType = entityType; } - public VmStatsEntry(double cpuUtilization, double networkReadKBs, double networkWriteKBs, double diskReadKBs, double diskWriteKBs, int numCPUs, String entityType) { - this.cpuUtilization = cpuUtilization; - this.networkReadKBs = networkReadKBs; - this.networkWriteKBs = networkWriteKBs; - this.diskReadKBs = diskReadKBs; - this.diskWriteKBs = diskWriteKBs; - this.numCPUs = numCPUs; - this.entityType = entityType; + public long getVmId() { + return vmId; + } + + public void setVmId(long vmId) { + this.vmId = vmId; } @Override @@ -166,4 +167,12 @@ public class VmStatsEntry implements VmStats { this.entityType = entityType; } + public UserVmVO getUserVmVO() { + return userVmVO; + } + + public void setUserVmVO(UserVmVO userVmVO) { + this.userVmVO = userVmVO; + } + } diff --git a/server/pom.xml b/server/pom.xml index e246150..1987383 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -157,6 +157,11 @@ <groupId>org.opensaml</groupId> <artifactId>opensaml</artifactId> </dependency> + <dependency> + <groupId>org.influxdb</groupId> + <artifactId>influxdb-java</artifactId> + <version>2.8</version> + </dependency> </dependencies> <build> <plugins> diff --git a/server/src/main/java/com/cloud/server/StatsCollector.java b/server/src/main/java/com/cloud/server/StatsCollector.java index b66fa5f..8e2bc7e 100644 --- a/server/src/main/java/com/cloud/server/StatsCollector.java +++ b/server/src/main/java/com/cloud/server/StatsCollector.java @@ -20,6 +20,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -46,8 +47,15 @@ import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.utils.graphite.GraphiteClient; import org.apache.cloudstack.utils.graphite.GraphiteException; import org.apache.cloudstack.utils.usage.UsageUtils; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Pong; import org.springframework.stereotype.Component; import com.cloud.agent.AgentManager; @@ -121,6 +129,7 @@ import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; import com.cloud.utils.db.TransactionCallbackNoReturn; import com.cloud.utils.db.TransactionStatus; +import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.net.MacAddress; import com.cloud.vm.NicVO; import com.cloud.vm.UserVmManager; @@ -140,7 +149,7 @@ import com.cloud.vm.dao.VMInstanceDao; public class StatsCollector extends ManagerBase implements ComponentMethodInterceptable, Configurable { public static enum ExternalStatsProtocol { - NONE("none"), GRAPHITE("graphite"); + NONE("none"), GRAPHITE("graphite"), INFLUXDB("influxdb"); String _type; ExternalStatsProtocol(String type) { @@ -155,16 +164,52 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc public static final Logger s_logger = Logger.getLogger(StatsCollector.class.getName()); - static final ConfigKey<Integer> vmDiskStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval", "0", + private static final int UNDEFINED_PORT_VALUE = -1; + + /** + * Default value for the Graphite connection port: {@value} + */ + private static final int GRAPHITE_DEFAULT_PORT = 2003; + + /** + * Default value for the InfluxDB connection port: {@value} + */ + private static final int INFLUXDB_DEFAULT_PORT = 8086; + + private static final String UUID_TAG = "uuid"; + + private static final String TOTAL_MEMORY_KBS_FIELD = "total_memory_kb"; + private static final String FREE_MEMORY_KBS_FIELD = "free_memory_kb"; + private static final String CPU_UTILIZATION_FIELD = "cpu_utilization"; + private static final String CPUS_FIELD = "cpus"; + private static final String CPU_SOCKETS_FIELD = "cpu_sockets"; + private static final String NETWORK_READ_KBS_FIELD = "network_read_kbs"; + private static final String NETWORK_WRITE_KBS_FIELD = "network_write_kbs"; + private static final String MEMORY_TARGET_KBS_FIELD = "memory_target_kbs"; + private static final String DISK_READ_IOPS_FIELD = "disk_read_iops"; + private static final String DISK_READ_KBS_FIELD = "disk_read_kbs"; + private static final String DISK_WRITE_IOPS_FIELD = "disk_write_iops"; + private static final String DISK_WRITE_KBS_FIELD = "disk_write_kbs"; + + private static final String DEFAULT_DATABASE_NAME = "cloudstack"; + private static final String INFLUXDB_HOST_MEASUREMENT = "host_stats"; + private static final String INFLUXDB_VM_MEASUREMENT = "vm_stats"; + + private static final ConfigKey<Integer> vmDiskStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval", "0", "Interval (in seconds) to report vm disk statistics. Vm disk statistics will be disabled if this is set to 0 or less than 0.", false); - static final ConfigKey<Integer> vmDiskStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval.min", "300", + private static final ConfigKey<Integer> vmDiskStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval.min", "300", "Minimal interval (in seconds) to report vm disk statistics. If vm.disk.stats.interval is smaller than this, use this to report vm disk statistics.", false); - static final ConfigKey<Integer> vmNetworkStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval", "0", + private static final ConfigKey<Integer> vmNetworkStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval", "0", "Interval (in seconds) to report vm network statistics (for Shared networks). Vm network statistics will be disabled if this is set to 0 or less than 0.", false); - static final ConfigKey<Integer> vmNetworkStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval.min", "300", - "Minimal Interval (in seconds) to report vm network statistics (for Shared networks). If vm.network.stats.interval is smaller than this, use this to report vm network statistics.", false); - static final ConfigKey<Integer> StatsTimeout = new ConfigKey<Integer>("Advanced", Integer.class, "stats.timeout", "60000", - "The timeout for stats call in milli seconds.", true, ConfigKey.Scope.Cluster); + private static final ConfigKey<Integer> vmNetworkStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval.min", "300", + "Minimal Interval (in seconds) to report vm network statistics (for Shared networks). If vm.network.stats.interval is smaller than this, use this to report vm network statistics.", + false); + private static final ConfigKey<Integer> StatsTimeout = new ConfigKey<Integer>("Advanced", Integer.class, "stats.timeout", "60000", + "The timeout for stats call in milli seconds.", true, + ConfigKey.Scope.Cluster); + private static final ConfigKey<String> statsOutputUri = new ConfigKey<String>("Advanced", String.class, "stats.output.uri", "", + "URI to send StatsCollector statistics to. The collector is defined on the URI scheme. Example: graphite://graphite-hostaddress:port or influxdb://influxdb-hostaddress/dbname. Note that the port is optional, if not added the default port for the respective collector (graphite or influxdb) will be used. Additionally, the database name '/dbname' is also optional; default db name is 'cloudstack'. You must create and configure the database if using influxdb.", + true); private static StatsCollector s_instance = null; @@ -236,20 +281,20 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc private ConcurrentHashMap<Long, StorageStats> _storageStats = new ConcurrentHashMap<Long, StorageStats>(); private ConcurrentHashMap<Long, StorageStats> _storagePoolStats = new ConcurrentHashMap<Long, StorageStats>(); - long hostStatsInterval = -1L; - long hostAndVmStatsInterval = -1L; - long storageStatsInterval = -1L; - long volumeStatsInterval = -1L; - long autoScaleStatsInterval = -1L; + private long hostStatsInterval = -1L; + private long hostAndVmStatsInterval = -1L; + private long storageStatsInterval = -1L; + private long volumeStatsInterval = -1L; + private long autoScaleStatsInterval = -1L; - List<Long> hostIds = null; private double _imageStoreCapacityThreshold = 0.90; - String externalStatsPrefix = ""; + private String externalStatsPrefix = ""; String externalStatsHost = null; int externalStatsPort = -1; - boolean externalStatsEnabled = false; + private String externalStatsScheme; ExternalStatsProtocol externalStatsType = ExternalStatsProtocol.NONE; + private String databaseName = DEFAULT_DATABASE_NAME; private ScheduledExecutorService _diskStatsUpdateExecutor; private int _usageAggregationRange = 1440; @@ -279,7 +324,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc return true; } - private void init(Map<String, String> configs) { + protected void init(Map<String, String> configs) { _executor = Executors.newScheduledThreadPool(6, new NamedThreadFactory("StatsCollector")); hostStatsInterval = NumbersUtil.parseLong(configs.get("host.stats.interval"), 60000L); @@ -288,24 +333,25 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc volumeStatsInterval = NumbersUtil.parseLong(configs.get("volume.stats.interval"), 600000L); autoScaleStatsInterval = NumbersUtil.parseLong(configs.get("autoscale.stats.interval"), 60000L); - /* URI to send statistics to. Currently only Graphite is supported */ - String externalStatsUri = configs.get("stats.output.uri"); - if (externalStatsUri != null && !externalStatsUri.equals("")) { + String statsUri = statsOutputUri.value(); + if (StringUtils.isNotBlank(statsUri)) { try { - URI uri = new URI(externalStatsUri); - String scheme = uri.getScheme(); + URI uri = new URI(statsUri); + externalStatsScheme = uri.getScheme(); try { - externalStatsType = ExternalStatsProtocol.valueOf(scheme.toUpperCase()); + externalStatsType = ExternalStatsProtocol.valueOf(externalStatsScheme.toUpperCase()); } catch (IllegalArgumentException e) { - s_logger.info(scheme + " is not a valid protocol for external statistics. No statistics will be send."); + s_logger.error(externalStatsScheme + " is not a valid protocol for external statistics. No statistics will be send."); } - if (!StringUtils.isEmpty(uri.getHost())) { + if (StringUtils.isNotEmpty(uri.getHost())) { externalStatsHost = uri.getHost(); } - externalStatsPort = uri.getPort(); + externalStatsPort = retrieveExternalStatsPortFromUri(uri); + + databaseName = configureDatabaseName(uri); if (!StringUtils.isEmpty(uri.getPath())) { externalStatsPrefix = uri.getPath().substring(1); @@ -318,9 +364,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc externalStatsPrefix = ""; } - externalStatsEnabled = true; } catch (URISyntaxException e) { - s_logger.debug("Failed to parse external statistics URI: " + e.getMessage()); + s_logger.error("Failed to parse external statistics URI: ", e); } } @@ -342,7 +387,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc if (vmDiskStatsInterval.value() > 0) { if (vmDiskStatsInterval.value() < vmDiskStatsIntervalMin.value()) { - s_logger.debug("vm.disk.stats.interval - " + vmDiskStatsInterval.value() + " is smaller than vm.disk.stats.interval.min - " + vmDiskStatsIntervalMin.value() + ", so use vm.disk.stats.interval.min"); + s_logger.debug("vm.disk.stats.interval - " + vmDiskStatsInterval.value() + " is smaller than vm.disk.stats.interval.min - " + vmDiskStatsIntervalMin.value() + + ", so use vm.disk.stats.interval.min"); _executor.scheduleAtFixedRate(new VmDiskStatsTask(), vmDiskStatsIntervalMin.value(), vmDiskStatsIntervalMin.value(), TimeUnit.SECONDS); } else { _executor.scheduleAtFixedRate(new VmDiskStatsTask(), vmDiskStatsInterval.value(), vmDiskStatsInterval.value(), TimeUnit.SECONDS); @@ -353,7 +399,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc if (vmNetworkStatsInterval.value() > 0) { if (vmNetworkStatsInterval.value() < vmNetworkStatsIntervalMin.value()) { - s_logger.debug("vm.network.stats.interval - " + vmNetworkStatsInterval.value() + " is smaller than vm.network.stats.interval.min - " + vmNetworkStatsIntervalMin.value() + ", so use vm.network.stats.interval.min"); + s_logger.debug("vm.network.stats.interval - " + vmNetworkStatsInterval.value() + " is smaller than vm.network.stats.interval.min - " + + vmNetworkStatsIntervalMin.value() + ", so use vm.network.stats.interval.min"); _executor.scheduleAtFixedRate(new VmNetworkStatsTask(), vmNetworkStatsIntervalMin.value(), vmNetworkStatsIntervalMin.value(), TimeUnit.SECONDS); } else { _executor.scheduleAtFixedRate(new VmNetworkStatsTask(), vmNetworkStatsInterval.value(), vmNetworkStatsInterval.value(), TimeUnit.SECONDS); @@ -411,82 +458,111 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc } - class HostCollector extends ManagedContextRunnable { + /** + * Configures the database name according to the URI path. For instance, if the URI is as influxdb://address:port/dbname, the database name will be 'dbname'. + */ + protected String configureDatabaseName(URI uri) { + String dbname = StringUtils.removeStart(uri.getPath(), "/"); + if (StringUtils.isBlank(dbname)) { + return DEFAULT_DATABASE_NAME; + } else { + return dbname; + } + } + + /** + * Configures the port to be used when connecting with the stats collector service. + * Default values are 8086 for influx DB and 2003 for GraphiteDB. + * Throws URISyntaxException in case of non configured port and external StatsType + */ + protected int retrieveExternalStatsPortFromUri(URI uri) throws URISyntaxException { + int port = uri.getPort(); + if (externalStatsType != ExternalStatsProtocol.NONE) { + if (port != UNDEFINED_PORT_VALUE) { + return port; + } + if (externalStatsType == ExternalStatsProtocol.GRAPHITE) { + return GRAPHITE_DEFAULT_PORT; + } + if (externalStatsType == ExternalStatsProtocol.INFLUXDB) { + return INFLUXDB_DEFAULT_PORT; + } + } + throw new URISyntaxException(uri.toString(), String.format( + "Cannot define a port for the Stats Collector host %s://%s:%s or URI scheme is incorrect. The configured URI in stats.output.uri is not supported. Please configure as the following examples: graphite://graphite-hostaddress:port, or influxdb://influxdb-hostaddress:port. Note that the port is optional, if not added the default port for the respective collector (graphite or influxdb) will be used.", + externalStatsPrefix, externalStatsHost, externalStatsPort)); + } + + class HostCollector extends AbstractStatsCollector { @Override protected void runInContext() { try { s_logger.debug("HostStatsCollector is running..."); - SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria(); - sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString()); - sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.Storage.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ConsoleProxy.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorage.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.LocalSecondaryStorage.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.TrafficMonitor.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorageVM.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ExternalFirewall.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ExternalLoadBalancer.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.NetScalerControlCenter.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.L2Networking.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.BaremetalDhcp.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.BaremetalPxe.toString()); - ConcurrentHashMap<Long, HostStats> hostStats = new ConcurrentHashMap<Long, HostStats>(); + SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance(); + + Map<Object, Object> metrics = new HashMap<>(); List<HostVO> hosts = _hostDao.search(sc, null); + for (HostVO host : hosts) { - HostStatsEntry stats = (HostStatsEntry)_resourceMgr.getHostStatistics(host.getId()); - if (stats != null) { - hostStats.put(host.getId(), stats); + HostStatsEntry hostStatsEntry = (HostStatsEntry)_resourceMgr.getHostStatistics(host.getId()); + if (hostStatsEntry != null) { + hostStatsEntry.setHostVo(host); + metrics.put(hostStatsEntry.getHostId(), hostStatsEntry); + _hostStats.put(host.getId(), hostStatsEntry); } else { - s_logger.warn("Received invalid host stats for host: " + host.getId()); + s_logger.warn("The Host stats is null for host: " + host.getId()); } } - _hostStats = hostStats; - // Get a subset of hosts with GPU support from the list of "hosts" - List<HostVO> gpuEnabledHosts = new ArrayList<HostVO>(); - if (hostIds != null) { - for (HostVO host : hosts) { - if (hostIds.contains(host.getId())) { - gpuEnabledHosts.add(host); - } - } - } else { - // Check for all the hosts managed by CloudStack. - gpuEnabledHosts = hosts; - } - for (HostVO host : gpuEnabledHosts) { - HashMap<String, HashMap<String, VgpuTypesInfo>> groupDetails = _resourceMgr.getGPUStatistics(host); - if (groupDetails != null) { - _resourceMgr.updateGPUDetails(host.getId(), groupDetails); - } + + if (externalStatsType == ExternalStatsProtocol.INFLUXDB) { + sendMetricsToInfluxdb(metrics); } - hostIds = _hostGpuGroupsDao.listHostIds(); + + updateGpuEnabledHostsDetails(hosts); } catch (Throwable t) { s_logger.error("Error trying to retrieve host stats", t); } } + + /** + * Updates GPU details on hosts supporting GPU. + */ + private void updateGpuEnabledHostsDetails(List<HostVO> hosts) { + List<HostVO> gpuEnabledHosts = new ArrayList<HostVO>(); + List<Long> hostIds = _hostGpuGroupsDao.listHostIds(); + if (CollectionUtils.isEmpty(hostIds)) { + return; + } + for (HostVO host : hosts) { + if (hostIds.contains(host.getId())) { + gpuEnabledHosts.add(host); + } + } + for (HostVO host : gpuEnabledHosts) { + HashMap<String, HashMap<String, VgpuTypesInfo>> groupDetails = _resourceMgr.getGPUStatistics(host); + if (MapUtils.isEmpty(groupDetails)) { + _resourceMgr.updateGPUDetails(host.getId(), groupDetails); + } + } + } + + @Override + protected Point creteInfluxDbPoint(Object metricsObject) { + return createInfluxDbPointForHostMetrics(metricsObject); + } } - class VmStatsCollector extends ManagedContextRunnable { + class VmStatsCollector extends AbstractStatsCollector { @Override protected void runInContext() { try { s_logger.trace("VmStatsCollector is running..."); - SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria(); - sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString()); - sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.Storage.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ConsoleProxy.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorage.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.LocalSecondaryStorage.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.TrafficMonitor.toString()); - sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorageVM.toString()); + SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance(); List<HostVO> hosts = _hostDao.search(sc, null); - /* HashMap for metrics to be send to Graphite */ - HashMap metrics = new HashMap<String, Integer>(); + Map<Object, Object> metrics = new HashMap<>(); for (HostVO host : hosts) { List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId()); @@ -497,86 +573,35 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc } try { - HashMap<Long, VmStatsEntry> vmStatsById = _userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds); + Map<Long, VmStatsEntry> vmStatsById = _userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds); if (vmStatsById != null) { - VmStatsEntry statsInMemory = null; - Set<Long> vmIdSet = vmStatsById.keySet(); for (Long vmId : vmIdSet) { VmStatsEntry statsForCurrentIteration = vmStatsById.get(vmId); - statsInMemory = (VmStatsEntry)_VmStats.get(vmId); + statsForCurrentIteration.setVmId(vmId); + UserVmVO userVmVo = _userVmDao.findById(vmId); + statsForCurrentIteration.setUserVmVO(userVmVo); - if (statsInMemory == null) { - //no stats exist for this vm, directly persist - _VmStats.put(vmId, statsForCurrentIteration); - } else { - //update each field - statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization()); - statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs()); - statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + statsForCurrentIteration.getNetworkReadKBs()); - statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + statsForCurrentIteration.getNetworkWriteKBs()); - statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + statsForCurrentIteration.getDiskWriteKBs()); - statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + statsForCurrentIteration.getDiskReadIOs()); - statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + statsForCurrentIteration.getDiskWriteIOs()); - statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + statsForCurrentIteration.getDiskReadKBs()); - statsInMemory.setMemoryKBs(statsForCurrentIteration.getMemoryKBs()); - statsInMemory.setIntFreeMemoryKBs(statsForCurrentIteration.getIntFreeMemoryKBs()); - statsInMemory.setTargetMemoryKBs(statsForCurrentIteration.getTargetMemoryKBs()); - - _VmStats.put(vmId, statsInMemory); - } - - /** - * Add statistics to HashMap only when they should be send to a external stats collector - * Performance wise it seems best to only append to the HashMap when needed - */ - if (externalStatsEnabled) { - VMInstanceVO vmVO = _vmInstance.findById(vmId); - String vmName = vmVO.getUuid(); - - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.num", statsForCurrentIteration.getNumCPUs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.utilization", statsForCurrentIteration.getCPUUtilization()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.read_kbs", statsForCurrentIteration.getNetworkReadKBs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.write_kbs", statsForCurrentIteration.getNetworkWriteKBs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_kbs", statsForCurrentIteration.getDiskWriteKBs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_kbs", statsForCurrentIteration.getDiskReadKBs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_iops", statsForCurrentIteration.getDiskWriteIOs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_iops", statsForCurrentIteration.getDiskReadIOs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.total_kbs", statsForCurrentIteration.getMemoryKBs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.internalfree_kbs", statsForCurrentIteration.getIntFreeMemoryKBs()); - metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.target_kbs", statsForCurrentIteration.getTargetMemoryKBs()); + storeVirtualMachineStatsInMemory(statsForCurrentIteration); + if (externalStatsType == ExternalStatsProtocol.GRAPHITE) { + prepareVmMetricsForGraphite(metrics, statsForCurrentIteration); + } else { + metrics.put(statsForCurrentIteration.getVmId(), statsForCurrentIteration); } - } - /** - * Send the metrics to a external stats collector - * We send it on a per-host basis to prevent that we flood the host - * Currently only Graphite is supported - */ if (!metrics.isEmpty()) { - if (externalStatsType != null && externalStatsType == ExternalStatsProtocol.GRAPHITE) { - - if (externalStatsPort == -1) { - externalStatsPort = 2003; - } - - s_logger.debug("Sending VmStats of host " + host.getId() + " to Graphite host " + externalStatsHost + ":" + externalStatsPort); - - try { - GraphiteClient g = new GraphiteClient(externalStatsHost, externalStatsPort); - g.sendMetrics(metrics); - } catch (GraphiteException e) { - s_logger.debug("Failed sending VmStats to Graphite host " + externalStatsHost + ":" + externalStatsPort + ": " + e.getMessage()); - } - - metrics.clear(); + if (externalStatsType == ExternalStatsProtocol.GRAPHITE) { + sendVmMetricsToGraphiteHost(metrics, host); + } else if (externalStatsType == ExternalStatsProtocol.INFLUXDB) { + sendMetricsToInfluxdb(metrics); } } - } + metrics.clear(); + } } catch (Exception e) { s_logger.debug("Failed to get VM stats for host with ID: " + host.getId()); continue; @@ -587,6 +612,11 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc s_logger.error("Error trying to retrieve VM stats", t); } } + + @Override + protected Point creteInfluxDbPoint(Object metricsObject) { + return createInfluxDbPointForVmMetrics(metricsObject); + } } public VmStats getVmStats(long id) { @@ -646,7 +676,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc //Check for ownership //msHost in UP state with min id should run the job ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L)); - if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){ + if (msHost == null || (msHost.getMsid() != mgmtSrvrId)) { s_logger.debug("Skipping collect vm disk stats from hosts"); return; } @@ -658,11 +688,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc public void doInTransactionWithoutResult(TransactionStatus status) { s_logger.debug("VmDiskStatsTask is running..."); - SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria(); - sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString()); - sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, - ResourceState.ErrorInMaintenance); - sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString()); + SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance(); sc.addAnd("hypervisorType", SearchCriteria.Op.EQ, HypervisorType.KVM); // support KVM only util 2013.06.25 List<HostVO> hosts = _hostDao.search(sc, null); @@ -689,67 +715,64 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc SearchCriteria<VolumeVO> sc_volume = _volsDao.createSearchCriteria(); sc_volume.addAnd("path", SearchCriteria.Op.EQ, vmDiskStat.getPath()); List<VolumeVO> volumes = _volsDao.search(sc_volume, null); - if ((volumes == null) || (volumes.size() == 0)) + + if (CollectionUtils.isEmpty(volumes)) break; + VolumeVO volume = volumes.get(0); - VmDiskStatisticsVO previousVmDiskStats = - _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId()); + VmDiskStatisticsVO previousVmDiskStats = _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId()); VmDiskStatisticsVO vmDiskStat_lock = _vmDiskStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId()); - if ((vmDiskStat.getBytesRead() == 0) && (vmDiskStat.getBytesWrite() == 0) && (vmDiskStat.getIORead() == 0) && - (vmDiskStat.getIOWrite() == 0)) { + if (areAllDiskStatsZero(vmDiskStat)) { s_logger.debug("IO/bytes read and write are all 0. Not updating vm_disk_statistics"); continue; } if (vmDiskStat_lock == null) { - s_logger.warn("unable to find vm disk stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId() + - " and volumeId:" + volume.getId()); + s_logger.warn("unable to find vm disk stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId() + + " and volumeId:" + volume.getId()); continue; } - if (previousVmDiskStats != null && - ((previousVmDiskStats.getCurrentBytesRead() != vmDiskStat_lock.getCurrentBytesRead()) || - (previousVmDiskStats.getCurrentBytesWrite() != vmDiskStat_lock.getCurrentBytesWrite()) || - (previousVmDiskStats.getCurrentIORead() != vmDiskStat_lock.getCurrentIORead()) || (previousVmDiskStats.getCurrentIOWrite() != vmDiskStat_lock.getCurrentIOWrite()))) { - s_logger.debug("vm disk stats changed from the time GetVmDiskStatsCommand was sent. " + "Ignoring current answer. Host: " + - host.getName() + " . VM: " + vmDiskStat.getVmName() + " Read(Bytes): " + vmDiskStat.getBytesRead() + " write(Bytes): " + - vmDiskStat.getBytesWrite() + " Read(IO): " + vmDiskStat.getIORead() + " write(IO): " + vmDiskStat.getIOWrite()); + if (isCurrentVmDiskStatsDifferentFromPrevious(previousVmDiskStats, vmDiskStat_lock)) { + s_logger.debug("vm disk stats changed from the time GetVmDiskStatsCommand was sent. " + "Ignoring current answer. Host: " + host.getName() + + " . VM: " + vmDiskStat.getVmName() + " Read(Bytes): " + vmDiskStat.getBytesRead() + " write(Bytes): " + vmDiskStat.getBytesWrite() + + " Read(IO): " + vmDiskStat.getIORead() + " write(IO): " + vmDiskStat.getIOWrite()); continue; } if (vmDiskStat_lock.getCurrentBytesRead() > vmDiskStat.getBytesRead()) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Read # of bytes that's less than the last one. " + - "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() + - " Reported: " + vmDiskStat.getBytesRead() + " Stored: " + vmDiskStat_lock.getCurrentBytesRead()); + s_logger.debug("Read # of bytes that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getBytesRead() + " Stored: " + + vmDiskStat_lock.getCurrentBytesRead()); } vmDiskStat_lock.setNetBytesRead(vmDiskStat_lock.getNetBytesRead() + vmDiskStat_lock.getCurrentBytesRead()); } vmDiskStat_lock.setCurrentBytesRead(vmDiskStat.getBytesRead()); if (vmDiskStat_lock.getCurrentBytesWrite() > vmDiskStat.getBytesWrite()) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Write # of bytes that's less than the last one. " + - "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() + - " Reported: " + vmDiskStat.getBytesWrite() + " Stored: " + vmDiskStat_lock.getCurrentBytesWrite()); + s_logger.debug("Write # of bytes that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getBytesWrite() + " Stored: " + + vmDiskStat_lock.getCurrentBytesWrite()); } vmDiskStat_lock.setNetBytesWrite(vmDiskStat_lock.getNetBytesWrite() + vmDiskStat_lock.getCurrentBytesWrite()); } vmDiskStat_lock.setCurrentBytesWrite(vmDiskStat.getBytesWrite()); if (vmDiskStat_lock.getCurrentIORead() > vmDiskStat.getIORead()) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Read # of IO that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + - host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIORead() + " Stored: " + - vmDiskStat_lock.getCurrentIORead()); + s_logger.debug("Read # of IO that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIORead() + " Stored: " + + vmDiskStat_lock.getCurrentIORead()); } vmDiskStat_lock.setNetIORead(vmDiskStat_lock.getNetIORead() + vmDiskStat_lock.getCurrentIORead()); } vmDiskStat_lock.setCurrentIORead(vmDiskStat.getIORead()); if (vmDiskStat_lock.getCurrentIOWrite() > vmDiskStat.getIOWrite()) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Write # of IO that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + - host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIOWrite() + " Stored: " + - vmDiskStat_lock.getCurrentIOWrite()); + s_logger.debug("Write # of IO that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIOWrite() + " Stored: " + + vmDiskStat_lock.getCurrentIOWrite()); } vmDiskStat_lock.setNetIOWrite(vmDiskStat_lock.getNetIOWrite() + vmDiskStat_lock.getCurrentIOWrite()); } @@ -781,7 +804,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc //Check for ownership //msHost in UP state with min id should run the job ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L)); - if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){ + if (msHost == null || (msHost.getMsid() != mgmtSrvrId)) { s_logger.debug("Skipping collect vm network stats from hosts"); return; } @@ -792,14 +815,10 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc public void doInTransactionWithoutResult(TransactionStatus status) { s_logger.debug("VmNetworkStatsTask is running..."); - SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria(); - sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString()); - sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance); - sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString()); + SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance(); List<HostVO> hosts = _hostDao.search(sc, null); - for (HostVO host : hosts) - { + for (HostVO host : hosts) { List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId()); List<Long> vmIds = new ArrayList<Long>(); @@ -813,8 +832,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc continue; Set<Long> vmIdSet = vmNetworkStatsById.keySet(); - for(Long vmId : vmIdSet) - { + for (Long vmId : vmIdSet) { List<VmNetworkStatsEntry> vmNetworkStats = vmNetworkStatsById.get(vmId); if (vmNetworkStats == null) continue; @@ -823,20 +841,24 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc s_logger.debug("Cannot find uservm with id: " + vmId + " , continue"); continue; } - s_logger.debug("Now we are updating the user_statistics table for VM: " + userVm.getInstanceName() + " after collecting vm network statistics from host: " + host.getName()); - for (VmNetworkStatsEntry vmNetworkStat:vmNetworkStats) { + s_logger.debug("Now we are updating the user_statistics table for VM: " + userVm.getInstanceName() + + " after collecting vm network statistics from host: " + host.getName()); + for (VmNetworkStatsEntry vmNetworkStat : vmNetworkStats) { SearchCriteria<NicVO> sc_nic = _nicDao.createSearchCriteria(); sc_nic.addAnd("macAddress", SearchCriteria.Op.EQ, vmNetworkStat.getMacAddress()); NicVO nic = _nicDao.search(sc_nic, null).get(0); List<VlanVO> vlan = _vlanDao.listVlansByNetworkId(nic.getNetworkId()); if (vlan == null || vlan.size() == 0 || vlan.get(0).getVlanType() != VlanType.DirectAttached) continue; // only get network statistics for DirectAttached network (shared networks in Basic zone and Advanced zone with/without SG) - UserStatisticsVO previousvmNetworkStats = _userStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(), nic.getIPv4Address(), vmId, "UserVm"); + UserStatisticsVO previousvmNetworkStats = _userStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(), + nic.getIPv4Address(), vmId, "UserVm"); if (previousvmNetworkStats == null) { - previousvmNetworkStats = new UserStatisticsVO(userVm.getAccountId(), userVm.getDataCenterId(),nic.getIPv4Address(), vmId, "UserVm", nic.getNetworkId()); + previousvmNetworkStats = new UserStatisticsVO(userVm.getAccountId(), userVm.getDataCenterId(), nic.getIPv4Address(), vmId, "UserVm", + nic.getNetworkId()); _userStatsDao.persist(previousvmNetworkStats); } - UserStatisticsVO vmNetworkStat_lock = _userStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(), nic.getIPv4Address(), vmId, "UserVm"); + UserStatisticsVO vmNetworkStat_lock = _userStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(), + nic.getIPv4Address(), vmId, "UserVm"); if ((vmNetworkStat.getBytesSent() == 0) && (vmNetworkStat.getBytesReceived() == 0)) { s_logger.debug("bytes sent and received are all 0. Not updating user_statistics"); @@ -844,24 +866,24 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc } if (vmNetworkStat_lock == null) { - s_logger.warn("unable to find vm network stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId()+ " and nicId:" + nic.getId()); + s_logger.warn("unable to find vm network stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId() + + " and nicId:" + nic.getId()); continue; } - if (previousvmNetworkStats != null - && ((previousvmNetworkStats.getCurrentBytesSent() != vmNetworkStat_lock.getCurrentBytesSent()) + if (previousvmNetworkStats != null && ((previousvmNetworkStats.getCurrentBytesSent() != vmNetworkStat_lock.getCurrentBytesSent()) || (previousvmNetworkStats.getCurrentBytesReceived() != vmNetworkStat_lock.getCurrentBytesReceived()))) { - s_logger.debug("vm network stats changed from the time GetNmNetworkStatsCommand was sent. " + - "Ignoring current answer. Host: " + host.getName() + " . VM: " + vmNetworkStat.getVmName() + - " Sent(Bytes): " + vmNetworkStat.getBytesSent() + " Received(Bytes): " + vmNetworkStat.getBytesReceived()); + s_logger.debug("vm network stats changed from the time GetNmNetworkStatsCommand was sent. " + "Ignoring current answer. Host: " + + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Sent(Bytes): " + vmNetworkStat.getBytesSent() + " Received(Bytes): " + + vmNetworkStat.getBytesReceived()); continue; } if (vmNetworkStat_lock.getCurrentBytesSent() > vmNetworkStat.getBytesSent()) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Sent # of bytes that's less than the last one. " + - "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmNetworkStat.getVmName() + - " Reported: " + vmNetworkStat.getBytesSent() + " Stored: " + vmNetworkStat_lock.getCurrentBytesSent()); + s_logger.debug("Sent # of bytes that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Reported: " + vmNetworkStat.getBytesSent() + " Stored: " + + vmNetworkStat_lock.getCurrentBytesSent()); } vmNetworkStat_lock.setNetBytesSent(vmNetworkStat_lock.getNetBytesSent() + vmNetworkStat_lock.getCurrentBytesSent()); } @@ -869,15 +891,15 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc if (vmNetworkStat_lock.getCurrentBytesReceived() > vmNetworkStat.getBytesReceived()) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Received # of bytes that's less than the last one. " + - "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmNetworkStat.getVmName() + - " Reported: " + vmNetworkStat.getBytesReceived() + " Stored: " + vmNetworkStat_lock.getCurrentBytesReceived()); + s_logger.debug("Received # of bytes that's less than the last one. " + "Assuming something went wrong and persisting it. Host: " + + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Reported: " + vmNetworkStat.getBytesReceived() + " Stored: " + + vmNetworkStat_lock.getCurrentBytesReceived()); } vmNetworkStat_lock.setNetBytesReceived(vmNetworkStat_lock.getNetBytesReceived() + vmNetworkStat_lock.getCurrentBytesReceived()); } vmNetworkStat_lock.setCurrentBytesReceived(vmNetworkStat.getBytesReceived()); - if (! _dailyOrHourly) { + if (!_dailyOrHourly) { //update agg bytes vmNetworkStat_lock.setAggBytesReceived(vmNetworkStat_lock.getNetBytesReceived() + vmNetworkStat_lock.getCurrentBytesReceived()); vmNetworkStat_lock.setAggBytesSent(vmNetworkStat_lock.getNetBytesSent() + vmNetworkStat_lock.getCurrentBytesSent()); @@ -895,7 +917,6 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc } } - class VolumeStatsTask extends ManagedContextRunnable { @Override protected void runInContext() { @@ -905,18 +926,15 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc for (StoragePoolVO pool : pools) { List<VolumeVO> volumes = _volsDao.findByPoolId(pool.getId(), null); List<String> volumeLocators = new ArrayList<String>(); - for (VolumeVO volume: volumes){ + for (VolumeVO volume : volumes) { if (volume.getFormat() == ImageFormat.QCOW2) { volumeLocators.add(volume.getUuid()); - } - else if (volume.getFormat() == ImageFormat.VHD){ + } else if (volume.getFormat() == ImageFormat.VHD) { volumeLocators.add(volume.getPath()); - } - else if (volume.getFormat() == ImageFormat.OVA){ + } else if (volume.getFormat() == ImageFormat.OVA) { volumeLocators.add(volume.getChainInfo()); - } - else { - s_logger.warn("Volume stats not implemented for this format type " + volume.getFormat() ); + } else { + s_logger.warn("Volume stats not implemented for this format type " + volume.getFormat()); break; } } @@ -924,8 +942,9 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc Map<String, VolumeStatsEntry> volumeStatsByUuid; if (pool.getScope() == ScopeType.ZONE) { volumeStatsByUuid = new HashMap<>(); - for (final Cluster cluster: _clusterDao.listByZoneId(pool.getDataCenterId())) { - final Map<String, VolumeStatsEntry> volumeStatsForCluster = _userVmMgr.getVolumeStatistics(cluster.getId(), pool.getUuid(), pool.getPoolType(), volumeLocators, StatsTimeout.value()); + for (final Cluster cluster : _clusterDao.listByZoneId(pool.getDataCenterId())) { + final Map<String, VolumeStatsEntry> volumeStatsForCluster = _userVmMgr.getVolumeStatistics(cluster.getId(), pool.getUuid(), pool.getPoolType(), + volumeLocators, StatsTimeout.value()); if (volumeStatsForCluster != null) { volumeStatsByUuid.putAll(volumeStatsForCluster); } @@ -933,7 +952,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc } else { volumeStatsByUuid = _userVmMgr.getVolumeStatistics(pool.getClusterId(), pool.getUuid(), pool.getPoolType(), volumeLocators, StatsTimeout.value()); } - if (volumeStatsByUuid != null){ + if (volumeStatsByUuid != null) { for (final Map.Entry<String, VolumeStatsEntry> entry : volumeStatsByUuid.entrySet()) { if (entry == null || entry.getKey() == null || entry.getValue() == null) { continue; @@ -985,8 +1004,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc Answer answer = ssAhost.sendMessage(command); if (answer != null && answer.getResult()) { storageStats.put(storeId, (StorageStats)answer); - s_logger.trace("HostId: " + storeId + " Used: " + ((StorageStats)answer).getByteUsed() + " Total Available: " + - ((StorageStats)answer).getCapacityBytes()); + s_logger.trace("HostId: " + storeId + " Used: " + ((StorageStats)answer).getByteUsed() + " Total Available: " + ((StorageStats)answer).getCapacityBytes()); } } _storageStats = storageStats; @@ -1047,8 +1065,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc //check interval long now = (new Date()).getTime(); if (asGroup.getLastInterval() != null) - if ((now - asGroup.getLastInterval().getTime()) < asGroup - .getInterval()) { + if ((now - asGroup.getLastInterval().getTime()) < asGroup.getInterval()) { continue; } @@ -1213,7 +1230,6 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc long thresholdValue = conditionVO.getThreshold(); Double thresholdPercent = (double)thresholdValue / 100; CounterVO counterVO = _asCounterDao.findById(conditionVO.getCounterid()); -//Double sum = avgCounter.get(conditionVO.getCounterid()); long counter_count = 1; do { String counter_param = params.get("counter" + String.valueOf(counter_count)); @@ -1260,8 +1276,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc return lstResult; } - public List<Pair<String, Integer>> getPairofCounternameAndDuration( - long groupId) { + public List<Pair<String, Integer>> getPairofCounternameAndDuration(long groupId) { AutoScaleVmGroupVO groupVo = _asGroupDao.findById(groupId); if (groupVo == null) return null; @@ -1307,14 +1322,231 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc } } + /** + * This class allows to writing metrics in InfluxDB for the table that matches the Collector extending it. + * Thus, VmStatsCollector and HostCollector can use same method to write on different measures (host_stats or vm_stats table). + */ + abstract class AbstractStatsCollector extends ManagedContextRunnable { + /** + * Sends metrics to influxdb host. This method supports both VM and Host metrics + */ + protected void sendMetricsToInfluxdb(Map<Object, Object> metrics) { + InfluxDB influxDbConnection = createInfluxDbConnection(); + + Pong response = influxDbConnection.ping(); + if (response.getVersion().equalsIgnoreCase("unknown")) { + throw new CloudRuntimeException(String.format("Cannot ping influxdb host %s:%s.", externalStatsHost, externalStatsPort)); + } + + Collection<Object> metricsObjects = metrics.values(); + List<Point> points = new ArrayList<>(); + + s_logger.debug(String.format("Sending stats to %s host %s:%s", externalStatsType, externalStatsHost, externalStatsPort)); + + for (Object metricsObject : metricsObjects) { + Point vmPoint = creteInfluxDbPoint(metricsObject); + points.add(vmPoint); + } + writeBatches(influxDbConnection, databaseName, points); + } + + /** + * Creates a InfluxDB point for the given stats collector (VmStatsCollector, or HostCollector). + */ + protected abstract Point creteInfluxDbPoint(Object metricsObject); + } + public boolean imageStoreHasEnoughCapacity(DataStore imageStore) { StorageStats imageStoreStats = _storageStats.get(imageStore.getId()); - if (imageStoreStats != null && (imageStoreStats.getByteUsed()/(imageStoreStats.getCapacityBytes()*1.0)) <= _imageStoreCapacityThreshold) { + if (imageStoreStats != null && (imageStoreStats.getByteUsed() / (imageStoreStats.getCapacityBytes() * 1.0)) <= _imageStoreCapacityThreshold) { return true; } return false; } + /** + * Sends VMs metrics to the configured graphite host. + */ + protected void sendVmMetricsToGraphiteHost(Map<Object, Object> metrics, HostVO host) { + s_logger.debug(String.format("Sending VmStats of host %s to %s host %s:%s", host.getId(), externalStatsType, externalStatsHost, externalStatsPort)); + try { + GraphiteClient g = new GraphiteClient(externalStatsHost, externalStatsPort); + g.sendMetrics(metrics); + } catch (GraphiteException e) { + s_logger.debug("Failed sending VmStats to Graphite host " + externalStatsHost + ":" + externalStatsPort + ": " + e.getMessage()); + } + } + + /** + * Prepares metrics for Graphite. + * @note this method must only be executed in case the configured stats collector is a Graphite host; + * otherwise, it will compromise the map of metrics used by another type of collector (e.g. InfluxDB). + */ + private void prepareVmMetricsForGraphite(Map<Object, Object> metrics, VmStatsEntry statsForCurrentIteration) { + VMInstanceVO vmVO = _vmInstance.findById(statsForCurrentIteration.getVmId()); + String vmName = vmVO.getUuid(); + + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.num", statsForCurrentIteration.getNumCPUs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.utilization", statsForCurrentIteration.getCPUUtilization()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.read_kbs", statsForCurrentIteration.getNetworkReadKBs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.write_kbs", statsForCurrentIteration.getNetworkWriteKBs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_kbs", statsForCurrentIteration.getDiskWriteKBs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_kbs", statsForCurrentIteration.getDiskReadKBs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_iops", statsForCurrentIteration.getDiskWriteIOs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_iops", statsForCurrentIteration.getDiskReadIOs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.total_kbs", statsForCurrentIteration.getMemoryKBs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.internalfree_kbs", statsForCurrentIteration.getIntFreeMemoryKBs()); + metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.target_kbs", statsForCurrentIteration.getTargetMemoryKBs()); + } + + /** + * Stores virtual machine stats in memory (map of {@link VmStatsEntry}). + */ + private void storeVirtualMachineStatsInMemory(VmStatsEntry statsForCurrentIteration) { + VmStatsEntry statsInMemory = (VmStatsEntry)_VmStats.get(statsForCurrentIteration.getVmId()); + + if (statsInMemory == null) { + //no stats exist for this vm, directly persist + _VmStats.put(statsForCurrentIteration.getVmId(), statsForCurrentIteration); + } else { + //update each field + statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization()); + statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs()); + statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + statsForCurrentIteration.getNetworkReadKBs()); + statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + statsForCurrentIteration.getNetworkWriteKBs()); + statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + statsForCurrentIteration.getDiskWriteKBs()); + statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + statsForCurrentIteration.getDiskReadIOs()); + statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + statsForCurrentIteration.getDiskWriteIOs()); + statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + statsForCurrentIteration.getDiskReadKBs()); + statsInMemory.setMemoryKBs(statsForCurrentIteration.getMemoryKBs()); + statsInMemory.setIntFreeMemoryKBs(statsForCurrentIteration.getIntFreeMemoryKBs()); + statsInMemory.setTargetMemoryKBs(statsForCurrentIteration.getTargetMemoryKBs()); + + _VmStats.put(statsForCurrentIteration.getVmId(), statsInMemory); + } + } + + /** + * Sends host metrics to a configured InfluxDB host. The metrics respects the following specification.</br> + * <b>Tags:</b>vm_id, uuid, instance_name, data_center_id, host_id</br> + * <b>Fields:</b>memory_total_kb, memory_internal_free_kbs, memory_target_kbs, cpu_utilization, cpus, network_write_kb, disk_read_iops, disk_read_kbs, disk_write_iops, disk_write_kbs + */ + protected Point createInfluxDbPointForHostMetrics(Object metricsObject) { + HostStatsEntry hostStatsEntry = (HostStatsEntry)metricsObject; + + Map<String, String> tagsToAdd = new HashMap<>(); + tagsToAdd.put(UUID_TAG, hostStatsEntry.getHostVo().getUuid()); + + Map<String, Object> fieldsToAdd = new HashMap<>(); + fieldsToAdd.put(TOTAL_MEMORY_KBS_FIELD, hostStatsEntry.getTotalMemoryKBs()); + fieldsToAdd.put(FREE_MEMORY_KBS_FIELD, hostStatsEntry.getFreeMemoryKBs()); + fieldsToAdd.put(CPU_UTILIZATION_FIELD, hostStatsEntry.getCpuUtilization()); + fieldsToAdd.put(CPUS_FIELD, hostStatsEntry.getHostVo().getCpus()); + fieldsToAdd.put(CPU_SOCKETS_FIELD, hostStatsEntry.getHostVo().getCpuSockets()); + fieldsToAdd.put(NETWORK_READ_KBS_FIELD, hostStatsEntry.getNetworkReadKBs()); + fieldsToAdd.put(NETWORK_WRITE_KBS_FIELD, hostStatsEntry.getNetworkWriteKBs()); + + return Point.measurement(INFLUXDB_HOST_MEASUREMENT).tag(tagsToAdd).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).fields(fieldsToAdd).build(); + } + + /** + * Sends VMs metrics to a configured InfluxDB host. The metrics respects the following specification.</br> + * <b>Tags:</b>vm_id, uuid, instance_name, data_center_id, host_id</br> + * <b>Fields:</b>memory_total_kb, memory_internal_free_kbs, memory_target_kbs, cpu_utilization, cpus, network_write_kb, disk_read_iops, disk_read_kbs, disk_write_iops, disk_write_kbs + */ + protected Point createInfluxDbPointForVmMetrics(Object metricsObject) { + VmStatsEntry vmStatsEntry = (VmStatsEntry)metricsObject; + UserVmVO userVmVO = vmStatsEntry.getUserVmVO(); + + Map<String, String> tagsToAdd = new HashMap<>(); + tagsToAdd.put(UUID_TAG, userVmVO.getUuid()); + + Map<String, Object> fieldsToAdd = new HashMap<>(); + fieldsToAdd.put(TOTAL_MEMORY_KBS_FIELD, vmStatsEntry.getMemoryKBs()); + fieldsToAdd.put(FREE_MEMORY_KBS_FIELD, vmStatsEntry.getIntFreeMemoryKBs()); + fieldsToAdd.put(MEMORY_TARGET_KBS_FIELD, vmStatsEntry.getTargetMemoryKBs()); + fieldsToAdd.put(CPU_UTILIZATION_FIELD, vmStatsEntry.getCPUUtilization()); + fieldsToAdd.put(CPUS_FIELD, vmStatsEntry.getNumCPUs()); + fieldsToAdd.put(NETWORK_READ_KBS_FIELD, vmStatsEntry.getNetworkReadKBs()); + fieldsToAdd.put(NETWORK_WRITE_KBS_FIELD, vmStatsEntry.getNetworkWriteKBs()); + fieldsToAdd.put(DISK_READ_IOPS_FIELD, vmStatsEntry.getDiskReadIOs()); + fieldsToAdd.put(DISK_READ_KBS_FIELD, vmStatsEntry.getDiskReadKBs()); + fieldsToAdd.put(DISK_WRITE_IOPS_FIELD, vmStatsEntry.getDiskWriteIOs()); + fieldsToAdd.put(DISK_WRITE_KBS_FIELD, vmStatsEntry.getDiskWriteKBs()); + + return Point.measurement(INFLUXDB_VM_MEASUREMENT).tag(tagsToAdd).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).fields(fieldsToAdd).build(); + } + + /** + * Creates connection to InfluxDB. If the database does not exist, it throws a CloudRuntimeException. </br> + * @note the user can configure the database name on parameter 'stats.output.influxdb.database.name'; such database must be yet created and configured by the user. + * The Default name for the database is 'cloudstack_stats'. + */ + protected InfluxDB createInfluxDbConnection() { + String influxDbQueryUrl = String.format("http://%s:%s/", externalStatsHost, externalStatsPort); + InfluxDB influxDbConnection = InfluxDBFactory.connect(influxDbQueryUrl); + + if (!influxDbConnection.databaseExists(databaseName)) { + throw new CloudRuntimeException(String.format("Database with name %s does not exist in influxdb host %s:%s", databaseName, externalStatsHost, externalStatsPort)); + } + + return influxDbConnection; + } + + /** + * Writes batches of InfluxDB database points into a given database. + */ + protected void writeBatches(InfluxDB influxDbConnection, String dbName, List<Point> points) { + BatchPoints batchPoints = BatchPoints.database(dbName).build(); + + for (Point point : points) { + batchPoints.point(point); + } + + influxDbConnection.write(batchPoints); + } + + /** + * Returns true if at least one of the current disk stats is different from the previous.</br> + * The considered disk stats are the following: bytes read, bytes write, IO read, and IO write. + */ + protected boolean isCurrentVmDiskStatsDifferentFromPrevious(VmDiskStatisticsVO previousVmDiskStats, VmDiskStatisticsVO currentVmDiskStats) { + if (previousVmDiskStats != null) { + boolean bytesReadDifferentFromPrevious = previousVmDiskStats.getCurrentBytesRead() != currentVmDiskStats.getCurrentBytesRead(); + boolean bytesWriteDifferentFromPrevious = previousVmDiskStats.getCurrentBytesWrite() != currentVmDiskStats.getCurrentBytesWrite(); + boolean ioReadDifferentFromPrevious = previousVmDiskStats.getCurrentIORead() != currentVmDiskStats.getCurrentIORead(); + boolean ioWriteDifferentFromPrevious = previousVmDiskStats.getCurrentIOWrite() != currentVmDiskStats.getCurrentIOWrite(); + return bytesReadDifferentFromPrevious || bytesWriteDifferentFromPrevious || ioReadDifferentFromPrevious || ioWriteDifferentFromPrevious; + } + if (currentVmDiskStats == null) { + return false; + } + return true; + } + + /** + * Returns true if all the VmDiskStatsEntry are Zeros (Bytes read, Bytes write, IO read, and IO write must be all equals to zero) + */ + protected boolean areAllDiskStatsZero(VmDiskStatsEntry vmDiskStat) { + return (vmDiskStat.getBytesRead() == 0) && (vmDiskStat.getBytesWrite() == 0) && (vmDiskStat.getIORead() == 0) && (vmDiskStat.getIOWrite() == 0); + } + + /** + * Creates a HostVO SearchCriteria where: + * <ul> + * <li>"status" is Up;</li> + * <li>"resourceState" is not in Maintenance, PrepareForMaintenance, or ErrorInMaintenance; and</li> + * <li>"type" is Routing.</li> + * </ul> + */ + private SearchCriteria<HostVO> createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance() { + SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria(); + sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString()); + sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance); + sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString()); + return sc; + } + public StorageStats getStorageStats(long id) { return _storageStats.get(id); } @@ -1334,6 +1566,6 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc @Override public ConfigKey<?>[] getConfigKeys() { - return new ConfigKey<?>[] { vmDiskStatsInterval, vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, StatsTimeout }; + return new ConfigKey<?>[] {vmDiskStatsInterval, vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, StatsTimeout, statsOutputUri}; } } diff --git a/server/src/test/java/com/cloud/server/StatsCollectorTest.java b/server/src/test/java/com/cloud/server/StatsCollectorTest.java new file mode 100644 index 0000000..040d08d --- /dev/null +++ b/server/src/test/java/com/cloud/server/StatsCollectorTest.java @@ -0,0 +1,227 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package com.cloud.server; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.BatchPoints.Builder; +import org.influxdb.dto.Point; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import com.cloud.agent.api.VmDiskStatsEntry; +import com.cloud.server.StatsCollector.ExternalStatsProtocol; +import com.cloud.user.VmDiskStatisticsVO; +import com.cloud.utils.exception.CloudRuntimeException; +import com.tngtech.java.junit.dataprovider.DataProvider; +import com.tngtech.java.junit.dataprovider.DataProviderRunner; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(DataProviderRunner.class) +@PrepareForTest({InfluxDBFactory.class, BatchPoints.class}) +public class StatsCollectorTest { + private StatsCollector statsCollector = Mockito.spy(new StatsCollector()); + + private static final int GRAPHITE_DEFAULT_PORT = 2003; + private static final int INFLUXDB_DEFAULT_PORT = 8086; + private static final String HOST_ADDRESS = "192.168.16.10"; + private static final String URL = String.format("http://%s:%s/", HOST_ADDRESS, INFLUXDB_DEFAULT_PORT); + + private static final String DEFAULT_DATABASE_NAME = "cloudstack"; + + @Test + public void createInfluxDbConnectionTest() { + configureAndTestCreateInfluxDbConnection(true); + } + + @Test(expected = CloudRuntimeException.class) + public void createInfluxDbConnectionTestExpectException() { + configureAndTestCreateInfluxDbConnection(false); + } + + private void configureAndTestCreateInfluxDbConnection(boolean databaseExists) { + statsCollector.externalStatsHost = HOST_ADDRESS; + statsCollector.externalStatsPort = INFLUXDB_DEFAULT_PORT; + InfluxDB influxDbConnection = Mockito.mock(InfluxDB.class); + Mockito.when(influxDbConnection.databaseExists(DEFAULT_DATABASE_NAME)).thenReturn(databaseExists); + PowerMockito.mockStatic(InfluxDBFactory.class); + PowerMockito.when(InfluxDBFactory.connect(URL)).thenReturn(influxDbConnection); + + InfluxDB returnedConnection = statsCollector.createInfluxDbConnection(); + + Assert.assertEquals(influxDbConnection, returnedConnection); + } + + @Test + public void writeBatchesTest() { + InfluxDB influxDbConnection = Mockito.mock(InfluxDB.class); + Mockito.doNothing().when(influxDbConnection).write(Mockito.any(Point.class)); + Builder builder = Mockito.mock(Builder.class); + BatchPoints batchPoints = Mockito.mock(BatchPoints.class); + PowerMockito.mockStatic(BatchPoints.class); + PowerMockito.when(BatchPoints.database(DEFAULT_DATABASE_NAME)).thenReturn(builder); + Mockito.when(builder.build()).thenReturn(batchPoints); + Map<String, String> tagsToAdd = new HashMap<>(); + tagsToAdd.put("hostId", "1"); + Map<String, Object> fieldsToAdd = new HashMap<>(); + fieldsToAdd.put("total_memory_kbs", 10000000); + Point point = Point.measurement("measure").tag(tagsToAdd).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).fields(fieldsToAdd).build(); + List<Point> points = new ArrayList<>(); + points.add(point); + Mockito.when(batchPoints.point(point)).thenReturn(batchPoints); + + statsCollector.writeBatches(influxDbConnection, DEFAULT_DATABASE_NAME, points); + + Mockito.verify(influxDbConnection).write(batchPoints); + } + + @Test + public void configureExternalStatsPortTestGraphitePort() throws URISyntaxException { + URI uri = new URI(HOST_ADDRESS); + statsCollector.externalStatsType = ExternalStatsProtocol.GRAPHITE; + int port = statsCollector.retrieveExternalStatsPortFromUri(uri); + Assert.assertEquals(GRAPHITE_DEFAULT_PORT, port); + } + + @Test + public void configureExternalStatsPortTestInfluxdbPort() throws URISyntaxException { + URI uri = new URI(HOST_ADDRESS); + statsCollector.externalStatsType = ExternalStatsProtocol.INFLUXDB; + int port = statsCollector.retrieveExternalStatsPortFromUri(uri); + Assert.assertEquals(INFLUXDB_DEFAULT_PORT, port); + } + + @Test(expected = URISyntaxException.class) + public void configureExternalStatsPortTestExpectException() throws URISyntaxException { + statsCollector.externalStatsType = ExternalStatsProtocol.NONE; + URI uri = new URI(HOST_ADDRESS); + statsCollector.retrieveExternalStatsPortFromUri(uri); + } + + @Test + public void configureExternalStatsPortTestInfluxDbCustomizedPort() throws URISyntaxException { + statsCollector.externalStatsType = ExternalStatsProtocol.INFLUXDB; + URI uri = new URI("test://" + HOST_ADDRESS + ":1234"); + int port = statsCollector.retrieveExternalStatsPortFromUri(uri); + Assert.assertEquals(1234, port); + } + + @Test + public void configureDatabaseNameTestDefaultDbName() throws URISyntaxException { + URI uri = new URI(URL); + String dbName = statsCollector.configureDatabaseName(uri); + Assert.assertEquals(DEFAULT_DATABASE_NAME, dbName); + } + + @Test + public void configureDatabaseNameTestCustomDbName() throws URISyntaxException { + String configuredDbName = "dbName"; + URI uri = new URI(URL + configuredDbName); + String dbName = statsCollector.configureDatabaseName(uri); + Assert.assertEquals(configuredDbName, dbName); + } + + @Test + public void isCurrentVmDiskStatsDifferentFromPreviousTestNull() { + VmDiskStatisticsVO currentVmDiskStatisticsVO = new VmDiskStatisticsVO(1l, 1l, 1l, 1l); + boolean result = statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(null, currentVmDiskStatisticsVO); + Assert.assertTrue(result); + } + + @Test + public void isCurrentVmDiskStatsDifferentFromPreviousTestBothNull() { + boolean result = statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(null, null); + Assert.assertFalse(result); + } + + @Test + public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentIoWrite() { + configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 123l, 12l, true); + } + + @Test + public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentIoRead() { + configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 12l, 123l, true); + } + + @Test + public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentBytesRead() { + configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(12l, 123l, 123l, 123l, true); + } + + @Test + public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentBytesWrite() { + configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 12l, 123l, 123l, true); + } + + @Test + public void isCurrentVmDiskStatsDifferentFromPreviousTestAllEqual() { + configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 123l, 123l, false); + } + + private void configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(long bytesRead, long bytesWrite, long ioRead, long ioWrite, boolean expectedResult) { + VmDiskStatisticsVO previousVmDiskStatisticsVO = new VmDiskStatisticsVO(1l, 1l, 1l, 1l); + previousVmDiskStatisticsVO.setCurrentBytesRead(123l); + previousVmDiskStatisticsVO.setCurrentBytesWrite(123l); + previousVmDiskStatisticsVO.setCurrentIORead(123l); + previousVmDiskStatisticsVO.setCurrentIOWrite(123l); + + VmDiskStatisticsVO currentVmDiskStatisticsVO = new VmDiskStatisticsVO(1l, 1l, 1l, 1l); + currentVmDiskStatisticsVO.setCurrentBytesRead(bytesRead); + currentVmDiskStatisticsVO.setCurrentBytesWrite(bytesWrite); + currentVmDiskStatisticsVO.setCurrentIORead(ioRead); + currentVmDiskStatisticsVO.setCurrentIOWrite(ioWrite); + + boolean result = statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(previousVmDiskStatisticsVO, currentVmDiskStatisticsVO); + Assert.assertEquals(expectedResult, result); + } + + @Test + @DataProvider({ + "0,0,0,0,true", "1,0,0,0,false", "0,1,0,0,false", "0,0,1,0,false", + "0,0,0,1,false", "1,0,0,1,false", "1,0,1,0,false", "1,1,0,0,false", + "0,1,1,0,false", "0,1,0,1,false", "0,0,1,1,false", "0,1,1,1,false", + "1,1,0,1,false", "1,0,1,1,false", "1,1,1,0,false", "1,1,1,1,false", + }) + public void configureAndTestCheckIfDiskStatsAreZero(long bytesRead, long bytesWrite, long ioRead, long ioWrite, boolean expected) { + VmDiskStatsEntry vmDiskStatsEntry = new VmDiskStatsEntry(); + vmDiskStatsEntry.setBytesRead(bytesRead); + vmDiskStatsEntry.setBytesWrite(bytesWrite); + vmDiskStatsEntry.setIORead(ioRead); + vmDiskStatsEntry.setIOWrite(ioWrite); + + boolean result = statsCollector.areAllDiskStatsZero(vmDiskStatsEntry); + Assert.assertEquals(expected, result); + } +} diff --git a/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java b/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java index 4143f09..9c9d48a 100644 --- a/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java +++ b/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java @@ -67,7 +67,7 @@ public class GraphiteClient { * * @param metrics the metrics as key-value-pairs */ - public void sendMetrics(Map<String, Integer> metrics) { + public void sendMetrics(Map<Object, Object> metrics) { sendMetrics(metrics, getCurrentSystemTime()); } @@ -77,12 +77,12 @@ public class GraphiteClient { * @param metrics the metrics as key-value-pairs * @param timeStamp the timestamp */ - public void sendMetrics(Map<String, Integer> metrics, long timeStamp) { + public void sendMetrics(Map<Object, Object> metrics, long timeStamp) { try (DatagramSocket sock = new DatagramSocket()){ java.security.Security.setProperty("networkaddress.cache.ttl", "0"); InetAddress addr = InetAddress.getByName(this.graphiteHost); - for (Map.Entry<String, Integer> metric: metrics.entrySet()) { + for (Map.Entry<Object, Object> metric : metrics.entrySet()) { byte[] message = new String(metric.getKey() + " " + metric.getValue() + " " + timeStamp + "\n").getBytes(); DatagramPacket packet = new DatagramPacket(message, message.length, addr, graphitePort); sock.send(packet);