acelyc111 commented on code in PR #1466:
URL: 
https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1193509106


##########
collector/avail/detector.go:
##########
@@ -19,106 +19,122 @@ package avail
 
 import (
        "context"
-       "sync/atomic"
+       "math/rand"
        "time"
 
+       "github.com/apache/incubator-pegasus/go-client/admin"
        "github.com/apache/incubator-pegasus/go-client/pegasus"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        log "github.com/sirupsen/logrus"
+       "github.com/spf13/viper"
+       "gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus 
cluster.
 type Detector interface {
-
-       // Start detection until the ctx cancelled. This method will block the 
current thread.
-       Start(ctx context.Context) error
+       Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-       return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+       detectTimeout time.Duration, partitionCount int) Detector {
+       metaServers := viper.GetStringSlice("meta_servers")
+       tableName := 
viper.GetStringMapString("availablity_detect")["table_name"]
+       // Create detect table.
+       adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+       error := adminClient.CreateTable(context.Background(), tableName, 
partitionCount)
+       if error != nil {
+               log.Errorf("Create detect table %s failed, error: %s", 
tableName, error)
+       }
+       pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: 
metaServers})
+       return &pegasusDetector{
+               client:          pegasusClient,
+               detectTableName: tableName,
+               detectInterval:  detectInterval,
+               detectTimeout:   detectTimeout,
+               partitionCount:  partitionCount,
+       }
 }
 
-type pegasusDetector struct {
-       // client reads and writes periodically to a specified table.
-       client      pegasus.Client
-       detectTable pegasus.TableConnector
+var (
+       DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "detect_times",
+               Help: "The times of availability detecting",
+       })
+
+       ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "read_failure_detect_times",
+               Help: "The failure times of read detecting",
+       })
+
+       WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "write_failure_detect_times",
+               Help: "The failure times of write detecting",
+       })
+)
 
-       detectInterval  time.Duration
+type pegasusDetector struct {
+       client          pegasus.Client
+       detectTable     pegasus.TableConnector
        detectTableName string
-
-       // timeout of a single detect
+       detectInterval  time.Duration
+       // timeout of a single detect.
        detectTimeout time.Duration
-
-       detectHashKeys [][]byte
-
-       recentMinuteDetectTimes  uint64
-       recentMinuteFailureTimes uint64
-
-       recentHourDetectTimes  uint64
-       recentHourFailureTimes uint64
-
-       recentDayDetectTimes  uint64
-       recentDayFailureTimes uint64
+       // partition count.
+       partitionCount int
 }
 
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
        var err error
-       ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
-       defer cancel()
-       d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+       // Open the detect table.
+       d.detectTable, err = d.client.OpenTable(context.Background(), 
d.detectTableName)
        if err != nil {
+               log.Errorf("Open detect table %s failed, error: %s", 
d.detectTable, err)
                return err
        }
-
        ticker := time.NewTicker(d.detectInterval)
        for {
                select {
-               case <-rootCtx.Done(): // check if context cancelled
+               case <-tom.Dying():
                        return nil
                case <-ticker.C:
-                       return nil
+                       d.detectPartition()
+                       break
                default:
                }
-
-               // periodically set/get a configured Pegasus table.
-               d.detect(ctx)
        }
 }
 
-func (d *pegasusDetector) detect(rootCtx context.Context) {
-       // TODO(yingchun): doesn't work, just to mute lint errors.
-       d.detectPartition(rootCtx, 1)
-}
-
-func (d *pegasusDetector) detectPartition(rootCtx context.Context, 
partitionIdx int) {
-       d.incrDetectTimes()
+func (d *pegasusDetector) detectPartition() {
+       DetectTimes.Inc()
 
        go func() {
-               ctx, cancel := context.WithTimeout(rootCtx, d.detectTimeout)
+               ctx, cancel := context.WithTimeout(context.Background(), 
d.detectTimeout)

Review Comment:
   How about add some latency metrics for read and write operations as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to