This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch feature-triple
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/feature-triple by this push:
     new 1d0c3b486 feat: modularize Graceful_Shutdown functionality (#2427)
1d0c3b486 is described below

commit 1d0c3b486ba7538a05108ef46872300124668715
Author: Scout Wang <[email protected]>
AuthorDate: Mon Sep 18 14:02:12 2023 +0800

    feat: modularize Graceful_Shutdown functionality (#2427)
---
 graceful_shutdown/common.go                        |  34 ++++
 graceful_shutdown/compat.go                        |  41 ++++
 .../graceful_shutdown_signal_darwin.go             |  38 ++++
 .../graceful_shutdown_signal_linux.go              |  38 ++++
 .../graceful_shutdown_signal_windows.go            |  35 ++++
 graceful_shutdown/options.go                       |  42 +++++
 graceful_shutdown/shutdown.go                      | 209 +++++++++++++++++++++
 7 files changed, 437 insertions(+)

diff --git a/graceful_shutdown/common.go b/graceful_shutdown/common.go
new file mode 100644
index 000000000..b12f7427f
--- /dev/null
+++ b/graceful_shutdown/common.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import (
+       "github.com/dubbogo/gost/log/logger"
+       "time"
+)
+
+func parseDuration(timeout string, desc string, def time.Duration) 
time.Duration {
+       res, err := time.ParseDuration(timeout)
+       if err != nil {
+               logger.Errorf("The %s configuration is invalid: %s, and we will 
use the default value: %s, err: %v",
+                       desc, timeout, def.String(), err)
+               res = def
+       }
+
+       return res
+}
diff --git a/graceful_shutdown/compat.go b/graceful_shutdown/compat.go
new file mode 100644
index 000000000..b462a57c3
--- /dev/null
+++ b/graceful_shutdown/compat.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/config"
+       "dubbo.apache.org/dubbo-go/v3/global"
+       "go.uber.org/atomic"
+)
+
+func compatShutdownConfig(c *global.ShutdownConfig) *config.ShutdownConfig {
+       if c == nil {
+               return nil
+       }
+       cfg := &config.ShutdownConfig{
+               Timeout:                     c.Timeout,
+               StepTimeout:                 c.StepTimeout,
+               ConsumerUpdateWaitTime:      c.ConsumerUpdateWaitTime,
+               RejectRequestHandler:        c.RejectRequestHandler,
+               InternalSignal:              c.InternalSignal,
+               OfflineRequestWindowTimeout: c.OfflineRequestWindowTimeout,
+               RejectRequest:               atomic.Bool{},
+       }
+       cfg.RejectRequest.Store(c.RejectRequest.Load())
+       return cfg
+}
diff --git a/graceful_shutdown/graceful_shutdown_signal_darwin.go 
b/graceful_shutdown/graceful_shutdown_signal_darwin.go
new file mode 100644
index 000000000..e9619f0b8
--- /dev/null
+++ b/graceful_shutdown/graceful_shutdown_signal_darwin.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import (
+       "os"
+       "syscall"
+)
+
+var (
+       // ShutdownSignals receives shutdown signals to process
+       ShutdownSignals = []os.Signal{
+               os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
+               syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, 
syscall.SIGILL, syscall.SIGTRAP,
+               syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM,
+       }
+
+       // DumpHeapShutdownSignals receives shutdown signals to process
+       DumpHeapShutdownSignals = []os.Signal{
+               syscall.SIGQUIT, syscall.SIGILL,
+               syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS,
+       }
+)
diff --git a/graceful_shutdown/graceful_shutdown_signal_linux.go 
b/graceful_shutdown/graceful_shutdown_signal_linux.go
new file mode 100644
index 000000000..e9619f0b8
--- /dev/null
+++ b/graceful_shutdown/graceful_shutdown_signal_linux.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import (
+       "os"
+       "syscall"
+)
+
+var (
+       // ShutdownSignals receives shutdown signals to process
+       ShutdownSignals = []os.Signal{
+               os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
+               syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, 
syscall.SIGILL, syscall.SIGTRAP,
+               syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM,
+       }
+
+       // DumpHeapShutdownSignals receives shutdown signals to process
+       DumpHeapShutdownSignals = []os.Signal{
+               syscall.SIGQUIT, syscall.SIGILL,
+               syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS,
+       }
+)
diff --git a/graceful_shutdown/graceful_shutdown_signal_windows.go 
b/graceful_shutdown/graceful_shutdown_signal_windows.go
new file mode 100644
index 000000000..ad21acdf4
--- /dev/null
+++ b/graceful_shutdown/graceful_shutdown_signal_windows.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import (
+       "os"
+       "syscall"
+)
+
+var (
+       // ShutdownSignals receives shutdown signals to process
+       ShutdownSignals = []os.Signal{
+               os.Interrupt, os.Kill, syscall.SIGKILL,
+               syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, 
syscall.SIGILL, syscall.SIGTRAP,
+               syscall.SIGABRT, syscall.SIGTERM,
+       }
+
+       // DumpHeapShutdownSignals receives shutdown signals to process
+       DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, 
syscall.SIGTRAP, syscall.SIGABRT}
+)
diff --git a/graceful_shutdown/options.go b/graceful_shutdown/options.go
new file mode 100644
index 000000000..d0a112293
--- /dev/null
+++ b/graceful_shutdown/options.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import "dubbo.apache.org/dubbo-go/v3/global"
+
+var (
+       defOpts = &Options{
+               shutdown: global.DefaultShutdownConfig(),
+       }
+)
+
+type Options struct {
+       shutdown *global.ShutdownConfig
+}
+
+func defaultOptions() *Options {
+       return defOpts
+}
+
+type Option func(*Options)
+
+func WithShutdown_Config(cfg *global.ShutdownConfig) Option {
+       return func(opts *Options) {
+               opts.shutdown = cfg
+       }
+}
diff --git a/graceful_shutdown/shutdown.go b/graceful_shutdown/shutdown.go
new file mode 100644
index 000000000..73e70c3b8
--- /dev/null
+++ b/graceful_shutdown/shutdown.go
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
+       "dubbo.apache.org/dubbo-go/v3/config"
+       "github.com/dubbogo/gost/log/logger"
+       "os"
+       "os/signal"
+       "runtime/debug"
+       "sync"
+       "time"
+)
+
+const (
+       // todo(DMwangnima): these descriptions and defaults could be wrapped 
by functions of Options
+       defaultTimeout                     = 60 * time.Second
+       defaultStepTimeout                 = 3 * time.Second
+       defaultConsumerUpdateWaitTime      = 3 * time.Second
+       defaultOfflineRequestWindowTimeout = 3 * time.Second
+
+       timeoutDesc                     = "Timeout"
+       stepTimeoutDesc                 = "StepTimeout"
+       consumerUpdateWaitTimeDesc      = "ConsumerUpdateWaitTime"
+       offlineRequestWindowTimeoutDesc = "OfflineRequestWindowTimeout"
+)
+
+var (
+       initOnce       sync.Once
+       compatShutdown *config.ShutdownConfig
+
+       proMu     sync.Mutex
+       protocols map[string]struct{}
+)
+
+func Init(opts ...Option) {
+       initOnce.Do(func() {
+               newOpts := defaultOptions()
+               for _, opt := range opts {
+                       opt(newOpts)
+               }
+               compatShutdown = compatShutdownConfig(newOpts.shutdown)
+               // retrieve ShutdownConfig for gracefulShutdownFilter
+               cGracefulShutdownFilter, existcGracefulShutdownFilter := 
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
+               if !existcGracefulShutdownFilter {
+                       return
+               }
+               sGracefulShutdownFilter, existsGracefulShutdownFilter := 
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
+               if !existsGracefulShutdownFilter {
+                       return
+               }
+               if filter, ok := cGracefulShutdownFilter.(config.Setter); ok {
+                       
filter.Set(constant.GracefulShutdownFilterShutdownConfig, compatShutdown)
+               }
+
+               if filter, ok := sGracefulShutdownFilter.(config.Setter); ok {
+                       
filter.Set(constant.GracefulShutdownFilterShutdownConfig, compatShutdown)
+               }
+
+               if compatShutdown.InternalSignal != nil && 
*compatShutdown.InternalSignal {
+                       signals := make(chan os.Signal, 1)
+                       signal.Notify(signals, ShutdownSignals...)
+
+                       go func() {
+                               select {
+                               case sig := <-signals:
+                                       logger.Infof("get signal %s, 
applicationConfig will shutdown.", sig)
+                                       // gracefulShutdownOnce.Do(func() {
+                                       time.AfterFunc(totalTimeout(), func() {
+                                               logger.Warn("Shutdown 
gracefully timeout, applicationConfig will shutdown immediately. ")
+                                               os.Exit(0)
+                                       })
+                                       beforeShutdown()
+                                       // those signals' original behavior is 
exit with dump ths stack, so we try to keep the behavior
+                                       for _, dumpSignal := range 
DumpHeapShutdownSignals {
+                                               if sig == dumpSignal {
+                                                       
debug.WriteHeapDump(os.Stdout.Fd())
+                                               }
+                                       }
+                                       os.Exit(0)
+                               }
+                       }()
+               }
+       })
+}
+
+// RegisterProtocol registers protocol which would be destroyed before 
shutdown.
+// Please make sure that Init function has been invoked before, otherwise this
+// function would not make any sense.
+func RegisterProtocol(name string) {
+       proMu.Lock()
+       protocols[name] = struct{}{}
+       proMu.Unlock()
+}
+
+func totalTimeout() time.Duration {
+       timeout := parseDuration(compatShutdown.Timeout, timeoutDesc, 
defaultTimeout)
+       if timeout < defaultTimeout {
+               timeout = defaultTimeout
+       }
+
+       return timeout
+}
+
+func beforeShutdown() {
+       destroyRegistries()
+       // waiting for a short time so that the clients have enough time to get 
the notification that server shutdowns
+       // The value of configuration depends on how long the clients will get 
notification.
+       waitAndAcceptNewRequests()
+
+       // reject sending/receiving the new request, but keeping waiting for 
accepting requests
+       waitForSendingAndReceivingRequests()
+
+       // destroy all protocols
+       destroyProtocols()
+
+       logger.Info("Graceful shutdown --- Execute the custom callbacks.")
+       customCallbacks := extension.GetAllCustomShutdownCallbacks()
+       for callback := customCallbacks.Front(); callback != nil; callback = 
callback.Next() {
+               callback.Value.(func())()
+       }
+}
+
+// destroyRegistries destroys RegistryProtocol directly.
+func destroyRegistries() {
+       logger.Info("Graceful shutdown --- Destroy all registriesConfig. ")
+       registryProtocol := extension.GetProtocol(constant.RegistryProtocol)
+       registryProtocol.Destroy()
+}
+
+func waitAndAcceptNewRequests() {
+       logger.Info("Graceful shutdown --- Keep waiting and accept new requests 
for a short time. ")
+
+       updateWaitTime := parseDuration(compatShutdown.ConsumerUpdateWaitTime, 
consumerUpdateWaitTimeDesc, defaultConsumerUpdateWaitTime)
+       time.Sleep(updateWaitTime)
+
+       stepTimeout := parseDuration(compatShutdown.StepTimeout, 
stepTimeoutDesc, defaultStepTimeout)
+
+       // ignore this step
+       if stepTimeout < 0 {
+               return
+       }
+       waitingProviderProcessedTimeout(stepTimeout)
+}
+
+func waitingProviderProcessedTimeout(timeout time.Duration) {
+       deadline := time.Now().Add(timeout)
+
+       offlineRequestWindowTimeout := 
parseDuration(compatShutdown.OfflineRequestWindowTimeout, 
offlineRequestWindowTimeoutDesc, defaultOfflineRequestWindowTimeout)
+
+       for time.Now().Before(deadline) &&
+               (compatShutdown.ProviderActiveCount.Load() > 0 || 
time.Now().Before(compatShutdown.ProviderLastReceivedRequestTime.Load().Add(offlineRequestWindowTimeout)))
 {
+               // sleep 10 ms and then we check it again
+               time.Sleep(10 * time.Millisecond)
+               logger.Infof("waiting for provider active invocation count = 
%d, provider last received request time: %v",
+                       compatShutdown.ProviderActiveCount.Load(), 
compatShutdown.ProviderLastReceivedRequestTime.Load())
+       }
+}
+
+// for provider. It will wait for processing receiving requests
+func waitForSendingAndReceivingRequests() {
+       logger.Info("Graceful shutdown --- Keep waiting until sending/accepting 
requests finish or timeout. ")
+       compatShutdown.RejectRequest.Store(true)
+       waitingConsumerProcessedTimeout()
+}
+
+func waitingConsumerProcessedTimeout() {
+       stepTimeout := parseDuration(compatShutdown.StepTimeout, 
stepTimeoutDesc, defaultStepTimeout)
+
+       if stepTimeout <= 0 {
+               return
+       }
+       deadline := time.Now().Add(stepTimeout)
+
+       for time.Now().Before(deadline) && 
compatShutdown.ConsumerActiveCount.Load() > 0 {
+               // sleep 10 ms and then we check it again
+               time.Sleep(10 * time.Millisecond)
+               logger.Infof("waiting for consumer active invocation count = 
%d", compatShutdown.ConsumerActiveCount.Load())
+       }
+}
+
+// destroyProtocols destroys protocols that have been registered.
+func destroyProtocols() {
+       logger.Info("Graceful shutdown --- Destroy protocols. ")
+
+       proMu.Lock()
+       // extension.GetProtocol might panic
+       defer proMu.Unlock()
+       for name := range protocols {
+               extension.GetProtocol(name).Destroy()
+       }
+}

Reply via email to