Fix TM2 thresholds, max history from TO /health/

Fixes Traffic Monitor 2.0 to get per-cache (profile) connection
timeout, max history; and threshold kbps, load average, and query
time from the Traffic Ops /health/{cdn-name} endpoint.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/57b17e08
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/57b17e08
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/57b17e08

Branch: refs/heads/master
Commit: 57b17e08a1128740bc79449319518013e95cc23b
Parents: 88566da
Author: Robert Butts <robert.o.bu...@gmail.com>
Authored: Fri Oct 28 10:56:26 2016 -0600
Committer: Dave Neuman <neu...@apache.org>
Committed: Mon Nov 7 12:29:08 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/fetcher/fetcher.go      |  6 +-
 .../experimental/common/handler/handler.go      |  3 +-
 .../experimental/common/poller/poller.go        | 60 +++++++++++++++-----
 .../experimental/traffic_monitor/cache/cache.go | 17 +++---
 .../traffic_monitor/health/cache_health.go      | 47 ++++++++++++---
 .../traffic_monitor/manager/healthresult.go     |  3 +-
 .../traffic_monitor/manager/manager.go          |  1 -
 .../traffic_monitor/manager/monitorconfig.go    | 28 ++++++---
 .../traffic_monitor/manager/stathistory.go      | 20 +++----
 .../experimental/traffic_monitor/peer/peer.go   |  3 +-
 10 files changed, 130 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/common/fetcher/fetcher.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/fetcher/fetcher.go 
