acelyc111 commented on code in PR #1466: URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1195932406
########## collector/metrics/metric_collector.go: ########## @@ -0,0 +1,581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metrics + +import ( + "errors" + "fmt" + "io/ioutil" + "math" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/tidwall/gjson" + "gopkg.in/tomb.v2" +) + +const ( + MetaServer int = 0 + ReplicaServer int = 1 +) + +type Metric struct { + name string + // For metric type for counter/gauge. + value float64 + // For metric type of percentile. + values []float64 + mtype string +} + +type Metrics []Metric + +var GaugeMetricsMap map[string]prometheus.GaugeVec +var CounterMetricsMap map[string]prometheus.CounterVec +var SummaryMetricsMap map[string]prometheus.Summary + +// DataSource 0 meta server, 1 replica server. +var DataSource int +var RoleByDataSource map[int]string + +var TableNameByID map[string]string + +type MetricCollector interface { + Start(tom *tomb.Tomb) error +} + +func NewMetricCollector( + dataSource int, + detectInterval time.Duration, + detectTimeout time.Duration) MetricCollector { + DataSource = dataSource + GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128) + CounterMetricsMap = make(map[string]prometheus.CounterVec, 128) + SummaryMetricsMap = make(map[string]prometheus.Summary, 128) + RoleByDataSource = make(map[int]string, 128) + TableNameByID = make(map[string]string, 128) + RoleByDataSource[0] = "meta_server" + RoleByDataSource[1] = "replica_server" + initMetrics() + + return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout} +} + +type Collector struct { + detectInterval time.Duration + detectTimeout time.Duration +} + +func (collector *Collector) Start(tom *tomb.Tomb) error { + ticker := time.NewTicker(collector.detectInterval) + for { + select { + case <-tom.Dying(): + return nil + case <-ticker.C: + updateClusterTableInfo() + processAllMetaServerMetrics() + } + } +} + +// Get replica server address. +func getReplicaAddrs() ([]string, error) { + addrs := viper.GetStringSlice("meta_servers") + var rserverAddrs []string + url := fmt.Sprintf("http://%s/meta/nodes", addrs[0]) Review Comment: If there are multiple meta servers, it's needed to attempt to fetch info from other meta servers if the first one failed. ########## collector/metrics/metric_collector.go: ########## @@ -0,0 +1,581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metrics + +import ( + "errors" + "fmt" + "io/ioutil" + "math" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/tidwall/gjson" + "gopkg.in/tomb.v2" +) + +const ( + MetaServer int = 0 + ReplicaServer int = 1 +) + +type Metric struct { + name string + // For metric type for counter/gauge. + value float64 + // For metric type of percentile. + values []float64 + mtype string +} + +type Metrics []Metric + +var GaugeMetricsMap map[string]prometheus.GaugeVec +var CounterMetricsMap map[string]prometheus.CounterVec +var SummaryMetricsMap map[string]prometheus.Summary + +// DataSource 0 meta server, 1 replica server. +var DataSource int +var RoleByDataSource map[int]string + +var TableNameByID map[string]string + +type MetricCollector interface { + Start(tom *tomb.Tomb) error +} + +func NewMetricCollector( + dataSource int, + detectInterval time.Duration, + detectTimeout time.Duration) MetricCollector { + DataSource = dataSource + GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128) + CounterMetricsMap = make(map[string]prometheus.CounterVec, 128) + SummaryMetricsMap = make(map[string]prometheus.Summary, 128) + RoleByDataSource = make(map[int]string, 128) + TableNameByID = make(map[string]string, 128) + RoleByDataSource[0] = "meta_server" + RoleByDataSource[1] = "replica_server" + initMetrics() + + return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout} +} + +type Collector struct { + detectInterval time.Duration + detectTimeout time.Duration +} + +func (collector *Collector) Start(tom *tomb.Tomb) error { + ticker := time.NewTicker(collector.detectInterval) + for { + select { + case <-tom.Dying(): + return nil + case <-ticker.C: + updateClusterTableInfo() + processAllMetaServerMetrics() + } + } +} + +// Get replica server address. +func getReplicaAddrs() ([]string, error) { + addrs := viper.GetStringSlice("meta_servers") + var rserverAddrs []string + url := fmt.Sprintf("http://%s/meta/nodes", addrs[0]) + resp, err := http.Get(url) + if err == nil && resp.StatusCode != http.StatusOK { + err = errors.New(resp.Status) + } + if err != nil { + log.Errorf("Fail to get replica server address from %s, err %s", addrs[0], err) + return rserverAddrs, err + } + body, _ := ioutil.ReadAll(resp.Body) + jsonData := gjson.Parse(string(body)) + for key := range jsonData.Get("details").Map() { + rserverAddrs = append(rserverAddrs, key) + } + defer resp.Body.Close() + return rserverAddrs, nil +} + +// Register all metrics. +func initMetrics() { + var addrs []string + var err error + if DataSource == MetaServer { + addrs = viper.GetStringSlice("meta_servers") + } else { + addrs, err = getReplicaAddrs() + if err != nil { + log.Errorf("Get replica server address failed, err: %s", err) + return + } + } + for _, addr := range addrs { + data, err := getOneServerMetrics(addr) + if err != nil { + log.Errorf("Get raw metrics from %s failed, err: %s", addr, err) + return + } + jsonData := gjson.Parse(data) + for _, entity := range jsonData.Array() { + for _, metric := range entity.Get("metrics").Array() { + var name string = metric.Get("name").String() + var mtype string = metric.Get("type").String() + var desc string = metric.Get("desc").String() + switch mtype { + case "counter": + if _, ok := CounterMetricsMap[name]; ok { + continue + } + counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{ + Name: name, + Help: desc, + }, []string{"endpoint", "role", "level", "title"}) + CounterMetricsMap[name] = *counterMetric + case "gauge": + if _, ok := GaugeMetricsMap[name]; ok { + continue + } + gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: desc, + }, []string{"endpoint", "role", "level", "title"}) + GaugeMetricsMap[name] = *gaugeMetric + case "percentile": + if _, ok := SummaryMetricsMap[name]; ok { + continue + } + summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{ + Name: name, + Help: desc, + Objectives: map[float64]float64{ + 0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001}, + }) + SummaryMetricsMap[name] = summaryMetric + case "histogram": + default: + log.Errorf("Unsupport metric type %s", mtype) + } + } + } + } +} + +// Parse metric data and update metrics. +func processAllMetaServerMetrics() { + var addrs []string + var err error + if DataSource == MetaServer { + addrs = viper.GetStringSlice("meta_servers") + } else { + addrs, err = getReplicaAddrs() + if err != nil { + log.Errorf("Get replica server address failed, err: %s", err) + return + } + } + metricsByTableID := make(map[string]Metrics, 128) + metricsByServerTableID := make(map[string]Metrics, 128) + var metricsOfCluster []Metric + metricsByAddr := make(map[string]Metrics, 128) + for _, addr := range addrs { + data, err := getOneServerMetrics(addr) + if err != nil { + log.Errorf("failed to get data from %s, err %s", addr, err) + return + } + jsonData := gjson.Parse(data) + for _, entity := range jsonData.Array() { + etype := entity.Get("type").String() + switch etype { + case "replica": + case "partition": + tableID := entity.Get("attributes").Get("table_id").String() + mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(), + tableID, &metricsByTableID) + case "table": + tableID := entity.Get("attributes").Get("table_id").String() + mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(), + tableID, &metricsByTableID) + collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID, + &metricsByServerTableID) + updateServerLevelTableMetrics(addr, metricsByServerTableID) + case "server": + mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(), + metricsOfCluster) + collectServerLevelServerMetrics(entity.Get("metrics").Array(), + addr, &metricsByAddr) + default: + log.Errorf("Unsupport entity type %s", etype) + } + } + } + + updateClusterLevelTableMetrics(metricsByTableID) + updateServerLevelServerMetrics(metricsByAddr) + updateClusterLevelMetrics(metricsOfCluster) +} + +// Update table metrics. They belong to a specified server. +func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) { + role := RoleByDataSource[DataSource] + for tableID, metrics := range metricsByServerTableID { + var tableName string + if name, ok := TableNameByID[tableID]; !ok { + tableName = tableID Review Comment: Is there any case it return not ok? ########## collector/metrics/metric_collector.go: ########## @@ -0,0 +1,581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metrics + +import ( + "errors" + "fmt" + "io/ioutil" + "math" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/tidwall/gjson" + "gopkg.in/tomb.v2" +) + +const ( + MetaServer int = 0 + ReplicaServer int = 1 +) + +type Metric struct { + name string + // For metric type for counter/gauge. + value float64 + // For metric type of percentile. + values []float64 + mtype string +} + +type Metrics []Metric + +var GaugeMetricsMap map[string]prometheus.GaugeVec +var CounterMetricsMap map[string]prometheus.CounterVec +var SummaryMetricsMap map[string]prometheus.Summary + +// DataSource 0 meta server, 1 replica server. +var DataSource int +var RoleByDataSource map[int]string + +var TableNameByID map[string]string + +type MetricCollector interface { + Start(tom *tomb.Tomb) error +} + +func NewMetricCollector( + dataSource int, + detectInterval time.Duration, + detectTimeout time.Duration) MetricCollector { + DataSource = dataSource + GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128) + CounterMetricsMap = make(map[string]prometheus.CounterVec, 128) + SummaryMetricsMap = make(map[string]prometheus.Summary, 128) + RoleByDataSource = make(map[int]string, 128) + TableNameByID = make(map[string]string, 128) + RoleByDataSource[0] = "meta_server" + RoleByDataSource[1] = "replica_server" + initMetrics() + + return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout} +} + +type Collector struct { + detectInterval time.Duration + detectTimeout time.Duration +} + +func (collector *Collector) Start(tom *tomb.Tomb) error { + ticker := time.NewTicker(collector.detectInterval) + for { + select { + case <-tom.Dying(): + return nil + case <-ticker.C: + updateClusterTableInfo() + processAllMetaServerMetrics() + } + } +} + +// Get replica server address. +func getReplicaAddrs() ([]string, error) { + addrs := viper.GetStringSlice("meta_servers") + var rserverAddrs []string + url := fmt.Sprintf("http://%s/meta/nodes", addrs[0]) + resp, err := http.Get(url) + if err == nil && resp.StatusCode != http.StatusOK { + err = errors.New(resp.Status) + } + if err != nil { + log.Errorf("Fail to get replica server address from %s, err %s", addrs[0], err) + return rserverAddrs, err + } + body, _ := ioutil.ReadAll(resp.Body) + jsonData := gjson.Parse(string(body)) + for key := range jsonData.Get("details").Map() { + rserverAddrs = append(rserverAddrs, key) + } + defer resp.Body.Close() + return rserverAddrs, nil +} + +// Register all metrics. +func initMetrics() { + var addrs []string + var err error + if DataSource == MetaServer { + addrs = viper.GetStringSlice("meta_servers") + } else { + addrs, err = getReplicaAddrs() + if err != nil { + log.Errorf("Get replica server address failed, err: %s", err) + return + } + } + for _, addr := range addrs { + data, err := getOneServerMetrics(addr) + if err != nil { + log.Errorf("Get raw metrics from %s failed, err: %s", addr, err) + return + } + jsonData := gjson.Parse(data) + for _, entity := range jsonData.Array() { + for _, metric := range entity.Get("metrics").Array() { + var name string = metric.Get("name").String() + var mtype string = metric.Get("type").String() + var desc string = metric.Get("desc").String() + switch mtype { + case "counter": + if _, ok := CounterMetricsMap[name]; ok { + continue + } + counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{ + Name: name, + Help: desc, + }, []string{"endpoint", "role", "level", "title"}) + CounterMetricsMap[name] = *counterMetric + case "gauge": + if _, ok := GaugeMetricsMap[name]; ok { + continue + } + gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: desc, + }, []string{"endpoint", "role", "level", "title"}) + GaugeMetricsMap[name] = *gaugeMetric + case "percentile": + if _, ok := SummaryMetricsMap[name]; ok { + continue + } + summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{ + Name: name, + Help: desc, + Objectives: map[float64]float64{ + 0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001}, + }) + SummaryMetricsMap[name] = summaryMetric + case "histogram": + default: + log.Errorf("Unsupport metric type %s", mtype) + } + } + } + } +} + +// Parse metric data and update metrics. +func processAllMetaServerMetrics() { + var addrs []string + var err error + if DataSource == MetaServer { + addrs = viper.GetStringSlice("meta_servers") + } else { + addrs, err = getReplicaAddrs() Review Comment: This function processes both meta server and replica server, but named `processAllMetaServerMetrics`? ########## collector/metrics/metric_collector.go: ########## @@ -0,0 +1,581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metrics + +import ( + "errors" + "fmt" + "io/ioutil" + "math" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/tidwall/gjson" + "gopkg.in/tomb.v2" +) + +const ( + MetaServer int = 0 + ReplicaServer int = 1 +) + +type Metric struct { + name string + // For metric type for counter/gauge. + value float64 + // For metric type of percentile. + values []float64 + mtype string +} + +type Metrics []Metric + +var GaugeMetricsMap map[string]prometheus.GaugeVec +var CounterMetricsMap map[string]prometheus.CounterVec +var SummaryMetricsMap map[string]prometheus.Summary + +// DataSource 0 meta server, 1 replica server. +var DataSource int +var RoleByDataSource map[int]string + +var TableNameByID map[string]string + +type MetricCollector interface { + Start(tom *tomb.Tomb) error +} + +func NewMetricCollector( + dataSource int, + detectInterval time.Duration, + detectTimeout time.Duration) MetricCollector { + DataSource = dataSource + GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128) + CounterMetricsMap = make(map[string]prometheus.CounterVec, 128) + SummaryMetricsMap = make(map[string]prometheus.Summary, 128) + RoleByDataSource = make(map[int]string, 128) + TableNameByID = make(map[string]string, 128) + RoleByDataSource[0] = "meta_server" + RoleByDataSource[1] = "replica_server" + initMetrics() + + return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout} +} + +type Collector struct { + detectInterval time.Duration + detectTimeout time.Duration +} + +func (collector *Collector) Start(tom *tomb.Tomb) error { + ticker := time.NewTicker(collector.detectInterval) + for { + select { + case <-tom.Dying(): + return nil + case <-ticker.C: + updateClusterTableInfo() + processAllMetaServerMetrics() + } + } +} + +// Get replica server address. +func getReplicaAddrs() ([]string, error) { + addrs := viper.GetStringSlice("meta_servers") + var rserverAddrs []string + url := fmt.Sprintf("http://%s/meta/nodes", addrs[0]) + resp, err := http.Get(url) + if err == nil && resp.StatusCode != http.StatusOK { + err = errors.New(resp.Status) + } + if err != nil { + log.Errorf("Fail to get replica server address from %s, err %s", addrs[0], err) + return rserverAddrs, err + } + body, _ := ioutil.ReadAll(resp.Body) + jsonData := gjson.Parse(string(body)) + for key := range jsonData.Get("details").Map() { + rserverAddrs = append(rserverAddrs, key) + } + defer resp.Body.Close() + return rserverAddrs, nil +} + +// Register all metrics. +func initMetrics() { + var addrs []string + var err error + if DataSource == MetaServer { + addrs = viper.GetStringSlice("meta_servers") + } else { + addrs, err = getReplicaAddrs() + if err != nil { + log.Errorf("Get replica server address failed, err: %s", err) + return + } + } + for _, addr := range addrs { + data, err := getOneServerMetrics(addr) + if err != nil { + log.Errorf("Get raw metrics from %s failed, err: %s", addr, err) + return + } + jsonData := gjson.Parse(data) + for _, entity := range jsonData.Array() { + for _, metric := range entity.Get("metrics").Array() { + var name string = metric.Get("name").String() + var mtype string = metric.Get("type").String() + var desc string = metric.Get("desc").String() + switch mtype { + case "counter": + if _, ok := CounterMetricsMap[name]; ok { + continue + } + counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{ + Name: name, + Help: desc, + }, []string{"endpoint", "role", "level", "title"}) + CounterMetricsMap[name] = *counterMetric + case "gauge": + if _, ok := GaugeMetricsMap[name]; ok { + continue + } + gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: desc, + }, []string{"endpoint", "role", "level", "title"}) + GaugeMetricsMap[name] = *gaugeMetric + case "percentile": + if _, ok := SummaryMetricsMap[name]; ok { + continue + } + summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{ + Name: name, + Help: desc, + Objectives: map[float64]float64{ + 0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001}, + }) + SummaryMetricsMap[name] = summaryMetric + case "histogram": + default: + log.Errorf("Unsupport metric type %s", mtype) + } + } + } + } +} + +// Parse metric data and update metrics. +func processAllMetaServerMetrics() { + var addrs []string + var err error + if DataSource == MetaServer { + addrs = viper.GetStringSlice("meta_servers") + } else { + addrs, err = getReplicaAddrs() + if err != nil { + log.Errorf("Get replica server address failed, err: %s", err) + return + } + } + metricsByTableID := make(map[string]Metrics, 128) + metricsByServerTableID := make(map[string]Metrics, 128) + var metricsOfCluster []Metric + metricsByAddr := make(map[string]Metrics, 128) + for _, addr := range addrs { + data, err := getOneServerMetrics(addr) + if err != nil { + log.Errorf("failed to get data from %s, err %s", addr, err) + return + } + jsonData := gjson.Parse(data) + for _, entity := range jsonData.Array() { + etype := entity.Get("type").String() + switch etype { + case "replica": + case "partition": + tableID := entity.Get("attributes").Get("table_id").String() + mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(), + tableID, &metricsByTableID) + case "table": + tableID := entity.Get("attributes").Get("table_id").String() + mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(), + tableID, &metricsByTableID) + collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID, + &metricsByServerTableID) + updateServerLevelTableMetrics(addr, metricsByServerTableID) + case "server": + mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(), + metricsOfCluster) + collectServerLevelServerMetrics(entity.Get("metrics").Array(), + addr, &metricsByAddr) + default: + log.Errorf("Unsupport entity type %s", etype) + } + } + } + + updateClusterLevelTableMetrics(metricsByTableID) + updateServerLevelServerMetrics(metricsByAddr) + updateClusterLevelMetrics(metricsOfCluster) +} + +// Update table metrics. They belong to a specified server. +func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) { + role := RoleByDataSource[DataSource] + for tableID, metrics := range metricsByServerTableID { + var tableName string + if name, ok := TableNameByID[tableID]; !ok { + tableName = tableID + } else { + tableName = name + } + for _, metric := range metrics { + switch metric.mtype { + case "counter": + if counter, ok := CounterMetricsMap[metric.name]; ok { + counter.With( + prometheus.Labels{"endpoint": addr, + "role": role, "level": "server", + "title": tableName}).Add(float64(metric.value)) + } else { + log.Warnf("Unknown metric name %s", metric.name) + } + case "gauge": + if gauge, ok := GaugeMetricsMap[metric.name]; ok { + gauge.With( + prometheus.Labels{"endpoint": addr, + "role": role, "level": "server", + "title": tableName}).Set(float64(metric.value)) + } else { + log.Warnf("Unknown metric name %s", metric.name) + } + case "percentile": + log.Warnf("Todo metric type %s", metric.mtype) + case "histogram": + default: + log.Warnf("Unknown metric type %s", metric.mtype) + } + } + } +} + +// Update server metrics. They belong to a specified server. +func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) { + role := RoleByDataSource[DataSource] + for addr, metrics := range metricsByAddr { + for _, metric := range metrics { + switch metric.mtype { + case "counter": + if counter, ok := CounterMetricsMap[metric.name]; ok { + counter.With( + prometheus.Labels{"endpoint": addr, + "role": role, "level": "server", + "title": "server"}).Add(float64(metric.value)) + } else { + log.Warnf("Unknown metric name %s", metric.name) + } + case "gauge": + if gauge, ok := GaugeMetricsMap[metric.name]; ok { + gauge.With( + prometheus.Labels{"endpoint": addr, + "role": role, "level": "server", + "title": "server"}).Set(float64(metric.value)) + } else { + log.Warnf("Unknown metric name %s", metric.name) + } + case "percentile": + log.Warnf("Todo metric type %s", metric.mtype) + case "histogram": + default: + log.Warnf("Unknown metric type %s", metric.mtype) + } + } Review Comment: Is the code duplicate? Is it able to encapsulate a function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
