Alex,

Thank you for your response, my team is using the scalar interpretations
for our model (importing from the opf directory containing the function you
suggested). We are aggregating the web logs over periods of 15 seconds,
combining the response times into one value (while tracking the minimum,
maximum values and the number of requests serviced over the 15 second
period). I will send you a copy of the output after both outages have been
processed, for now, here are the python scripts responsible for parsing and
checking the data.

The data is first run through LogParser, which accesses NuPICModel and
sends the aggregated period's results to our local database (InfluxDB). I
am not sure what you will glean without having access to our sample data
(which is enormous), but maybe there is something.

Arthur Harris

On Sun, Apr 17, 2016 at 11:38 AM, Alex Lavin <[email protected]> wrote:

> Thank you for the update Arthur. What is the "default" NuPIC model you are
> using? For anomaly detection we recommend using this set params [1]. You
> can pull these params directly into your model with the
> getScalarMetricWithTimeOfDayAnomalyParams() [2], as we've done for the
> detector in NAB [3].
>
> Are you using anomaly score or anomaly likelihood to flag the anomalies?
> What  values does the model output at the first and second outages? If a
> NuPIC model encounters the same anomalies repeatedly, it will learn these
> to be normal and stop detecting them.
>
> What is the granularity of the data? Yes it would help to see the code
> and/or the data.
>
> Also, if you're able to share the labeled anomaly data, you should check
> out the NAB Competition: numenta.org/nab. You can win a bunch of money
> for the data!
>
> [1]
> https://github.com/numenta/nupic/blob/master/src/nupic/frameworks/opf/common_models/anomaly_params_random_encoder/best_single_metric_anomaly_params.json
> [2]
> https://github.com/numenta/nupic/blob/master/src/nupic/frameworks/opf/common_models/cluster_params.py#L29
> [3]
> https://github.com/numenta/NAB/blob/master/nab/detectors/numenta/numenta_detector.py#L73-L78
>
> Cheers,
> Alex
>
> --
> Alexander Lavin
> Software Engineer
> Numenta
>
from nupic.frameworks.opf.modelfactory import ModelFactory
from nupic.algorithms import anomaly_likelihood
from nupic.frameworks.opf.common_models.cluster_params import \
    getScalarMetricWithTimeOfDayAnomalyParams as createParams


class NuPICModel:
    __model = None
    __likelihood = None
    __identifier = None

    __learning_amount = 0           # 0 turns off learning period
    __point_count = 0

    def __init__(self, identifier, maximum):
        self.__identifier = identifier
        self.__maximum = max(maximum, 0)

        # Initialize model params with default scalar interpretation, min = 0,
        # max given by user
        model_params = createParams(
            metricData=[0],
            minVal=-1,
            maxVal=self.__maximum
        )

        # Initialize the NuPIC model and turn on learning
        self.__model = ModelFactory.create(model_params["modelConfig"])
        self.__model.enableLearning()
        self.__model.enableInference(model_params["inferenceArgs"])

        # Initialize the NuPIC object that will determine anomaly probabilities
        self.__likelihood = anomaly_likelihood.AnomalyLikelihood()

    def feed_point(self, cutoff_time, value):
        # Feed the point into the model
        result = self.__model.run({
            "c0": cutoff_time,
            "c1": value
        })
        self.__point_count += 1

        # Get the anomaly information for the just-added point
        anomaly = result.inferences["anomalyScore"]

        # Turn off learning if the point count has reached the learning
        # amount level. Return 0 if still learning, the anomaly score otherwise
        if self.__point_count <= self.__learning_amount:
            if self.__point_count == self.__learning_amount:
                self.__model.disableLearning()
            return float(0)
        else:
            return float(self.__likelihood.anomalyProbability(
                value, anomaly, cutoff_time
            ))

#!/user/bin/env python

import datetime
import os
from nupic.frameworks.opf.modelfactory import ModelFactory
from nupic.algorithms import anomaly_likelihood
from nupic.frameworks.opf.common_models.cluster_params import \
    getScalarMetricWithTimeOfDayAnomalyParams as createParams

# Database name
DATABASE_NAME = "bam"


# Create an instance of a NuPIC model
def create_model(model_params):
    model = ModelFactory.create(model_params["modelConfig"])
    model.enableLearning()
    model.enableInference(model_params["inferenceArgs"])
    return model


