acelyc111 commented on code in PR #1466:
URL:
https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1185891976
##########
collector/avail/detector.go:
##########
@@ -50,75 +72,45 @@ type pegasusDetector struct {
detectTimeout time.Duration
detectHashKeys [][]byte
-
- recentMinuteDetectTimes uint64
- recentMinuteFailureTimes uint64
-
- recentHourDetectTimes uint64
- recentHourFailureTimes uint64
-
- recentDayDetectTimes uint64
- recentDayFailureTimes uint64
}
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
var err error
- ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
- defer cancel()
- d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+ d.detectTable, err = d.client.OpenTable(context.Background(),
d.detectTableName)
if err != nil {
+ log.Error(err)
return err
}
-
ticker := time.NewTicker(d.detectInterval)
for {
select {
- case <-rootCtx.Done(): // check if context cancelled
- return nil
- case <-ticker.C:
- return nil
- default:
+ case <-tom.Dying():
+ return nil
+ case <-ticker.C:
+ d.detectPartition(1)
Review Comment:
Detect only partition 1 is not enough, it's necessary to detect all
partitions. Generate hashkeys distributed in all partitions can resolve it, for
example, generate thousands of random hashkeys on a 16 partitions table is
rough OK IMO.
##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+ "errors"
+ "time"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
+ "github.com/tidwall/gjson"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+ META_SERVER int = 0
+ REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+ name string
+ value int64
+ mtype string
+ desc string
+ unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+ name string
+ value string
+}
+
+type Entity struct {
+ etype string
+ eid string
+ attributes string
+ metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+ Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+ dataSource_ = dataSource
+ gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+ counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+ histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+ summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+ initMetrics()
+ return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+ detectInterval time.Duration
+ detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+ ticker := time.NewTicker(collector.detectInterval)
+ for {
+ select {
+ case <-tom.Dying():
+ return nil
+ case <-ticker.C:
+ processAllMetaServerMetrics()
+ break
+ default:
+ }
+ }
+}
+
+func initMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
Review Comment:
Generally, the replica servers list is not exposed as meta servers list, we
can fetch replica servers list via meta servers instead, though HTTP `curl
localhost:34601/meta/nodes`
Further more, we have to update server list periodically to tolerate the
case of scale in and scale out.
##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+ "errors"
+ "time"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
+ "github.com/tidwall/gjson"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+ META_SERVER int = 0
+ REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+ name string
+ value int64
+ mtype string
+ desc string
+ unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+ name string
+ value string
+}
+
+type Entity struct {
+ etype string
+ eid string
+ attributes string
+ metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+ Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+ dataSource_ = dataSource
+ gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+ counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+ histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+ summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+ initMetrics()
+ return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+ detectInterval time.Duration
+ detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+ ticker := time.NewTicker(collector.detectInterval)
+ for {
+ select {
+ case <-tom.Dying():
+ return nil
+ case <-ticker.C:
+ processAllMetaServerMetrics()
+ break
+ default:
+ }
+ }
+}
+
+func initMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
+ }
+
+ for _, addr := range addrs {
+ data, err := getOneMetaServerMetrics(addr)
+ if err != nil {
+ log.Errorf(fmt.Sprintf("Get raw metrics from %s failed,
err: %s", addr, err))
+ return
+ }
+ json_data := gjson.Parse(data)
+ for _, entity := range json_data.Array() {
+ for _, metric := range entity.Get("metrics").Array() {
+ var name string = metric.Get("name").String()
+ var mtype string = metric.Get("type").String()
+ var desc string =
metric.Get("description").String()
+ log.Errorf("name:%s type:%s desc:%s", name,
mtype, desc)
+ switch mtype {
+ case "counter":
+ if _, ok := counterMetricsMap_[name];
ok {
+ continue
+ }
+ counterMetric :=
promauto.NewCounterVec(prometheus.CounterOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ counterMetricsMap_[name] =
*counterMetric
+ break
+ case "gauge":
+ if _, ok := gaugeMetricsMap_[name]; ok {
+ continue
+ }
+ gaugeMetric :=
promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ gaugeMetricsMap_[name] = *gaugeMetric
+ break
+ case "histogram":
Review Comment:
The new metrics framework dosen't has histogram type of metrics, so it can
be removed.
##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+ "errors"
+ "time"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
+ "github.com/tidwall/gjson"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+ META_SERVER int = 0
+ REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+ name string
+ value int64
+ mtype string
+ desc string
+ unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+ name string
+ value string
+}
+
+type Entity struct {
+ etype string
+ eid string
+ attributes string
+ metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+ Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+ dataSource_ = dataSource
+ gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+ counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+ histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+ summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+ initMetrics()
+ return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+ detectInterval time.Duration
+ detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+ ticker := time.NewTicker(collector.detectInterval)
+ for {
+ select {
+ case <-tom.Dying():
+ return nil
+ case <-ticker.C:
+ processAllMetaServerMetrics()
+ break
+ default:
+ }
+ }
+}
+
+func initMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
+ }
+
+ for _, addr := range addrs {
+ data, err := getOneMetaServerMetrics(addr)
+ if err != nil {
+ log.Errorf(fmt.Sprintf("Get raw metrics from %s failed,
err: %s", addr, err))
+ return
+ }
+ json_data := gjson.Parse(data)
+ for _, entity := range json_data.Array() {
+ for _, metric := range entity.Get("metrics").Array() {
+ var name string = metric.Get("name").String()
+ var mtype string = metric.Get("type").String()
+ var desc string =
metric.Get("description").String()
+ log.Errorf("name:%s type:%s desc:%s", name,
mtype, desc)
+ switch mtype {
+ case "counter":
+ if _, ok := counterMetricsMap_[name];
ok {
+ continue
+ }
+ counterMetric :=
promauto.NewCounterVec(prometheus.CounterOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ counterMetricsMap_[name] =
*counterMetric
+ break
+ case "gauge":
+ if _, ok := gaugeMetricsMap_[name]; ok {
+ continue
+ }
+ gaugeMetric :=
promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ gaugeMetricsMap_[name] = *gaugeMetric
+ break
+ case "histogram":
+ if _, ok := histogramMetricsMap_[name];
ok {
+ continue
+ }
+ histogramMetric :=
promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: name,
+ Help: desc,
+ Buckets:
prometheus.LinearBuckets(-3, .1, 61),
+ })
+ histogramMetricsMap_[name] =
histogramMetric
+ break
+ case "percentile":
+ if _, ok := summaryMetricsMap_[name];
ok {
+ continue
+ }
+ log.Errorf("name:%s", name)
+ summaryMetric :=
promauto.NewSummary(prometheus.SummaryOpts{
+ Name: name,
+ Help: desc,
+ Objectives: map[float64]float64{
+ 0.5: 0.05, 0.9: 0.01,
0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+ })
+ summaryMetricsMap_[name] = summaryMetric
+ break
+ default:
+ log.Errorf("Unsupport metric type %s",
mtype)
+ break
+ }
+ }
+ }
+ }
+}
+
+func processAllMetaServerMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
+ }
+ metric_by_table_id := make(map[string]Metrics, 128)
+ metric_of_cluster := make([]Metric, 128)
+ for _, addr := range addrs {
+ data, err := getOneMetaServerMetrics(addr)
+ if err != nil {
+ log.Errorf("failed to get data from %s, err %s", addr,
err)
+ return
+ }
+ json_data := gjson.Parse(data)
+ for _, entity := range json_data.Array() {
+ etype := entity.Get("type").String()
+ switch etype {
+ case "replica":
+ case "partition":
+ table_id :=
entity.Get("attributes").Get("table_id").String()
+
mergeIntoTableLevel(entity.Get("metrics").Array(), table_id,
&metric_by_table_id)
+ break
+ case "server":
+
mergeIntoClusterLevel(entity.Get("metrics").Array(), metric_of_cluster)
Review Comment:
The raw server level metrics are also needed to sink to Promethues, that is
to say, some metrics are with attributes
```
level: server
endpoint: <host>
```
##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+ "errors"
+ "time"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
+ "github.com/tidwall/gjson"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+ META_SERVER int = 0
+ REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+ name string
+ value int64
+ mtype string
+ desc string
+ unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+ name string
+ value string
+}
+
+type Entity struct {
+ etype string
+ eid string
+ attributes string
+ metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+ Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+ dataSource_ = dataSource
+ gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+ counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+ histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+ summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+ initMetrics()
+ return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+ detectInterval time.Duration
+ detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+ ticker := time.NewTicker(collector.detectInterval)
+ for {
+ select {
+ case <-tom.Dying():
+ return nil
+ case <-ticker.C:
+ processAllMetaServerMetrics()
+ break
+ default:
+ }
+ }
+}
+
+func initMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
+ }
+
+ for _, addr := range addrs {
+ data, err := getOneMetaServerMetrics(addr)
+ if err != nil {
+ log.Errorf(fmt.Sprintf("Get raw metrics from %s failed,
err: %s", addr, err))
+ return
+ }
+ json_data := gjson.Parse(data)
+ for _, entity := range json_data.Array() {
+ for _, metric := range entity.Get("metrics").Array() {
+ var name string = metric.Get("name").String()
+ var mtype string = metric.Get("type").String()
+ var desc string =
metric.Get("description").String()
+ log.Errorf("name:%s type:%s desc:%s", name,
mtype, desc)
+ switch mtype {
+ case "counter":
+ if _, ok := counterMetricsMap_[name];
ok {
+ continue
+ }
+ counterMetric :=
promauto.NewCounterVec(prometheus.CounterOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ counterMetricsMap_[name] =
*counterMetric
+ break
+ case "gauge":
+ if _, ok := gaugeMetricsMap_[name]; ok {
+ continue
+ }
+ gaugeMetric :=
promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ gaugeMetricsMap_[name] = *gaugeMetric
+ break
+ case "histogram":
+ if _, ok := histogramMetricsMap_[name];
ok {
+ continue
+ }
+ histogramMetric :=
promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: name,
+ Help: desc,
+ Buckets:
prometheus.LinearBuckets(-3, .1, 61),
+ })
+ histogramMetricsMap_[name] =
histogramMetric
+ break
+ case "percentile":
+ if _, ok := summaryMetricsMap_[name];
ok {
+ continue
+ }
+ log.Errorf("name:%s", name)
+ summaryMetric :=
promauto.NewSummary(prometheus.SummaryOpts{
+ Name: name,
+ Help: desc,
+ Objectives: map[float64]float64{
+ 0.5: 0.05, 0.9: 0.01,
0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+ })
+ summaryMetricsMap_[name] = summaryMetric
+ break
+ default:
+ log.Errorf("Unsupport metric type %s",
mtype)
+ break
+ }
+ }
+ }
+ }
+}
+
+func processAllMetaServerMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
+ }
+ metric_by_table_id := make(map[string]Metrics, 128)
+ metric_of_cluster := make([]Metric, 128)
+ for _, addr := range addrs {
+ data, err := getOneMetaServerMetrics(addr)
+ if err != nil {
+ log.Errorf("failed to get data from %s, err %s", addr,
err)
+ return
+ }
+ json_data := gjson.Parse(data)
+ for _, entity := range json_data.Array() {
+ etype := entity.Get("type").String()
+ switch etype {
+ case "replica":
+ case "partition":
+ table_id :=
entity.Get("attributes").Get("table_id").String()
+
mergeIntoTableLevel(entity.Get("metrics").Array(), table_id,
&metric_by_table_id)
+ break
+ case "server":
+
mergeIntoClusterLevel(entity.Get("metrics").Array(), metric_of_cluster)
+ break
+ default:
+ log.Errorf("Unsupport entity type %s", etype)
+ }
+ }
+ }
+ // Update table level metrics.
+ for table_id, metrics := range metric_by_table_id {
+ for _, metric := range metrics {
+ switch metric.mtype {
+ case "counter":
+ if counter, ok :=
counterMetricsMap_[metric.name]; ok {
+ counter.With(
+
prometheus.Labels{"level":"table", "endpoint":table_id}).Add(
+
float64(metric.value))
+ } else {
+ log.Warnf("Unknown metric name
%s", metric.name)
+ }
+ break
+ case "gauge":
+ if gauge, ok :=
gaugeMetricsMap_[metric.name]; ok {
+ gauge.With(
+
prometheus.Labels{"level":"table", "endpoint":table_id}).Set(
+
float64(metric.value))
+ } else {
+ log.Warnf("Unknown metric name
%s", metric.name)
+ }
+ break
+ case "histogram":
+ case "percentile":
+ log.Warnf("Unsupport metric type %s",
metric.mtype)
+ break
+ default:
+ log.Warnf("Unknown metric type %s",
metric.mtype)
+ }
+ }
+ }
+ // Update server level metrics
+ for _, metric := range metric_of_cluster {
+ switch metric.mtype {
+ case "counter":
+ if counter, ok :=
counterMetricsMap_[metric.name]; ok {
+ counter.With(
+
prometheus.Labels{"level":"server", "endpoint":"cluster"}).Add(
+ float64(metric.value))
+ } else {
+ log.Warnf("Unknown metric name %s",
metric.name)
+ }
+ break
+ case "gauge":
+ if gauge, ok := gaugeMetricsMap_[metric.name];
ok {
+ gauge.With(
+
prometheus.Labels{"level":"server", "endpoint":"cluster"}).Set(
+ float64(metric.value))
+ } else {
+ log.Warnf("Unknown metric name %s",
metric.name)
+ }
+ break
+ case "histogram":
+ case "percentile":
+ log.Warnf("Unsupport metric type %s",
metric.mtype)
+ break
+ default:
+ log.Warnf("Unknown metric type %s",
metric.mtype)
+ }
+ }
+}
+
+func mergeIntoClusterLevel(metrics []gjson.Result, metric_of_cluster []Metric)
{
+ for _, metric := range metrics {
+ name := metric.Get("name").String()
+ mtype := metric.Get("type").String()
+ value := metric.Get("value").Int()
+ var isExisted bool = false
+ for _, m := range metric_of_cluster {
+ if m.name == name {
+ isExisted = true
+ switch mtype {
+ case "counter":
+ case "gauge":
+ m.value += value
+ break
+ case "histogram":
+ case "percentile":
+ break
+ default:
+ log.Errorf("Unsupport metric type %s",
mtype)
+ }
+ }
+ }
+ if !isExisted {
+ unit := metric.Get("name").String()
+ desc := metric.Get("desc").String()
+ value := metric.Get("value").Int()
+ m := Metric{name:name, mtype:mtype, unit:unit,
desc:desc, value:value}
+ metric_of_cluster = append(metric_of_cluster, m)
+ }
+ }
+}
+
+func mergeIntoTableLevel(metrics []gjson.Result, table_id string,
+ metric_by_table_id
*map[string]Metrics) {
+ // Find a same table id, try to merge them.
+ if _, ok := (*metric_by_table_id)[table_id]; ok {
+ mts := (*metric_by_table_id)[table_id]
+ for _, metric := range metrics {
+ name := metric.Get("name").String()
+ mtype := metric.Get("type").String()
+ value := metric.Get("value").Int()
+ for _, m := range mts {
+ if name == m.name {
+ switch mtype {
+ case "counter":
+ case "gauge":
+ m.value += value
+ break
+ case "histogram":
+ case "percentile":
+ break
+ }
+ }
+ }
+ }
+ } else {
+ var mts Metrics
+ for _, metric := range metrics {
+ name := metric.Get("name").String()
+ mtype := metric.Get("type").String()
+ unit := metric.Get("name").String()
+ desc := metric.Get("desc").String()
+ value := metric.Get("value").Int()
+ m := Metric{name:name, mtype:mtype, unit:unit,
desc:desc, value:value}
+ mts = append(mts, m)
+ }
+ (*metric_by_table_id)[table_id] = mts
+ }
+}
+
+func getOneMetaServerMetrics(addr string) (string, error) {
+ url := fmt.Sprintf("http://%s/metrics?detail=true", addr)
Review Comment:
> detail=true
Fetch details everytime is a waste of bandwith, we can fetch the details at
the first time to initilalize the metrics, then cache the details and fetch
metrics without `details`.
##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+ "errors"
+ "time"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
+ "github.com/tidwall/gjson"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+ META_SERVER int = 0
+ REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+ name string
+ value int64
+ mtype string
+ desc string
+ unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+ name string
+ value string
+}
+
+type Entity struct {
+ etype string
+ eid string
+ attributes string
+ metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+ Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+ dataSource_ = dataSource
+ gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+ counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+ histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+ summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+ initMetrics()
+ return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+ detectInterval time.Duration
+ detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+ ticker := time.NewTicker(collector.detectInterval)
+ for {
+ select {
+ case <-tom.Dying():
+ return nil
+ case <-ticker.C:
+ processAllMetaServerMetrics()
+ break
+ default:
+ }
+ }
+}
+
+func initMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
+ }
+
+ for _, addr := range addrs {
+ data, err := getOneMetaServerMetrics(addr)
+ if err != nil {
+ log.Errorf(fmt.Sprintf("Get raw metrics from %s failed,
err: %s", addr, err))
+ return
+ }
+ json_data := gjson.Parse(data)
+ for _, entity := range json_data.Array() {
+ for _, metric := range entity.Get("metrics").Array() {
+ var name string = metric.Get("name").String()
+ var mtype string = metric.Get("type").String()
+ var desc string =
metric.Get("description").String()
+ log.Errorf("name:%s type:%s desc:%s", name,
mtype, desc)
+ switch mtype {
+ case "counter":
+ if _, ok := counterMetricsMap_[name];
ok {
+ continue
+ }
+ counterMetric :=
promauto.NewCounterVec(prometheus.CounterOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ counterMetricsMap_[name] =
*counterMetric
+ break
+ case "gauge":
+ if _, ok := gaugeMetricsMap_[name]; ok {
+ continue
+ }
+ gaugeMetric :=
promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: name,
+ Help: desc,
+ }, []string{"level", "endpoint"})
+ gaugeMetricsMap_[name] = *gaugeMetric
+ break
+ case "histogram":
+ if _, ok := histogramMetricsMap_[name];
ok {
+ continue
+ }
+ histogramMetric :=
promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: name,
+ Help: desc,
+ Buckets:
prometheus.LinearBuckets(-3, .1, 61),
+ })
+ histogramMetricsMap_[name] =
histogramMetric
+ break
+ case "percentile":
+ if _, ok := summaryMetricsMap_[name];
ok {
+ continue
+ }
+ log.Errorf("name:%s", name)
+ summaryMetric :=
promauto.NewSummary(prometheus.SummaryOpts{
+ Name: name,
+ Help: desc,
+ Objectives: map[float64]float64{
+ 0.5: 0.05, 0.9: 0.01,
0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+ })
+ summaryMetricsMap_[name] = summaryMetric
+ break
+ default:
+ log.Errorf("Unsupport metric type %s",
mtype)
+ break
+ }
+ }
+ }
+ }
+}
+
+func processAllMetaServerMetrics() {
+ var addrs []string
+ if dataSource_ == META_SERVER {
+ addrs = viper.GetStringSlice("meta_servers")
+ } else {
+ addrs = viper.GetStringSlice("replica_servers")
+ }
+ metric_by_table_id := make(map[string]Metrics, 128)
+ metric_of_cluster := make([]Metric, 128)
+ for _, addr := range addrs {
+ data, err := getOneMetaServerMetrics(addr)
+ if err != nil {
+ log.Errorf("failed to get data from %s, err %s", addr,
err)
+ return
+ }
+ json_data := gjson.Parse(data)
+ for _, entity := range json_data.Array() {
+ etype := entity.Get("type").String()
+ switch etype {
+ case "replica":
+ case "partition":
+ table_id :=
entity.Get("attributes").Get("table_id").String()
+
mergeIntoTableLevel(entity.Get("metrics").Array(), table_id,
&metric_by_table_id)
+ break
+ case "server":
+
mergeIntoClusterLevel(entity.Get("metrics").Array(), metric_of_cluster)
+ break
+ default:
+ log.Errorf("Unsupport entity type %s", etype)
+ }
+ }
+ }
+ // Update table level metrics.
+ for table_id, metrics := range metric_by_table_id {
+ for _, metric := range metrics {
+ switch metric.mtype {
+ case "counter":
+ if counter, ok :=
counterMetricsMap_[metric.name]; ok {
+ counter.With(
+
prometheus.Labels{"level":"table", "endpoint":table_id}).Add(
+
float64(metric.value))
+ } else {
+ log.Warnf("Unknown metric name
%s", metric.name)
+ }
+ break
+ case "gauge":
+ if gauge, ok :=
gaugeMetricsMap_[metric.name]; ok {
+ gauge.With(
+
prometheus.Labels{"level":"table", "endpoint":table_id}).Set(
+
float64(metric.value))
+ } else {
+ log.Warnf("Unknown metric name
%s", metric.name)
+ }
+ break
+ case "histogram":
+ case "percentile":
+ log.Warnf("Unsupport metric type %s",
metric.mtype)
+ break
+ default:
+ log.Warnf("Unknown metric type %s",
metric.mtype)
+ }
+ }
+ }
+ // Update server level metrics
Review Comment:
> server level
cluste level?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]