This is an automated email from the ASF dual-hosted git repository.

gabriel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e0be52  Add influxdb to statscollector (#3078)
1e0be52 is described below

commit 1e0be522b1a385fb9c24c91d77efa9c36691acd6
Author: Gabriel Beims Bräscher <gabrasc...@gmail.com>
AuthorDate: Wed Jan 9 11:22:35 2019 +0100

    Add influxdb to statscollector (#3078)
    
    * Add Support for InfluxDB on StatsCollector
    
    * Code refactored to fit Inner Class architecture.
    
    Due to the inner class structure, test case for some methods will not be
    implemented. On the future it will be necessary to refactor the whole
    StatsCOllector architecture and extract inner classes.
    
    Each Inner Class that is a "stats collector" and sends data to Influx
    will extend AbstractStatsCollector to send metrics to the correct
    measure ("table"). For instance, HostCollector sends data to host_stats,
    VmStatsCollector sends data to vm_stats.
    
    Add ping test for ensure that the target InfluxDB host is reachable
    
    * Address PR reviews
    
    * Enhance and tests implemented addressing reviewers.
    
    * Set variables to private
---
 core/pom.xml                                       |   5 +
 .../java/com/cloud/agent/api/HostStatsEntry.java   |  14 +
 .../java/com/cloud/agent/api/VmStatsEntry.java     |  49 +-
 server/pom.xml                                     |   5 +
 .../main/java/com/cloud/server/StatsCollector.java | 688 ++++++++++++++-------
 .../java/com/cloud/server/StatsCollectorTest.java  | 227 +++++++
 .../cloudstack/utils/graphite/GraphiteClient.java  |   6 +-
 7 files changed, 743 insertions(+), 251 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index e95a72a..a3ef70a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -39,6 +39,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cloudstack</groupId>
+            <artifactId>cloud-engine-schema</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cloudstack</groupId>
             <artifactId>cloud-framework-security</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java 
b/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java
index 82041a6..5fd1c51 100644
--- a/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java
@@ -20,10 +20,12 @@
 package com.cloud.agent.api;
 
 import com.cloud.host.HostStats;
+import com.cloud.host.HostVO;
 
 public class HostStatsEntry implements HostStats {
 
     long hostId;
+    HostVO hostVo;
     String entityType;
     double cpuUtilization;
     double networkReadKBs;
@@ -112,4 +114,16 @@ public class HostStatsEntry implements HostStats {
     public void setHostId(long hostId) {
         this.hostId = hostId;
     }
+
+    public long getHostId() {
+        return hostId;
+    }
+
+    public HostVO getHostVo() {
+        return hostVo;
+    }
+
+    public void setHostVo(HostVO hostVo) {
+        this.hostVo = hostVo;
+    }
 }
diff --git a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java 
b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
index e6063b9..9f82808 100644
--- a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
@@ -19,22 +19,25 @@
 
 package com.cloud.agent.api;
 
+import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VmStats;
 
 public class VmStatsEntry implements VmStats {
 
-    double cpuUtilization;
-    double networkReadKBs;
-    double networkWriteKBs;
-    double diskReadIOs;
-    double diskWriteIOs;
-    double diskReadKBs;
-    double diskWriteKBs;
-    double memoryKBs;
-    double intfreememoryKBs;
-    double targetmemoryKBs;
-    int numCPUs;
-    String entityType;
+    private long vmId;
+    private UserVmVO userVmVO;
+    private double cpuUtilization;
+    private double networkReadKBs;
+    private double networkWriteKBs;
+    private double diskReadIOs;
+    private double diskWriteIOs;
+    private double diskReadKBs;
+    private double diskWriteKBs;
+    private double memoryKBs;
+    private double intfreememoryKBs;
+    private double targetmemoryKBs;
+    private int numCPUs;
+    private String entityType;
 
     public VmStatsEntry() {
     }
@@ -50,14 +53,12 @@ public class VmStatsEntry implements VmStats {
         this.entityType = entityType;
     }
 
-    public VmStatsEntry(double cpuUtilization, double networkReadKBs, double 
networkWriteKBs, double diskReadKBs, double diskWriteKBs, int numCPUs, String 
entityType) {
-        this.cpuUtilization = cpuUtilization;
-        this.networkReadKBs = networkReadKBs;
-        this.networkWriteKBs = networkWriteKBs;
-        this.diskReadKBs = diskReadKBs;
-        this.diskWriteKBs = diskWriteKBs;
-        this.numCPUs = numCPUs;
-        this.entityType = entityType;
+    public long getVmId() {
+        return vmId;
+    }
+
+    public void setVmId(long vmId) {
+        this.vmId = vmId;
     }
 
     @Override
@@ -166,4 +167,12 @@ public class VmStatsEntry implements VmStats {
         this.entityType = entityType;
     }
 
+    public UserVmVO getUserVmVO() {
+        return userVmVO;
+    }
+
+    public void setUserVmVO(UserVmVO userVmVO) {
+        this.userVmVO = userVmVO;
+    }
+
 }
diff --git a/server/pom.xml b/server/pom.xml
index e246150..1987383 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -157,6 +157,11 @@
             <groupId>org.opensaml</groupId>
             <artifactId>opensaml</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.influxdb</groupId>
+            <artifactId>influxdb-java</artifactId>
+            <version>2.8</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/main/java/com/cloud/server/StatsCollector.java 
b/server/src/main/java/com/cloud/server/StatsCollector.java
index b66fa5f..8e2bc7e 100644
--- a/server/src/main/java/com/cloud/server/StatsCollector.java
+++ b/server/src/main/java/com/cloud/server/StatsCollector.java
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -46,8 +47,15 @@ import 
org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.utils.graphite.GraphiteClient;
 import org.apache.cloudstack.utils.graphite.GraphiteException;
 import org.apache.cloudstack.utils.usage.UsageUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Pong;
 import org.springframework.stereotype.Component;
 
 import com.cloud.agent.AgentManager;
@@ -121,6 +129,7 @@ import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.Transaction;
 import com.cloud.utils.db.TransactionCallbackNoReturn;
 import com.cloud.utils.db.TransactionStatus;
+import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.net.MacAddress;
 import com.cloud.vm.NicVO;
 import com.cloud.vm.UserVmManager;
@@ -140,7 +149,7 @@ import com.cloud.vm.dao.VMInstanceDao;
 public class StatsCollector extends ManagerBase implements 
ComponentMethodInterceptable, Configurable {
 
     public static enum ExternalStatsProtocol {
-        NONE("none"), GRAPHITE("graphite");
+        NONE("none"), GRAPHITE("graphite"), INFLUXDB("influxdb");
         String _type;
 
         ExternalStatsProtocol(String type) {
@@ -155,16 +164,52 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
 
     public static final Logger s_logger = 
Logger.getLogger(StatsCollector.class.getName());
 
-    static final ConfigKey<Integer> vmDiskStatsInterval = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval", "0",
+    private static final int UNDEFINED_PORT_VALUE = -1;
+
+    /**
+     * Default value for the Graphite connection port: {@value}
+     */
+    private static final int GRAPHITE_DEFAULT_PORT = 2003;
+
+    /**
+     * Default value for the InfluxDB connection port: {@value}
+     */
+    private static final int INFLUXDB_DEFAULT_PORT = 8086;
+
+    private static final String UUID_TAG = "uuid";
+
+    private static final String TOTAL_MEMORY_KBS_FIELD = "total_memory_kb";
+    private static final String FREE_MEMORY_KBS_FIELD = "free_memory_kb";
+    private static final String CPU_UTILIZATION_FIELD = "cpu_utilization";
+    private static final String CPUS_FIELD = "cpus";
+    private static final String CPU_SOCKETS_FIELD = "cpu_sockets";
+    private static final String NETWORK_READ_KBS_FIELD = "network_read_kbs";
+    private static final String NETWORK_WRITE_KBS_FIELD = "network_write_kbs";
+    private static final String MEMORY_TARGET_KBS_FIELD = "memory_target_kbs";
+    private static final String DISK_READ_IOPS_FIELD = "disk_read_iops";
+    private static final String DISK_READ_KBS_FIELD = "disk_read_kbs";
+    private static final String DISK_WRITE_IOPS_FIELD = "disk_write_iops";
+    private static final String DISK_WRITE_KBS_FIELD = "disk_write_kbs";
+
+    private static final String DEFAULT_DATABASE_NAME = "cloudstack";
+    private static final String INFLUXDB_HOST_MEASUREMENT = "host_stats";
+    private static final String INFLUXDB_VM_MEASUREMENT = "vm_stats";
+
+    private static final ConfigKey<Integer> vmDiskStatsInterval = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval", "0",
             "Interval (in seconds) to report vm disk statistics. Vm disk 
statistics will be disabled if this is set to 0 or less than 0.", false);
-    static final ConfigKey<Integer> vmDiskStatsIntervalMin = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval.min", 
"300",
+    private static final ConfigKey<Integer> vmDiskStatsIntervalMin = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval.min", 
"300",
             "Minimal interval (in seconds) to report vm disk statistics. If 
vm.disk.stats.interval is smaller than this, use this to report vm disk 
statistics.", false);
-    static final ConfigKey<Integer> vmNetworkStatsInterval = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval", "0",
+    private static final ConfigKey<Integer> vmNetworkStatsInterval = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval", "0",
             "Interval (in seconds) to report vm network statistics (for Shared 
networks). Vm network statistics will be disabled if this is set to 0 or less 
than 0.", false);
-    static final ConfigKey<Integer> vmNetworkStatsIntervalMin = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval.min", 
"300",
-            "Minimal Interval (in seconds) to report vm network statistics 
(for Shared networks). If vm.network.stats.interval is smaller than this, use 
this to report vm network statistics.", false);
-    static final ConfigKey<Integer> StatsTimeout = new 
ConfigKey<Integer>("Advanced", Integer.class, "stats.timeout", "60000",
-            "The timeout for stats call in milli seconds.", true, 
ConfigKey.Scope.Cluster);
+    private static final ConfigKey<Integer> vmNetworkStatsIntervalMin = new 
ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval.min", 
"300",
+            "Minimal Interval (in seconds) to report vm network statistics 
(for Shared networks). If vm.network.stats.interval is smaller than this, use 
this to report vm network statistics.",
+            false);
+    private static final ConfigKey<Integer> StatsTimeout = new 
ConfigKey<Integer>("Advanced", Integer.class, "stats.timeout", "60000",
+            "The timeout for stats call in milli seconds.", true,
+            ConfigKey.Scope.Cluster);
+    private static final ConfigKey<String> statsOutputUri = new 
ConfigKey<String>("Advanced", String.class, "stats.output.uri", "",
+            "URI to send StatsCollector statistics to. The collector is 
defined on the URI scheme. Example: graphite://graphite-hostaddress:port or 
influxdb://influxdb-hostaddress/dbname. Note that the port is optional, if not 
added the default port for the respective collector (graphite or influxdb) will 
be used. Additionally, the database name '/dbname' is  also optional; default 
db name is 'cloudstack'. You must create and configure the database if using 
influxdb.",
+            true);
 
     private static StatsCollector s_instance = null;
 
@@ -236,20 +281,20 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
     private ConcurrentHashMap<Long, StorageStats> _storageStats = new 
ConcurrentHashMap<Long, StorageStats>();
     private ConcurrentHashMap<Long, StorageStats> _storagePoolStats = new 
ConcurrentHashMap<Long, StorageStats>();
 
-    long hostStatsInterval = -1L;
-    long hostAndVmStatsInterval = -1L;
-    long storageStatsInterval = -1L;
-    long volumeStatsInterval = -1L;
-    long autoScaleStatsInterval = -1L;
+    private long hostStatsInterval = -1L;
+    private long hostAndVmStatsInterval = -1L;
+    private long storageStatsInterval = -1L;
+    private long volumeStatsInterval = -1L;
+    private long autoScaleStatsInterval = -1L;
 
-    List<Long> hostIds = null;
     private double _imageStoreCapacityThreshold = 0.90;
 
-    String externalStatsPrefix = "";
+    private String externalStatsPrefix = "";
     String externalStatsHost = null;
     int externalStatsPort = -1;
-    boolean externalStatsEnabled = false;
+    private String externalStatsScheme;
     ExternalStatsProtocol externalStatsType = ExternalStatsProtocol.NONE;
+    private String databaseName = DEFAULT_DATABASE_NAME;
 
     private ScheduledExecutorService _diskStatsUpdateExecutor;
     private int _usageAggregationRange = 1440;
@@ -279,7 +324,7 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
         return true;
     }
 
-    private void init(Map<String, String> configs) {
+    protected void init(Map<String, String> configs) {
         _executor = Executors.newScheduledThreadPool(6, new 
NamedThreadFactory("StatsCollector"));
 
         hostStatsInterval = 
NumbersUtil.parseLong(configs.get("host.stats.interval"), 60000L);
@@ -288,24 +333,25 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
         volumeStatsInterval = 
NumbersUtil.parseLong(configs.get("volume.stats.interval"), 600000L);
         autoScaleStatsInterval = 
NumbersUtil.parseLong(configs.get("autoscale.stats.interval"), 60000L);
 
-        /* URI to send statistics to. Currently only Graphite is supported */
-        String externalStatsUri = configs.get("stats.output.uri");
-        if (externalStatsUri != null && !externalStatsUri.equals("")) {
+        String statsUri = statsOutputUri.value();
+        if (StringUtils.isNotBlank(statsUri)) {
             try {
-                URI uri = new URI(externalStatsUri);
-                String scheme = uri.getScheme();
+                URI uri = new URI(statsUri);
+                externalStatsScheme = uri.getScheme();
 
                 try {
-                    externalStatsType = 
ExternalStatsProtocol.valueOf(scheme.toUpperCase());
+                    externalStatsType = 
ExternalStatsProtocol.valueOf(externalStatsScheme.toUpperCase());
                 } catch (IllegalArgumentException e) {
-                    s_logger.info(scheme + " is not a valid protocol for 
external statistics. No statistics will be send.");
+                    s_logger.error(externalStatsScheme + " is not a valid 
protocol for external statistics. No statistics will be send.");
                 }
 
-                if (!StringUtils.isEmpty(uri.getHost())) {
+                if (StringUtils.isNotEmpty(uri.getHost())) {
                     externalStatsHost = uri.getHost();
                 }
 
-                externalStatsPort = uri.getPort();
+                externalStatsPort = retrieveExternalStatsPortFromUri(uri);
+
+                databaseName = configureDatabaseName(uri);
 
                 if (!StringUtils.isEmpty(uri.getPath())) {
                     externalStatsPrefix = uri.getPath().substring(1);
@@ -318,9 +364,8 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
                     externalStatsPrefix = "";
                 }
 
-                externalStatsEnabled = true;
             } catch (URISyntaxException e) {
-                s_logger.debug("Failed to parse external statistics URI: " + 
e.getMessage());
+                s_logger.error("Failed to parse external statistics URI: ", e);
             }
         }
 
@@ -342,7 +387,8 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
 
         if (vmDiskStatsInterval.value() > 0) {
             if (vmDiskStatsInterval.value() < vmDiskStatsIntervalMin.value()) {
-                s_logger.debug("vm.disk.stats.interval - " + 
vmDiskStatsInterval.value() + " is smaller than vm.disk.stats.interval.min - " 
+ vmDiskStatsIntervalMin.value() + ", so use vm.disk.stats.interval.min");
+                s_logger.debug("vm.disk.stats.interval - " + 
vmDiskStatsInterval.value() + " is smaller than vm.disk.stats.interval.min - " 
+ vmDiskStatsIntervalMin.value()
+                        + ", so use vm.disk.stats.interval.min");
                 _executor.scheduleAtFixedRate(new VmDiskStatsTask(), 
vmDiskStatsIntervalMin.value(), vmDiskStatsIntervalMin.value(), 
TimeUnit.SECONDS);
             } else {
                 _executor.scheduleAtFixedRate(new VmDiskStatsTask(), 
vmDiskStatsInterval.value(), vmDiskStatsInterval.value(), TimeUnit.SECONDS);
@@ -353,7 +399,8 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
 
         if (vmNetworkStatsInterval.value() > 0) {
             if (vmNetworkStatsInterval.value() < 
vmNetworkStatsIntervalMin.value()) {
-                s_logger.debug("vm.network.stats.interval - " + 
vmNetworkStatsInterval.value() + " is smaller than 
vm.network.stats.interval.min - " + vmNetworkStatsIntervalMin.value() + ", so 
use vm.network.stats.interval.min");
+                s_logger.debug("vm.network.stats.interval - " + 
vmNetworkStatsInterval.value() + " is smaller than 
vm.network.stats.interval.min - "
+                        + vmNetworkStatsIntervalMin.value() + ", so use 
vm.network.stats.interval.min");
                 _executor.scheduleAtFixedRate(new VmNetworkStatsTask(), 
vmNetworkStatsIntervalMin.value(), vmNetworkStatsIntervalMin.value(), 
TimeUnit.SECONDS);
             } else {
                 _executor.scheduleAtFixedRate(new VmNetworkStatsTask(), 
vmNetworkStatsInterval.value(), vmNetworkStatsInterval.value(), 
TimeUnit.SECONDS);
@@ -411,82 +458,111 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
 
     }
 
-    class HostCollector extends ManagedContextRunnable {
+    /**
+     * Configures the database name according to the URI path. For instance, 
if the URI is as influxdb://address:port/dbname, the database name will be 
'dbname'.
+     */
+    protected String configureDatabaseName(URI uri) {
+        String dbname = StringUtils.removeStart(uri.getPath(), "/");
+        if (StringUtils.isBlank(dbname)) {
+            return DEFAULT_DATABASE_NAME;
+        } else {
+            return dbname;
+        }
+    }
+
+    /**
+     * Configures the port to be used when connecting with the stats collector 
service.
+     * Default values are 8086 for influx DB and 2003 for GraphiteDB.
+     * Throws URISyntaxException in case of non configured port and external 
StatsType
+     */
+    protected int retrieveExternalStatsPortFromUri(URI uri) throws 
URISyntaxException {
+        int port = uri.getPort();
+        if (externalStatsType != ExternalStatsProtocol.NONE) {
+            if (port != UNDEFINED_PORT_VALUE) {
+                return port;
+            }
+            if (externalStatsType == ExternalStatsProtocol.GRAPHITE) {
+                return GRAPHITE_DEFAULT_PORT;
+            }
+            if (externalStatsType == ExternalStatsProtocol.INFLUXDB) {
+                return INFLUXDB_DEFAULT_PORT;
+            }
+        }
+        throw new URISyntaxException(uri.toString(), String.format(
+                "Cannot define a port for the Stats Collector host %s://%s:%s 
or URI scheme is incorrect. The configured URI in stats.output.uri is not 
supported. Please configure as the following examples: 
graphite://graphite-hostaddress:port, or influxdb://influxdb-hostaddress:port. 
Note that the port is optional, if not added the default port for the 
respective collector (graphite or influxdb) will be used.",
+                externalStatsPrefix, externalStatsHost, externalStatsPort));
+    }
+
+    class HostCollector extends AbstractStatsCollector {
         @Override
         protected void runInContext() {
             try {
                 s_logger.debug("HostStatsCollector is running...");
 
-                SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
-                sc.addAnd("status", SearchCriteria.Op.EQ, 
Status.Up.toString());
-                sc.addAnd("resourceState", SearchCriteria.Op.NIN, 
ResourceState.Maintenance, ResourceState.PrepareForMaintenance, 
ResourceState.ErrorInMaintenance);
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.Storage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.ConsoleProxy.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.SecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.LocalSecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.TrafficMonitor.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.SecondaryStorageVM.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.ExternalFirewall.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.ExternalLoadBalancer.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.NetScalerControlCenter.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.L2Networking.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.BaremetalDhcp.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.BaremetalPxe.toString());
-                ConcurrentHashMap<Long, HostStats> hostStats = new 
ConcurrentHashMap<Long, HostStats>();
+                SearchCriteria<HostVO> sc = 
createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
+
+                Map<Object, Object> metrics = new HashMap<>();
                 List<HostVO> hosts = _hostDao.search(sc, null);
+
                 for (HostVO host : hosts) {
-                    HostStatsEntry stats = 
(HostStatsEntry)_resourceMgr.getHostStatistics(host.getId());
-                    if (stats != null) {
-                        hostStats.put(host.getId(), stats);
+                    HostStatsEntry hostStatsEntry = 
(HostStatsEntry)_resourceMgr.getHostStatistics(host.getId());
+                    if (hostStatsEntry != null) {
+                        hostStatsEntry.setHostVo(host);
+                        metrics.put(hostStatsEntry.getHostId(), 
hostStatsEntry);
+                        _hostStats.put(host.getId(), hostStatsEntry);
                     } else {
-                        s_logger.warn("Received invalid host stats for host: " 
+ host.getId());
+                        s_logger.warn("The Host stats is null for host: " + 
host.getId());
                     }
                 }
-                _hostStats = hostStats;
-                // Get a subset of hosts with GPU support from the list of 
"hosts"
-                List<HostVO> gpuEnabledHosts = new ArrayList<HostVO>();
-                if (hostIds != null) {
-                    for (HostVO host : hosts) {
-                        if (hostIds.contains(host.getId())) {
-                            gpuEnabledHosts.add(host);
-                        }
-                    }
-                } else {
-                    // Check for all the hosts managed by CloudStack.
-                    gpuEnabledHosts = hosts;
-                }
-                for (HostVO host : gpuEnabledHosts) {
-                    HashMap<String, HashMap<String, VgpuTypesInfo>> 
groupDetails = _resourceMgr.getGPUStatistics(host);
-                    if (groupDetails != null) {
-                        _resourceMgr.updateGPUDetails(host.getId(), 
groupDetails);
-                    }
+
+                if (externalStatsType == ExternalStatsProtocol.INFLUXDB) {
+                    sendMetricsToInfluxdb(metrics);
                 }
-                hostIds = _hostGpuGroupsDao.listHostIds();
+
+                updateGpuEnabledHostsDetails(hosts);
             } catch (Throwable t) {
                 s_logger.error("Error trying to retrieve host stats", t);
             }
         }
+
+        /**
+         * Updates GPU details on hosts supporting GPU.
+         */
+        private void updateGpuEnabledHostsDetails(List<HostVO> hosts) {
+            List<HostVO> gpuEnabledHosts = new ArrayList<HostVO>();
+            List<Long> hostIds = _hostGpuGroupsDao.listHostIds();
+            if (CollectionUtils.isEmpty(hostIds)) {
+                return;
+            }
+            for (HostVO host : hosts) {
+                if (hostIds.contains(host.getId())) {
+                    gpuEnabledHosts.add(host);
+                }
+            }
+            for (HostVO host : gpuEnabledHosts) {
+                HashMap<String, HashMap<String, VgpuTypesInfo>> groupDetails = 
_resourceMgr.getGPUStatistics(host);
+                if (MapUtils.isEmpty(groupDetails)) {
+                    _resourceMgr.updateGPUDetails(host.getId(), groupDetails);
+                }
+            }
+        }
+
+        @Override
+        protected Point creteInfluxDbPoint(Object metricsObject) {
+            return createInfluxDbPointForHostMetrics(metricsObject);
+        }
     }
 
-    class VmStatsCollector extends ManagedContextRunnable {
+    class VmStatsCollector extends AbstractStatsCollector {
         @Override
         protected void runInContext() {
             try {
                 s_logger.trace("VmStatsCollector is running...");
 
-                SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
-                sc.addAnd("status", SearchCriteria.Op.EQ, 
Status.Up.toString());
-                sc.addAnd("resourceState", SearchCriteria.Op.NIN, 
ResourceState.Maintenance, ResourceState.PrepareForMaintenance, 
ResourceState.ErrorInMaintenance);
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.Storage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.ConsoleProxy.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.SecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.LocalSecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.TrafficMonitor.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, 
Host.Type.SecondaryStorageVM.toString());
+                SearchCriteria<HostVO> sc = 
createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
                 List<HostVO> hosts = _hostDao.search(sc, null);
 
-                /* HashMap for metrics to be send to Graphite */
-                HashMap metrics = new HashMap<String, Integer>();
+                Map<Object, Object> metrics = new HashMap<>();
 
                 for (HostVO host : hosts) {
                     List<UserVmVO> vms = 
_userVmDao.listRunningByHostId(host.getId());
@@ -497,86 +573,35 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                     }
 
                     try {
-                        HashMap<Long, VmStatsEntry> vmStatsById = 
_userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
+                        Map<Long, VmStatsEntry> vmStatsById = 
_userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
 
                         if (vmStatsById != null) {
-                            VmStatsEntry statsInMemory = null;
-
                             Set<Long> vmIdSet = vmStatsById.keySet();
                             for (Long vmId : vmIdSet) {
                                 VmStatsEntry statsForCurrentIteration = 
vmStatsById.get(vmId);
-                                statsInMemory = 
(VmStatsEntry)_VmStats.get(vmId);
+                                statsForCurrentIteration.setVmId(vmId);
+                                UserVmVO userVmVo = _userVmDao.findById(vmId);
+                                statsForCurrentIteration.setUserVmVO(userVmVo);
 
-                                if (statsInMemory == null) {
-                                    //no stats exist for this vm, directly 
persist
-                                    _VmStats.put(vmId, 
statsForCurrentIteration);
-                                } else {
-                                    //update each field
-                                    
statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization());
-                                    
statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs());
-                                    
statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + 
statsForCurrentIteration.getNetworkReadKBs());
-                                    
statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + 
statsForCurrentIteration.getNetworkWriteKBs());
-                                    
statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + 
statsForCurrentIteration.getDiskWriteKBs());
-                                    
statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + 
statsForCurrentIteration.getDiskReadIOs());
-                                    
statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + 
statsForCurrentIteration.getDiskWriteIOs());
-                                    
statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + 
statsForCurrentIteration.getDiskReadKBs());
-                                    
statsInMemory.setMemoryKBs(statsForCurrentIteration.getMemoryKBs());
-                                    
statsInMemory.setIntFreeMemoryKBs(statsForCurrentIteration.getIntFreeMemoryKBs());
-                                    
statsInMemory.setTargetMemoryKBs(statsForCurrentIteration.getTargetMemoryKBs());
-
-                                    _VmStats.put(vmId, statsInMemory);
-                                }
-
-                                /**
-                                 * Add statistics to HashMap only when they 
should be send to a external stats collector
-                                 * Performance wise it seems best to only 
append to the HashMap when needed
-                                 */
-                                if (externalStatsEnabled) {
-                                    VMInstanceVO vmVO = 
_vmInstance.findById(vmId);
-                                    String vmName = vmVO.getUuid();
-
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".cpu.num", 
statsForCurrentIteration.getNumCPUs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".cpu.utilization", 
statsForCurrentIteration.getCPUUtilization());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".network.read_kbs", 
statsForCurrentIteration.getNetworkReadKBs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".network.write_kbs", 
statsForCurrentIteration.getNetworkWriteKBs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".disk.write_kbs", 
statsForCurrentIteration.getDiskWriteKBs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".disk.read_kbs", 
statsForCurrentIteration.getDiskReadKBs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".disk.write_iops", 
statsForCurrentIteration.getDiskWriteIOs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".disk.read_iops", 
statsForCurrentIteration.getDiskReadIOs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".memory.total_kbs", 
statsForCurrentIteration.getMemoryKBs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".memory.internalfree_kbs", 
statsForCurrentIteration.getIntFreeMemoryKBs());
-                                    metrics.put(externalStatsPrefix + 
"cloudstack.stats.instances." + vmName + ".memory.target_kbs", 
statsForCurrentIteration.getTargetMemoryKBs());
+                                
storeVirtualMachineStatsInMemory(statsForCurrentIteration);
 
+                                if (externalStatsType == 
ExternalStatsProtocol.GRAPHITE) {
+                                    prepareVmMetricsForGraphite(metrics, 
statsForCurrentIteration);
+                                } else {
+                                    
metrics.put(statsForCurrentIteration.getVmId(), statsForCurrentIteration);
                                 }
-
                             }
 
-                            /**
-                             * Send the metrics to a external stats collector
-                             * We send it on a per-host basis to prevent that 
we flood the host
-                             * Currently only Graphite is supported
-                             */
                             if (!metrics.isEmpty()) {
-                                if (externalStatsType != null && 
externalStatsType == ExternalStatsProtocol.GRAPHITE) {
-
-                                    if (externalStatsPort == -1) {
-                                        externalStatsPort = 2003;
-                                    }
-
-                                    s_logger.debug("Sending VmStats of host " 
+ host.getId() + " to Graphite host " + externalStatsHost + ":" + 
externalStatsPort);
-
-                                    try {
-                                        GraphiteClient g = new 
GraphiteClient(externalStatsHost, externalStatsPort);
-                                        g.sendMetrics(metrics);
-                                    } catch (GraphiteException e) {
-                                        s_logger.debug("Failed sending VmStats 
to Graphite host " + externalStatsHost + ":" + externalStatsPort + ": " + 
e.getMessage());
-                                    }
-
-                                    metrics.clear();
+                                if (externalStatsType == 
ExternalStatsProtocol.GRAPHITE) {
+                                    sendVmMetricsToGraphiteHost(metrics, host);
+                                } else if (externalStatsType == 
ExternalStatsProtocol.INFLUXDB) {
+                                    sendMetricsToInfluxdb(metrics);
                                 }
                             }
-                        }
 
+                            metrics.clear();
+                        }
                     } catch (Exception e) {
                         s_logger.debug("Failed to get VM stats for host with 
ID: " + host.getId());
                         continue;
@@ -587,6 +612,11 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
                 s_logger.error("Error trying to retrieve VM stats", t);
             }
         }
+
+        @Override
+        protected Point creteInfluxDbPoint(Object metricsObject) {
+            return createInfluxDbPointForVmMetrics(metricsObject);
+        }
     }
 
     public VmStats getVmStats(long id) {
@@ -646,7 +676,7 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
             //Check for ownership
             //msHost in UP state with min id should run the job
             ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new 
Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
-            if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){
+            if (msHost == null || (msHost.getMsid() != mgmtSrvrId)) {
                 s_logger.debug("Skipping collect vm disk stats from hosts");
                 return;
             }
@@ -658,11 +688,7 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
                     public void doInTransactionWithoutResult(TransactionStatus 
status) {
                         s_logger.debug("VmDiskStatsTask is running...");
 
-                        SearchCriteria<HostVO> sc = 
_hostDao.createSearchCriteria();
-                        sc.addAnd("status", SearchCriteria.Op.EQ, 
Status.Up.toString());
-                        sc.addAnd("resourceState", SearchCriteria.Op.NIN, 
ResourceState.Maintenance, ResourceState.PrepareForMaintenance,
-                                ResourceState.ErrorInMaintenance);
-                        sc.addAnd("type", SearchCriteria.Op.EQ, 
Host.Type.Routing.toString());
+                        SearchCriteria<HostVO> sc = 
createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
                         sc.addAnd("hypervisorType", SearchCriteria.Op.EQ, 
HypervisorType.KVM); // support KVM only util 2013.06.25
                         List<HostVO> hosts = _hostDao.search(sc, null);
 
@@ -689,67 +715,64 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                                     SearchCriteria<VolumeVO> sc_volume = 
_volsDao.createSearchCriteria();
                                     sc_volume.addAnd("path", 
SearchCriteria.Op.EQ, vmDiskStat.getPath());
                                     List<VolumeVO> volumes = 
_volsDao.search(sc_volume, null);
-                                    if ((volumes == null) || (volumes.size() 
== 0))
+
+                                    if (CollectionUtils.isEmpty(volumes))
                                         break;
+
                                     VolumeVO volume = volumes.get(0);
-                                    VmDiskStatisticsVO previousVmDiskStats =
-                                            
_vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, 
volume.getId());
+                                    VmDiskStatisticsVO previousVmDiskStats = 
_vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, 
volume.getId());
                                     VmDiskStatisticsVO vmDiskStat_lock = 
_vmDiskStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), vmId, 
volume.getId());
 
-                                    if ((vmDiskStat.getBytesRead() == 0) && 
(vmDiskStat.getBytesWrite() == 0) && (vmDiskStat.getIORead() == 0) &&
-                                            (vmDiskStat.getIOWrite() == 0)) {
+                                    if (areAllDiskStatsZero(vmDiskStat)) {
                                         s_logger.debug("IO/bytes read and 
write are all 0. Not updating vm_disk_statistics");
                                         continue;
                                     }
 
                                     if (vmDiskStat_lock == null) {
-                                        s_logger.warn("unable to find vm disk 
stats from host for account: " + userVm.getAccountId() + " with vmId: " + 
userVm.getId() +
-                                                " and volumeId:" + 
volume.getId());
+                                        s_logger.warn("unable to find vm disk 
stats from host for account: " + userVm.getAccountId() + " with vmId: " + 
userVm.getId()
+                                                + " and volumeId:" + 
volume.getId());
                                         continue;
                                     }
 
-                                    if (previousVmDiskStats != null &&
-                                            
((previousVmDiskStats.getCurrentBytesRead() != 
vmDiskStat_lock.getCurrentBytesRead()) ||
-                                                    
(previousVmDiskStats.getCurrentBytesWrite() != 
vmDiskStat_lock.getCurrentBytesWrite()) ||
-                                                    
(previousVmDiskStats.getCurrentIORead() != vmDiskStat_lock.getCurrentIORead()) 
|| (previousVmDiskStats.getCurrentIOWrite() != 
vmDiskStat_lock.getCurrentIOWrite()))) {
-                                        s_logger.debug("vm disk stats changed 
from the time GetVmDiskStatsCommand was sent. " + "Ignoring current answer. 
Host: " +
-                                                host.getName() + " . VM: " + 
vmDiskStat.getVmName() + " Read(Bytes): " + vmDiskStat.getBytesRead() + " 
write(Bytes): " +
-                                                vmDiskStat.getBytesWrite() + " 
Read(IO): " + vmDiskStat.getIORead() + " write(IO): " + 
vmDiskStat.getIOWrite());
+                                    if 
(isCurrentVmDiskStatsDifferentFromPrevious(previousVmDiskStats, 
vmDiskStat_lock)) {
+                                        s_logger.debug("vm disk stats changed 
from the time GetVmDiskStatsCommand was sent. " + "Ignoring current answer. 
Host: " + host.getName()
+                                                + " . VM: " + 
vmDiskStat.getVmName() + " Read(Bytes): " + vmDiskStat.getBytesRead() + " 
write(Bytes): " + vmDiskStat.getBytesWrite()
+                                                + " Read(IO): " + 
vmDiskStat.getIORead() + " write(IO): " + vmDiskStat.getIOWrite());
                                         continue;
                                     }
 
                                     if (vmDiskStat_lock.getCurrentBytesRead() 
> vmDiskStat.getBytesRead()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Read # of bytes 
that's less than the last one.  " +
-                                                    "Assuming something went 
wrong and persisting it. Host: " + host.getName() + " . VM: " + 
vmDiskStat.getVmName() +
-                                                    " Reported: " + 
vmDiskStat.getBytesRead() + " Stored: " + 
vmDiskStat_lock.getCurrentBytesRead());
+                                            s_logger.debug("Read # of bytes 
that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: "
+                                                    + host.getName() + " . VM: 
" + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getBytesRead() + " 
Stored: "
+                                                    + 
vmDiskStat_lock.getCurrentBytesRead());
                                         }
                                         
vmDiskStat_lock.setNetBytesRead(vmDiskStat_lock.getNetBytesRead() + 
vmDiskStat_lock.getCurrentBytesRead());
                                     }
                                     
vmDiskStat_lock.setCurrentBytesRead(vmDiskStat.getBytesRead());
                                     if (vmDiskStat_lock.getCurrentBytesWrite() 
> vmDiskStat.getBytesWrite()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Write # of bytes 
that's less than the last one.  " +
-                                                    "Assuming something went 
wrong and persisting it. Host: " + host.getName() + " . VM: " + 
vmDiskStat.getVmName() +
-                                                    " Reported: " + 
vmDiskStat.getBytesWrite() + " Stored: " + 
vmDiskStat_lock.getCurrentBytesWrite());
+                                            s_logger.debug("Write # of bytes 
that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: "
+                                                    + host.getName() + " . VM: 
" + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getBytesWrite() + " 
Stored: "
+                                                    + 
vmDiskStat_lock.getCurrentBytesWrite());
                                         }
                                         
vmDiskStat_lock.setNetBytesWrite(vmDiskStat_lock.getNetBytesWrite() + 
vmDiskStat_lock.getCurrentBytesWrite());
                                     }
                                     
vmDiskStat_lock.setCurrentBytesWrite(vmDiskStat.getBytesWrite());
                                     if (vmDiskStat_lock.getCurrentIORead() > 
vmDiskStat.getIORead()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Read # of IO 
that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: " +
-                                                    host.getName() + " . VM: " 
+ vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIORead() + " Stored: " 
+
-                                                    
vmDiskStat_lock.getCurrentIORead());
+                                            s_logger.debug("Read # of IO 
that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: "
+                                                    + host.getName() + " . VM: 
" + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIORead() + " Stored: 
"
+                                                    + 
vmDiskStat_lock.getCurrentIORead());
                                         }
                                         
vmDiskStat_lock.setNetIORead(vmDiskStat_lock.getNetIORead() + 
vmDiskStat_lock.getCurrentIORead());
                                     }
                                     
vmDiskStat_lock.setCurrentIORead(vmDiskStat.getIORead());
                                     if (vmDiskStat_lock.getCurrentIOWrite() > 
vmDiskStat.getIOWrite()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Write # of IO 
that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: " +
-                                                    host.getName() + " . VM: " 
+ vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIOWrite() + " Stored: 
" +
-                                                    
vmDiskStat_lock.getCurrentIOWrite());
+                                            s_logger.debug("Write # of IO 
that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: "
+                                                    + host.getName() + " . VM: 
" + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIOWrite() + " 
Stored: "
+                                                    + 
vmDiskStat_lock.getCurrentIOWrite());
                                         }
                                         
vmDiskStat_lock.setNetIOWrite(vmDiskStat_lock.getNetIOWrite() + 
vmDiskStat_lock.getCurrentIOWrite());
                                     }
@@ -781,7 +804,7 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
             //Check for ownership
             //msHost in UP state with min id should run the job
             ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new 
Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
-            if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){
+            if (msHost == null || (msHost.getMsid() != mgmtSrvrId)) {
                 s_logger.debug("Skipping collect vm network stats from hosts");
                 return;
             }
@@ -792,14 +815,10 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                     public void doInTransactionWithoutResult(TransactionStatus 
status) {
                         s_logger.debug("VmNetworkStatsTask is running...");
 
-                        SearchCriteria<HostVO> sc = 
_hostDao.createSearchCriteria();
-                        sc.addAnd("status", SearchCriteria.Op.EQ, 
Status.Up.toString());
-                        sc.addAnd("resourceState", SearchCriteria.Op.NIN, 
ResourceState.Maintenance, ResourceState.PrepareForMaintenance, 
ResourceState.ErrorInMaintenance);
-                        sc.addAnd("type", SearchCriteria.Op.EQ, 
Host.Type.Routing.toString());
+                        SearchCriteria<HostVO> sc = 
createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
                         List<HostVO> hosts = _hostDao.search(sc, null);
 
-                        for (HostVO host : hosts)
-                        {
+                        for (HostVO host : hosts) {
                             List<UserVmVO> vms = 
_userVmDao.listRunningByHostId(host.getId());
                             List<Long> vmIds = new ArrayList<Long>();
 
@@ -813,8 +832,7 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
                                 continue;
 
                             Set<Long> vmIdSet = vmNetworkStatsById.keySet();
-                            for(Long vmId : vmIdSet)
-                            {
+                            for (Long vmId : vmIdSet) {
                                 List<VmNetworkStatsEntry> vmNetworkStats = 
vmNetworkStatsById.get(vmId);
                                 if (vmNetworkStats == null)
                                     continue;
@@ -823,20 +841,24 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                                     s_logger.debug("Cannot find uservm with 
id: " + vmId + " , continue");
                                     continue;
                                 }
-                                s_logger.debug("Now we are updating the 
user_statistics table for VM: " + userVm.getInstanceName() + " after collecting 
vm network statistics from host: " + host.getName());
-                                for (VmNetworkStatsEntry 
vmNetworkStat:vmNetworkStats) {
+                                s_logger.debug("Now we are updating the 
user_statistics table for VM: " + userVm.getInstanceName()
+                                        + " after collecting vm network 
statistics from host: " + host.getName());
+                                for (VmNetworkStatsEntry vmNetworkStat : 
vmNetworkStats) {
                                     SearchCriteria<NicVO> sc_nic = 
_nicDao.createSearchCriteria();
                                     sc_nic.addAnd("macAddress", 
SearchCriteria.Op.EQ, vmNetworkStat.getMacAddress());
                                     NicVO nic = _nicDao.search(sc_nic, 
null).get(0);
                                     List<VlanVO> vlan = 
_vlanDao.listVlansByNetworkId(nic.getNetworkId());
                                     if (vlan == null || vlan.size() == 0 || 
vlan.get(0).getVlanType() != VlanType.DirectAttached)
                                         continue; // only get network 
statistics for DirectAttached network (shared networks in Basic zone and 
Advanced zone with/without SG)
-                                    UserStatisticsVO previousvmNetworkStats = 
_userStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), 
nic.getNetworkId(), nic.getIPv4Address(), vmId, "UserVm");
+                                    UserStatisticsVO previousvmNetworkStats = 
_userStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), 
nic.getNetworkId(),
+                                            nic.getIPv4Address(), vmId, 
"UserVm");
                                     if (previousvmNetworkStats == null) {
-                                        previousvmNetworkStats = new 
UserStatisticsVO(userVm.getAccountId(), 
userVm.getDataCenterId(),nic.getIPv4Address(), vmId, "UserVm", 
nic.getNetworkId());
+                                        previousvmNetworkStats = new 
UserStatisticsVO(userVm.getAccountId(), userVm.getDataCenterId(), 
nic.getIPv4Address(), vmId, "UserVm",
+                                                nic.getNetworkId());
                                         
_userStatsDao.persist(previousvmNetworkStats);
                                     }
-                                    UserStatisticsVO vmNetworkStat_lock = 
_userStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), 
nic.getNetworkId(), nic.getIPv4Address(), vmId, "UserVm");
+                                    UserStatisticsVO vmNetworkStat_lock = 
_userStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), 
nic.getNetworkId(),
+                                            nic.getIPv4Address(), vmId, 
"UserVm");
 
                                     if ((vmNetworkStat.getBytesSent() == 0) && 
(vmNetworkStat.getBytesReceived() == 0)) {
                                         s_logger.debug("bytes sent and 
received are all 0. Not updating user_statistics");
@@ -844,24 +866,24 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                                     }
 
                                     if (vmNetworkStat_lock == null) {
-                                        s_logger.warn("unable to find vm 
network stats from host for account: " + userVm.getAccountId() + " with vmId: " 
+ userVm.getId()+ " and nicId:" + nic.getId());
+                                        s_logger.warn("unable to find vm 
network stats from host for account: " + userVm.getAccountId() + " with vmId: " 
+ userVm.getId()
+                                                + " and nicId:" + nic.getId());
                                         continue;
                                     }
 
-                                    if (previousvmNetworkStats != null
-                                            && 
((previousvmNetworkStats.getCurrentBytesSent() != 
vmNetworkStat_lock.getCurrentBytesSent())
+                                    if (previousvmNetworkStats != null && 
((previousvmNetworkStats.getCurrentBytesSent() != 
vmNetworkStat_lock.getCurrentBytesSent())
                                             || 
(previousvmNetworkStats.getCurrentBytesReceived() != 
vmNetworkStat_lock.getCurrentBytesReceived()))) {
-                                        s_logger.debug("vm network stats 
changed from the time GetNmNetworkStatsCommand was sent. " +
-                                                "Ignoring current answer. 
Host: " + host.getName()  + " . VM: " + vmNetworkStat.getVmName() +
-                                                " Sent(Bytes): " + 
vmNetworkStat.getBytesSent() + " Received(Bytes): " + 
vmNetworkStat.getBytesReceived());
+                                        s_logger.debug("vm network stats 
changed from the time GetNmNetworkStatsCommand was sent. " + "Ignoring current 
answer. Host: "
+                                                + host.getName() + " . VM: " + 
vmNetworkStat.getVmName() + " Sent(Bytes): " + vmNetworkStat.getBytesSent() + " 
Received(Bytes): "
+                                                + 
vmNetworkStat.getBytesReceived());
                                         continue;
                                     }
 
                                     if 
(vmNetworkStat_lock.getCurrentBytesSent() > vmNetworkStat.getBytesSent()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Sent # of bytes 
that's less than the last one.  " +
-                                                    "Assuming something went 
wrong and persisting it. Host: " + host.getName() + " . VM: " + 
vmNetworkStat.getVmName() +
-                                                    " Reported: " + 
vmNetworkStat.getBytesSent() + " Stored: " + 
vmNetworkStat_lock.getCurrentBytesSent());
+                                            s_logger.debug("Sent # of bytes 
that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: "
+                                                    + host.getName() + " . VM: 
" + vmNetworkStat.getVmName() + " Reported: " + vmNetworkStat.getBytesSent() + 
" Stored: "
+                                                    + 
vmNetworkStat_lock.getCurrentBytesSent());
                                         }
                                         
vmNetworkStat_lock.setNetBytesSent(vmNetworkStat_lock.getNetBytesSent() + 
vmNetworkStat_lock.getCurrentBytesSent());
                                     }
@@ -869,15 +891,15 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
 
                                     if 
(vmNetworkStat_lock.getCurrentBytesReceived() > 
vmNetworkStat.getBytesReceived()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Received # of 
bytes that's less than the last one.  " +
-                                                    "Assuming something went 
wrong and persisting it. Host: " + host.getName() + " . VM: " + 
vmNetworkStat.getVmName() +
-                                                    " Reported: " + 
vmNetworkStat.getBytesReceived() + " Stored: " + 
vmNetworkStat_lock.getCurrentBytesReceived());
+                                            s_logger.debug("Received # of 
bytes that's less than the last one.  " + "Assuming something went wrong and 
persisting it. Host: "
+                                                    + host.getName() + " . VM: 
" + vmNetworkStat.getVmName() + " Reported: " + 
vmNetworkStat.getBytesReceived() + " Stored: "
+                                                    + 
vmNetworkStat_lock.getCurrentBytesReceived());
                                         }
                                         
vmNetworkStat_lock.setNetBytesReceived(vmNetworkStat_lock.getNetBytesReceived() 
+ vmNetworkStat_lock.getCurrentBytesReceived());
                                     }
                                     
vmNetworkStat_lock.setCurrentBytesReceived(vmNetworkStat.getBytesReceived());
 
-                                    if (! _dailyOrHourly) {
+                                    if (!_dailyOrHourly) {
                                         //update agg bytes
                                         
vmNetworkStat_lock.setAggBytesReceived(vmNetworkStat_lock.getNetBytesReceived() 
+ vmNetworkStat_lock.getCurrentBytesReceived());
                                         
vmNetworkStat_lock.setAggBytesSent(vmNetworkStat_lock.getNetBytesSent() + 
vmNetworkStat_lock.getCurrentBytesSent());
@@ -895,7 +917,6 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
         }
     }
 
-
     class VolumeStatsTask extends ManagedContextRunnable {
         @Override
         protected void runInContext() {
@@ -905,18 +926,15 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                 for (StoragePoolVO pool : pools) {
                     List<VolumeVO> volumes = 
_volsDao.findByPoolId(pool.getId(), null);
                     List<String> volumeLocators = new ArrayList<String>();
-                    for (VolumeVO volume: volumes){
+                    for (VolumeVO volume : volumes) {
                         if (volume.getFormat() == ImageFormat.QCOW2) {
                             volumeLocators.add(volume.getUuid());
-                        }
-                        else if (volume.getFormat() == ImageFormat.VHD){
+                        } else if (volume.getFormat() == ImageFormat.VHD) {
                             volumeLocators.add(volume.getPath());
-                        }
-                        else if (volume.getFormat() == ImageFormat.OVA){
+                        } else if (volume.getFormat() == ImageFormat.OVA) {
                             volumeLocators.add(volume.getChainInfo());
-                        }
-                        else {
-                            s_logger.warn("Volume stats not implemented for 
this format type " + volume.getFormat() );
+                        } else {
+                            s_logger.warn("Volume stats not implemented for 
this format type " + volume.getFormat());
                             break;
                         }
                     }
@@ -924,8 +942,9 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
                         Map<String, VolumeStatsEntry> volumeStatsByUuid;
                         if (pool.getScope() == ScopeType.ZONE) {
                             volumeStatsByUuid = new HashMap<>();
-                            for (final Cluster cluster: 
_clusterDao.listByZoneId(pool.getDataCenterId())) {
-                                final Map<String, VolumeStatsEntry> 
volumeStatsForCluster = _userVmMgr.getVolumeStatistics(cluster.getId(), 
pool.getUuid(), pool.getPoolType(), volumeLocators, StatsTimeout.value());
+                            for (final Cluster cluster : 
_clusterDao.listByZoneId(pool.getDataCenterId())) {
+                                final Map<String, VolumeStatsEntry> 
volumeStatsForCluster = _userVmMgr.getVolumeStatistics(cluster.getId(), 
pool.getUuid(), pool.getPoolType(),
+                                        volumeLocators, StatsTimeout.value());
                                 if (volumeStatsForCluster != null) {
                                     
volumeStatsByUuid.putAll(volumeStatsForCluster);
                                 }
@@ -933,7 +952,7 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
                         } else {
                             volumeStatsByUuid = 
_userVmMgr.getVolumeStatistics(pool.getClusterId(), pool.getUuid(), 
pool.getPoolType(), volumeLocators, StatsTimeout.value());
                         }
-                        if (volumeStatsByUuid != null){
+                        if (volumeStatsByUuid != null) {
                             for (final Map.Entry<String, VolumeStatsEntry> 
entry : volumeStatsByUuid.entrySet()) {
                                 if (entry == null || entry.getKey() == null || 
entry.getValue() == null) {
                                     continue;
@@ -985,8 +1004,7 @@ public class StatsCollector extends ManagerBase implements 
ComponentMethodInterc
                     Answer answer = ssAhost.sendMessage(command);
                     if (answer != null && answer.getResult()) {
                         storageStats.put(storeId, (StorageStats)answer);
-                        s_logger.trace("HostId: " + storeId + " Used: " + 
((StorageStats)answer).getByteUsed() + " Total Available: " +
-                                ((StorageStats)answer).getCapacityBytes());
+                        s_logger.trace("HostId: " + storeId + " Used: " + 
((StorageStats)answer).getByteUsed() + " Total Available: " + 
((StorageStats)answer).getCapacityBytes());
                     }
                 }
                 _storageStats = storageStats;
@@ -1047,8 +1065,7 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                         //check interval
                         long now = (new Date()).getTime();
                         if (asGroup.getLastInterval() != null)
-                            if ((now - asGroup.getLastInterval().getTime()) < 
asGroup
-                                    .getInterval()) {
+                            if ((now - asGroup.getLastInterval().getTime()) < 
asGroup.getInterval()) {
                                 continue;
                             }
 
@@ -1213,7 +1230,6 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
                                 long thresholdValue = 
conditionVO.getThreshold();
                                 Double thresholdPercent = 
(double)thresholdValue / 100;
                                 CounterVO counterVO = 
_asCounterDao.findById(conditionVO.getCounterid());
-//Double sum = avgCounter.get(conditionVO.getCounterid());
                                 long counter_count = 1;
                                 do {
                                     String counter_param = 
params.get("counter" + String.valueOf(counter_count));
@@ -1260,8 +1276,7 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
             return lstResult;
         }
 
-        public List<Pair<String, Integer>> getPairofCounternameAndDuration(
-                long groupId) {
+        public List<Pair<String, Integer>> 
getPairofCounternameAndDuration(long groupId) {
             AutoScaleVmGroupVO groupVo = _asGroupDao.findById(groupId);
             if (groupVo == null)
                 return null;
@@ -1307,14 +1322,231 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
         }
     }
 
+    /**
+     * This class allows to writing metrics in InfluxDB for the table that 
matches the Collector extending it.
+     * Thus, VmStatsCollector and HostCollector can use same method to write 
on different measures (host_stats or vm_stats table).
+     */
+    abstract class AbstractStatsCollector extends ManagedContextRunnable {
+        /**
+         * Sends metrics to influxdb host. This method supports both VM and 
Host metrics
+         */
+        protected void sendMetricsToInfluxdb(Map<Object, Object> metrics) {
+            InfluxDB influxDbConnection = createInfluxDbConnection();
+
+            Pong response = influxDbConnection.ping();
+            if (response.getVersion().equalsIgnoreCase("unknown")) {
+                throw new CloudRuntimeException(String.format("Cannot ping 
influxdb host %s:%s.", externalStatsHost, externalStatsPort));
+            }
+
+            Collection<Object> metricsObjects = metrics.values();
+            List<Point> points = new ArrayList<>();
+
+            s_logger.debug(String.format("Sending stats to %s host %s:%s", 
externalStatsType, externalStatsHost, externalStatsPort));
+
+            for (Object metricsObject : metricsObjects) {
+                Point vmPoint = creteInfluxDbPoint(metricsObject);
+                points.add(vmPoint);
+            }
+            writeBatches(influxDbConnection, databaseName, points);
+        }
+
+        /**
+         * Creates a InfluxDB point for the given stats collector 
(VmStatsCollector, or HostCollector).
+         */
+        protected abstract Point creteInfluxDbPoint(Object metricsObject);
+    }
+
     public boolean imageStoreHasEnoughCapacity(DataStore imageStore) {
         StorageStats imageStoreStats = _storageStats.get(imageStore.getId());
-        if (imageStoreStats != null && 
(imageStoreStats.getByteUsed()/(imageStoreStats.getCapacityBytes()*1.0)) <= 
_imageStoreCapacityThreshold) {
+        if (imageStoreStats != null && (imageStoreStats.getByteUsed() / 
(imageStoreStats.getCapacityBytes() * 1.0)) <= _imageStoreCapacityThreshold) {
             return true;
         }
         return false;
     }
 
+    /**
+     * Sends VMs metrics to the configured graphite host.
+     */
+    protected void sendVmMetricsToGraphiteHost(Map<Object, Object> metrics, 
HostVO host) {
+        s_logger.debug(String.format("Sending VmStats of host %s to %s host 
%s:%s", host.getId(), externalStatsType, externalStatsHost, externalStatsPort));
+        try {
+            GraphiteClient g = new GraphiteClient(externalStatsHost, 
externalStatsPort);
+            g.sendMetrics(metrics);
+        } catch (GraphiteException e) {
+            s_logger.debug("Failed sending VmStats to Graphite host " + 
externalStatsHost + ":" + externalStatsPort + ": " + e.getMessage());
+        }
+    }
+
+    /**
+     * Prepares metrics for Graphite.
+     * @note this method must only be executed in case the configured stats 
collector is a Graphite host;
+     * otherwise, it will compromise the map of metrics used by another type 
of collector (e.g. InfluxDB).
+     */
+    private void prepareVmMetricsForGraphite(Map<Object, Object> metrics, 
VmStatsEntry statsForCurrentIteration) {
+        VMInstanceVO vmVO = 
_vmInstance.findById(statsForCurrentIteration.getVmId());
+        String vmName = vmVO.getUuid();
+
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".cpu.num", statsForCurrentIteration.getNumCPUs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".cpu.utilization", statsForCurrentIteration.getCPUUtilization());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".network.read_kbs", statsForCurrentIteration.getNetworkReadKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".network.write_kbs", statsForCurrentIteration.getNetworkWriteKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".disk.write_kbs", statsForCurrentIteration.getDiskWriteKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".disk.read_kbs", statsForCurrentIteration.getDiskReadKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".disk.write_iops", statsForCurrentIteration.getDiskWriteIOs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".disk.read_iops", statsForCurrentIteration.getDiskReadIOs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".memory.total_kbs", statsForCurrentIteration.getMemoryKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".memory.internalfree_kbs", 
statsForCurrentIteration.getIntFreeMemoryKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + 
vmName + ".memory.target_kbs", statsForCurrentIteration.getTargetMemoryKBs());
+    }
+
+    /**
+     * Stores virtual machine stats in memory (map of {@link VmStatsEntry}).
+     */
+    private void storeVirtualMachineStatsInMemory(VmStatsEntry 
statsForCurrentIteration) {
+        VmStatsEntry statsInMemory = 
(VmStatsEntry)_VmStats.get(statsForCurrentIteration.getVmId());
+
+        if (statsInMemory == null) {
+            //no stats exist for this vm, directly persist
+            _VmStats.put(statsForCurrentIteration.getVmId(), 
statsForCurrentIteration);
+        } else {
+            //update each field
+            
statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization());
+            statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs());
+            statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() 
+ statsForCurrentIteration.getNetworkReadKBs());
+            
statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + 
statsForCurrentIteration.getNetworkWriteKBs());
+            statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + 
statsForCurrentIteration.getDiskWriteKBs());
+            statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + 
statsForCurrentIteration.getDiskReadIOs());
+            statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + 
statsForCurrentIteration.getDiskWriteIOs());
+            statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + 
statsForCurrentIteration.getDiskReadKBs());
+            
statsInMemory.setMemoryKBs(statsForCurrentIteration.getMemoryKBs());
+            
statsInMemory.setIntFreeMemoryKBs(statsForCurrentIteration.getIntFreeMemoryKBs());
+            
statsInMemory.setTargetMemoryKBs(statsForCurrentIteration.getTargetMemoryKBs());
+
+            _VmStats.put(statsForCurrentIteration.getVmId(), statsInMemory);
+        }
+    }
+
+    /**
+     * Sends host metrics to a configured InfluxDB host. The metrics respects 
the following specification.</br>
+     * <b>Tags:</b>vm_id, uuid, instance_name, data_center_id, host_id</br>
+     * <b>Fields:</b>memory_total_kb, memory_internal_free_kbs, 
memory_target_kbs, cpu_utilization, cpus, network_write_kb, disk_read_iops, 
disk_read_kbs, disk_write_iops, disk_write_kbs
+     */
+    protected Point createInfluxDbPointForHostMetrics(Object metricsObject) {
+        HostStatsEntry hostStatsEntry = (HostStatsEntry)metricsObject;
+
+        Map<String, String> tagsToAdd = new HashMap<>();
+        tagsToAdd.put(UUID_TAG, hostStatsEntry.getHostVo().getUuid());
+
+        Map<String, Object> fieldsToAdd = new HashMap<>();
+        fieldsToAdd.put(TOTAL_MEMORY_KBS_FIELD, 
hostStatsEntry.getTotalMemoryKBs());
+        fieldsToAdd.put(FREE_MEMORY_KBS_FIELD, 
hostStatsEntry.getFreeMemoryKBs());
+        fieldsToAdd.put(CPU_UTILIZATION_FIELD, 
hostStatsEntry.getCpuUtilization());
+        fieldsToAdd.put(CPUS_FIELD, hostStatsEntry.getHostVo().getCpus());
+        fieldsToAdd.put(CPU_SOCKETS_FIELD, 
hostStatsEntry.getHostVo().getCpuSockets());
+        fieldsToAdd.put(NETWORK_READ_KBS_FIELD, 
hostStatsEntry.getNetworkReadKBs());
+        fieldsToAdd.put(NETWORK_WRITE_KBS_FIELD, 
hostStatsEntry.getNetworkWriteKBs());
+
+        return 
Point.measurement(INFLUXDB_HOST_MEASUREMENT).tag(tagsToAdd).time(System.currentTimeMillis(),
 TimeUnit.MILLISECONDS).fields(fieldsToAdd).build();
+    }
+
+    /**
+     * Sends VMs metrics to a configured InfluxDB host. The metrics respects 
the following specification.</br>
+     * <b>Tags:</b>vm_id, uuid, instance_name, data_center_id, host_id</br>
+     * <b>Fields:</b>memory_total_kb, memory_internal_free_kbs, 
memory_target_kbs, cpu_utilization, cpus, network_write_kb, disk_read_iops, 
disk_read_kbs, disk_write_iops, disk_write_kbs
+     */
+    protected Point createInfluxDbPointForVmMetrics(Object metricsObject) {
+        VmStatsEntry vmStatsEntry = (VmStatsEntry)metricsObject;
+        UserVmVO userVmVO = vmStatsEntry.getUserVmVO();
+
+        Map<String, String> tagsToAdd = new HashMap<>();
+        tagsToAdd.put(UUID_TAG, userVmVO.getUuid());
+
+        Map<String, Object> fieldsToAdd = new HashMap<>();
+        fieldsToAdd.put(TOTAL_MEMORY_KBS_FIELD, vmStatsEntry.getMemoryKBs());
+        fieldsToAdd.put(FREE_MEMORY_KBS_FIELD, 
vmStatsEntry.getIntFreeMemoryKBs());
+        fieldsToAdd.put(MEMORY_TARGET_KBS_FIELD, 
vmStatsEntry.getTargetMemoryKBs());
+        fieldsToAdd.put(CPU_UTILIZATION_FIELD, 
vmStatsEntry.getCPUUtilization());
+        fieldsToAdd.put(CPUS_FIELD, vmStatsEntry.getNumCPUs());
+        fieldsToAdd.put(NETWORK_READ_KBS_FIELD, 
vmStatsEntry.getNetworkReadKBs());
+        fieldsToAdd.put(NETWORK_WRITE_KBS_FIELD, 
vmStatsEntry.getNetworkWriteKBs());
+        fieldsToAdd.put(DISK_READ_IOPS_FIELD, vmStatsEntry.getDiskReadIOs());
+        fieldsToAdd.put(DISK_READ_KBS_FIELD, vmStatsEntry.getDiskReadKBs());
+        fieldsToAdd.put(DISK_WRITE_IOPS_FIELD, vmStatsEntry.getDiskWriteIOs());
+        fieldsToAdd.put(DISK_WRITE_KBS_FIELD, vmStatsEntry.getDiskWriteKBs());
+
+        return 
Point.measurement(INFLUXDB_VM_MEASUREMENT).tag(tagsToAdd).time(System.currentTimeMillis(),
 TimeUnit.MILLISECONDS).fields(fieldsToAdd).build();
+    }
+
+    /**
+     * Creates connection to InfluxDB. If the database does not exist, it 
throws a CloudRuntimeException. </br>
+     * @note the user can configure the database name on parameter 
'stats.output.influxdb.database.name'; such database must be yet created and 
configured by the user.
+     * The Default name for the database is 'cloudstack_stats'.
+     */
+    protected InfluxDB createInfluxDbConnection() {
+        String influxDbQueryUrl = String.format("http://%s:%s/";, 
externalStatsHost, externalStatsPort);
+        InfluxDB influxDbConnection = 
InfluxDBFactory.connect(influxDbQueryUrl);
+
+        if (!influxDbConnection.databaseExists(databaseName)) {
+            throw new CloudRuntimeException(String.format("Database with name 
%s does not exist in influxdb host %s:%s", databaseName, externalStatsHost, 
externalStatsPort));
+        }
+
+        return influxDbConnection;
+    }
+
+    /**
+     * Writes batches of InfluxDB database points into a given database.
+     */
+    protected void writeBatches(InfluxDB influxDbConnection, String dbName, 
List<Point> points) {
+        BatchPoints batchPoints = BatchPoints.database(dbName).build();
+
+        for (Point point : points) {
+            batchPoints.point(point);
+        }
+
+        influxDbConnection.write(batchPoints);
+    }
+
+    /**
+     * Returns true if at least one of the current disk stats is different 
from the previous.</br>
+     * The considered disk stats are the following: bytes read, bytes write, 
IO read,  and IO write.
+     */
+    protected boolean 
isCurrentVmDiskStatsDifferentFromPrevious(VmDiskStatisticsVO 
previousVmDiskStats, VmDiskStatisticsVO currentVmDiskStats) {
+        if (previousVmDiskStats != null) {
+            boolean bytesReadDifferentFromPrevious = 
previousVmDiskStats.getCurrentBytesRead() != 
currentVmDiskStats.getCurrentBytesRead();
+            boolean bytesWriteDifferentFromPrevious = 
previousVmDiskStats.getCurrentBytesWrite() != 
currentVmDiskStats.getCurrentBytesWrite();
+            boolean ioReadDifferentFromPrevious = 
previousVmDiskStats.getCurrentIORead() != currentVmDiskStats.getCurrentIORead();
+            boolean ioWriteDifferentFromPrevious = 
previousVmDiskStats.getCurrentIOWrite() != 
currentVmDiskStats.getCurrentIOWrite();
+            return bytesReadDifferentFromPrevious || 
bytesWriteDifferentFromPrevious || ioReadDifferentFromPrevious || 
ioWriteDifferentFromPrevious;
+        }
+        if (currentVmDiskStats == null) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Returns true if all the VmDiskStatsEntry are Zeros (Bytes read, Bytes 
write, IO read, and IO write must be all equals to zero)
+     */
+    protected boolean areAllDiskStatsZero(VmDiskStatsEntry vmDiskStat) {
+        return (vmDiskStat.getBytesRead() == 0) && (vmDiskStat.getBytesWrite() 
== 0) && (vmDiskStat.getIORead() == 0) && (vmDiskStat.getIOWrite() == 0);
+    }
+
+    /**
+     * Creates a HostVO SearchCriteria where:
+     * <ul>
+     *  <li>"status" is Up;</li>
+     *  <li>"resourceState" is not in Maintenance, PrepareForMaintenance, or 
ErrorInMaintenance; and</li>
+     *  <li>"type" is Routing.</li>
+     * </ul>
+     */
+    private SearchCriteria<HostVO> 
createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance() {
+        SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
+        sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
+        sc.addAnd("resourceState", SearchCriteria.Op.NIN, 
ResourceState.Maintenance, ResourceState.PrepareForMaintenance, 
ResourceState.ErrorInMaintenance);
+        sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString());
+        return sc;
+    }
+
     public StorageStats getStorageStats(long id) {
         return _storageStats.get(id);
     }
@@ -1334,6 +1566,6 @@ public class StatsCollector extends ManagerBase 
implements ComponentMethodInterc
 
     @Override
     public ConfigKey<?>[] getConfigKeys() {
-        return new ConfigKey<?>[] { vmDiskStatsInterval, 
vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, 
StatsTimeout };
+        return new ConfigKey<?>[] {vmDiskStatsInterval, 
vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, 
StatsTimeout, statsOutputUri};
     }
 }
diff --git a/server/src/test/java/com/cloud/server/StatsCollectorTest.java 
b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
new file mode 100644
index 0000000..040d08d
--- /dev/null
+++ b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
@@ -0,0 +1,227 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package com.cloud.server;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.BatchPoints.Builder;
+import org.influxdb.dto.Point;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import com.cloud.agent.api.VmDiskStatsEntry;
+import com.cloud.server.StatsCollector.ExternalStatsProtocol;
+import com.cloud.user.VmDiskStatisticsVO;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.tngtech.java.junit.dataprovider.DataProvider;
+import com.tngtech.java.junit.dataprovider.DataProviderRunner;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(DataProviderRunner.class)
+@PrepareForTest({InfluxDBFactory.class, BatchPoints.class})
+public class StatsCollectorTest {
+    private StatsCollector statsCollector = Mockito.spy(new StatsCollector());
+
+    private static final int GRAPHITE_DEFAULT_PORT = 2003;
+    private static final int INFLUXDB_DEFAULT_PORT = 8086;
+    private static final String HOST_ADDRESS = "192.168.16.10";
+    private static final String URL = String.format("http://%s:%s/";, 
HOST_ADDRESS, INFLUXDB_DEFAULT_PORT);
+
+    private static final String DEFAULT_DATABASE_NAME = "cloudstack";
+
+    @Test
+    public void createInfluxDbConnectionTest() {
+        configureAndTestCreateInfluxDbConnection(true);
+    }
+
+    @Test(expected = CloudRuntimeException.class)
+    public void createInfluxDbConnectionTestExpectException() {
+        configureAndTestCreateInfluxDbConnection(false);
+    }
+
+    private void configureAndTestCreateInfluxDbConnection(boolean 
databaseExists) {
+        statsCollector.externalStatsHost = HOST_ADDRESS;
+        statsCollector.externalStatsPort = INFLUXDB_DEFAULT_PORT;
+        InfluxDB influxDbConnection = Mockito.mock(InfluxDB.class);
+        
Mockito.when(influxDbConnection.databaseExists(DEFAULT_DATABASE_NAME)).thenReturn(databaseExists);
+        PowerMockito.mockStatic(InfluxDBFactory.class);
+        
PowerMockito.when(InfluxDBFactory.connect(URL)).thenReturn(influxDbConnection);
+
+        InfluxDB returnedConnection = 
statsCollector.createInfluxDbConnection();
+
+        Assert.assertEquals(influxDbConnection, returnedConnection);
+    }
+
+    @Test
+    public void writeBatchesTest() {
+        InfluxDB influxDbConnection = Mockito.mock(InfluxDB.class);
+        
Mockito.doNothing().when(influxDbConnection).write(Mockito.any(Point.class));
+        Builder builder = Mockito.mock(Builder.class);
+        BatchPoints batchPoints = Mockito.mock(BatchPoints.class);
+        PowerMockito.mockStatic(BatchPoints.class);
+        
PowerMockito.when(BatchPoints.database(DEFAULT_DATABASE_NAME)).thenReturn(builder);
+        Mockito.when(builder.build()).thenReturn(batchPoints);
+        Map<String, String> tagsToAdd = new HashMap<>();
+        tagsToAdd.put("hostId", "1");
+        Map<String, Object> fieldsToAdd = new HashMap<>();
+        fieldsToAdd.put("total_memory_kbs", 10000000);
+        Point point = 
Point.measurement("measure").tag(tagsToAdd).time(System.currentTimeMillis(), 
TimeUnit.MILLISECONDS).fields(fieldsToAdd).build();
+        List<Point> points = new ArrayList<>();
+        points.add(point);
+        Mockito.when(batchPoints.point(point)).thenReturn(batchPoints);
+
+        statsCollector.writeBatches(influxDbConnection, DEFAULT_DATABASE_NAME, 
points);
+
+        Mockito.verify(influxDbConnection).write(batchPoints);
+    }
+
+    @Test
+    public void configureExternalStatsPortTestGraphitePort() throws 
URISyntaxException {
+        URI uri = new URI(HOST_ADDRESS);
+        statsCollector.externalStatsType = ExternalStatsProtocol.GRAPHITE;
+        int port = statsCollector.retrieveExternalStatsPortFromUri(uri);
+        Assert.assertEquals(GRAPHITE_DEFAULT_PORT, port);
+    }
+
+    @Test
+    public void configureExternalStatsPortTestInfluxdbPort() throws 
URISyntaxException {
+        URI uri = new URI(HOST_ADDRESS);
+        statsCollector.externalStatsType = ExternalStatsProtocol.INFLUXDB;
+        int port = statsCollector.retrieveExternalStatsPortFromUri(uri);
+        Assert.assertEquals(INFLUXDB_DEFAULT_PORT, port);
+    }
+
+    @Test(expected = URISyntaxException.class)
+    public void configureExternalStatsPortTestExpectException() throws 
URISyntaxException {
+        statsCollector.externalStatsType = ExternalStatsProtocol.NONE;
+        URI uri = new URI(HOST_ADDRESS);
+        statsCollector.retrieveExternalStatsPortFromUri(uri);
+    }
+
+    @Test
+    public void configureExternalStatsPortTestInfluxDbCustomizedPort() throws 
URISyntaxException {
+        statsCollector.externalStatsType = ExternalStatsProtocol.INFLUXDB;
+        URI uri = new URI("test://" + HOST_ADDRESS + ":1234");
+        int port = statsCollector.retrieveExternalStatsPortFromUri(uri);
+        Assert.assertEquals(1234, port);
+    }
+
+    @Test
+    public void configureDatabaseNameTestDefaultDbName() throws 
URISyntaxException {
+        URI uri = new URI(URL);
+        String dbName = statsCollector.configureDatabaseName(uri);
+        Assert.assertEquals(DEFAULT_DATABASE_NAME, dbName);
+    }
+
+    @Test
+    public void configureDatabaseNameTestCustomDbName() throws 
URISyntaxException {
+        String configuredDbName = "dbName";
+        URI uri = new URI(URL + configuredDbName);
+        String dbName = statsCollector.configureDatabaseName(uri);
+        Assert.assertEquals(configuredDbName, dbName);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestNull() {
+        VmDiskStatisticsVO currentVmDiskStatisticsVO = new 
VmDiskStatisticsVO(1l, 1l, 1l, 1l);
+        boolean result = 
statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(null, 
currentVmDiskStatisticsVO);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestBothNull() {
+        boolean result = 
statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(null, null);
+        Assert.assertFalse(result);
+    }
+
+    @Test
+    public void 
isCurrentVmDiskStatsDifferentFromPreviousTestDifferentIoWrite() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 
123l, 12l, true);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentIoRead() 
{
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 
12l, 123l, true);
+    }
+
+    @Test
+    public void 
isCurrentVmDiskStatsDifferentFromPreviousTestDifferentBytesRead() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(12l, 123l, 
123l, 123l, true);
+    }
+
+    @Test
+    public void 
isCurrentVmDiskStatsDifferentFromPreviousTestDifferentBytesWrite() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 12l, 
123l, 123l, true);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestAllEqual() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 
123l, 123l, false);
+    }
+
+    private void 
configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(long bytesRead, long 
bytesWrite, long ioRead, long ioWrite, boolean expectedResult) {
+        VmDiskStatisticsVO previousVmDiskStatisticsVO = new 
VmDiskStatisticsVO(1l, 1l, 1l, 1l);
+        previousVmDiskStatisticsVO.setCurrentBytesRead(123l);
+        previousVmDiskStatisticsVO.setCurrentBytesWrite(123l);
+        previousVmDiskStatisticsVO.setCurrentIORead(123l);
+        previousVmDiskStatisticsVO.setCurrentIOWrite(123l);
+
+        VmDiskStatisticsVO currentVmDiskStatisticsVO = new 
VmDiskStatisticsVO(1l, 1l, 1l, 1l);
+        currentVmDiskStatisticsVO.setCurrentBytesRead(bytesRead);
+        currentVmDiskStatisticsVO.setCurrentBytesWrite(bytesWrite);
+        currentVmDiskStatisticsVO.setCurrentIORead(ioRead);
+        currentVmDiskStatisticsVO.setCurrentIOWrite(ioWrite);
+
+        boolean result = 
statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(previousVmDiskStatisticsVO,
 currentVmDiskStatisticsVO);
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    @Test
+    @DataProvider({
+        "0,0,0,0,true", "1,0,0,0,false", "0,1,0,0,false", "0,0,1,0,false",
+        "0,0,0,1,false", "1,0,0,1,false", "1,0,1,0,false", "1,1,0,0,false",
+        "0,1,1,0,false", "0,1,0,1,false", "0,0,1,1,false", "0,1,1,1,false",
+        "1,1,0,1,false", "1,0,1,1,false", "1,1,1,0,false", "1,1,1,1,false",
+    })
+    public void configureAndTestCheckIfDiskStatsAreZero(long bytesRead, long 
bytesWrite, long ioRead, long ioWrite, boolean expected) {
+        VmDiskStatsEntry vmDiskStatsEntry = new VmDiskStatsEntry();
+        vmDiskStatsEntry.setBytesRead(bytesRead);
+        vmDiskStatsEntry.setBytesWrite(bytesWrite);
+        vmDiskStatsEntry.setIORead(ioRead);
+        vmDiskStatsEntry.setIOWrite(ioWrite);
+
+        boolean result = statsCollector.areAllDiskStatsZero(vmDiskStatsEntry);
+        Assert.assertEquals(expected, result);
+    }
+}
diff --git 
a/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java 
b/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java
index 4143f09..9c9d48a 100644
--- 
a/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java
+++ 
b/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java
@@ -67,7 +67,7 @@ public class GraphiteClient {
      *
      * @param metrics the metrics as key-value-pairs
      */
-    public void sendMetrics(Map<String, Integer> metrics) {
+    public void sendMetrics(Map<Object, Object> metrics) {
         sendMetrics(metrics, getCurrentSystemTime());
     }
 
@@ -77,12 +77,12 @@ public class GraphiteClient {
      * @param metrics the metrics as key-value-pairs
      * @param timeStamp the timestamp
      */
-    public void sendMetrics(Map<String, Integer> metrics, long timeStamp) {
+    public void sendMetrics(Map<Object, Object> metrics, long timeStamp) {
         try (DatagramSocket sock = new DatagramSocket()){
             java.security.Security.setProperty("networkaddress.cache.ttl", 
"0");
             InetAddress addr = InetAddress.getByName(this.graphiteHost);
 
-            for (Map.Entry<String, Integer> metric: metrics.entrySet()) {
+            for (Map.Entry<Object, Object> metric : metrics.entrySet()) {
                 byte[] message = new String(metric.getKey() + " " + 
metric.getValue() + " " + timeStamp + "\n").getBytes();
                 DatagramPacket packet = new DatagramPacket(message, 
message.length, addr, graphitePort);
                 sock.send(packet);

Reply via email to