Alex,
Thank you for your response, my team is using the scalar interpretations
for our model (importing from the opf directory containing the function you
suggested). We are aggregating the web logs over periods of 15 seconds,
combining the response times into one value (while tracking the minimum,
maximum values and the number of requests serviced over the 15 second
period). I will send you a copy of the output after both outages have been
processed, for now, here are the python scripts responsible for parsing and
checking the data.
The data is first run through LogParser, which accesses NuPICModel and
sends the aggregated period's results to our local database (InfluxDB). I
am not sure what you will glean without having access to our sample data
(which is enormous), but maybe there is something.
Arthur Harris
On Sun, Apr 17, 2016 at 11:38 AM, Alex Lavin <[email protected]> wrote:
> Thank you for the update Arthur. What is the "default" NuPIC model you are
> using? For anomaly detection we recommend using this set params [1]. You
> can pull these params directly into your model with the
> getScalarMetricWithTimeOfDayAnomalyParams() [2], as we've done for the
> detector in NAB [3].
>
> Are you using anomaly score or anomaly likelihood to flag the anomalies?
> What values does the model output at the first and second outages? If a
> NuPIC model encounters the same anomalies repeatedly, it will learn these
> to be normal and stop detecting them.
>
> What is the granularity of the data? Yes it would help to see the code
> and/or the data.
>
> Also, if you're able to share the labeled anomaly data, you should check
> out the NAB Competition: numenta.org/nab. You can win a bunch of money
> for the data!
>
> [1]
> https://github.com/numenta/nupic/blob/master/src/nupic/frameworks/opf/common_models/anomaly_params_random_encoder/best_single_metric_anomaly_params.json
> [2]
> https://github.com/numenta/nupic/blob/master/src/nupic/frameworks/opf/common_models/cluster_params.py#L29
> [3]
> https://github.com/numenta/NAB/blob/master/nab/detectors/numenta/numenta_detector.py#L73-L78
>
> Cheers,
> Alex
>
> --
> Alexander Lavin
> Software Engineer
> Numenta
>
from nupic.frameworks.opf.modelfactory import ModelFactory
from nupic.algorithms import anomaly_likelihood
from nupic.frameworks.opf.common_models.cluster_params import \
getScalarMetricWithTimeOfDayAnomalyParams as createParams
class NuPICModel:
__model = None
__likelihood = None
__identifier = None
__learning_amount = 0 # 0 turns off learning period
__point_count = 0
def __init__(self, identifier, maximum):
self.__identifier = identifier
self.__maximum = max(maximum, 0)
# Initialize model params with default scalar interpretation, min = 0,
# max given by user
model_params = createParams(
metricData=[0],
minVal=-1,
maxVal=self.__maximum
)
# Initialize the NuPIC model and turn on learning
self.__model = ModelFactory.create(model_params["modelConfig"])
self.__model.enableLearning()
self.__model.enableInference(model_params["inferenceArgs"])
# Initialize the NuPIC object that will determine anomaly probabilities
self.__likelihood = anomaly_likelihood.AnomalyLikelihood()
def feed_point(self, cutoff_time, value):
# Feed the point into the model
result = self.__model.run({
"c0": cutoff_time,
"c1": value
})
self.__point_count += 1
# Get the anomaly information for the just-added point
anomaly = result.inferences["anomalyScore"]
# Turn off learning if the point count has reached the learning
# amount level. Return 0 if still learning, the anomaly score otherwise
if self.__point_count <= self.__learning_amount:
if self.__point_count == self.__learning_amount:
self.__model.disableLearning()
return float(0)
else:
return float(self.__likelihood.anomalyProbability(
value, anomaly, cutoff_time
))
#!/user/bin/env python
import datetime
import os
from nupic.frameworks.opf.modelfactory import ModelFactory
from nupic.algorithms import anomaly_likelihood
from nupic.frameworks.opf.common_models.cluster_params import \
getScalarMetricWithTimeOfDayAnomalyParams as createParams
# Database name
DATABASE_NAME = "bam"
# Create an instance of a NuPIC model
def create_model(model_params):
model = ModelFactory.create(model_params["modelConfig"])
model.enableLearning()
model.enableInference(model_params["inferenceArgs"])
return model
# Print out, to both the console and the specified file, a description of the
# detected anomaly
# String contains the date and time at which the anomaly occurs, the URL for
# which it occurred, and the likelihood
def print_anomaly_detected(out_file, likelihood, formatted_date, url_type):
output_string = url_type + "\tAnomaly detected: " + formatted_date + \
"\t\tLikelihood: " + str(likelihood)
print output_string
out_file.write(output_string + '\n')
# Test for response times anomalies
def test_response_times(learning_entries):
in_times = open('response-times.txt', 'r')
if os.path.isfile('alarms-responseTimes.txt'):
os.remove('alarms-responseTimes.txt')
out_times = open('alarms-responseTimes.txt', 'w')
params = createParams([0])
model_accounts = create_model(params)
model_tns = create_model(params)
likelihood_cutoff = 0.5
count = 0
likelihood = anomaly_likelihood.AnomalyLikelihood()
for line in in_times:
count += 1
data = line.split(',') # datetime first, value second
accounts_value = float(data[1])
tns_value = float(data[2])
date_time = data[0]
date_time = datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S")
accounts_result = model_accounts.run({
"c0": date_time,
"c1": accounts_value
})
tns_result = model_tns.run({
"c0": date_time,
"c1": tns_value
})
accounts_anomaly = accounts_result.inferences["anomalyScore"]
tns_anomaly = tns_result.inferences["anomalyScore"]
accounts_probability = likelihood.anomalyProbability(
accounts_value, accounts_anomaly, date_time
)
tns_probability = likelihood.anomalyProbability(
tns_value, tns_anomaly, date_time
)
accounts_likelihood = likelihood.computeLogLikelihood(
accounts_probability
)
tns_likelihood = likelihood.computeLogLikelihood(tns_probability)
if accounts_likelihood > likelihood_cutoff and count > learning_entries:
formatted_date = datetime.datetime.strftime(
date_time, "%Y-%m-%d %H:%M:%S"
)
print_anomaly_detected(
out_times, accounts_likelihood, formatted_date, "account"
)
if tns_likelihood > likelihood_cutoff and count > learning_entries:
formatted_date = datetime.datetime.strftime(
date_time, "%Y-%m-%d %H:%M:%S"
)
print_anomaly_detected(
out_times, tns_likelihood, formatted_date, "tns"
)
in_times.close()
out_times.close()
def test_response_rates(learning_entries):
in_rates = open('response-rates.txt', 'r')
if os.path.isfile('alarms-responseRates.txt'):
os.remove('alarms-responseRates.txt')
out_rates = open('alarms-responseRates.txt', 'w')
params = createParams([0])
model_accounts = create_model(params)
model_tns = create_model(params)
likelihood_cutoff = 0.5
count = 0
likelihood = anomaly_likelihood.AnomalyLikelihood()
for line in in_rates:
count += 1
data = line.split(',') # datetime first, value second
accounts_value = float(data[1])
tns_value = float(data[2])
date_time = data[0]
date_time = datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S")
accounts_result = model_accounts.run({
"c0": date_time,
"c1": accounts_value
})
tns_result = model_tns.run({
"c0": date_time,
"c1": tns_value
})
accounts_anomaly = accounts_result.inferences["anomalyScore"]
tns_anomaly = tns_result.inferences["anomalyScore"]
accounts_probability = likelihood.anomalyProbability(
accounts_value, accounts_anomaly, date_time
)
tns_probability = likelihood.anomalyProbability(
tns_value, tns_anomaly, date_time
)
accounts_likelihood = likelihood.computeLogLikelihood(
accounts_probability
)
tns_likelihood = likelihood.computeLogLikelihood(tns_probability)
if accounts_likelihood > likelihood_cutoff and count > learning_entries:
formatted_date = datetime.datetime.strftime(
date_time, "%Y-%m-%d %H:%M:%S"
)
print_anomaly_detected(
out_rates, accounts_likelihood, formatted_date, "account"
)
if tns_likelihood > likelihood_cutoff and count > learning_entries:
formatted_date = datetime.datetime.strftime(
date_time, "%Y-%m-%d %H:%M:%S")
print_anomaly_detected(
out_rates, tns_likelihood, formatted_date, "tns"
)
in_rates.close()
out_rates.close()
if __name__ == "__main__":
second_differential = 15
num_learning_days = 7
num_learning_entries = (60 * 60 * 24 * num_learning_days) / \
second_differential
test_response_times(num_learning_entries)
test_response_rates(num_learning_entries)
#!/user/bin/env python
import os
import datetime
import re
from RedisAccess.RedisHandler import RedisHandler
from datetime import timedelta
from DBAccess.InfluxDBHandler import InfluxDBHandler
from NuPICModel import NuPICModel
class Statistics:
__url_stats = {}
def __init__(self):
self.__url_stats = {}
def clear(self):
self.__url_stats.clear()
def get_url(self, url):
return self.__url_stats.get(url, {
"average": 0,
"max": 0,
"count": 0,
"min": 0
})
def update_stats(self, url, response_time):
a = self.__url_stats.get(url, {}).get("average", 0)
c = self.__url_stats.get(url, {}).get("count", 0)
m = self.__url_stats.get(url, {}).get("min", response_time)
n = self.__url_stats.get(url, {}).get("max", response_time)
self.__url_stats[url] = {
"average": ((a * c) + response_time) / (c + 1),
"count": c + 1,
"max": max(n, response_time),
"min": min(m, response_time)
}
# Class for parsing log files
class LogParser:
# Measurement for InfluxDB
__MEASUREMENT_NAME = "log"
# Access logs location
__ACCESS_LOG_DIR = "../access-logs/web01"
# Second differential over which to aggregate data
__SECOND_DIFFERENTIAL = 15
# URLs
__urls = {}
def __init__(self):
self.add_url("available_numbers", "/availableNumbers")
# Parse a single line of an Apache log file
# Returns a date and time string and the response time (ms)
def __parse_line(self, log_line):
split_line = log_line.split('"') # NOT on space
date_part = split_line[0].split() # split on spaces
request_date_time = self.__parse_date(date_part[3])
response_part = split_line[2].split()
request_response_time = response_part[2]
return datetime.datetime.strptime(
request_date_time, "%Y-%m-%d %H:%M:%S"
), int(request_response_time)
def __matches_url(self, log_line, url_tag):
regex = self.__urls.get(url_tag, {}).get("regex", None)
if regex is not None:
return log_line.find(regex) != -1
else:
return False
# Create one data point for insertion
@staticmethod
def __make_data_point(measurement, tags, t, fields):
return {
"measurement": measurement,
"tags": tags,
"time": t,
"fields": fields
}
# Parse a datetime string to change the month from the abbreviation to the
# numeric representation
@staticmethod
def __parse_date(date_string):
month_lookup = {
"Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05",
"Jun": "06", "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10",
"Nov": "11", "Dec": "12"
}
string_parts = date_string.split("/")
month = month_lookup[string_parts[1]] # get numeric representation
day = string_parts[0]
day = day.replace("[", "")
year = string_parts[2].split(":", 1)[0]
t = string_parts[2].split(":", 1)[1]
return year + "-" + month + "-" + day + " " + t
# Runs the NuPIC model on information from the data point and adds
# anomaly score fields to the dict
def __make_nupic(self, url_tag, cutoff, url_point):
# Get all the models for this URL
url_models = self.__urls.get(url_tag, {}).get("models", {})
# Initialize a dict to hold the anomaly scores, setting it first to just
# the raw values
anomaly_points = url_point.copy()
# Iterate through each field in url_point (average, min, max, etc.)
for key, value in url_point.iteritems():
# Get the associated model, or None if nonexistent
model = url_models.get(key, None)
if model is not None:
# Feed the data point to the model and store the returned
# anomaly score
anomaly_points["anomaly" + key.strip().title()] = \
model.feed_point(cutoff, value)
return anomaly_points
# Update the stats for the aggregation period
def __update_aggregation_stats(self, url_stats, response_time, log_line):
if log_line is not None:
for url_tag in self.__urls:
if self.__matches_url(log_line, url_tag):
url_stats.update_stats(url_tag, response_time)
# End an aggregation period by creating actual InfluxDB data points
def __end_aggregation_period(
self, current, cutoff, url_stats, response_time, log_line
):
temp_points = []
while (current - cutoff).total_seconds() >= self.__SECOND_DIFFERENTIAL:
for url in self.__urls:
# Set field as 0 if no log in the aggregation period
fields = self.__make_nupic(url, cutoff, url_stats.get_url(url))
# Append the points
temp_points.append(self.__make_data_point(
self.__MEASUREMENT_NAME, {
"url": url
}, cutoff, fields))
url_stats.clear()
cutoff += timedelta(seconds=self.__SECOND_DIFFERENTIAL)
# Retain the point just read
self.__update_aggregation_stats(url_stats, response_time, log_line)
return cutoff, temp_points
# Process one log line
# line is the line itself, cutoff_time is the start of the current
# aggregation period, url_stats is the data for the current aggregation
# period
def __process_line(self, line, cutoff_time, url_stats, influx):
# Get information of current line
date_time, response_time = self.__parse_line(line)
# If line is within the current aggregation period
if (date_time - cutoff_time).total_seconds() <= \
self.__SECOND_DIFFERENTIAL:
self.__update_aggregation_stats(url_stats, response_time, line)
# If current line is in next aggregation period
else:
cutoff_time, temp_points = self.__end_aggregation_period(
date_time, cutoff_time, url_stats, response_time, line
)
if temp_points:
influx.write_to_influx(temp_points)
return cutoff_time
# Process file
# log_in is the file input handler
def __process_file(self, log_in, influx):
# Use first line to set initial values for loop
first_date_time, response_time = self.__parse_line(log_in.readline())
# We want the date to be right, but start at midnight for every log
cutoff_time = first_date_time.replace(
hour=0, minute=0, second=0, microsecond=0
)
log_in.seek(0)
url_stats = Statistics()
# Iterate through lines in the log file
for line in log_in:
cutoff_time = self.__process_line(
line, cutoff_time, url_stats, influx
)
# Write remaining points from the last 15 seconds of the day
# The end time would actually occur in the next file, so we must start
# this manually
end_of_day = first_date_time.replace(
hour=0, minute=0, second=0, microsecond=0
) + timedelta(days=1)
cutoff_time, more_points = self.__end_aggregation_period(
end_of_day, cutoff_time, url_stats, 0, None
)
if more_points:
influx.write_to_influx(more_points)
def run_sample_data(self):
# InfluxDB instance
influx = InfluxDBHandler("log")
# Iterate through all the sample log files
for root, directory, files in os.walk(self.__ACCESS_LOG_DIR):
for f in files:
print "Starting file " + str(f)
tmp = os.path.join(self.__ACCESS_LOG_DIR, f)
log_in = open(tmp, "r")
# Process a file
self.__process_file(log_in, influx)
# Close the file
log_in.close()
print "Finished file " + str(f)
def add_url(self, url_tag, matching_regex):
if self.__urls.get(url_tag, None) is None:
self.__urls[url_tag] = {
"tag": url_tag,
"regex": matching_regex,
"models": {
"average": NuPICModel("average", 4037853),
"min": NuPICModel("min", 4037853),
"max": NuPICModel("max", 4237310),
"count": NuPICModel("count", 661)
}
}
# Parses a single log line
def parse_line(self, log_line):
date_time, response_time = self.__parse_line(log_line)
for url_tag in self.__urls:
if self.__matches_url(log_line, url_tag):
RedisHandler.update_stats(url_tag, response_time, date_time)
# TODO Run the NuPIC models as needed
if __name__ == "__main__":
parser = LogParser()
parser.run_sample_data()