# Print out, to both the console and the specified file, a description of the
#  detected anomaly
# String contains the date and time at which the anomaly occurs, the URL for
# which it occurred, and the likelihood
def print_anomaly_detected(out_file, likelihood, formatted_date, url_type):
    output_string = url_type + "\tAnomaly detected: " + formatted_date + \
                    "\t\tLikelihood: " + str(likelihood)
    print output_string
    out_file.write(output_string + '\n')


# Test for response times anomalies
def test_response_times(learning_entries):
    in_times = open('response-times.txt', 'r')
    if os.path.isfile('alarms-responseTimes.txt'):
        os.remove('alarms-responseTimes.txt')
    out_times = open('alarms-responseTimes.txt', 'w')
    params = createParams([0])
    model_accounts = create_model(params)
    model_tns = create_model(params)
    likelihood_cutoff = 0.5
    count = 0
    likelihood = anomaly_likelihood.AnomalyLikelihood()
    for line in in_times:
        count += 1
        data = line.split(',')  # datetime first, value second
        accounts_value = float(data[1])
        tns_value = float(data[2])
        date_time = data[0]
        date_time = datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S")
        accounts_result = model_accounts.run({
            "c0": date_time,
            "c1": accounts_value
        })
        tns_result = model_tns.run({
            "c0": date_time,
            "c1": tns_value
        })
        accounts_anomaly = accounts_result.inferences["anomalyScore"]
        tns_anomaly = tns_result.inferences["anomalyScore"]
        accounts_probability = likelihood.anomalyProbability(
            accounts_value, accounts_anomaly, date_time
        )
        tns_probability = likelihood.anomalyProbability(
            tns_value, tns_anomaly, date_time
        )
        accounts_likelihood = likelihood.computeLogLikelihood(
            accounts_probability
        )
        tns_likelihood = likelihood.computeLogLikelihood(tns_probability)
        if accounts_likelihood > likelihood_cutoff and count > learning_entries:
            formatted_date = datetime.datetime.strftime(
                date_time, "%Y-%m-%d %H:%M:%S"
            )
            print_anomaly_detected(
                out_times, accounts_likelihood, formatted_date, "account"
            )
        if tns_likelihood > likelihood_cutoff and count > learning_entries:
            formatted_date = datetime.datetime.strftime(
                date_time, "%Y-%m-%d %H:%M:%S"
            )
            print_anomaly_detected(
                out_times, tns_likelihood, formatted_date, "tns"
            )
    in_times.close()
    out_times.close()


def test_response_rates(learning_entries):
    in_rates = open('response-rates.txt', 'r')
    if os.path.isfile('alarms-responseRates.txt'):
        os.remove('alarms-responseRates.txt')
    out_rates = open('alarms-responseRates.txt', 'w')
    params = createParams([0])
    model_accounts = create_model(params)
    model_tns = create_model(params)
    likelihood_cutoff = 0.5
    count = 0
    likelihood = anomaly_likelihood.AnomalyLikelihood()
    for line in in_rates:
        count += 1
        data = line.split(',')  # datetime first, value second
        accounts_value = float(data[1])
        tns_value = float(data[2])
        date_time = data[0]
        date_time = datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S")
        accounts_result = model_accounts.run({
            "c0": date_time,
            "c1": accounts_value
        })
        tns_result = model_tns.run({
            "c0": date_time,
            "c1": tns_value
        })
        accounts_anomaly = accounts_result.inferences["anomalyScore"]
        tns_anomaly = tns_result.inferences["anomalyScore"]
        accounts_probability = likelihood.anomalyProbability(
            accounts_value, accounts_anomaly, date_time
        )
        tns_probability = likelihood.anomalyProbability(
            tns_value, tns_anomaly, date_time
        )
        accounts_likelihood = likelihood.computeLogLikelihood(
            accounts_probability
        )
        tns_likelihood = likelihood.computeLogLikelihood(tns_probability)
        if accounts_likelihood > likelihood_cutoff and count > learning_entries:
            formatted_date = datetime.datetime.strftime(
                date_time, "%Y-%m-%d %H:%M:%S"
            )
            print_anomaly_detected(
                out_rates, accounts_likelihood, formatted_date, "account"
            )
        if tns_likelihood > likelihood_cutoff and count > learning_entries:
            formatted_date = datetime.datetime.strftime(
                date_time, "%Y-%m-%d %H:%M:%S")
            print_anomaly_detected(
                out_rates, tns_likelihood, formatted_date, "tns"
            )
    in_rates.close()
    out_rates.close()


