little-cui closed pull request #325: SCB-472 Null point reference in zipkin plugin URL: https://github.com/apache/incubator-servicecomb-service-center/pull/325
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/etc/conf/app.conf b/etc/conf/app.conf index 02786d95..d468dbe2 100644 --- a/etc/conf/app.conf +++ b/etc/conf/app.conf @@ -108,6 +108,12 @@ log_format = text # whether enable record syslog log_sys = false +################################################################### +# Frontend Configurations +################################################################### +frontend_host_ip=127.0.0.1 +frontend_host_port=30103 + ################################################################### # above is the global configurations # you can overide above configuration in specific env @@ -120,9 +126,3 @@ logfile = ./service-center.log [dev] loglevel = DEBUG logfile = "" - -################################################################### -# Frontend Configurations -################################################################### -frontend_host_ip=127.0.0.1 -frontend_host_port=30103 diff --git a/main.go b/main.go index d76e34cb..3d7ac976 100644 --- a/main.go +++ b/main.go @@ -18,8 +18,18 @@ package main // plugins import _ "github.com/apache/incubator-servicecomb-service-center/server/bootstrap" -import "github.com/apache/incubator-servicecomb-service-center/server" +import ( + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "github.com/apache/incubator-servicecomb-service-center/server" + "github.com/apache/incubator-servicecomb-service-center/server/core/backend" +) func main() { server.Run() + + util.GoCloseAndWait() + + backend.Registry().Close() + + util.Logger().Warn("service center exited", nil) } diff --git a/pkg/async/async_task.go b/pkg/async/async_task.go index 14727a17..73ceb384 100644 --- a/pkg/async/async_task.go +++ b/pkg/async/async_task.go @@ -39,6 +39,7 @@ type scheduler struct { queue *util.UniQueue latestTask AsyncTask once sync.Once + goroutine *util.GoRoutine } func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) { @@ -47,7 +48,7 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) { } s.once.Do(func() { - go s.do() + s.goroutine.Do(s.do) }) err = s.queue.Put(ctx, task) @@ -57,15 +58,17 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) { return s.latestTask.Err() } -func (s *scheduler) do() { +func (s *scheduler) do(ctx context.Context) { for { select { + case <-ctx.Done(): + return case task, ok := <-s.queue.Chan(): if !ok { return } at := task.(AsyncTask) - at.Do(context.Background()) + at.Do(ctx) s.latestTask = at } } @@ -73,6 +76,15 @@ func (s *scheduler) do() { func (s *scheduler) Close() { s.queue.Close() + s.goroutine.Close(true) +} + +func newScheduler(task AsyncTask) *scheduler { + return &scheduler{ + queue: util.NewUniQueue(), + latestTask: task, + goroutine: util.NewGo(context.Background()), + } } type AsyncTaskService struct { @@ -99,10 +111,7 @@ func (lat *AsyncTaskService) getOrNewScheduler(task AsyncTask) (s *scheduler, is s, ok = lat.schedules[key] if !ok { isNew = true - s = &scheduler{ - queue: util.NewUniQueue(), - latestTask: task, - } + s = newScheduler(task) lat.schedules[key] = s } lat.lock.Unlock() @@ -166,11 +175,11 @@ func (lat *AsyncTaskService) LatestHandled(key string) (AsyncTask, error) { return s.latestTask, nil } -func (lat *AsyncTaskService) daemon(stopCh <-chan struct{}) { +func (lat *AsyncTaskService) daemon(ctx context.Context) { util.SafeCloseChan(lat.ready) for { select { - case <-stopCh: + case <-ctx.Done(): util.Logger().Debugf("daemon thread exited for AsyncTaskService is stopped") return case <-time.After(DEFAULT_REMOVE_TASKS_INTERVAL): @@ -228,7 +237,7 @@ func NewAsyncTaskService() *AsyncTaskService { return &AsyncTaskService{ schedules: make(map[string]*scheduler, DEFAULT_MAX_SCHEDULE_COUNT), removeTasks: make(map[string]struct{}, DEFAULT_MAX_SCHEDULE_COUNT), - goroutine: util.NewGo(make(chan struct{})), + goroutine: util.NewGo(context.Background()), ready: make(chan struct{}), isClose: true, } diff --git a/pkg/chain/callback.go b/pkg/chain/callback.go index 9b2e7fa2..36577402 100644 --- a/pkg/chain/callback.go +++ b/pkg/chain/callback.go @@ -47,11 +47,7 @@ func (cb *Callback) Invoke(r Result) { } func syncInvoke(f func(r Result), r Result) { - defer func() { - if itf := recover(); itf != nil { - util.LogPanic(itf) - } - }() + defer util.RecoverAndReport() if f == nil { util.Logger().Errorf(nil, "Callback function is nil. result: %s,", r) return diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go index f54ef6f5..d1a35b94 100644 --- a/pkg/etcdsync/mutex.go +++ b/pkg/etcdsync/mutex.go @@ -134,7 +134,8 @@ func (m *DLock) Lock(wait bool) error { util.Logger().Warnf(err, "Key %s is locked, waiting for other node releases it, id=%s", m.builder.key, m.id) ctx, cancel := context.WithTimeout(m.builder.ctx, DEFAULT_LOCK_TTL*time.Second) - go func() { + util.Go(func(context.Context) { + defer cancel() err := backend.Registry().Watch(ctx, registry.WithStrKey(m.builder.key), registry.WithWatchCallback( @@ -146,10 +147,9 @@ func (m *DLock) Lock(wait bool) error { return nil })) if err != nil { - util.Logger().Errorf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id) + util.Logger().Warnf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id) } - cancel() - }() + }) select { case <-ctx.Done(): continue // 可以重新尝试获取锁 diff --git a/pkg/grace/grace.go b/pkg/grace/grace.go index fef6cafd..3ec7cd35 100644 --- a/pkg/grace/grace.go +++ b/pkg/grace/grace.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "golang.org/x/net/context" "os" "os/exec" "os/signal" @@ -71,7 +72,7 @@ func Init() { flag.Parse() } - go handleSignals() + util.Go(handleSignals) } func Before(f func()) { @@ -111,26 +112,28 @@ func fireSignalHook(ppFlag int, sig os.Signal) { } } -func handleSignals() { +func handleSignals(ctx context.Context) { var sig os.Signal sigCh := make(chan os.Signal) signal.Notify(sigCh, registerSignals...) for { - sig = <-sigCh - fireSignalHook(PreSignal, sig) - switch sig { - case syscall.SIGHUP: - util.Logger().Debugf("received signal 'SIGHUP', now forking") - err := fork() - if err != nil { - util.Logger().Errorf(err, "fork a process failed") + select { + case <-ctx.Done(): + return + case sig = <-sigCh: + fireSignalHook(PreSignal, sig) + switch sig { + case syscall.SIGHUP: + util.Logger().Debugf("received signal '%v', now forking", sig) + err := fork() + if err != nil { + util.Logger().Errorf(err, "fork a process failed") + } } - default: - util.Logger().Warnf(nil, "received signal '%v'", sig) + fireSignalHook(PostSignal, sig) } - fireSignalHook(PostSignal, sig) } } diff --git a/pkg/util/goroutines.go b/pkg/util/goroutines.go index a021f52c..bb9b9d99 100644 --- a/pkg/util/goroutines.go +++ b/pkg/util/goroutines.go @@ -16,42 +16,37 @@ */ package util -import "sync" +import ( + "golang.org/x/net/context" + "sync" +) type GoRoutine struct { - stopCh chan struct{} + ctx context.Context + cancel context.CancelFunc wg sync.WaitGroup mux sync.RWMutex - once sync.Once closed bool } -func (g *GoRoutine) Init(stopCh chan struct{}) { - g.once.Do(func() { - g.stopCh = stopCh - }) -} - -func (g *GoRoutine) StopCh() <-chan struct{} { - return g.stopCh -} - -func (g *GoRoutine) Do(f func(<-chan struct{})) { +func (g *GoRoutine) Do(f func(context.Context)) { g.wg.Add(1) go func() { defer g.wg.Done() - f(g.StopCh()) + defer RecoverAndReport() + f(g.ctx) }() } func (g *GoRoutine) Close(wait bool) { g.mux.Lock() defer g.mux.Unlock() + if g.closed { return } g.closed = true - close(g.stopCh) + g.cancel() if wait { g.Wait() } @@ -61,27 +56,26 @@ func (g *GoRoutine) Wait() { g.wg.Wait() } -var defaultGo GoRoutine +var defaultGo *GoRoutine func init() { - GoInit() + defaultGo = NewGo(context.Background()) } -func Go(f func(<-chan struct{})) { +func Go(f func(context.Context)) { defaultGo.Do(f) } -func GoInit() { - defaultGo.Init(make(chan struct{})) -} - func GoCloseAndWait() { defaultGo.Close(true) - Logger().Debugf("all goroutines quit normally") + Logger().Debugf("all goroutines exited") } -func NewGo(stopCh chan struct{}) *GoRoutine { - gr := &GoRoutine{} - gr.Init(stopCh) +func NewGo(ctx context.Context) *GoRoutine { + ctx, cancel := context.WithCancel(ctx) + gr := &GoRoutine{ + ctx: ctx, + cancel: cancel, + } return gr } diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go index cfc09191..d8088142 100644 --- a/pkg/util/goroutines_test.go +++ b/pkg/util/goroutines_test.go @@ -18,72 +18,66 @@ package util import ( "fmt" + "golang.org/x/net/context" "sync" "testing" "time" ) -func TestGoRoutine_Init(t *testing.T) { - var test GoRoutine - stopCh1 := make(chan struct{}) - defer close(stopCh1) - stopCh2 := make(chan struct{}) - defer close(stopCh2) - - test.Init(stopCh1) - c := test.StopCh() - if c != stopCh1 { - fail(t, "init GoRoutine failed.") - } - - test.Init(stopCh2) - c = test.StopCh() - if c == stopCh2 { - fail(t, "init GoRoutine twice.") - } -} - func TestGoRoutine_Do(t *testing.T) { - var test1 GoRoutine - stopCh := make(chan struct{}) - test1.Init(make(chan struct{})) - test1.Do(func(neverStopCh <-chan struct{}) { - defer close(stopCh) + test1 := NewGo(context.Background()) + defer test1.Close(true) + stopCh1 := make(chan struct{}) + test1.Do(func(ctx context.Context) { + defer close(stopCh1) select { - case <-neverStopCh: - fail(t, "neverStopCh should not be closed.") + case <-ctx.Done(): + fail(t, "ctx should not be done.") case <-time.After(time.Second): } }) - <-stopCh + <-stopCh1 - var test2 GoRoutine - stopCh1 := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + test2 := NewGo(ctx) + defer test2.Close(true) stopCh2 := make(chan struct{}) - test2.Init(stopCh1) - test2.Do(func(stopCh <-chan struct{}) { + test2.Do(func(ctx context.Context) { defer close(stopCh2) select { - case <-stopCh: + case <-ctx.Done(): case <-time.After(time.Second): - fail(t, "time out to wait stopCh1 close.") + fail(t, "time out to wait stopCh2 close.") } }) - close(stopCh1) + cancel() <-stopCh2 + + ctx, _ = context.WithTimeout(context.Background(), 0) + test3 := NewGo(ctx) + defer test3.Close(true) + stopCh3 := make(chan struct{}) + test3.Do(func(ctx context.Context) { + defer close(stopCh3) + select { + case <-ctx.Done(): + case <-time.After(time.Second): + fail(t, "time out to wait ctx done.") + } + }) + <-stopCh3 } func TestGoRoutine_Wait(t *testing.T) { - var test GoRoutine var mux sync.Mutex MAX := 10 resultArr := make([]int, 0, MAX) - test.Init(make(chan struct{})) + test := NewGo(context.Background()) for i := 0; i < MAX; i++ { func(i int) { - test.Do(func(neverStopCh <-chan struct{}) { + test.Do(func(ctx context.Context) { select { - case <-neverStopCh: + case <-ctx.Done(): case <-time.After(time.Second): mux.Lock() resultArr = append(resultArr, i) @@ -103,13 +97,12 @@ func TestGoRoutine_Wait(t *testing.T) { } func TestGoRoutine_Close(t *testing.T) { - var test GoRoutine - test.Init(make(chan struct{})) - test.Do(func(stopCh <-chan struct{}) { + test := NewGo(context.Background()) + test.Do(func(ctx context.Context) { select { - case <-stopCh: + case <-ctx.Done(): case <-time.After(time.Second): - fail(t, "time out to wait stopCh close.") + fail(t, "time out to wait ctx close.") } }) test.Close(true) @@ -117,20 +110,18 @@ func TestGoRoutine_Close(t *testing.T) { } func TestGo(t *testing.T) { - GoInit() - Go(func(stopCh <-chan struct{}) { + Go(func(ctx context.Context) { for { select { - case <-stopCh: + case <-ctx.Done(): return case <-time.After(time.Second): } } }) + Go(func(ctx context.Context) { + var a *int + fmt.Println(*a) + }) GoCloseAndWait() } - -func TestNewGo(t *testing.T) { - g := NewGo(make(chan struct{})) - defer g.Close(true) -} diff --git a/pkg/util/log.go b/pkg/util/log.go index 5e475a7b..bc82a899 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/ServiceComb/paas-lager" "github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager" + "golang.org/x/net/context" "os" "path/filepath" "runtime" @@ -57,7 +58,7 @@ func init() { loggers = make(map[string]lager.Logger, 10) loggerNames = make(map[string]string, 10) // make LOGGER do not be nil, new a stdout logger - LOGGER = newLogger(fromLagerConfig(defaultLagerConfig)) + LOGGER = NewLogger(fromLagerConfig(defaultLagerConfig)) } func fromLagerConfig(c *stlager.Config) LoggerConfig { @@ -82,7 +83,7 @@ func toLagerConfig(c LoggerConfig) stlager.Config { } // newLog new log, unsafe -func newLogger(cfg LoggerConfig) lager.Logger { +func NewLogger(cfg LoggerConfig) lager.Logger { stlager.Init(toLagerConfig(cfg)) return stlager.NewLogger(cfg.LoggerFile) } @@ -93,7 +94,7 @@ func InitGlobalLogger(cfg LoggerConfig) { cfg.LoggerLevel = defaultLagerConfig.LoggerLevel } loggerConfig = cfg - LOGGER = newLogger(cfg) + LOGGER = NewLogger(cfg) // log rotate RunLogDirRotate(cfg) // recreate the deleted log file @@ -144,7 +145,7 @@ func Logger() lager.Logger { if len(cfg.LoggerFile) != 0 { cfg.LoggerFile = filepath.Join(filepath.Dir(cfg.LoggerFile), logFile+".log") } - logger = newLogger(cfg) + logger = NewLogger(cfg) loggers[logFile] = logger LOGGER.Warnf(nil, "match %s, new logger %s for %s", prefix, logFile, funcFullName) } @@ -190,10 +191,10 @@ func monitorLogFile() { if len(loggerConfig.LoggerFile) == 0 { return } - Go(func(stopCh <-chan struct{}) { + Go(func(ctx context.Context) { for { select { - case <-stopCh: + case <-ctx.Done(): return case <-time.After(time.Minute): Logger().Debug(fmt.Sprintf("Check log file at %s", time.Now())) diff --git a/pkg/util/logrotate.go b/pkg/util/logrotate.go index db4b79f9..e7dbcefd 100644 --- a/pkg/util/logrotate.go +++ b/pkg/util/logrotate.go @@ -19,6 +19,7 @@ package util import ( "archive/zip" "fmt" + "golang.org/x/net/context" "io" "os" "path/filepath" @@ -293,10 +294,10 @@ func CopyFile(srcFile, destFile string) error { } func RunLogDirRotate(cfg LoggerConfig) { - Go(func(stopCh <-chan struct{}) { + Go(func(ctx context.Context) { for { select { - case <-stopCh: + case <-ctx.Done(): return case <-time.After(cfg.LogRotatePeriod): LogRotate(filepath.Dir(cfg.LoggerFile), cfg.LogRotateSize, cfg.LogBackupCount) diff --git a/server/api.go b/server/api.go index 26ac55a2..f434bdc4 100644 --- a/server/api.go +++ b/server/api.go @@ -36,8 +36,9 @@ func init() { InitAPI() apiServer = &APIServer{ - isClose: true, - err: make(chan error, 1), + isClose: true, + err: make(chan error, 1), + goroutine: util.NewGo(context.Background()), } } @@ -66,6 +67,7 @@ type APIServer struct { isClose bool forked bool err chan error + goroutine *util.GoRoutine } const ( @@ -176,16 +178,18 @@ func (s *APIServer) doAPIServerHeartBeat(pCtx context.Context) { } func (s *APIServer) startHeartBeatService() { - go func() { + s.goroutine.Do(func(ctx context.Context) { for { select { + case <-ctx.Done(): + return case <-s.err: return case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second): s.doAPIServerHeartBeat(context.Background()) } } - }() + }) } func (s *APIServer) graceDone() { @@ -211,14 +215,14 @@ func (s *APIServer) startRESTServer() (err error) { } util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName) - go func() { + s.goroutine.Do(func(_ context.Context) { err := s.restSrv.Serve() if s.isClose { return } util.Logger().Errorf(err, "error to start REST API server %s", ep) s.err <- err - }() + }) return } @@ -234,14 +238,14 @@ func (s *APIServer) startRPCServer() (err error) { } util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName) - go func() { + s.goroutine.Do(func(_ context.Context) { err := s.rpcSrv.Serve() if s.isClose { return } util.Logger().Errorf(err, "error to start RPC API server %s", ep) s.err <- err - }() + }) return } @@ -301,6 +305,8 @@ func (s *APIServer) Stop() { close(s.err) + s.goroutine.Close(true) + util.Logger().Info("api server stopped.") } diff --git a/server/broker/service.go b/server/broker/service.go index bde8f3e8..3ae63c61 100644 --- a/server/broker/service.go +++ b/server/broker/service.go @@ -34,7 +34,7 @@ import ( "golang.org/x/net/context" ) -var BrokerServiceAPI *BrokerService = &BrokerService{} +var BrokerServiceAPI = &BrokerService{} type BrokerService struct { } diff --git a/server/broker/store.go b/server/broker/store.go index 0cdddc59..95c61a41 100644 --- a/server/broker/store.go +++ b/server/broker/store.go @@ -21,6 +21,7 @@ import ( "github.com/apache/incubator-servicecomb-service-center/pkg/util" sstore "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store" + "golang.org/x/net/context" ) const ( @@ -72,12 +73,16 @@ func (s *BKvStore) newStore(t sstore.StoreType, opts ...sstore.KvCacherCfgOption s.newIndexer(t, sstore.NewKvCacher(opts...)) } -func (s *BKvStore) store() { +func (s *BKvStore) store(ctx context.Context) { for t := sstore.StoreType(0); t != typeEnd; t++ { s.newStore(t) } for _, i := range s.bindexers { - <-i.Ready() + select { + case <-ctx.Done(): + return + case <-i.Ready(): + } } util.SafeCloseChan(s.bready) @@ -120,7 +125,13 @@ func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) { } func (s *BKvStore) Run() { - go s.store() + util.Go(func(ctx context.Context) { + s.store(ctx) + select { + case <-ctx.Done(): + s.Stop() + } + }) } func (s *BKvStore) Ready() <-chan struct{} { @@ -154,3 +165,18 @@ func (s *BKvStore) Verification() *sstore.Indexer { func (s *BKvStore) PactLatest() *sstore.Indexer { return s.bindexers[PACT_LATEST] } + +func (s *BKvStore) Stop() { + if s.bisClose { + return + } + s.bisClose = true + + for _, i := range s.bindexers { + i.Stop() + } + + util.SafeCloseChan(s.bready) + + util.Logger().Debugf("broker store daemon stopped") +} diff --git a/server/broker/util.go b/server/broker/util.go index 23e98ca1..895ee8ca 100644 --- a/server/broker/util.go +++ b/server/broker/util.go @@ -25,14 +25,16 @@ import ( "strconv" "strings" - "github.com/ServiceComb/paas-lager" "github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager" "github.com/apache/incubator-servicecomb-service-center/pkg/util" - backend "github.com/apache/incubator-servicecomb-service-center/server/core/backend" + "github.com/apache/incubator-servicecomb-service-center/server/core" + "github.com/apache/incubator-servicecomb-service-center/server/core/backend" pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" scerr "github.com/apache/incubator-servicecomb-service-center/server/error" "github.com/apache/incubator-servicecomb-service-center/server/infra/registry" serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util" + "path/filepath" + "time" ) var PactLogger lager.Logger @@ -88,12 +90,18 @@ var brokerAPILinksTitles = map[string]string{ func init() { //define Broker logger - stlager.Init(stlager.Config{ - LoggerLevel: "INFO", - LoggerFile: "broker_srvc.log", - EnableRsyslog: false, + name := "" + if len(core.ServerInfo.Config.LogFilePath) != 0 { + name = filepath.Join(filepath.Dir(core.ServerInfo.Config.LogFilePath), "broker_srvc.log") + } + PactLogger = util.NewLogger(util.LoggerConfig{ + LoggerLevel: core.ServerInfo.Config.LogLevel, + LoggerFile: name, + LogFormatText: core.ServerInfo.Config.LogFormat == "text", + LogRotatePeriod: 30 * time.Second, + LogRotateSize: int(core.ServerInfo.Config.LogRotateSize), + LogBackupCount: int(core.ServerInfo.Config.LogBackupCount), }) - PactLogger = stlager.NewLogger("broker_srvc") } func GetDefaultTenantProject() string { diff --git a/server/core/0_init.go b/server/core/0_init.go index 1b888492..8749af62 100644 --- a/server/core/0_init.go +++ b/server/core/0_init.go @@ -87,7 +87,6 @@ func initLogger() { } func handleSignals() { - var sig os.Signal sigCh := make(chan os.Signal) signal.Notify(sigCh, syscall.SIGINT, @@ -95,13 +94,14 @@ func handleSignals() { syscall.SIGTERM, ) wait := 5 * time.Second - for { - sig = <-sigCh + for sig := range sigCh { switch sig { case syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM: <-time.After(wait) - util.Logger().Warnf(nil, "Waiting for server response timed out(%s), force shutdown.", wait) + util.Logger().Warnf(nil, "waiting for server response timed out(%s), force shutdown", wait) os.Exit(1) + default: + util.Logger().Warnf(nil, "received signal '%v'", sig) } } } diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go index c6ade974..39ba733f 100644 --- a/server/core/backend/store/cacher.go +++ b/server/core/backend/store/cacher.go @@ -169,12 +169,12 @@ type KvCacher struct { lastRev int64 noEventInterval int - ready chan struct{} - lw ListWatcher - mux sync.Mutex - once sync.Once - cache *KvCache - goroute *util.GoRoutine + ready chan struct{} + lw ListWatcher + mux sync.Mutex + once sync.Once + cache *KvCache + goroutine *util.GoRoutine } func (c *KvCacher) needList() bool { @@ -267,23 +267,18 @@ func (c *KvCacher) needDeferHandle(evts []*Event) bool { return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts) } -func (c *KvCacher) refresh(stopCh <-chan struct{}) { +func (c *KvCacher) refresh(ctx context.Context) { util.Logger().Debugf("start to list and watch %s", c.Cfg) - ctx, cancel := context.WithCancel(context.Background()) - c.goroute.Do(func(stopCh <-chan struct{}) { - defer cancel() - <-stopCh - }) for { start := time.Now() c.ListAndWatch(ctx) watchDuration := time.Since(start) - nextPeriod := 0 * time.Second + nextPeriod := c.Cfg.Period if watchDuration > 0 && c.Cfg.Period > watchDuration { nextPeriod = c.Cfg.Period - watchDuration } select { - case <-stopCh: + case <-ctx.Done(): util.Logger().Debugf("stop to list and watch %s", c.Cfg) return case <-time.After(nextPeriod): @@ -291,7 +286,7 @@ func (c *KvCacher) refresh(stopCh <-chan struct{}) { } } -func (c *KvCacher) deferHandle(stopCh <-chan struct{}) { +func (c *KvCacher) deferHandle(ctx context.Context) { if c.Cfg.DeferHandler == nil { return } @@ -299,7 +294,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) { i, evts := 0, make([]*Event, event_block_size) for { select { - case <-stopCh: + case <-ctx.Done(): return case evt, ok := <-c.Cfg.DeferHandler.HandleChan(): if !ok { @@ -524,8 +519,8 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) { } func (c *KvCacher) run() { - c.goroute.Do(c.refresh) - c.goroute.Do(c.deferHandle) + c.goroutine.Do(c.refresh) + c.goroutine.Do(c.deferHandle) } func (c *KvCacher) Cache() Cache { @@ -537,7 +532,7 @@ func (c *KvCacher) Run() { } func (c *KvCacher) Stop() { - c.goroute.Close(true) + c.goroutine.Close(true) util.SafeCloseChan(c.ready) } @@ -577,7 +572,7 @@ func NewKvCacher(opts ...KvCacherCfgOption) *KvCacher { Client: backend.Registry(), Key: cfg.Key, }, - goroute: util.NewGo(make(chan struct{})), + goroutine: util.NewGo(context.Background()), } cacher.cache = NewKvCache(cacher, cfg.InitSize) return cacher diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go index d35f9734..43464f06 100644 --- a/server/core/backend/store/defer.go +++ b/server/core/backend/store/defer.go @@ -21,6 +21,7 @@ import ( "github.com/apache/incubator-servicecomb-service-center/pkg/util" pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" "github.com/coreos/etcd/mvcc/mvccpb" + "golang.org/x/net/context" "sync" "time" ) @@ -98,12 +99,12 @@ func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event { return iedh.deferCh } -func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) { +func (iedh *InstanceEventDeferHandler) check(ctx context.Context) { defer util.RecoverAndReport() t, n := iedh.newTimer(), false for { select { - case <-stopCh: + case <-ctx.Done(): return case evts := <-iedh.pendingCh: for _, evt := range evts { @@ -117,7 +118,7 @@ func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) { } total := iedh.cache.Size() - if !iedh.enabled && del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent { + if !iedh.enabled && del > 0 && total > 5 && float64(del) >= float64(total)*iedh.Percent { iedh.enabled = true util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events", del, total, iedh.Percent*100) diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go index 72360503..5cf3e6dc 100644 --- a/server/core/backend/store/indexer.go +++ b/server/core/backend/store/indexer.go @@ -186,11 +186,11 @@ func (i *Indexer) OnCacheEvent(evt *KvEvent) { } func (i *Indexer) buildIndex() { - i.goroutine.Do(func(stopCh <-chan struct{}) { + i.goroutine.Do(func(ctx context.Context) { util.SafeCloseChan(i.ready) for { select { - case <-stopCh: + case <-ctx.Done(): return case evt, ok := <-i.prefixBuildQueue: if !ok { @@ -317,7 +317,7 @@ func NewCacheIndexer(t StoreType, cr Cacher) *Indexer { cacheType: t, prefixIndex: make(map[string]map[string]struct{}, DEFAULT_MAX_EVENT_COUNT), prefixBuildQueue: make(chan *KvEvent, DEFAULT_MAX_EVENT_COUNT), - goroutine: util.NewGo(make(chan struct{})), + goroutine: util.NewGo(context.Background()), ready: make(chan struct{}), isClose: true, } diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go index 189f9094..4ffb9d45 100644 --- a/server/core/backend/store/listwatch.go +++ b/server/core/backend/store/listwatch.go @@ -131,17 +131,17 @@ func (w *Watcher) EventBus() <-chan []*Event { return w.bus } -func (w *Watcher) process() { +func (w *Watcher) process(_ context.Context) { stopCh := make(chan struct{}) ctx, cancel := context.WithTimeout(w.ListOps.Context, w.ListOps.Timeout) - go func() { + util.Go(func(_ context.Context) { defer close(stopCh) w.lw.doWatch(ctx, w.sendEvent) - }() + }) select { case <-stopCh: - // time out + // timed out or exception w.Stop() case <-w.stopCh: cancel() @@ -180,6 +180,6 @@ func newWatcher(lw *ListWatcher, listOps *ListOptions) *Watcher { bus: make(chan []*Event, EVENT_BUS_MAX_SIZE), stopCh: make(chan struct{}), } - go w.process() + util.Go(w.process) return w } diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go index 6a5f38c3..c462d411 100644 --- a/server/core/backend/store/store.go +++ b/server/core/backend/store/store.go @@ -109,6 +109,7 @@ type KvStore struct { asyncTaskSvc *async.AsyncTaskService lock sync.RWMutex ready chan struct{} + goroutine *util.GoRoutine isClose bool } @@ -116,6 +117,7 @@ func (s *KvStore) Initialize() { s.indexers = make(map[StoreType]*Indexer) s.asyncTaskSvc = async.NewAsyncTaskService() s.ready = make(chan struct{}) + s.goroutine = util.NewGo(context.Background()) for i := StoreType(0); i != typeEnd; i++ { store.newNullStore(i) @@ -147,7 +149,7 @@ func (s *KvStore) newIndexer(t StoreType, cacher Cacher) { } func (s *KvStore) Run() { - go s.store() + s.goroutine.Do(s.store) s.asyncTaskSvc.Run() } @@ -166,7 +168,7 @@ func (s *KvStore) SelfPreservationHandler() DeferHandler { return &InstanceEventDeferHandler{Percent: DEFAULT_SELF_PRESERVATION_PERCENT} } -func (s *KvStore) store() { +func (s *KvStore) store(ctx context.Context) { for t := StoreType(0); t != typeEnd; t++ { switch t { case INSTANCE: @@ -178,7 +180,11 @@ func (s *KvStore) store() { } } for _, i := range s.indexers { - <-i.Ready() + select { + case <-ctx.Done(): + return + case <-i.Ready(): + } } util.SafeCloseChan(s.ready) @@ -214,9 +220,11 @@ func (s *KvStore) Stop() { s.asyncTaskSvc.Stop() + s.goroutine.Close(true) + util.SafeCloseChan(s.ready) - util.Logger().Debugf("store daemon stopped.") + util.Logger().Debugf("store daemon stopped") } func (s *KvStore) Ready() <-chan struct{} { diff --git a/server/infra/registry/registry.go b/server/infra/registry/registry.go index 575a351f..0856fa3b 100644 --- a/server/infra/registry/registry.go +++ b/server/infra/registry/registry.go @@ -144,7 +144,7 @@ const ( ) const ( - REQUEST_TIMEOUT = 300 + REQUEST_TIMEOUT = 30 * time.Second DEFAULT_PAGE_COUNT = 4096 // grpc does not allow to transport a large body more then 4MB in a request. ) @@ -359,7 +359,7 @@ func OpCmp(opt CompareOperation, result CompareResult, v interface{}) (cmp Compa } func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) { - return context.WithTimeout(ctx, REQUEST_TIMEOUT*time.Second) + return context.WithTimeout(ctx, REQUEST_TIMEOUT) } func RegistryConfig() *Config { diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go b/server/plugin/infra/registry/embededetcd/embededetcd.go index 4b941bce..3bb71401 100644 --- a/server/plugin/infra/registry/embededetcd/embededetcd.go +++ b/server/plugin/infra/registry/embededetcd/embededetcd.go @@ -41,16 +41,17 @@ import ( var embedTLSConfig *tls.Config -const START_MANAGER_SERVER_TIMEOUT = 60 +const START_MANAGER_SERVER_TIMEOUT = 10 func init() { mgr.RegisterPlugin(mgr.Plugin{mgr.REGISTRY, "embeded_etcd", getEmbedInstance}) } type EtcdEmbed struct { - Server *embed.Etcd - err chan error - ready chan int + Embed *embed.Etcd + err chan error + ready chan int + goroutine *util.GoRoutine } func (s *EtcdEmbed) Err() <-chan error { @@ -62,9 +63,10 @@ func (s *EtcdEmbed) Ready() <-chan int { } func (s *EtcdEmbed) Close() { - if s.Server != nil { - s.Server.Close() + if s.Embed != nil { + s.Embed.Close() } + s.goroutine.Close(true) util.Logger().Debugf("embedded etcd client stopped.") } @@ -232,7 +234,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error { } util.Logger().Infof("Compacting... revision is %d(current: %d, reserve %d)", revToCompact, curRev, reserve) - _, err := s.Server.Server.Compact(ctx, &etcdserverpb.CompactionRequest{ + _, err := s.Embed.Server.Compact(ctx, &etcdserverpb.CompactionRequest{ Revision: revToCompact, Physical: true, }) @@ -250,7 +252,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error { } func (s *EtcdEmbed) getLeaderCurrentRevision(ctx context.Context) int64 { - return s.Server.Server.KV().Rev() + return s.Embed.Server.KV().Rev() } func (s *EtcdEmbed) PutNoOverride(ctx context.Context, opts ...registry.PluginOpOption) (bool, error) { @@ -275,7 +277,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r switch op.Action { case registry.Get: var etcdResp *etcdserverpb.RangeResponse - etcdResp, err = s.Server.Server.Range(otCtx, s.toGetRequest(op)) + etcdResp, err = s.Embed.Server.Range(otCtx, s.toGetRequest(op)) if err != nil { break } @@ -286,7 +288,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r } case registry.Put: var etcdResp *etcdserverpb.PutResponse - etcdResp, err = s.Server.Server.Put(otCtx, s.toPutRequest(op)) + etcdResp, err = s.Embed.Server.Put(otCtx, s.toPutRequest(op)) if err != nil { break } @@ -295,7 +297,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r } case registry.Delete: var etcdResp *etcdserverpb.DeleteRangeResponse - etcdResp, err = s.Server.Server.DeleteRange(otCtx, s.toDeleteRequest(op)) + etcdResp, err = s.Embed.Server.DeleteRange(otCtx, s.toDeleteRequest(op)) if err != nil { break } @@ -338,7 +340,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp, if len(etcdFailOps) > 0 { txnRequest.Failure = etcdFailOps } - resp, err := s.Server.Server.Txn(otCtx, txnRequest) + resp, err := s.Embed.Server.Txn(otCtx, txnRequest) if err != nil { return nil, err } @@ -351,7 +353,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp, func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) { otCtx, cancel := registry.WithTimeout(ctx) defer cancel() - etcdResp, err := s.Server.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{ + etcdResp, err := s.Embed.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{ TTL: TTL, }) if err != nil { @@ -363,7 +365,7 @@ func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) { func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error) { otCtx, cancel := registry.WithTimeout(ctx) defer cancel() - ttl, err := s.Server.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID)) + ttl, err := s.Embed.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID)) if err != nil { if err.Error() == grpc.ErrorDesc(rpctypes.ErrGRPCLeaseNotFound) { return 0, err @@ -376,7 +378,7 @@ func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error func (s *EtcdEmbed) LeaseRevoke(ctx context.Context, leaseID int64) error { otCtx, cancel := registry.WithTimeout(ctx) defer cancel() - _, err := s.Server.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{ + _, err := s.Embed.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{ ID: leaseID, }) if err != nil { @@ -392,7 +394,7 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption) op := registry.OpGet(opts...) if len(op.Key) > 0 { - watchable := s.Server.Server.Watchable() + watchable := s.Embed.Server.Watchable() ws := watchable.NewWatchStream() defer ws.Close() @@ -455,6 +457,29 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption) return } +func (s *EtcdEmbed) ReadyNotify() { + timeout := START_MANAGER_SERVER_TIMEOUT * time.Second + select { + case <-s.Embed.Server.ReadyNotify(): + close(s.ready) + s.goroutine.Do(func(ctx context.Context) { + select { + case <-ctx.Done(): + return + case err := <-s.Embed.Err(): + s.err <- err + } + }) + case <-time.After(timeout): + err := fmt.Errorf("timed out(%s)", timeout) + util.Logger().Errorf(err, "read notify failed") + + s.Embed.Server.Stop() + + s.err <- err + } +} + func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event) registry.ActionType { switch evt.Type { case mvccpb.DELETE: @@ -488,8 +513,9 @@ func getEmbedInstance() mgr.PluginInstance { addrs := beego.AppConfig.DefaultString("manager_addr", "http://127.0.0.1:2380") inst := &EtcdEmbed{ - err: make(chan error, 1), - ready: make(chan int), + err: make(chan error, 1), + ready: make(chan int), + goroutine: util.NewGo(context.Background()), } if core.ServerInfo.Config.SslEnabled { @@ -537,30 +563,14 @@ func getEmbedInstance() mgr.PluginInstance { inst.err <- err return inst } - inst.Server = etcd - - select { - case <-etcd.Server.ReadyNotify(): - close(inst.ready) - go func() { - select { - case err := <-etcd.Err(): - inst.err <- err - } - }() - case <-time.After(START_MANAGER_SERVER_TIMEOUT * time.Second): - message := "etcd server took too long to start" - util.Logger().Error(message, nil) + inst.Embed = etcd - etcd.Server.Stop() - - inst.err <- errors.New(message) - } + inst.ReadyNotify() return inst } func parseURL(addrs string) ([]url.URL, error) { - urls := []url.URL{} + var urls []url.URL ips := strings.Split(addrs, ",") for _, ip := range ips { addr, err := url.Parse(ip) diff --git a/server/plugin/infra/tracing/buildin/file_collector.go b/server/plugin/infra/tracing/buildin/file_collector.go index bd48e5bd..851b3fc7 100644 --- a/server/plugin/infra/tracing/buildin/file_collector.go +++ b/server/plugin/infra/tracing/buildin/file_collector.go @@ -23,15 +23,19 @@ import ( "github.com/apache/incubator-servicecomb-service-center/pkg/util" "github.com/apache/incubator-servicecomb-service-center/server/core" "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" + "golang.org/x/net/context" "os" + "strings" "time" ) type FileCollector struct { Fd *os.File + Timeout time.Duration Interval time.Duration BatchSize int c chan *zipkincore.Span + goroutine *util.GoRoutine } func (f *FileCollector) Collect(span *zipkincore.Span) error { @@ -39,11 +43,16 @@ func (f *FileCollector) Collect(span *zipkincore.Span) error { return fmt.Errorf("required FD to write") } - f.c <- span + select { + case f.c <- span: + case <-time.After(f.Timeout): + util.Logger().Errorf(nil, "send span to handle channel timed out(%s)", f.Timeout) + } return nil } func (f *FileCollector) Close() error { + f.goroutine.Close(true) return f.Fd.Close() } @@ -77,7 +86,7 @@ func (f *FileCollector) write(batch []*zipkincore.Span) (c int) { } func (f *FileCollector) checkFile() error { - if util.PathExist(f.Fd.Name()) { + if util.PathExist(f.Fd.Name()) || strings.Index(f.Fd.Name(), "/dev/") == 0 { return nil } @@ -100,52 +109,54 @@ func (f *FileCollector) checkFile() error { return nil } -func (f *FileCollector) Run(stopCh <-chan struct{}) { - var ( - batch []*zipkincore.Span - prev []*zipkincore.Span - i = f.Interval * 10 - t = time.NewTicker(f.Interval) - nr = time.Now().Add(i) - max = f.BatchSize * 2 - ) - for { - select { - case <-stopCh: - f.write(batch) - return - case span := <-f.c: - batch = append(batch, span) - if len(batch) >= f.BatchSize { - if len(batch) > max { - dispose := len(batch) - f.BatchSize - util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d", - dispose, max) - batch = batch[dispose:] // allocate more - } - if c := f.write(batch); c == 0 { - continue +func (f *FileCollector) Run() { + f.goroutine.Do(func(ctx context.Context) { + var ( + batch []*zipkincore.Span + prev []*zipkincore.Span + i = f.Interval * 10 + t = time.NewTicker(f.Interval) + nr = time.Now().Add(i) + max = f.BatchSize * 2 + ) + for { + select { + case <-ctx.Done(): + f.write(batch) + return + case span := <-f.c: + batch = append(batch, span) + if len(batch) >= f.BatchSize { + if len(batch) > max { + dispose := len(batch) - f.BatchSize + util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d", + dispose, max) + batch = batch[dispose:] // allocate more + } + if c := f.write(batch); c == 0 { + continue + } + if prev != nil { + batch, prev = prev[:0], batch + } else { + prev, batch = batch, batch[len(batch):] // new one + } } - if prev != nil { - batch, prev = prev[:0], batch - } else { - prev, batch = batch, batch[len(batch):] // new one + case <-t.C: + if time.Now().After(nr) { + util.LogRotateFile(f.Fd.Name(), + int(core.ServerInfo.Config.LogRotateSize), + int(core.ServerInfo.Config.LogBackupCount), + ) + nr = time.Now().Add(i) } - } - case <-t.C: - if time.Now().After(nr) { - util.LogRotateFile(f.Fd.Name(), - int(core.ServerInfo.Config.LogRotateSize), - int(core.ServerInfo.Config.LogBackupCount), - ) - nr = time.Now().Add(i) - } - if c := f.write(batch); c > 0 { - batch = batch[:0] + if c := f.write(batch); c > 0 { + batch = batch[:0] + } } } - } + }) } func NewFileCollector(path string) (*FileCollector, error) { @@ -155,10 +166,12 @@ func NewFileCollector(path string) (*FileCollector, error) { } fc := &FileCollector{ Fd: fd, + Timeout: 5 * time.Second, Interval: 10 * time.Second, BatchSize: 100, c: make(chan *zipkincore.Span, 1000), + goroutine: util.NewGo(context.Background()), } - util.Go(fc.Run) + fc.Run() return fc, nil } diff --git a/server/plugin/infra/tracing/buildin/file_collector_test.go b/server/plugin/infra/tracing/buildin/file_collector_test.go index 075fdaec..4f3e1505 100644 --- a/server/plugin/infra/tracing/buildin/file_collector_test.go +++ b/server/plugin/infra/tracing/buildin/file_collector_test.go @@ -17,34 +17,33 @@ package buildin import ( + "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/util" "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" + "golang.org/x/net/context" "os" "testing" "time" ) func TestFileCollector_Collect(t *testing.T) { - fileName := "./test" - fd, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) - if err != nil { - t.FailNow() - } fc := &FileCollector{ - Fd: fd, + Fd: os.Stdout, + Timeout: 1 * time.Second, Interval: 100 * time.Second, BatchSize: 2, - c: make(chan *zipkincore.Span, 1000), + c: make(chan *zipkincore.Span, 100), + goroutine: util.NewGo(context.Background()), } defer func() { fc.Close() - os.Remove(fileName) }() - util.Go(fc.Run) + fc.Run() - for i := int64(0); i < 3; i++ { - err := fc.Collect(&zipkincore.Span{ParentID: &i, TraceIDHigh: &i}) + for i := 0; i < 10; i++ { + err := fc.Collect(&zipkincore.Span{}) if err != nil { + fmt.Println(err) t.FailNow() } } diff --git a/server/plugin/infra/tracing/buildin/span.go b/server/plugin/infra/tracing/buildin/span.go index 5b8011d9..3d200887 100644 --- a/server/plugin/infra/tracing/buildin/span.go +++ b/server/plugin/infra/tracing/buildin/span.go @@ -61,7 +61,9 @@ type Endpoint struct { func (s *Span) FromZipkinSpan(span *zipkincore.Span) { traceId := new(types.TraceID) traceId.Low = uint64(span.TraceID) - traceId.High = uint64(*(span.TraceIDHigh)) + if span.TraceIDHigh != nil { + traceId.High = uint64(*(span.TraceIDHigh)) + } s.TraceID = traceId.ToHex() s.Duration = span.Duration diff --git a/server/plugin/infra/tracing/buildin/span_test.go b/server/plugin/infra/tracing/buildin/span_test.go index c3dceb49..069e6a9a 100644 --- a/server/plugin/infra/tracing/buildin/span_test.go +++ b/server/plugin/infra/tracing/buildin/span_test.go @@ -158,4 +158,12 @@ func TestFromZipkinSpan(t *testing.T) { t.FailNow() } fmt.Println(string(b)) + + s = FromZipkinSpan(&zipkincore.Span{}) + b, err = json.Marshal(s) + if err != nil { + fmt.Println("TestFromZipkinSpan Marshal", err) + t.FailNow() + } + fmt.Println(string(b)) } diff --git a/server/server.go b/server/server.go index a5468798..15e0abd3 100644 --- a/server/server.go +++ b/server/server.go @@ -43,6 +43,7 @@ func init() { store: st.Store(), notifyService: nf.GetNotifyService(), apiServer: GetAPIServer(), + goroutine: util.NewGo(context.Background()), } } @@ -50,6 +51,7 @@ type ServiceCenterServer struct { apiServer *APIServer notifyService *nf.NotifyService store *st.KvStore + goroutine *util.GoRoutine } func (s *ServiceCenterServer) Run() { @@ -74,7 +76,7 @@ func (s *ServiceCenterServer) waitForQuit() { s.Stop() - util.Logger().Warn("service center quit", nil) + util.Logger().Debugf("service center stopped") } func (s *ServiceCenterServer) needUpgrade() bool { @@ -119,12 +121,12 @@ func (s *ServiceCenterServer) autoCompactBackend() { util.Logger().Errorf(err, "invalid compact interval %s, reset to default interval 12h", core.ServerInfo.Config.CompactInterval) interval = 12 * time.Hour } - util.Go(func(stopCh <-chan struct{}) { + s.goroutine.Do(func(ctx context.Context) { util.Logger().Infof("enabled the automatic compact mechanism, compact once every %s, reserve %d", core.ServerInfo.Config.CompactInterval, delta) for { select { - case <-stopCh: + case <-ctx.Done(): return case <-time.After(interval): lock, err := mux.Try(mux.GLOBAL_LOCK) @@ -133,7 +135,7 @@ func (s *ServiceCenterServer) autoCompactBackend() { continue } - backend.Registry().Compact(context.Background(), delta) + backend.Registry().Compact(ctx, delta) lock.Unlock() } @@ -190,9 +192,7 @@ func (s *ServiceCenterServer) Stop() { s.store.Stop() } - util.GoCloseAndWait() - - backend.Registry().Close() + s.goroutine.Close(true) } func Run() { diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go index 7bbe5df8..6961c816 100644 --- a/server/service/event/dependency_event_handler.go +++ b/server/service/event/dependency_event_handler.go @@ -50,7 +50,7 @@ func (h *DependencyEventHandler) OnEvent(evt *store.KvEvent) { } func (h *DependencyEventHandler) loop() { - util.Go(func(stopCh <-chan struct{}) { + util.Go(func(ctx context.Context) { waitDelayIndex := 0 waitDelay := []int{1, 1, 5, 10, 20, 30, 60} retry := func() { @@ -64,7 +64,7 @@ func (h *DependencyEventHandler) loop() { } for { select { - case <-stopCh: + case <-ctx.Done(): return case <-h.signals.Chan(): lock, err := mux.Try(mux.DEP_QUEUE_LOCK) diff --git a/server/service/instances.go b/server/service/instances.go index 48c70296..eb945069 100644 --- a/server/service/instances.go +++ b/server/service/instances.go @@ -370,19 +370,7 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true noMultiCounter++ } - go func(element *pb.HeartbeatSetElement) { - hbRst := &pb.InstanceHbRst{ - ServiceId: element.ServiceId, - InstanceId: element.InstanceId, - ErrMessage: "", - } - _, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId) - if err != nil { - hbRst.ErrMessage = err.Error() - util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId) - } - instancesHbRst <- hbRst - }(heartbeatElement) + util.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement)) } count := 0 successFlag := false @@ -415,6 +403,22 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR } } +func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) { + return func(_ context.Context) { + hbRst := &pb.InstanceHbRst{ + ServiceId: element.ServiceId, + InstanceId: element.InstanceId, + ErrMessage: "", + } + _, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId) + if err != nil { + hbRst.ErrMessage = err.Error() + util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId) + } + instancesHbRst <- hbRst + } +} + func (s *InstanceService) GetOneInstance(ctx context.Context, in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) { checkErr := s.getInstancePreCheck(ctx, in) if checkErr != nil { @@ -723,7 +727,6 @@ func (s *InstanceService) UpdateInstanceProperties(ctx context.Context, in *pb.U }, nil } - func (s *InstanceService) WatchPreOpera(ctx context.Context, in *pb.WatchInstanceRequest) error { if in == nil || len(in.SelfServiceId) == 0 { return errors.New("Request format invalid.") @@ -742,7 +745,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceIn return err } domainProject := util.ParseDomainProject(stream.Context()) - watcher := nf.NewInstanceWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/") + watcher := nf.NewInstanceListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil) err = nf.GetNotifyService().AddSubscriber(watcher) util.Logger().Infof("start watch instance status, watcher %s %s", watcher.Subject(), watcher.Id()) return nf.HandleWatchJob(watcher, stream, nf.GetNotifyService().Config.NotifyTimeout) @@ -754,7 +757,7 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan nf.EstablishWebSocketError(conn, err) return } - nf.DoWebSocketWatch(ctx, in.SelfServiceId, conn) + nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn) } func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) { diff --git a/server/service/microservices.go b/server/service/microservices.go index 4fe47d29..6663ab99 100644 --- a/server/service/microservices.go +++ b/server/service/microservices.go @@ -359,11 +359,6 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De nuoMultilCount++ } - serviceRst := &pb.DelServicesRspInfo{ - ServiceId: serviceId, - ErrMessage: "", - } - //检查服务ID合法性 in := &pb.DeleteServiceRequest{ ServiceId: serviceId, @@ -372,22 +367,15 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De err := apt.Validate(in) if err != nil { util.Logger().Errorf(err, "delete micro-service failed, serviceId is %s: invalid parameters.", in.ServiceId) - serviceRst.ErrMessage = err.Error() - serviceRespChan <- serviceRst + serviceRespChan <- &pb.DelServicesRspInfo{ + ServiceId: serviceId, + ErrMessage: err.Error(), + } continue } //执行删除服务操作 - go func(serviceItem string) { - resp, err := s.DeleteServicePri(ctx, serviceItem, request.Force) - if err != nil { - serviceRst.ErrMessage = err.Error() - } else if resp.Code != pb.Response_SUCCESS { - serviceRst.ErrMessage = resp.Message - } - - serviceRespChan <- serviceRst - }(serviceId) + util.Go(s.getDeleteServiceFunc(ctx, serviceId, request.Force, serviceRespChan)) } //获取批量删除服务的结果 @@ -419,6 +407,23 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De return resp, nil } +func (s *MicroServiceService) getDeleteServiceFunc(ctx context.Context, serviceId string, force bool, serviceRespChan chan<- *pb.DelServicesRspInfo) func(context.Context) { + return func(_ context.Context) { + serviceRst := &pb.DelServicesRspInfo{ + ServiceId: serviceId, + ErrMessage: "", + } + resp, err := s.DeleteServicePri(ctx, serviceId, force) + if err != nil { + serviceRst.ErrMessage = err.Error() + } else if resp.Code != pb.Response_SUCCESS { + serviceRst.ErrMessage = resp.Message + } + + serviceRespChan <- serviceRst + } +} + func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceRequest) (*pb.GetServiceResponse, error) { if in == nil || len(in.ServiceId) == 0 { return &pb.GetServiceResponse{ @@ -437,7 +442,7 @@ func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceReque service, err := serviceUtil.GetService(ctx, domainProject, in.ServiceId) if err != nil { - util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err,get service failed.", in.ServiceId) + util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err, get service failed.", in.ServiceId) return &pb.GetServiceResponse{ Response: pb.CreateResponse(scerr.ErrInternal, "Get service file failed."), }, err @@ -655,7 +660,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create //create rules if in.Rules != nil && len(in.Rules) != 0 { chanLen++ - go func() { + util.Go(func(_ context.Context) { req := &pb.AddServiceRulesRequest{ ServiceId: serviceId, Rules: in.Rules, @@ -670,12 +675,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create chanRsp.Message = rsp.Response.Message } createRespChan <- chanRsp - }() + }) } //create tags if in.Tags != nil && len(in.Tags) != 0 { chanLen++ - go func() { + util.Go(func(_ context.Context) { req := &pb.AddServiceTagsRequest{ ServiceId: serviceId, Tags: in.Tags, @@ -690,12 +695,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create chanRsp.Message = rsp.Response.Message } createRespChan <- chanRsp - }() + }) } // create instance if in.Instances != nil && len(in.Instances) != 0 { chanLen++ - go func() { + util.Go(func(_ context.Context) { chanRsp := &pb.Response{} for _, ins := range in.Instances { req := &pb.RegisterInstanceRequest{ @@ -711,7 +716,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create } createRespChan <- chanRsp } - }() + }) } // handle result diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go index 8a340760..de0e2c66 100644 --- a/server/service/notification/listwatcher.go +++ b/server/service/notification/listwatcher.go @@ -19,6 +19,7 @@ package notification import ( "github.com/apache/incubator-servicecomb-service-center/pkg/util" pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "golang.org/x/net/context" "time" ) @@ -44,10 +45,10 @@ func (w *ListWatcher) OnAccept() { } util.Logger().Debugf("accepted by notify service, %s watcher %s %s", w.Type(), w.Id(), w.Subject()) - go w.listAndPublishJobs() + util.Go(w.listAndPublishJobs) } -func (w *ListWatcher) listAndPublishJobs() { +func (w *ListWatcher) listAndPublishJobs(_ context.Context) { defer close(w.listCh) if w.ListFunc == nil { return @@ -112,10 +113,6 @@ func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, resp } } -func NewWatcher(nType NotifyType, id string, subject string) *ListWatcher { - return NewListWatcher(nType, id, subject, nil) -} - func NewListWatcher(nType NotifyType, id string, subject string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher { watcher := &ListWatcher{ diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go index e2d78640..63f78a27 100644 --- a/server/service/notification/notification_service.go +++ b/server/service/notification/notification_service.go @@ -20,6 +20,7 @@ import ( "container/list" "errors" "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "golang.org/x/net/context" "sync" "time" ) @@ -33,7 +34,8 @@ var notifyService *NotifyService func init() { notifyService = &NotifyService{ - isClose: true, + isClose: true, + goroutine: util.NewGo(context.Background()), } } @@ -46,13 +48,14 @@ type serviceIndex map[NotifyType]subscriberSubjectIndex type NotifyService struct { Config NotifyServiceConfig - services serviceIndex - queues map[NotifyType]chan NotifyJob - waits sync.WaitGroup - mutexes map[NotifyType]*sync.Mutex - err chan error - closeMux sync.RWMutex - isClose bool + services serviceIndex + queues map[NotifyType]chan NotifyJob + waits sync.WaitGroup + mutexes map[NotifyType]*sync.Mutex + err chan error + closeMux sync.RWMutex + isClose bool + goroutine *util.GoRoutine } func (s *NotifyService) Err() <-chan error { @@ -150,41 +153,52 @@ func (s *NotifyService) AddJob(job NotifyJob) error { } } -func (s *NotifyService) publish2Subscriber(t NotifyType) { - defer s.waits.Done() - for job := range s.queues[t] { - util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s", - job.Type(), job.Subject(), job.SubscriberId()) +func (s *NotifyService) getPublish2SubscriberFunc(t NotifyType) func(context.Context) { + return func(ctx context.Context) { + defer s.waits.Done() + for { + select { + case <-ctx.Done(): + return + case job, ok := <-s.queues[t]: + if !ok { + return + } - s.mutexes[t].Lock() + util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s", + job.Type(), job.Subject(), job.SubscriberId()) - if s.Closed() && len(s.services[t]) == 0 { - s.mutexes[t].Unlock() - return - } + s.mutexes[t].Lock() - m, ok := s.services[t][job.Subject()] - if ok { - // publish的subject如果带上id,则单播,否则广播 - if len(job.SubscriberId()) != 0 { - ns, ok := m[job.SubscriberId()] + if s.Closed() && len(s.services[t]) == 0 { + s.mutexes[t].Unlock() + return + } + + m, ok := s.services[t][job.Subject()] if ok { - for n := ns.Front(); n != nil; n = n.Next() { - n.Value.(Subscriber).OnMessage(job) + // publish的subject如果带上id,则单播,否则广播 + if len(job.SubscriberId()) != 0 { + ns, ok := m[job.SubscriberId()] + if ok { + for n := ns.Front(); n != nil; n = n.Next() { + n.Value.(Subscriber).OnMessage(job) + } + } + s.mutexes[t].Unlock() + continue + } + for key := range m { + ns := m[key] + for n := ns.Front(); n != nil; n = n.Next() { + n.Value.(Subscriber).OnMessage(job) + } } } + s.mutexes[t].Unlock() - continue - } - for key := range m { - ns := m[key] - for n := ns.Front(); n != nil; n = n.Next() { - n.Value.(Subscriber).OnMessage(job) - } } } - - s.mutexes[t].Unlock() } } @@ -227,7 +241,7 @@ func (s *NotifyService) Start() { util.Logger().Debugf("notify service is started with config %s", s.Config) for i := NotifyType(0); i != typeEnd; i++ { - go s.publish2Subscriber(i) + s.goroutine.Do(s.getPublish2SubscriberFunc(i)) } } @@ -255,6 +269,8 @@ func (s *NotifyService) Stop() { close(s.err) + s.goroutine.Close(true) + util.Logger().Debug("notify service stopped.") } diff --git a/server/service/notification/watch_util.go b/server/service/notification/watch_util.go index c938719c..9a7e26c5 100644 --- a/server/service/notification/watch_util.go +++ b/server/service/notification/watch_util.go @@ -62,6 +62,7 @@ type WebSocketHandler struct { watcher *ListWatcher needPingWatcher bool closed chan struct{} + goroutine *util.GoRoutine } func (wh *WebSocketHandler) Init() error { @@ -101,7 +102,7 @@ func (wh *WebSocketHandler) websocketHeartbeat(messageType int) error { return nil } -func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() { +func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage(ctx context.Context) { defer close(wh.closed) remoteAddr := wh.conn.RemoteAddr().String() @@ -128,17 +129,23 @@ func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() { }) for { - _, _, err := wh.conn.ReadMessage() - if err != nil { - wh.watcher.SetError(err) + select { + case <-ctx.Done(): return + default: + _, _, err := wh.conn.ReadMessage() + if err != nil { + wh.watcher.SetError(err) + return + } } } } func (wh *WebSocketHandler) HandleWatchWebSocketJob() { - remoteAddr := wh.conn.RemoteAddr().String() + wh.goroutine.Do(wh.HandleWatchWebSocketControlMessage) + remoteAddr := wh.conn.RemoteAddr().String() for { select { case <-wh.closed: @@ -224,8 +231,10 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() { } func (wh *WebSocketHandler) Close(code int, text string) error { + defer wh.goroutine.Close(true) + remoteAddr := wh.conn.RemoteAddr().String() - message := []byte{} + var message []byte if code != websocket.CloseNoStatusReceived { message = websocket.FormatCloseMessage(code, text) } @@ -238,18 +247,6 @@ func (wh *WebSocketHandler) Close(code int, text string) error { return nil } -func DoWebSocketWatch(ctx context.Context, serviceId string, conn *websocket.Conn) { - domainProject := util.ParseDomainProject(ctx) - handler := &WebSocketHandler{ - ctx: ctx, - conn: conn, - watcher: NewInstanceWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/"), - needPingWatcher: true, - closed: make(chan struct{}), - } - processHandler(handler) -} - func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) { domainProject := util.ParseDomainProject(ctx) handler := &WebSocketHandler{ @@ -258,6 +255,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([] watcher: NewInstanceListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f), needPingWatcher: true, closed: make(chan struct{}), + goroutine: util.NewGo(context.Background()), } processHandler(handler) } @@ -266,7 +264,6 @@ func processHandler(handler *WebSocketHandler) { if err := handler.Init(); err != nil { return } - go handler.HandleWatchWebSocketControlMessage() handler.HandleWatchWebSocketJob() } @@ -294,10 +291,6 @@ func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey } } -func NewInstanceWatcher(selfServiceId, instanceRoot string) *ListWatcher { - return NewWatcher(INSTANCE, selfServiceId, instanceRoot) -} - func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher { return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc) } diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go index 66261163..f0837ee4 100644 --- a/server/service/util/dependency.go +++ b/server/service/util/dependency.go @@ -557,11 +557,10 @@ type Dependency struct { func (dep *Dependency) RemoveConsumerOfProviderRule() { dep.chanNum++ - go dep.removeConsumerOfProviderRule() + util.Go(dep.removeConsumerOfProviderRule) } -func (dep *Dependency) removeConsumerOfProviderRule() { - ctx := context.TODO() +func (dep *Dependency) removeConsumerOfProviderRule(ctx context.Context) { opts := make([]registry.PluginOp, 0, len(dep.removedDependencyRuleList)) for _, providerRule := range dep.removedDependencyRuleList { proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule) @@ -605,11 +604,10 @@ func (dep *Dependency) removeConsumerOfProviderRule() { func (dep *Dependency) AddConsumerOfProviderRule() { dep.chanNum++ - go dep.addConsumerOfProviderRule() + util.Go(dep.addConsumerOfProviderRule) } -func (dep *Dependency) addConsumerOfProviderRule() { - ctx := context.TODO() +func (dep *Dependency) addConsumerOfProviderRule(ctx context.Context) { opts := []registry.PluginOp{} for _, providerRule := range dep.NewDependencyRuleList { proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services