b/traffic_monitor/experimental/common/fetcher/fetcher.go
index f1176ea..d7d7646 100644
--- a/traffic_monitor/experimental/common/fetcher/fetcher.go
+++ b/traffic_monitor/experimental/common/fetcher/fetcher.go
@@ -43,7 +43,9 @@ func (f HttpFetcher) Fetch(id string, url string, pollId 
uint64, pollFinishedCha
        if f.Pending != nil {
                f.Pending.Inc()
        }
+       startReq := time.Now()
        response, err := f.Client.Do(req)
+       reqTime := time.Now().Sub(startReq)
        if f.Pending != nil {
                f.Pending.Dec()
        }
@@ -69,11 +71,11 @@ func (f HttpFetcher) Fetch(id string, url string, pollId 
uint64, pollFinishedCha
                        f.Success.Inc()
                }
                log.Debugf("poll %v %v fetch end\n", pollId, time.Now())
-               f.Handler.Handle(id, response.Body, err, pollId, 
pollFinishedChan)
+               f.Handler.Handle(id, response.Body, reqTime, err, pollId, 
pollFinishedChan)
        } else {
                if f.Fail != nil {
                        f.Fail.Inc()
                }
-               f.Handler.Handle(id, nil, err, pollId, pollFinishedChan)
+               f.Handler.Handle(id, nil, reqTime, err, pollId, 
pollFinishedChan)
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/common/handler/handler.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/handler/handler.go 
b/traffic_monitor/experimental/common/handler/handler.go
index 0b5f838..e1a558a 100644
--- a/traffic_monitor/experimental/common/handler/handler.go
+++ b/traffic_monitor/experimental/common/handler/handler.go
@@ -3,6 +3,7 @@ package handler
 import (
        "encoding/json"
        "io"
+       "time"
 
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
 )
@@ -14,7 +15,7 @@ const (
 )
 
 type Handler interface {
-       Handle(string, io.Reader, error, uint64, chan<- uint64)
+       Handle(string, io.Reader, time.Duration, error, uint64, chan<- uint64)
 }
 
 type OpsConfigFileHandler struct {

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/common/poller/poller.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/poller.go 
b/traffic_monitor/experimental/common/poller/poller.go
index 9a7402f..a78d767 100644
--- a/traffic_monitor/experimental/common/poller/poller.go
+++ b/traffic_monitor/experimental/common/poller/poller.go
@@ -22,14 +22,20 @@ type Poller interface {
 }
 
 type HttpPoller struct {
-       Config        HttpPollerConfig
-       ConfigChannel chan HttpPollerConfig
-       Fetcher       fetcher.Fetcher
-       TickChan      chan uint64
+       Config          HttpPollerConfig
+       ConfigChannel   chan HttpPollerConfig
+       FetcherTemplate fetcher.HttpFetcher // FetcherTemplate has all the 
constant settings, and is copied to create fetchers with custom HTTP client 
timeouts.
+       TickChan        chan uint64
+}
+
+type PollConfig struct {
+       URL     string
+       Timeout time.Duration
+       Handler handler.Handler
 }
 
 type HttpPollerConfig struct {
-       Urls     map[string]string
+       Urls     map[string]PollConfig
        Interval time.Duration
 }
 
@@ -46,7 +52,7 @@ func NewHTTP(interval time.Duration, tick bool, httpClient 
*http.Client, counter
                Config: HttpPollerConfig{
                        Interval: interval,
                },
-               Fetcher: fetcher.HttpFetcher{
+               FetcherTemplate: fetcher.HttpFetcher{
                        Handler:  fetchHandler,
                        Client:   httpClient,
                        Counters: counters,
@@ -124,7 +130,14 @@ func (p HttpPoller) Poll() {
                for _, info := range additions {
                        kill := make(chan struct{})
                        killChans[info.ID] = kill
-                       go pollHttp(info.Interval, info.ID, info.URL, 
p.Fetcher, kill)
+
+                       fetcher := p.FetcherTemplate
+                       if info.Timeout != 0 { // if the timeout isn't 
explicitly set, use the template value.
+                               c := *fetcher.Client
+                               fetcher.Client = &c // copy the client, so we 
don't change other fetchers.
+                               fetcher.Client.Timeout = info.Timeout
+                       }
+                       go pollHttp(info.Interval, info.ID, info.URL, fetcher, 
kill)
                }
                p.Config = newConfig
        }
@@ -132,8 +145,10 @@ func (p HttpPoller) Poll() {
 
 type HTTPPollInfo struct {
        Interval time.Duration
+       Timeout  time.Duration
        ID       string
        URL      string
+       Handler  handler.Handler
 }
 
 // diffConfigs takes the old and new configs, and returns a list of deleted 
IDs, and a list of new polls to do
@@ -145,26 +160,41 @@ func diffConfigs(old HttpPollerConfig, new 
HttpPollerConfig) ([]string, []HTTPPo
                for id, _ := range old.Urls {
                        deletions = append(deletions, id)
                }
-               for id, url := range new.Urls {
-                       additions = append(additions, HTTPPollInfo{Interval: 
new.Interval, ID: id, URL: url})
+               for id, pollCfg := range new.Urls {
+                       additions = append(additions, HTTPPollInfo{
+                               Interval: new.Interval,
+                               ID:       id,
+                               URL:      pollCfg.URL,
+                               Timeout:  pollCfg.Timeout,
+                       })
                }
                return deletions, additions
        }
 
-       for id, oldUrl := range old.Urls {
-               newUrl, newIdExists := new.Urls[id]
+       for id, oldPollCfg := range old.Urls {
+               newPollCfg, newIdExists := new.Urls[id]
                if !newIdExists {
                        deletions = append(deletions, id)
-               } else if newUrl != oldUrl {
+               } else if newPollCfg != oldPollCfg {
                        deletions = append(deletions, id)
-                       additions = append(additions, HTTPPollInfo{Interval: 
new.Interval, ID: id, URL: newUrl})
+                       additions = append(additions, HTTPPollInfo{
+                               Interval: new.Interval,
+                               ID:       id,
+                               URL:      newPollCfg.URL,
+                               Timeout:  newPollCfg.Timeout,
+                       })
                }
        }
 
-       for id, newUrl := range new.Urls {
+       for id, newPollCfg := range new.Urls {
                _, oldIdExists := old.Urls[id]
                if !oldIdExists {
-                       additions = append(additions, HTTPPollInfo{Interval: 
new.Interval, ID: id, URL: newUrl})
+                       additions = append(additions, HTTPPollInfo{
+                               Interval: new.Interval,
+                               ID:       id,
+                               URL:      newPollCfg.URL,
+                               Timeout:  newPollCfg.Timeout,
+                       })
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/cache/cache.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/cache.go 
b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
index 0d5a95c..52230d2 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
@@ -27,6 +27,7 @@ type Handler struct {
 }
 
 // NewHandler returns a new cache handler. Note this handler does NOT 
precomputes stat data before calling ResultChannel, and Result.Precomputed will 
be nil
+// TODO change this to take the ResultChan. It doesn't make sense for the 
Handler to 'own' the Result Chan.
 func NewHandler() Handler {
        return Handler{ResultChannel: make(chan Result), MultipleSpaceRegex: 
regexp.MustCompile(" +")}
 }
@@ -52,12 +53,13 @@ type PrecomputedData struct {
 
 // Result is the data result returned by a cache.
 type Result struct {
-       ID        enum.CacheName
-       Available bool
-       Error     error
-       Astats    Astats
-       Time      time.Time
-       Vitals    Vitals
+       ID          enum.CacheName
+       Available   bool
+       Error       error
+       Astats      Astats
+       Time        time.Time
+       RequestTime time.Duration
+       Vitals      Vitals
        PrecomputedData
        PollID       uint64
        PollFinished chan<- uint64
@@ -135,11 +137,12 @@ func StatsMarshall(statHistory 
map[enum.CacheName][]Result, filter Filter, param
 }
 
 // Handle handles results fetched from a cache, parsing the raw Reader data 
and passing it along to a chan for further processing.
-func (handler Handler) Handle(id string, r io.Reader, err error, pollID 
uint64, pollFinished chan<- uint64) {
+func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, 
err error, pollID uint64, pollFinished chan<- uint64) {
        log.Debugf("poll %v %v handle start\n", pollID, time.Now())
        result := Result{
                ID:           enum.CacheName(id),
                Time:         time.Now(), // TODO change this to be computed 
the instant we get the result back, to minimise inaccuracy
+               RequestTime:  reqTime,
                PollID:       pollID,
                PollFinished: pollFinished,
        }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
----------------------------------------------------------------------
diff --git 
a/traffic_monitor/experimental/traffic_monitor/health/cache_health.go 
b/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
index 4147418..1106f62 100644
--- a/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
+++ b/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
@@ -1,13 +1,15 @@
 package health
 
 import (
-       
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
-       
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
-       traffic_ops 
"github.com/apache/incubator-trafficcontrol/traffic_ops/client"
-
        "fmt"
+       "math"
        "strconv"
        "strings"
+       "time"
+
+       
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
+       
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+       traffic_ops 
"github.com/apache/incubator-trafficcontrol/traffic_ops/client"
 )
 
 func setError(newResult *cache.Result, err error) {
@@ -73,9 +75,34 @@ func GetVitals(newResult *cache.Result, prevResult 
*cache.Result, mc *traffic_op
        // log.Infoln(newResult.Id, "BytesOut", newResult.Vitals.BytesOut, 
"BytesIn", newResult.Vitals.BytesIn, "Kbps", newResult.Vitals.KbpsOut, "max", 
newResult.Vitals.MaxKbpsOut)
 }
 
+// getKbpsThreshold returns the numeric kbps threshold, from the Traffic Ops 
string value. If there is a parse error, it logs a warning and returns the max 
floating point number, signifying no limit
+// TODO add float64 to Traffic Ops Client interface
+func getKbpsThreshold(threshStr string) int64 {
+       if len(threshStr) == 0 {
+               log.Errorf("Empty Traffic Ops 
HealthThresholdAvailableBandwidthInKbps; setting no limit.\n")
+               return math.MaxInt64
+       }
+       if threshStr[0] == '>' {
+               threshStr = threshStr[1:]
+       }
+       thresh, err := strconv.ParseInt(threshStr, 10, 64)
+       if err != nil {
+               log.Errorf("Failed to parse Traffic Ops 
HealthThresholdAvailableBandwidthInKbps, setting no limit: '%v'\n", err)
+               return math.MaxInt64
+       }
+       return thresh
+}
+
+// TODO add time.Duration to Traffic Ops Client interface
+func getQueryThreshold(threshInt int64) time.Duration {
+       return time.Duration(threshInt) * time.Millisecond
+}
+
 // EvalCache returns whether the given cache should be marked available, and a 
string describing why
 func EvalCache(result cache.Result, mc *traffic_ops.TrafficMonitorConfigMap) 
(bool, string) {
-       status := mc.TrafficServer[string(result.ID)].Status
+       toServer := mc.TrafficServer[string(result.ID)]
+       status := toServer.Status
+       params := mc.Profile[toServer.Profile].Parameters
        switch {
        case status == "ADMIN_DOWN":
                return false, "set to ADMIN_DOWN"
@@ -85,10 +112,12 @@ func EvalCache(result cache.Result, mc 
*traffic_ops.TrafficMonitorConfigMap) (bo
                return true, "set to ONLINE"
        case result.Error != nil:
                return false, fmt.Sprintf("error: %v", result.Error)
-       case result.Vitals.LoadAvg > 
mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HealthThresholdLoadAvg:
-               return false, fmt.Sprintf("load average %f exceeds threshold 
%f", result.Vitals.LoadAvg, 
mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HealthThresholdLoadAvg)
-       case result.Vitals.MaxKbpsOut < result.Vitals.KbpsOut:
-               return false, fmt.Sprintf("%dkbps exceeds max %dkbps", 
result.Vitals.KbpsOut, result.Vitals.MaxKbpsOut)
+       case result.Vitals.LoadAvg > params.HealthThresholdLoadAvg:
+               return false, fmt.Sprintf("load average %f exceeds threshold 
%f", result.Vitals.LoadAvg, params.HealthThresholdLoadAvg)
+       case result.Vitals.KbpsOut >= 
getKbpsThreshold(params.HealthThresholdAvailableBandwidthInKbps):
+               return false, fmt.Sprintf("%dkbps exceeds max %dkbps", 
result.Vitals.KbpsOut, 
getKbpsThreshold(params.HealthThresholdAvailableBandwidthInKbps))
+       case result.RequestTime > 
getQueryThreshold(int64(params.HealthThresholdQueryTime)):
+               return false, fmt.Sprintf("request time %v exceeds max %v", 
result.RequestTime, getQueryThreshold(int64(params.HealthThresholdQueryTime)))
        default:
                return result.Available, "reported"
        }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git 
a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go 
b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index de069b3..42fe82c 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -203,7 +203,8 @@ func processHealthResult(
                        health.GetVitals(&healthResult, &prevResult, 
&monitorConfigCopy)
                }
 
-               healthHistory[healthResult.ID] = 
pruneHistory(append(healthHistory[healthResult.ID], healthResult), 
cfg.MaxHealthHistory)
+               maxHistory := 
uint64(monitorConfigCopy.Profile[monitorConfigCopy.TrafficServer[string(healthResult.ID)].Profile].Parameters.HistoryCount)
+               healthHistory[healthResult.ID] = 
pruneHistory(append(healthHistory[healthResult.ID], healthResult), maxHistory)
 
                isAvailable, whyAvailable := health.EvalCache(healthResult, 
&monitorConfigCopy)
                if localStates.Get().Caches[healthResult.ID].IsAvailable != 
isAvailable {

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/manager.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go 
b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
index c362b33..36e47c6 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
@@ -42,7 +42,6 @@ func Start(opsConfigFile string, cfg config.Config, 
staticAppData StaticAppData)
 
        // TODO investigate whether a unique client per cache to be polled is 
faster
        sharedClient := &http.Client{
-               Timeout:   cfg.HTTPTimeout,
                Transport: &http.Transport{TLSClientConfig: 
&tls.Config{InsecureSkipVerify: true}},
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
----------------------------------------------------------------------
diff --git 
a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go 
b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
index dc8ca41..e3ba337 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
@@ -2,14 +2,16 @@ package manager
 
 import (
        "fmt"
+       "strings"
+       "sync"
+       "time"
+
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/poller"
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/config"
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
        to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
-       "strings"
-       "sync"
 )
 
 // CopyTrafficMonitorConfigMap returns a deep copy of the given 
TrafficMonitorConfigMap
@@ -92,6 +94,12 @@ func StartMonitorConfigManager(
        return monitorConfig
 }
 
+// trafficOpsHealthConnectionTimeoutToDuration takes the int from Traffic Ops, 
which is in milliseconds, and returns a time.Duration
+// TODO change Traffic Ops Client API to a time.Duration
+func trafficOpsHealthConnectionTimeoutToDuration(t int) time.Duration {
+       return time.Duration(t) * time.Millisecond
+}
+
 // TODO timing, and determine if the case, or its internal `for`, should be 
put in a goroutine
 // TODO determine if subscribers take action on change, and change to mutexed 
objects if not.
 func monitorConfigListen(
@@ -107,9 +115,9 @@ func monitorConfigListen(
 ) {
        for monitorConfig := range monitorConfigPollChan {
                monitorConfigTS.Set(monitorConfig)
-               healthUrls := map[string]string{}
-               statUrls := map[string]string{}
-               peerUrls := map[string]string{}
+               healthUrls := map[string]poller.PollConfig{}
+               statUrls := map[string]poller.PollConfig{}
+               peerUrls := map[string]poller.PollConfig{}
                caches := map[string]string{}
 
                for _, srv := range monitorConfig.TrafficServer {
@@ -137,10 +145,12 @@ func monitorConfigListen(
                                "application=", "application=plugin.remap",
                        )
                        url = r.Replace(url)
-                       healthUrls[srv.HostName] = url
+
+                       connTimeout := 
trafficOpsHealthConnectionTimeoutToDuration(monitorConfig.Profile[srv.Profile].Parameters.HealthConnectionTimeout)
+                       healthUrls[srv.HostName] = poller.PollConfig{URL: url, 
Timeout: connTimeout}
                        r = strings.NewReplacer("application=plugin.remap", 
"application=")
-                       url = r.Replace(url)
-                       statUrls[srv.HostName] = url
+                       statUrl := r.Replace(url)
+                       statUrls[srv.HostName] = poller.PollConfig{URL: 
statUrl, Timeout: connTimeout}
                }
 
                for _, srv := range monitorConfig.TrafficMonitor {
@@ -152,7 +162,7 @@ func monitorConfigListen(
                        }
                        // TODO: the URL should be config driven. -jse
                        url := fmt.Sprintf("http://%s:%d/publish/CrStates?raw";, 
srv.IP, srv.Port)
-                       peerUrls[srv.HostName] = url
+                       peerUrls[srv.HostName] = poller.PollConfig{URL: url} // 
TODO determine timeout.
                }
 
                statURLSubscriber <- poller.HttpPollerConfig{Urls: statUrls, 
Interval: cfg.CacheStatPollingInterval}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
----------------------------------------------------------------------
diff --git 
a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go 
b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
index 7af7e2c..b3fc407 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
@@ -11,6 +11,7 @@ import (
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
        todata 
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
+       to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
 )
 
 // StatHistory is a map of cache names, to an array of result history from 
each cache.
@@ -37,18 +38,12 @@ func (a StatHistory) Copy() StatHistory {
 type StatHistoryThreadsafe struct {
        statHistory *StatHistory
        m           *sync.RWMutex
-       max         uint64
-}
-
-// Max returns the max history to be stored for any cache
-func (h StatHistoryThreadsafe) Max() uint64 {
-       return h.max
 }
 
 // NewStatHistoryThreadsafe returns a new StatHistory safe for multiple 
readers and a single writer.
-func NewStatHistoryThreadsafe(maxHistory uint64) StatHistoryThreadsafe {
+func NewStatHistoryThreadsafe() StatHistoryThreadsafe {
        h := StatHistory{}
-       return StatHistoryThreadsafe{m: &sync.RWMutex{}, statHistory: &h, max: 
maxHistory}
+       return StatHistoryThreadsafe{m: &sync.RWMutex{}, statHistory: &h}
 }
 
 // Get returns the StatHistory. Callers MUST NOT modify. If mutation is 
necessary, call StatHistory.Copy()
@@ -99,7 +94,7 @@ func StartStatHistoryManager(
        cfg config.Config,
        monitorConfig TrafficMonitorConfigMapThreadsafe,
 ) (StatHistoryThreadsafe, DurationMapThreadsafe, LastStatsThreadsafe, 
DSStatsReader, UnpolledCachesThreadsafe) {
-       statHistory := NewStatHistoryThreadsafe(cfg.MaxStatHistory)
+       statHistory := NewStatHistoryThreadsafe()
        lastStatDurations := NewDurationMapThreadsafe()
        lastStatEndTimes := map[enum.CacheName]time.Time{}
        lastStats := NewLastStatsThreadsafe()
@@ -122,14 +117,14 @@ func StartStatHistoryManager(
                                        
unpolledCaches.SetNewCaches(getNewCaches(localStates, monitorConfig))
                                case <-tick:
                                        log.Warnf("StatHistoryManager flushing 
queued results\n")
-                                       processStatResults(results, 
statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, 
dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches)
+                                       processStatResults(results, 
statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, 
dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches, 
monitorConfig.Get())
                                        break innerLoop
                                default:
                                        select {
                                        case r := <-cacheStatChan:
                                                results = append(results, r)
                                        default:
-                                               processStatResults(results, 
statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, 
dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches)
+                                               processStatResults(results, 
statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, 
dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches, 
monitorConfig.Get())
                                                break innerLoop
                                        }
                                }
@@ -151,10 +146,11 @@ func processStatResults(
        lastStatEndTimes map[enum.CacheName]time.Time,
        lastStatDurationsThreadsafe DurationMapThreadsafe,
        unpolledCaches UnpolledCachesThreadsafe,
+       mc to.TrafficMonitorConfigMap,
 ) {
        statHistory := statHistoryThreadsafe.Get().Copy()
-       maxStats := statHistoryThreadsafe.Max()
        for _, result := range results {
+               maxStats := 
uint64(mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HistoryCount)
                // TODO determine if we want to add results with errors, or 
just print the errors now and don't add them.
                statHistory[result.ID] = 
pruneHistory(append(statHistory[result.ID], result), maxStats)
        }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/peer/peer.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/peer/peer.go 
b/traffic_monitor/experimental/traffic_monitor/peer/peer.go
index b6e9873..01d5c52 100644
--- a/traffic_monitor/experimental/traffic_monitor/peer/peer.go
+++ b/traffic_monitor/experimental/traffic_monitor/peer/peer.go
@@ -3,6 +3,7 @@ package peer
 import (
        "encoding/json"
        "io"
+       "time"
 
        
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
 )
@@ -29,7 +30,7 @@ type Result struct {
 }
 
 // Handle handles a response from a polled Traffic Monitor peer, parsing the 
data and forwarding it to the ResultChannel.
-func (handler Handler) Handle(id string, r io.Reader, err error, pollID 
uint64, pollFinished chan<- uint64) {
+func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, 
err error, pollID uint64, pollFinished chan<- uint64) {
        result := Result{
                ID:           enum.TrafficMonitorName(id),
                Available:    false,

Reply via email to