if __name__ == "__main__":
    second_differential = 15
    num_learning_days = 7
    num_learning_entries = (60 * 60 * 24 * num_learning_days) / \
        second_differential
    test_response_times(num_learning_entries)
    test_response_rates(num_learning_entries)
#!/user/bin/env python

import os
import datetime
import re
from RedisAccess.RedisHandler import RedisHandler
from datetime import timedelta
from DBAccess.InfluxDBHandler import InfluxDBHandler
from NuPICModel import NuPICModel


class Statistics:
    __url_stats = {}

    def __init__(self):
        self.__url_stats = {}

    def clear(self):
        self.__url_stats.clear()

    def get_url(self, url):
        return self.__url_stats.get(url, {
            "average": 0,
            "max": 0,
            "count": 0,
            "min": 0
        })

    def update_stats(self, url, response_time):
        a = self.__url_stats.get(url, {}).get("average", 0)
        c = self.__url_stats.get(url, {}).get("count", 0)
        m = self.__url_stats.get(url, {}).get("min", response_time)
        n = self.__url_stats.get(url, {}).get("max", response_time)
        self.__url_stats[url] = {
            "average": ((a * c) + response_time) / (c + 1),
            "count": c + 1,
            "max": max(n, response_time),
            "min": min(m, response_time)
        }


