This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit e10b5da6638fbccab6a1330047babfb234913679 Author: Aravindan Vijayan <[email protected]> AuthorDate: Tue May 30 12:34:18 2017 -0700 AMBARI-21106 : Ambari Metrics Anomaly detection prototype.(avijayan) --- ambari-metrics/ambari-metrics-alertservice/pom.xml | 121 +++++++++++ .../ambari/metrics/alertservice/R/AmsRTest.java | 130 ++++++++++++ .../metrics/alertservice/R/RFunctionInvoker.java | 180 +++++++++++++++++ .../metrics/alertservice/common/DataSet.java | 21 ++ .../metrics/alertservice/common/MethodResult.java | 10 + .../metrics/alertservice/common/MetricAnomaly.java | 52 +++++ .../metrics/alertservice/common/ResultSet.java | 26 +++ .../common/SingleValuedTimelineMetric.java | 86 ++++++++ .../alertservice/common/StatisticUtils.java | 60 ++++++ .../alertservice/common/TimelineMetric.java | 221 +++++++++++++++++++++ .../alertservice/common/TimelineMetrics.java | 112 +++++++++++ .../alertservice/methods/MetricAnomalyModel.java | 12 ++ .../metrics/alertservice/methods/ema/EmaDS.java | 56 ++++++ .../metrics/alertservice/methods/ema/EmaModel.java | 114 +++++++++++ .../alertservice/methods/ema/EmaModelLoader.java | 29 +++ .../alertservice/methods/ema/EmaResult.java | 19 ++ .../alertservice/methods/ema/TestEmaModel.java | 51 +++++ .../alertservice/spark/AmsKafkaProducer.java | 75 +++++++ .../alertservice/spark/AnomalyMetricPublisher.java | 181 +++++++++++++++++ .../alertservice/spark/MetricAnomalyDetector.java | 134 +++++++++++++ ambari-metrics/ambari-metrics-spark/pom.xml | 133 +++++++++++++ .../metrics/spark/MetricAnomalyDetector.scala | 97 +++++++++ .../ambari/metrics/spark/SparkPhoenixReader.scala | 67 +++++++ .../ambari-metrics-timelineservice/pom.xml | 5 + ...Store.java => HBaseTimelineMetricsService.java} | 39 +++- ambari-metrics/pom.xml | 2 + 26 files changed, 2029 insertions(+), 4 deletions(-) diff --git a/ambari-metrics/ambari-metrics-alertservice/pom.xml b/ambari-metrics/ambari-metrics-alertservice/pom.xml new file mode 100644 index 0000000..3a3545b --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/pom.xml @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>2.5.1.0.0</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>ambari-metrics-alertservice</artifactId> + <version>2.5.1.0.0</version> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> + <name>Ambari Metrics Alert Service</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.5</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.2</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.2</version> + </dependency> + + <dependency> + <groupId>com.github.lucarosellini.rJava</groupId> + <artifactId>JRI</artifactId> + <version>0.9-7</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.11</artifactId> + <version>2.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.10.1.0</version> + <exclusions> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>javax.mail</groupId> + <artifactId>mail</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jmx</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-json</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + <version>4.7.0-HBase-1.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_2.10</artifactId> + <version>1.3.0</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java new file mode 100644 index 0000000..0929f4c --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java @@ -0,0 +1,130 @@ +package org.apache.ambari.metrics.alertservice.R; + +import org.apache.ambari.metrics.alertservice.common.ResultSet; +import org.apache.ambari.metrics.alertservice.common.DataSet; +import org.apache.commons.lang.ArrayUtils; +import org.rosuda.JRI.REXP; +import org.rosuda.JRI.RVector; +import org.rosuda.JRI.Rengine; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class AmsRTest { + + public static void main(String[] args) { + + String metricName = "TestMetric"; + double[] ts = getTS(1000); + + double[] train_ts = ArrayUtils.subarray(ts, 0,750); + double[] train_x = getData(750); + DataSet trainData = new DataSet(metricName, train_ts, train_x); + + double[] test_ts = ArrayUtils.subarray(ts, 750,1000); + double[] test_x = getData(250); + test_x[50] = 5.5; //Anomaly + DataSet testData = new DataSet(metricName, test_ts, test_x); + ResultSet rs; + + Map<String, String> configs = new HashMap(); + + System.out.println("TUKEYS"); + configs.put("tukeys.n", "3"); + rs = RFunctionInvoker.tukeys(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + + System.out.println("EMA Global"); + configs.put("ema.n", "3"); + configs.put("ema.w", "0.8"); + rs = RFunctionInvoker.ema_global(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + + System.out.println("EMA Daily"); + rs = RFunctionInvoker.ema_daily(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + + configs.put("ks.p_value", "0.05"); + System.out.println("KS Test"); + rs = RFunctionInvoker.ksTest(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + + ts = getTS(5000); + train_ts = ArrayUtils.subarray(ts, 30,4800); + train_x = getData(4800); + trainData = new DataSet(metricName, train_ts, train_x); + test_ts = ArrayUtils.subarray(ts, 4800,5000); + test_x = getData(200); + for (int i =0; i<200;i++) { + test_x[i] = test_x[i]*5; + } + testData = new DataSet(metricName, test_ts, test_x); + configs.put("hsdev.n", "3"); + configs.put("hsdev.nhp", "3"); + configs.put("hsdev.interval", "86400000"); + configs.put("hsdev.period", "604800000"); + System.out.println("HSdev"); + rs = RFunctionInvoker.hsdev(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + + } + + static double[] getTS(int n) { + long currentTime = System.currentTimeMillis(); + double[] ts = new double[n]; + currentTime = currentTime - (currentTime % (5*60*1000)); + + for (int i = 0,j=n-1; i<n; i++,j--) { + ts[j] = currentTime; + currentTime = currentTime - (5*60*1000); + } + return ts; + } + + static void testBasic() { + Rengine r = new Rengine(new String[]{"--no-save"}, false, null); + try { + r.eval("library(ambarimetricsAD)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/test.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/util.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + double[] ts = getTS(5000); + double[] x = getData(5000); + r.assign("ts", ts); + r.assign("x", x); + r.eval("x[1000] <- 4.5"); + r.eval("x[2000] <- 4.75"); + r.eval("x[3000] <- 3.5"); + r.eval("x[4000] <- 5.5"); + r.eval("x[5000] <- 5.0"); + r.eval("data <- data.frame(ts,x)"); + r.eval("names(data) <- c(\"TS\", \"Metric\")"); + System.out.println(r.eval("data")); + REXP exp = r.eval("t_an <- test_methods(data)"); + exp = r.eval("t_an"); + String strExp = exp.asString(); + System.out.println("result:" + exp); + RVector cont = (RVector) exp.getContent(); + double[] an_ts = cont.at(0).asDoubleArray(); + double[] an_x = cont.at(1).asDoubleArray(); + System.out.println("result:" + strExp); + } + finally { + r.end(); + } + } + static double[] getData(int n) { + double[] metrics = new double[n]; + Random random = new Random(); + for (int i = 0; i<n; i++) { + metrics[i] = random.nextDouble(); + } + return metrics; + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java new file mode 100644 index 0000000..8d1e520 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java @@ -0,0 +1,180 @@ +package org.apache.ambari.metrics.alertservice.R; + + +import org.apache.ambari.metrics.alertservice.common.ResultSet; +import org.apache.ambari.metrics.alertservice.common.DataSet; +import org.rosuda.JRI.REXP; +import org.rosuda.JRI.RVector; +import org.rosuda.JRI.Rengine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class RFunctionInvoker { + + public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null); + + + private static void loadDataSets(Rengine r, DataSet trainData, DataSet testData) { + r.assign("train_ts", trainData.ts); + r.assign("train_x", trainData.values); + r.eval("train_data <- data.frame(train_ts,train_x)"); + r.eval("names(train_data) <- c(\"TS\", " + trainData.metricName + ")"); + + r.assign("test_ts", testData.ts); + r.assign("test_x", testData.values); + r.eval("test_data <- data.frame(test_ts,test_x)"); + r.eval("names(test_data) <- c(\"TS\", " + testData.metricName + ")"); + } + + + public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) { + try { + r.eval("library(ambarimetricsAD)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + + int n = Integer.parseInt(configs.get("tukeys.n")); + r.eval("n <- " + n); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_tukeys(train_data, test_data, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i< cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch(Exception e) { + e.printStackTrace(); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) { + try { + r.eval("library(ambarimetricsAD)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_global(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i< cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch(Exception e) { + e.printStackTrace(); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) { + try { + r.eval("library(ambarimetricsAD)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_daily(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i< cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch(Exception e) { + e.printStackTrace(); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) { + try { + r.eval("library(ambarimetricsAD)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/kstest.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + + double p_value = Double.parseDouble(configs.get("ks.p_value")); + r.eval("p_value <- " + p_value); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_ks(train_data, test_data, p_value)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i< cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch(Exception e) { + e.printStackTrace(); + } finally { + r.end(); + } + return null; + } + + public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) { + try { + r.eval("library(ambarimetricsAD)"); + r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/hsdev.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + + int n = Integer.parseInt(configs.get("hsdev.n")); + r.eval("n <- " + n); + + int nhp = Integer.parseInt(configs.get("hsdev.nhp")); + r.eval("nhp <- " + nhp); + + long interval = Long.parseLong(configs.get("hsdev.interval")); + r.eval("interval <- " + interval); + + long period = Long.parseLong(configs.get("hsdev.period")); + r.eval("period <- " + period); + + loadDataSets(r, trainData, testData); + + r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)"); + REXP exp = r.eval("an2"); + RVector cont = (RVector) exp.getContent(); + + List<double[]> result = new ArrayList(); + for (int i = 0; i< cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch(Exception e) { + e.printStackTrace(); + } finally { + r.end(); + } + return null; + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java new file mode 100644 index 0000000..47bf9b6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java @@ -0,0 +1,21 @@ +package org.apache.ambari.metrics.alertservice.common; + +import java.util.Arrays; + +public class DataSet { + + public String metricName; + public double[] ts; + public double[] values; + + public DataSet(String metricName, double[] ts, double[] values) { + this.metricName = metricName; + this.ts = ts; + this.values = values; + } + + @Override + public String toString() { + return metricName + Arrays.toString(ts) + Arrays.toString(values); + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java new file mode 100644 index 0000000..915da4c --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java @@ -0,0 +1,10 @@ +package org.apache.ambari.metrics.alertservice.common; + +public abstract class MethodResult { + protected String methodType; + public abstract String prettyPrint(); + + public String getMethodType() { + return methodType; + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java new file mode 100644 index 0000000..d237bee --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java @@ -0,0 +1,52 @@ +package org.apache.ambari.metrics.alertservice.common; + +public class MetricAnomaly { + + private String metricKey; + private long timestamp; + private double metricValue; + private MethodResult methodResult; + + public MetricAnomaly(String metricKey, long timestamp, double metricValue, MethodResult methodResult) { + this.metricKey = metricKey; + this.timestamp = timestamp; + this.metricValue = metricValue; + this.methodResult = methodResult; + } + + public String getMetricKey() { + return metricKey; + } + + public void setMetricName(String metricName) { + this.metricKey = metricName; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getMetricValue() { + return metricValue; + } + + public void setMetricValue(double metricValue) { + this.metricValue = metricValue; + } + + public MethodResult getMethodResult() { + return methodResult; + } + + public void setMethodResult(MethodResult methodResult) { + this.methodResult = methodResult; + } + + public String getAnomalyAsString() { + return metricKey + ":" + timestamp + ":" + metricValue + ":" + methodResult.prettyPrint(); + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java new file mode 100644 index 0000000..96b74e0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java @@ -0,0 +1,26 @@ +package org.apache.ambari.metrics.alertservice.common; + + +import java.util.ArrayList; +import java.util.List; + +public class ResultSet { + + List<double[]> resultset = new ArrayList<>(); + + public ResultSet(List<double[]> resultset) { + this.resultset = resultset; + } + + public void print() { + System.out.println("Result : "); + if (!resultset.isEmpty()) { + for (int i = 0; i<resultset.get(0).length;i++) { + for (double[] entity : resultset) { + System.out.print(entity[i] + " "); + } + System.out.println(); + } + } + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java new file mode 100644 index 0000000..5118225 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java @@ -0,0 +1,86 @@ +package org.apache.ambari.metrics.alertservice.common; + + +public class SingleValuedTimelineMetric { + private Long timestamp; + private Double value; + private String metricName; + private String appId; + private String instanceId; + private String hostName; + private Long startTime; + private String type; + + public void setSingleTimeseriesValue(Long timestamp, Double value) { + this.timestamp = timestamp; + this.value = value; + } + + public SingleValuedTimelineMetric(String metricName, String appId, + String instanceId, String hostName, + long timestamp, long startTime, String type) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.hostName = hostName; + this.timestamp = timestamp; + this.startTime = startTime; + this.type = type; + } + + public Long getTimestamp() { + return timestamp; + } + + public long getStartTime() { + return startTime; + } + + public String getType() { + return type; + } + + public Double getValue() { + return value; + } + + public String getMetricName() { + return metricName; + } + + public String getAppId() { + return appId; + } + + public String getInstanceId() { + return instanceId; + } + + public String getHostName() { + return hostName; + } + + public boolean equalsExceptTime(TimelineMetric metric) { + if (!metricName.equals(metric.getMetricName())) return false; + if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null) + return false; + if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false; + + return true; + } + + public TimelineMetric getTimelineMetric() { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(this.metricName); + metric.setAppId(this.appId); + metric.setHostName(this.hostName); + metric.setType(this.type); + metric.setInstanceId(this.instanceId); + metric.setStartTime(this.startTime); + metric.setTimestamp(this.timestamp); + metric.getMetricValues().put(timestamp, value); + return metric; + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java new file mode 100644 index 0000000..dff56e6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java @@ -0,0 +1,60 @@ +package org.apache.ambari.metrics.alertservice.common; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class StatisticUtils { + + public static double mean(Collection<Double> values) { + double sum = 0; + for (double d : values) { + sum += d; + } + return sum / values.size(); + } + + public static double variance(Collection<Double> values) { + double avg = mean(values); + double variance = 0; + for (double d : values) { + variance += Math.pow(d - avg, 2.0); + } + return variance; + } + + public static double sdev(Collection<Double> values, boolean useBesselsCorrection) { + double variance = variance(values); + int n = (useBesselsCorrection) ? values.size() - 1 : values.size(); + return Math.sqrt(variance / n); + } + + public static double median(Collection<Double> values) { + ArrayList<Double> clonedValues = new ArrayList<Double>(values); + Collections.sort(clonedValues); + int n = values.size(); + + if (n % 2 != 0) { + return clonedValues.get((n-1)/2); + } else { + return ( clonedValues.get((n-1)/2) + clonedValues.get(n/2) ) / 2; + } + } + + + +// public static void main(String[] args) { +// +// Collection<Double> values = new ArrayList<>(); +// values.add(1.0); +// values.add(2.0); +// values.add(3.0); +// values.add(4.0); +// values.add(5.0); +// +// System.out.println(mean(values)); +// System.out.println(sdev(values, false)); +// System.out.println(median(values)); +// } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java new file mode 100644 index 0000000..2a73855 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java @@ -0,0 +1,221 @@ +package org.apache.ambari.metrics.alertservice.common; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +@XmlRootElement(name = "metric") +@XmlAccessorType(XmlAccessType.NONE) [email protected] [email protected] +public class TimelineMetric implements Comparable<TimelineMetric>, Serializable { + + private String metricName; + private String appId; + private String instanceId; + private String hostName; + private long timestamp; + private long startTime; + private String type; + private String units; + private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + private Map<String, String> metadata = new HashMap<>(); + + // default + public TimelineMetric() { + + } + + public TimelineMetric(String metricName, String appId, String hostName, TreeMap<Long,Double> metricValues) { + this.metricName = metricName; + this.appId = appId; + this.hostName = hostName; + this.metricValues.putAll(metricValues); + } + + // copy constructor + public TimelineMetric(TimelineMetric metric) { + setMetricName(metric.getMetricName()); + setType(metric.getType()); + setUnits(metric.getUnits()); + setTimestamp(metric.getTimestamp()); + setAppId(metric.getAppId()); + setInstanceId(metric.getInstanceId()); + setHostName(metric.getHostName()); + setStartTime(metric.getStartTime()); + setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues())); + } + + @XmlElement(name = "metricname") + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + @XmlElement(name = "appid") + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + @XmlElement(name = "instanceid") + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + @XmlElement(name = "hostname") + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + @XmlElement(name = "timestamp") + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @XmlElement(name = "starttime") + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + @XmlElement(name = "type", defaultValue = "UNDEFINED") + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @XmlElement(name = "units") + public String getUnits() { + return units; + } + + public void setUnits(String units) { + this.units = units; + } + + @XmlElement(name = "metrics") + public TreeMap<Long, Double> getMetricValues() { + return metricValues; + } + + public void setMetricValues(TreeMap<Long, Double> metricValues) { + this.metricValues = metricValues; + } + + public void addMetricValues(Map<Long, Double> metricValues) { + this.metricValues.putAll(metricValues); + } + + @XmlElement(name = "metadata") + public Map<String,String> getMetadata () { + return metadata; + } + + public void setMetadata (Map<String,String> metadata) { + this.metadata = metadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineMetric metric = (TimelineMetric) o; + + if (!metricName.equals(metric.metricName)) return false; + if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) + return false; + if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + if (timestamp != metric.timestamp) return false; + if (startTime != metric.startTime) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) + return false; + if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (hostName != null ? hostName.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public int compareTo(TimelineMetric other) { + if (timestamp > other.timestamp) { + return -1; + } else if (timestamp < other.timestamp) { + return 1; + } else { + return metricName.compareTo(other.metricName); + } + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java new file mode 100644 index 0000000..500e1e9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java @@ -0,0 +1,112 @@ +package org.apache.ambari.metrics.alertservice.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * The class that hosts a list of timeline entities. + */ +@XmlRootElement(name = "metrics") +@XmlAccessorType(XmlAccessType.NONE) [email protected] [email protected] +public class TimelineMetrics implements Serializable { + + private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>(); + + public TimelineMetrics() {} + + @XmlElement(name = "metrics") + public List<TimelineMetric> getMetrics() { + return allMetrics; + } + + public void setMetrics(List<TimelineMetric> allMetrics) { + this.allMetrics = allMetrics; + } + + private boolean isEqualTimelineMetrics(TimelineMetric metric1, + TimelineMetric metric2) { + + boolean isEqual = true; + + if (!metric1.getMetricName().equals(metric2.getMetricName())) { + return false; + } + + if (metric1.getHostName() != null) { + isEqual = metric1.getHostName().equals(metric2.getHostName()); + } + + if (metric1.getAppId() != null) { + isEqual = metric1.getAppId().equals(metric2.getAppId()); + } + + return isEqual; + } + + /** + * Merge with existing TimelineMetric if everything except startTime is + * the same. + * @param metric {@link TimelineMetric} + */ + public void addOrMergeTimelineMetric(TimelineMetric metric) { + TimelineMetric metricToMerge = null; + + if (!allMetrics.isEmpty()) { + for (TimelineMetric timelineMetric : allMetrics) { + if (timelineMetric.equalsExceptTime(metric)) { + metricToMerge = timelineMetric; + break; + } + } + } + + if (metricToMerge != null) { + metricToMerge.addMetricValues(metric.getMetricValues()); + if (metricToMerge.getTimestamp() > metric.getTimestamp()) { + metricToMerge.setTimestamp(metric.getTimestamp()); + } + if (metricToMerge.getStartTime() > metric.getStartTime()) { + metricToMerge.setStartTime(metric.getStartTime()); + } + } else { + allMetrics.add(metric); + } + } + + // Optimization that addresses too many TreeMaps from getting created. + public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) { + TimelineMetric metricToMerge = null; + + if (!allMetrics.isEmpty()) { + for (TimelineMetric timelineMetric : allMetrics) { + if (metric.equalsExceptTime(timelineMetric)) { + metricToMerge = timelineMetric; + break; + } + } + } + + if (metricToMerge != null) { + metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue()); + if (metricToMerge.getTimestamp() > metric.getTimestamp()) { + metricToMerge.setTimestamp(metric.getTimestamp()); + } + if (metricToMerge.getStartTime() > metric.getStartTime()) { + metricToMerge.setStartTime(metric.getStartTime()); + } + } else { + allMetrics.add(metric.getTimelineMetric()); + } + } +} + diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java new file mode 100644 index 0000000..7ae91a3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java @@ -0,0 +1,12 @@ +package org.apache.ambari.metrics.alertservice.methods; + +import org.apache.ambari.metrics.alertservice.common.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.common.TimelineMetric; + +import java.util.List; + +public interface MetricAnomalyModel { + + public List<MetricAnomaly> onNewMetric(TimelineMetric metric); + public List<MetricAnomaly> test(TimelineMetric metric); +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java new file mode 100644 index 0000000..ec548c8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java @@ -0,0 +1,56 @@ +package org.apache.ambari.metrics.alertservice.methods.ema; + +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; + +@XmlRootElement +public class EmaDS implements Serializable { + + String metricName; + String appId; + String hostname; + double ema; + double ems; + double weight; + int timessdev; + private static final Log LOG = LogFactory.getLog(EmaDS.class); + + public EmaDS(String metricName, String appId, String hostname, double weight, int timessdev) { + this.metricName = metricName; + this.appId = appId; + this.hostname = hostname; + this.weight = weight; + this.timessdev = timessdev; + this.ema = 0.0; + this.ems = 0.0; + } + + + public EmaResult testAndUpdate(double metricValue) { + + double diff = Math.abs(ema - metricValue) - (timessdev * ems); + + ema = weight * ema + (1 - weight) * metricValue; + ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); + + System.out.println(ema + ", " + ems); + LOG.info(ema + ", " + ems); + return diff > 0 ? new EmaResult(diff) : null; + } + + public void update(double metricValue) { + ema = weight * ema + (1 - weight) * metricValue; + ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); + System.out.println(ema + ", " + ems); + LOG.info(ema + ", " + ems); + } + + public EmaResult test(double metricValue) { + double diff = Math.abs(ema - metricValue) - (timessdev * ems); + return diff > 0 ? new EmaResult(diff) : null; + } + +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java new file mode 100644 index 0000000..4aae543 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java @@ -0,0 +1,114 @@ +package org.apache.ambari.metrics.alertservice.methods.ema; + +import com.google.gson.Gson; +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; +import org.apache.ambari.metrics.alertservice.common.MethodResult; +import org.apache.ambari.metrics.alertservice.common.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.common.TimelineMetric; +import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Saveable; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@XmlRootElement +public class EmaModel implements MetricAnomalyModel, Saveable, Serializable { + + @XmlElement(name = "trackedEmas") + private Map<String, EmaDS> trackedEmas = new HashMap<>(); + private static final Log LOG = LogFactory.getLog(EmaModel.class); + + public List<MetricAnomaly> onNewMetric(TimelineMetric metric) { + + String metricName = metric.getMetricName(); + String appId = metric.getAppId(); + String hostname = metric.getHostName(); + String key = metricName + "_" + appId + "_" + hostname; + List<MetricAnomaly> anomalies = new ArrayList<>(); + + if (!trackedEmas.containsKey(metricName)) { + trackedEmas.put(key, new EmaDS(metricName, appId, hostname, 0.8, 3)); + } + + EmaDS emaDS = trackedEmas.get(key); + for (Long timestamp : metric.getMetricValues().keySet()) { + double metricValue = metric.getMetricValues().get(timestamp); + MethodResult result = emaDS.testAndUpdate(metricValue); + if (result != null) { + MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result); + anomalies.add(metricAnomaly); + } + } + return anomalies; + } + + public EmaDS train(TimelineMetric metric, double weight, int timessdev) { + + String metricName = metric.getMetricName(); + String appId = metric.getAppId(); + String hostname = metric.getHostName(); + String key = metricName + "_" + appId + "_" + hostname; + + EmaDS emaDS = new EmaDS(metric.getMetricName(), metric.getAppId(), metric.getHostName(), weight, timessdev); + LOG.info("In EMA Train step"); + for (Long timestamp : metric.getMetricValues().keySet()) { + System.out.println(timestamp + " : " + metric.getMetricValues().get(timestamp)); + LOG.info(timestamp + " : " + metric.getMetricValues().get(timestamp)); + emaDS.update(metric.getMetricValues().get(timestamp)); + } + trackedEmas.put(key, emaDS); + return emaDS; + } + + public List<MetricAnomaly> test(TimelineMetric metric) { + String metricName = metric.getMetricName(); + String appId = metric.getAppId(); + String hostname = metric.getHostName(); + String key = metricName + "_" + appId + "_" + hostname; + + EmaDS emaDS = trackedEmas.get(key); + + if (emaDS == null) { + return new ArrayList<>(); + } + + List<MetricAnomaly> anomalies = new ArrayList<>(); + + for (Long timestamp : metric.getMetricValues().keySet()) { + double metricValue = metric.getMetricValues().get(timestamp); + MethodResult result = emaDS.test(metricValue); + if (result != null) { + MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result); + anomalies.add(metricAnomaly); + } + } + return anomalies; + } + + @Override + public void save(SparkContext sc, String path) { + Gson gson = new Gson(); + try { + String json = gson.toJson(this); + try (Writer writer = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(path), "utf-8"))) { + writer.write(json); + } } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public String formatVersion() { + return "1.0"; + } + +} + diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java new file mode 100644 index 0000000..f0ef340 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java @@ -0,0 +1,29 @@ +package org.apache.ambari.metrics.alertservice.methods.ema; + +import com.google.gson.Gson; +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Loader; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class EmaModelLoader implements Loader<EmaModel> { + private static final Log LOG = LogFactory.getLog(EmaModelLoader.class); + + @Override + public EmaModel load(SparkContext sc, String path) { + Gson gson = new Gson(); + try { + String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); + return gson.fromJson(fileString, EmaModel.class); + } catch (IOException e) { + LOG.error(e); + } + return null; + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java new file mode 100644 index 0000000..23f1793 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java @@ -0,0 +1,19 @@ +package org.apache.ambari.metrics.alertservice.methods.ema; + +import org.apache.ambari.metrics.alertservice.common.MethodResult; + +public class EmaResult extends MethodResult{ + + double diff; + + public EmaResult(double diff) { + this.methodType = "EMA"; + this.diff = diff; + } + + + @Override + public String prettyPrint() { + return methodType + "(` = " + diff + ")"; + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java new file mode 100644 index 0000000..a090786 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java @@ -0,0 +1,51 @@ +package org.apache.ambari.metrics.alertservice.methods.ema; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import org.apache.ambari.metrics.alertservice.common.TimelineMetric; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +public class TestEmaModel { + + public static void main(String[] args) throws IOException { + + long now = System.currentTimeMillis(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("dummy_metric"); + metric1.setHostName("dummy_host"); + metric1.setTimestamp(now); + metric1.setStartTime(now - 1000); + metric1.setAppId("HOST"); + metric1.setType("Integer"); + + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + + for (int i = 0; i<20;i++) { + double metric = 9 + Math.random(); + metricValues.put(now - i*100, metric); + } + metric1.setMetricValues(metricValues); + + EmaModel emaModel = new EmaModel(); + + emaModel.train(metric1, 0.8, 3); + } + + /* + {{ + put(now - 100, 1.20); + put(now - 200, 1.25); + put(now - 300, 1.30); + put(now - 400, 4.50); + put(now - 500, 1.35); + put(now - 400, 5.50); + }} + */ +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java new file mode 100644 index 0000000..de56825 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java @@ -0,0 +1,75 @@ +package org.apache.ambari.metrics.alertservice.spark; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ambari.metrics.alertservice.common.TimelineMetric; +import org.apache.ambari.metrics.alertservice.common.TimelineMetrics; +import org.apache.kafka.clients.producer.*; + +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class AmsKafkaProducer { + + Producer producer; + private static String topicName = "ambari-metrics-topic"; + + public AmsKafkaProducer(String kafkaServers) { + Properties configProperties = new Properties(); + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667" + configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); + configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer"); + producer = new KafkaProducer(configProperties); + } + + public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException { + + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics); + ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode); + Future<RecordMetadata> kafkaFuture = producer.send(rec); + + System.out.println(kafkaFuture.isDone()); + System.out.println(kafkaFuture.get().topic()); + } + + public static void main(String[] args) throws ExecutionException, InterruptedException { + final long now = System.currentTimeMillis(); + + TimelineMetrics timelineMetrics = new TimelineMetrics(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("mem_free"); + metric1.setHostName("avijayan-ams-3.openstacklocal"); + metric1.setTimestamp(now); + metric1.setStartTime(now - 1000); + metric1.setAppId("HOST"); + metric1.setType("Integer"); + + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + + for (int i = 0; i<20;i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i*100, metric); + } + + metric1.setMetricValues(metricValues); + +// metric1.setMetricValues(new TreeMap<Long, Double>() {{ +// put(now - 100, 1.20); +// put(now - 200, 11.25); +// put(now - 300, 1.30); +// put(now - 400, 4.50); +// put(now - 500, 16.35); +// put(now - 400, 5.50); +// }}); + + timelineMetrics.getMetrics().add(metric1); + + for (int i = 0; i<1; i++) { + new AmsKafkaProducer("avijayan-ams-2.openstacklocal:6667").sendMetrics(timelineMetrics); + Thread.sleep(1000); + } + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java new file mode 100644 index 0000000..5a6bb61 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java @@ -0,0 +1,181 @@ +package org.apache.ambari.metrics.alertservice.spark; + +import org.apache.ambari.metrics.alertservice.common.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.common.TimelineMetric; +import org.apache.ambari.metrics.alertservice.common.TimelineMetrics; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.*; + +public class AnomalyMetricPublisher implements Serializable { + + private String hostName = "UNKNOWN.example.com"; + private String instanceId = null; + private String serviceName = "anomaly-engine"; + private String collectorHost; + private String protocol; + private String port; + private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; + private static final Log LOG = LogFactory.getLog(AnomalyMetricPublisher.class); + private static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + } + + public AnomalyMetricPublisher(String collectorHost, String protocol, String port) { + this.collectorHost = collectorHost; + this.protocol = protocol; + this.port = port; + this.hostName = getDefaultLocalHostName(); + } + + private String getDefaultLocalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.info("Error getting host address"); + } + return null; + } + + public void publish(List<MetricAnomaly> metricAnomalies) { + LOG.info("Sending metric anomalies of size : " + metricAnomalies.size()); + List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies); + LOG.info("Sending TimelineMetric list of size : " + metricList.size()); + if (!metricList.isEmpty()) { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + emitMetrics(timelineMetrics); + } + } + + private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) { + List<TimelineMetric> metrics = new ArrayList<>(); + + if (metricAnomalies.isEmpty()) { + return metrics; + } + + long currentTime = System.currentTimeMillis(); + MetricAnomaly prevAnomaly = metricAnomalies.get(0); + + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + prevAnomaly.getMethodResult().getMethodType()); + timelineMetric.setAppId(serviceName); + timelineMetric.setInstanceId(instanceId); + timelineMetric.setHostName(hostName); + timelineMetric.setStartTime(currentTime); + + TreeMap<Long,Double> metricValues = new TreeMap<>(); + metricValues.put(prevAnomaly.getTimestamp(), prevAnomaly.getMetricValue()); + MetricAnomaly currentAnomaly; + + for (int i = 1; i < metricAnomalies.size(); i++) { + currentAnomaly = metricAnomalies.get(i); + if (currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) { + metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue()); + } else { + timelineMetric.setMetricValues(metricValues); + metrics.add(timelineMetric); + + timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(currentAnomaly.getMetricKey() + "_" + currentAnomaly.getMethodResult().getMethodType()); + timelineMetric.setAppId(serviceName); + timelineMetric.setInstanceId(instanceId); + timelineMetric.setHostName(hostName); + timelineMetric.setStartTime(currentTime); + metricValues = new TreeMap<>(); + metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue()); + prevAnomaly = currentAnomaly; + } + } + + timelineMetric.setMetricValues(metricValues); + metrics.add(timelineMetric); + return metrics; + } + + private boolean emitMetrics(TimelineMetrics metrics) { + String connectUrl = constructTimelineMetricUri(); + String jsonData = null; + LOG.info("EmitMetrics connectUrl = " + connectUrl); + try { + jsonData = mapper.writeValueAsString(metrics); + } catch (IOException e) { + LOG.error("Unable to parse metrics", e); + } + if (jsonData != null) { + return emitMetricsJson(connectUrl, jsonData); + } + return false; + } + + private HttpURLConnection getConnection(String spec) throws IOException { + return (HttpURLConnection) new URL(spec).openConnection(); + } + + private boolean emitMetricsJson(String connectUrl, String jsonData) { + LOG.info("Metrics Data : " + jsonData); + int timeout = 10000; + HttpURLConnection connection = null; + try { + if (connectUrl == null) { + throw new IOException("Unknown URL. Unable to connect to metrics collector."); + } + connection = getConnection(connectUrl); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Connection", "Keep-Alive"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setDoOutput(true); + + if (jsonData != null) { + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + + int statusCode = connection.getResponseCode(); + + if (statusCode != 200) { + LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + + "statusCode = " + statusCode); + } else { + LOG.info("Metrics posted to Collector " + connectUrl); + } + return true; + } catch (IOException ioe) { + LOG.error(ioe.getMessage()); + } + return false; + } + + private String constructTimelineMetricUri() { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(collectorHost); + sb.append(":"); + sb.append(port); + sb.append(WS_V1_TIMELINE_METRICS); + return sb.toString(); + } +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java new file mode 100644 index 0000000..ab87a95 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java @@ -0,0 +1,134 @@ +package org.apache.ambari.metrics.alertservice.spark; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ambari.metrics.alertservice.common.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.common.TimelineMetric; +import org.apache.ambari.metrics.alertservice.common.TimelineMetrics; +import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel; +import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel; +import org.apache.ambari.metrics.alertservice.methods.ema.EmaModelLoader; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; +import scala.Tuple2; + +import java.util.*; + +public class MetricAnomalyDetector { + + private static final Log LOG = LogFactory.getLog(MetricAnomalyDetector.class); + private static String groupId = "ambari-metrics-group"; + private static String topicName = "ambari-metrics-topic"; + private static int numThreads = 1; + + //private static String zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181"; + //private static Map<String, String> kafkaParams = new HashMap<>(); + //static { + // kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "avijayan-ams-2.openstacklocal:6667"); + // kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + // kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer"); + // kafkaParams.put("metadata.broker.list", "avijayan-ams-2.openstacklocal:6667"); + //} + + public MetricAnomalyDetector() { + } + + public static void main(String[] args) throws InterruptedException { + + + if (args.length < 6) { + System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>"); + System.exit(1); + } + + List<String> appIds = Arrays.asList(args[1].split(",")); + String collectorHost = args[2]; + String collectorPort = args[3]; + String collectorProtocol = args[4]; + String zkQuorum = args[5]; + + List<MetricAnomalyModel> anomalyDetectionModels = new ArrayList<>(); + AnomalyMetricPublisher anomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort); + + SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector"); + + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); + + for (String method : args[0].split(",")) { + if (method.equals("ema")) { + LOG.info("Model EMA requested."); + EmaModel emaModel = new EmaModelLoader().load(jssc.sparkContext().sc(), "/tmp/model/ema"); + anomalyDetectionModels.add(emaModel); + } + } + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); + + //Convert JSON string to TimelineMetrics. + JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { + @Override + public TimelineMetrics call(Tuple2<String, String> message) throws Exception { + LOG.info(message._2()); + ObjectMapper mapper = new ObjectMapper(); + TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); + return metrics; + } + }); + + //Group TimelineMetric by AppId. + JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( + timelineMetrics -> new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(),timelineMetrics) + ); + + appMetricStream.print(); + + //Filter AppIds that are not needed. + JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { + @Override + public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { + return appIds.contains(appMetricTuple._1); + } + }); + + filteredAppMetricStream.print(); + + filteredAppMetricStream.foreachRDD(rdd -> { + rdd.foreach( + tuple2 -> { + TimelineMetrics metrics = tuple2._2(); + LOG.info("Received Metric : " + metrics.getMetrics().get(0).getMetricName()); + for (TimelineMetric metric : metrics.getMetrics()) { + + TimelineMetric timelineMetric = + new TimelineMetric(metric.getMetricName(), metric.getAppId(), metric.getHostName(), metric.getMetricValues()); + LOG.info("Models size : " + anomalyDetectionModels.size()); + + for (MetricAnomalyModel model : anomalyDetectionModels) { + LOG.info("Testing against Model : " + model.getClass().getCanonicalName()); + List<MetricAnomaly> anomalies = model.test(timelineMetric); + anomalyMetricPublisher.publish(anomalies); + for (MetricAnomaly anomaly : anomalies) { + LOG.info(anomaly.getAnomalyAsString()); + } + + } + } + }); + }); + + jssc.start(); + jssc.awaitTermination(); + } +} + + + + diff --git a/ambari-metrics/ambari-metrics-spark/pom.xml b/ambari-metrics/ambari-metrics-spark/pom.xml new file mode 100644 index 0000000..33b4257 --- /dev/null +++ b/ambari-metrics/ambari-metrics-spark/pom.xml @@ -0,0 +1,133 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>2.5.1.0.0</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>ambari-metrics-spark</artifactId> + <version>2.5.1.0.0</version> + <properties> + <scala.version>2.10.4</scala.version> + </properties> + + <repositories> + <repository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </repository> + </repositories> + + <pluginRepositories> + <pluginRepository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </pluginRepository> + </pluginRepositories> + + <dependencies> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.specs</groupId> + <artifactId>specs</artifactId> + <version>1.2.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + <version>4.7.0-HBase-1.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-alertservice</artifactId> + <version>2.5.1.0.0</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api-scala_2.10</artifactId> + <version>2.8.2</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_2.10</artifactId> + <version>2.1.1</version> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/scala</sourceDirectory> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <args> + <arg>-target:jvm-1.5</arg> + </args> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <configuration> + <downloadSources>true</downloadSources> + <buildcommands> + <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <additionalProjectnatures> + <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> + </additionalProjectnatures> + <classpathContainers> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> + </classpathContainers> + </configuration> + </plugin> + </plugins> + </build> + <reporting> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + </configuration> + </plugin> + </plugins> + </reporting> +</project> diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala new file mode 100644 index 0000000..d4ed31a --- /dev/null +++ b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala @@ -0,0 +1,97 @@ +package org.apache.ambari.metrics.spark + + +import java.util +import java.util.logging.LogManager + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.spark.SparkConf +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.ambari.metrics.alertservice.common.{MetricAnomaly, TimelineMetrics} +import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel +import org.apache.ambari.metrics.alertservice.methods.ema.{EmaModel, EmaModelLoader} +import org.apache.ambari.metrics.alertservice.spark.AnomalyMetricPublisher +import org.apache.log4j.Logger +import org.apache.spark.storage.StorageLevel + +import scala.collection.JavaConversions._ +import org.apache.logging.log4j.scala.Logging + +object MetricAnomalyDetector extends Logging { + + + var zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181" + var groupId = "ambari-metrics-group" + var topicName = "ambari-metrics-topic" + var numThreads = 1 + val anomalyDetectionModels: Array[MetricAnomalyModel] = Array[MetricAnomalyModel]() + + def main(args: Array[String]): Unit = { + + @transient + lazy val log: Logger = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger") + + if (args.length < 5) { + System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol>") + System.exit(1) + } + + for (method <- args(0).split(",")) { + if (method == "ema") anomalyDetectionModels :+ new EmaModel() + } + + val appIds = util.Arrays.asList(args(1).split(",")) + + val collectorHost = args(2) + val collectorPort = args(3) + val collectorProtocol = args(4) + + val anomalyMetricPublisher: AnomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort) + + val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector") + + val streamingContext = new StreamingContext(sparkConf, Duration(10000)) + + val emaModel = new EmaModelLoader().load(streamingContext.sparkContext, "/tmp/model/ema") + + val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2) + kafkaStream.print() + + var timelineMetricsStream = kafkaStream.map( message => { + val mapper = new ObjectMapper + val metrics = mapper.readValue(message._2, classOf[TimelineMetrics]) + metrics + }) + timelineMetricsStream.print() + + var appMetricStream = timelineMetricsStream.map( timelineMetrics => { + (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics) + }) + appMetricStream.print() + + var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => { + appIds.contains(appMetricTuple._1) + } ) + filteredAppMetricStream.print() + + filteredAppMetricStream.foreachRDD( rdd => { + rdd.foreach( appMetricTuple => { + val timelineMetrics = appMetricTuple._2 + logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName) + log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName) + for (timelineMetric <- timelineMetrics.getMetrics) { + var anomalies = emaModel.test(timelineMetric) + anomalyMetricPublisher.publish(anomalies) + for (anomaly <- anomalies) { + var an = anomaly : MetricAnomaly + logger.info(an.getAnomalyAsString) + } + } + }) + }) + + streamingContext.start() + streamingContext.awaitTermination() + } + } diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala new file mode 100644 index 0000000..5ca7b17 --- /dev/null +++ b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala @@ -0,0 +1,67 @@ +package org.apache.ambari.metrics.spark + +import org.apache.ambari.metrics.alertservice.common.TimelineMetric +import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.rdd.RDD + +object SparkPhoenixReader { + + def main(args: Array[String]) { + + if (args.length < 6) { + System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") + System.exit(1) + } + + var metricName = args(0) + var appId = args(1) + var hostname = args(2) + var weight = args(3).toDouble + var timessdev = args(4).toInt + var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure + var modelDir = args(6) + + val conf = new SparkConf() + conf.set("spark.app.name", "AMSAnomalyModelBuilder") + //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") + + var sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + + val currentTime = System.currentTimeMillis() + val oneDayBack = currentTime - 24*60*60*1000 + + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) + df.registerTempTable("METRIC_RECORD") + val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + + "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) + + var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] + result.collect().foreach( + t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) + ) + + //val metricName = result.head().getString(0) + //val hostname = result.head().getString(1) + //val appId = result.head().getString(2) + + val timelineMetric = new TimelineMetric(metricName, appId, hostname, metricValues) + + var emaModel = new EmaModel() + emaModel.train(timelineMetric, weight, timessdev) + emaModel.save(sc, modelDir) + +// var metricData:Seq[Double] = Seq.empty +// result.collect().foreach( +// t => metricData :+ t.getDouble(4) / t.getInt(5) +// ) +// val data: RDD[Double] = sc.parallelize(metricData) +// val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1) +// val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF) + + } + +} diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index fc67cb1..67b7f4b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -697,6 +697,11 @@ <version>1.0.0.0-SNAPSHOT</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-alertservice</artifactId> + <version>2.5.1.0.0</version> + </dependency> </dependencies> <profiles> diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java similarity index 92% rename from ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java rename to ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java index 0836a72..3558f87 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -63,10 +64,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; @@ -85,6 +83,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time private Integer defaultTopNHostsLimit; private MetricCollectorHAController haController; private boolean containerMetricsDisabled = false; + private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667"); /** * Construct the service. @@ -372,11 +371,43 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time // Error indicated by the Sql exception TimelinePutResponse response = new TimelinePutResponse(); + try { + if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) { + kafkaProducer.sendMetrics(fromTimelineMetrics(metrics)); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error(e); + } hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false); return response; } + + private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) { + org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics(); + + List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>(); + for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { + timelineMetricList.add(fromTimelineMetric(timelineMetric)); + } + otherMetrics.setMetrics(timelineMetricList); + return otherMetrics; + } + + private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) { + + org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric(); + otherMetric.setMetricValues(timelineMetric.getMetricValues()); + otherMetric.setStartTime(timelineMetric.getStartTime()); + otherMetric.setHostName(timelineMetric.getHostName()); + otherMetric.setInstanceId(timelineMetric.getInstanceId()); + otherMetric.setAppId(timelineMetric.getAppId()); + otherMetric.setMetricName(timelineMetric.getMetricName()); + + return otherMetric; + } + @Override public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) throws SQLException, IOException { diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml index 47255ea..79ea06f 100644 --- a/ambari-metrics/pom.xml +++ b/ambari-metrics/pom.xml @@ -34,6 +34,8 @@ <module>ambari-metrics-grafana</module> <module>ambari-metrics-assembly</module> <module>ambari-metrics-host-aggregator</module> + <module>ambari-metrics-alertservice</module> + <module>ambari-metrics-spark</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> -- To stop receiving notification emails like this one, please contact [email protected].