# Class for parsing log files
class LogParser:
    # Measurement for InfluxDB
    __MEASUREMENT_NAME = "log"
    # Access logs location
    __ACCESS_LOG_DIR = "../access-logs/web01"
    # Second differential over which to aggregate data
    __SECOND_DIFFERENTIAL = 15

    # URLs
    __urls = {}

    def __init__(self):
        self.add_url("available_numbers", "/availableNumbers")

    # Parse a single line of an Apache log file
    # Returns a date and time string and the response time (ms)
    def __parse_line(self, log_line):
        split_line = log_line.split('"')            # NOT on space
        date_part = split_line[0].split()           # split on spaces
        request_date_time = self.__parse_date(date_part[3])
        response_part = split_line[2].split()
        request_response_time = response_part[2]

        return datetime.datetime.strptime(
            request_date_time, "%Y-%m-%d %H:%M:%S"
        ), int(request_response_time)

    def __matches_url(self, log_line, url_tag):
        regex = self.__urls.get(url_tag, {}).get("regex", None)
        if regex is not None:
            return log_line.find(regex) != -1
        else:
            return False

    # Create one data point for insertion
    @staticmethod
    def __make_data_point(measurement, tags, t, fields):
        return {
            "measurement": measurement,
            "tags": tags,
            "time": t,
            "fields": fields
        }

    # Parse a datetime string to change the month from the abbreviation to the
    # numeric representation
    @staticmethod
    def __parse_date(date_string):
        month_lookup = {
            "Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05",
            "Jun": "06", "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10",
            "Nov": "11", "Dec": "12"
        }
        string_parts = date_string.split("/")
        month = month_lookup[string_parts[1]]       # get numeric representation
        day = string_parts[0]
        day = day.replace("[", "")
        year = string_parts[2].split(":", 1)[0]
        t = string_parts[2].split(":", 1)[1]
        return year + "-" + month + "-" + day + " " + t

    # Runs the NuPIC model on information from the data point and adds
    # anomaly score fields to the dict
    def __make_nupic(self, url_tag, cutoff, url_point):
        # Get all the models for this URL
        url_models = self.__urls.get(url_tag, {}).get("models", {})

        # Initialize a dict to hold the anomaly scores, setting it first to just
        # the raw values
        anomaly_points = url_point.copy()

        # Iterate through each field in url_point (average, min, max, etc.)
        for key, value in url_point.iteritems():
            # Get the associated  model, or None if nonexistent
            model = url_models.get(key, None)
            if model is not None:
                # Feed the data point to the model and store the returned
                # anomaly score
                anomaly_points["anomaly" + key.strip().title()] = \
                    model.feed_point(cutoff, value)

        return anomaly_points

    # Update the stats for the aggregation period
    def __update_aggregation_stats(self, url_stats, response_time, log_line):
        if log_line is not None:
            for url_tag in self.__urls:
                if self.__matches_url(log_line, url_tag):
                    url_stats.update_stats(url_tag, response_time)

    # End an aggregation period by creating actual InfluxDB data points
    def __end_aggregation_period(
        self, current, cutoff, url_stats, response_time, log_line
    ):
        temp_points = []
        while (current - cutoff).total_seconds() >= self.__SECOND_DIFFERENTIAL:
            for url in self.__urls:
                # Set field as 0 if no log in the aggregation period
                fields = self.__make_nupic(url, cutoff, url_stats.get_url(url))
                # Append the points
                temp_points.append(self.__make_data_point(
                    self.__MEASUREMENT_NAME, {
                        "url": url
                    }, cutoff, fields))
            url_stats.clear()
            cutoff += timedelta(seconds=self.__SECOND_DIFFERENTIAL)
        # Retain the point just read
        self.__update_aggregation_stats(url_stats, response_time, log_line)
        return cutoff, temp_points

    # Process one log line
    # line is the line itself, cutoff_time is the start of the current
    # aggregation period, url_stats is the data for the current aggregation
    # period
    def __process_line(self, line, cutoff_time, url_stats, influx):
        # Get information of current line
        date_time, response_time = self.__parse_line(line)

        # If line is within the current aggregation period
        if (date_time - cutoff_time).total_seconds() <= \
                self.__SECOND_DIFFERENTIAL:
            self.__update_aggregation_stats(url_stats, response_time, line)
        # If current line is in next aggregation period
        else:
            cutoff_time, temp_points = self.__end_aggregation_period(
                date_time, cutoff_time, url_stats, response_time, line
            )
            if temp_points:
                influx.write_to_influx(temp_points)

        return cutoff_time

    # Process file
    # log_in is the file input handler
    def __process_file(self, log_in, influx):
        # Use first line to set initial values for loop
        first_date_time, response_time = self.__parse_line(log_in.readline())
        # We want the date to be right, but start at midnight for every log
        cutoff_time = first_date_time.replace(
            hour=0, minute=0, second=0, microsecond=0
        )
        log_in.seek(0)

        url_stats = Statistics()

        # Iterate through lines in the log file
        for line in log_in:
            cutoff_time = self.__process_line(
                line, cutoff_time, url_stats, influx
            )

        # Write remaining points from the last 15 seconds of the day
        # The end time would actually occur in the next file, so we must start
        # this manually
        end_of_day = first_date_time.replace(
            hour=0, minute=0, second=0, microsecond=0
        ) + timedelta(days=1)
        cutoff_time, more_points = self.__end_aggregation_period(
            end_of_day, cutoff_time, url_stats, 0, None
        )
        if more_points:
            influx.write_to_influx(more_points)

    def run_sample_data(self):
        # InfluxDB instance
        influx = InfluxDBHandler("log")

        # Iterate through all the sample log files
        for root, directory, files in os.walk(self.__ACCESS_LOG_DIR):
            for f in files:
                print "Starting file " + str(f)
                tmp = os.path.join(self.__ACCESS_LOG_DIR, f)
                log_in = open(tmp, "r")

                # Process a file
                self.__process_file(log_in, influx)

                # Close the file
                log_in.close()
                print "Finished file " + str(f)

    def add_url(self, url_tag, matching_regex):
        if self.__urls.get(url_tag, None) is None:
            self.__urls[url_tag] = {
                "tag": url_tag,
                "regex": matching_regex,
                "models": {
                    "average": NuPICModel("average", 4037853),
                    "min": NuPICModel("min", 4037853),
                    "max": NuPICModel("max", 4237310),
                    "count": NuPICModel("count", 661)
                }
            }

    # Parses a single log line
    def parse_line(self, log_line):
        date_time, response_time = self.__parse_line(log_line)
        for url_tag in self.__urls:
            if self.__matches_url(log_line, url_tag):
                RedisHandler.update_stats(url_tag, response_time, date_time)
        # TODO Run the NuPIC models as needed


if __name__ == "__main__":
    parser = LogParser()
    parser.run_sample_data()

Reply via email to