This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch refactor-with-go-components-experimental
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git

commit 934c27f05a7ace7246d819b336f4b28c1095d52c
Author: chickenlj <[email protected]>
AuthorDate: Mon Jul 3 11:09:22 2023 +0800

    refactor, introduce component concept to admin
---
 .run/Admin.run.xml                                 |  41 +--
 README.md                                          |   2 +-
 README_ZH.md                                       |   2 +-
 cmd/admin/README.md                                |   2 +-
 cmd/admin/cmd/root.go                              |  45 ++++
 cmd/admin/cmd/run.go                               |  93 +++++++
 cmd/admin/main.go                                  |  28 +--
 cmd/authority/app/authority.go                     |  84 +------
 conf/{dubboadmin.yml => admin.yml}                 |   0
 deploy/charts/dubbo-admin/templates/configmap.yaml |  85 +------
 .../charts/dubbo-admin/templates/deployment.yaml   |   3 +
 deploy/charts/dubbo-admin/values.yaml              |  39 +--
 pkg/admin/config/config.go                         |   2 +-
 pkg/authority/security/server.go                   |   1 +
 .../app/authority.go => pkg/authority/setup.go     |  57 +----
 pkg/core/alias.go                                  |  43 ++++
 pkg/core/cmd/helpers.go                            |  14 ++
 pkg/core/cmd/util.go                               |  17 ++
 pkg/{authority/security => core/rule}/server.go    |   3 +-
 pkg/core/rule/server_test.go                       | 209 ++++++++++++++++
 pkg/core/runtime/builder.go                        | 117 +++++++++
 pkg/core/runtime/component/component.go            | 163 ++++++++++++
 pkg/core/runtime/component/leader.go               |  53 ++++
 pkg/core/runtime/component/resilient.go            |  60 +++++
 pkg/core/runtime/reports/reports.go                | 277 +++++++++++++++++++++
 pkg/core/runtime/runtime.go                        |  87 +++++++
 pkg/version/version.go                             |  24 ++
 27 files changed, 1275 insertions(+), 276 deletions(-)

diff --git a/.run/Admin.run.xml b/.run/Admin.run.xml
index 06768acd..484d9ec0 100644
--- a/.run/Admin.run.xml
+++ b/.run/Admin.run.xml
@@ -1,31 +1,14 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
--->
-
 <component name="ProjectRunConfigurationManager">
-    <configuration default="false" name="Admin" 
type="GoApplicationRunConfiguration" factoryName="Go Application">
-        <module name="dubbo-admin" />
-        <working_directory value="$PROJECT_DIR$" />
-        <envs>
-            <env name="ADMIN_CONFIG_PATH" value="conf/dubboadmin.yml" />
-        </envs>
-        <kind value="PACKAGE" />
-        <package value="github.com/apache/dubbo-admin/cmd/admin" />
-        <directory value="$PROJECT_DIR$" />
-        <filePath value="$PROJECT_DIR$/cmd/admin/main.go" />
-        <method v="2" />
-    </configuration>
+  <configuration default="false" name="Admin" 
type="GoApplicationRunConfiguration" factoryName="Go Application">
+    <module name="dubbo-admin" />
+    <working_directory value="$PROJECT_DIR$" />
+    <envs>
+      <env name="ADMIN_CONFIG_PATH" value="conf/admin.yml" />
+    </envs>
+    <kind value="PACKAGE" />
+    <package value="github.com/apache/dubbo-admin/cmd/admin" />
+    <directory value="$PROJECT_DIR$" />
+    <filePath value="$PROJECT_DIR$/cmd/admin/main.go" />
+    <method v="2" />
+  </configuration>
 </component>
\ No newline at end of file
diff --git a/README.md b/README.md
index c9c27373..02c97ea3 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,7 @@ admin:
 ### Run with command line
 
 ```shell
-$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml
+$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml
 $ cd cmd/admin
 $ go run . 
 ```
diff --git a/README_ZH.md b/README_ZH.md
index 617ea956..ac07ecf9 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -27,7 +27,7 @@ admin:
 
 ### Run with command line
 ```shell
-$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml
+$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml
 $ cd cmd/admin
 $ go run . 
 ```
diff --git a/cmd/admin/README.md b/cmd/admin/README.md
index 851fb655..e4199f9a 100644
--- a/cmd/admin/README.md
+++ b/cmd/admin/README.md
@@ -24,7 +24,7 @@ admin:
 
 ### Run with command line 
 ```shell
-$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml
+$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml
 $ cd cmd/admin
 $ go run . 
 ```
diff --git a/cmd/admin/cmd/root.go b/cmd/admin/cmd/root.go
new file mode 100644
index 00000000..58a073d5
--- /dev/null
+++ b/cmd/admin/cmd/root.go
@@ -0,0 +1,45 @@
+package cmd
+
+import (
+       "github.com/apache/dubbo-admin/pkg/logger"
+       "github.com/apache/dubbo-admin/pkg/version"
+
+       corecmd "github.com/apache/dubbo-admin/pkg/core/cmd"
+
+       "github.com/spf13/cobra"
+       "os"
+)
+
+func GetRootCmd(args []string) *cobra.Command {
+       // rootCmd represents the base command when called without any 
subcommands
+       cmd := &cobra.Command{
+               Use:   "dubbo-admin",
+               Short: "Console and control plane for microservices built with 
Apache Dubbo.",
+               Long:  `Console and control plane for microservices built with 
Apache Dubbo.`,
+
+               PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
+                       logger.Init()
+
+                       // once command line flags have been parsed,
+                       // avoid printing usage instructions
+                       cmd.SilenceUsage = true
+
+                       return nil
+               },
+       }
+
+       cmd.SetOut(os.Stdout)
+
+       // root flags
+       //cmd.PersistentFlags().StringVar(&args.logLevel, "log-level", 
kuma_log.InfoLevel.String(), kuma_cmd.UsageOptions("log level", 
kuma_log.OffLevel, kuma_log.InfoLevel, kuma_log.DebugLevel))
+       //cmd.PersistentFlags().StringVar(&args.outputPath, "log-output-path", 
args.outputPath, "path to the file that will be filled with logs. Example: if 
we set it to /tmp/admin.log then after the file is rotated we will have 
/tmp/admin-2021-06-07T09-15-18.265.log")
+       //cmd.PersistentFlags().IntVar(&args.maxBackups, 
"log-max-retained-files", 1000, "maximum number of the old log files to retain")
+       //cmd.PersistentFlags().IntVar(&args.maxSize, "log-max-size", 100, 
"maximum size in megabytes of a log file before it gets rotated")
+       //cmd.PersistentFlags().IntVar(&args.maxAge, "log-max-age", 30, 
"maximum number of days to retain old log files based on the timestamp encoded 
in their filename")
+
+       // sub-commands
+       cmd.AddCommand(newRunCmdWithOpts(corecmd.DefaultRunCmdOpts))
+       cmd.AddCommand(version.NewVersionCmd())
+
+       return cmd
+}
diff --git a/cmd/admin/cmd/run.go b/cmd/admin/cmd/run.go
new file mode 100755
index 00000000..b12a1b6c
--- /dev/null
+++ b/cmd/admin/cmd/run.go
@@ -0,0 +1,93 @@
+package cmd
+
+import (
+       "github.com/apache/dubbo-admin/pkg/admin/config"
+       "github.com/apache/dubbo-admin/pkg/admin/constant"
+       "github.com/apache/dubbo-admin/pkg/admin/providers/mock"
+       "github.com/apache/dubbo-admin/pkg/admin/router"
+       "github.com/apache/dubbo-admin/pkg/admin/services"
+       "github.com/apache/dubbo-admin/pkg/authority"
+       "github.com/apache/dubbo-admin/pkg/core/cmd"
+       "github.com/apache/dubbo-admin/pkg/logger"
+
+       caconfig "github.com/apache/dubbo-admin/pkg/authority/config"
+
+       "os"
+       "time"
+
+       "github.com/spf13/cobra"
+)
+
+const gracefullyShutdownDuration = 3 * time.Second
+
+// This is the open file limit below which the control plane may not
+// reasonably have enough descriptors to accept all its clients.
+const minOpenFileLimit = 4096
+
+func newRunCmdWithOpts(opts cmd.RunCmdOpts) *cobra.Command {
+       args := struct {
+               configPath string
+       }{}
+       cmd := &cobra.Command{
+               Use:   "run",
+               Short: "Launch Dubbo Admin",
+               Long:  `Launch Dubbo Admin.`,
+               RunE: func(cmd *cobra.Command, _ []string) error {
+                       // init config
+                       config.LoadConfig()
+
+                       // subscribe to registries
+                       go services.StartSubscribe(config.RegistryCenter)
+                       defer func() {
+                               services.DestroySubscribe(config.RegistryCenter)
+                       }()
+
+                       // start mock server
+                       os.Setenv(constant.ConfigFileEnvKey, 
config.MockProviderConf)
+                       go mock.RunMockServiceServer()
+
+                       // start console server
+                       router := router.InitRouter()
+                       if err := router.Run(":38080"); err != nil {
+                               logger.Error("Failed to start Admin console 
server.")
+                               return err
+                       }
+
+                       // start CA
+                       if err := startCA(cmd); err != nil {
+                               logger.Error("Failed to start CA server.")
+                               return err
+                       }
+
+                       // start
+
+                       return nil
+               },
+       }
+
+       // flags
+       cmd.PersistentFlags().StringVarP(&args.configPath, "config-file", "c", 
"", "configuration file")
+
+       return cmd
+}
+
+func startCA(cmd *cobra.Command) error {
+       options := caconfig.NewOptions()
+
+       if err := authority.Initialize(cmd); err != nil {
+               logger.Fatal("Failed to initialize CA server.")
+               return err
+       }
+
+       logger.Infof("Authority command Run with options: %+v", options)
+       if errs := options.Validate(); len(errs) != 0 {
+               logger.Fatal(errs)
+               return errs[0]
+       }
+
+       if err := authority.Run(options); err != nil {
+               logger.Fatal(err)
+               return err
+       }
+       return nil
+}
diff --git a/cmd/admin/main.go b/cmd/admin/main.go
index 5bc1ffd7..6dc3e449 100644
--- a/cmd/admin/main.go
+++ b/cmd/admin/main.go
@@ -18,30 +18,14 @@
 package main
 
 import (
+       "fmt"
+       "github.com/apache/dubbo-admin/cmd/admin/cmd"
        "os"
-
-       "dubbo.apache.org/dubbo-go/v3/common/constant"
-       "github.com/apache/dubbo-admin/pkg/admin/config"
-       mock "github.com/apache/dubbo-admin/pkg/admin/providers/mock"
-       "github.com/apache/dubbo-admin/pkg/admin/router"
-       "github.com/apache/dubbo-admin/pkg/admin/services"
 )
 
-// @title           Dubbo-Admin API
-// @version         1.0
-// @description     This is a dubbo-admin swagger ui server.
-// @license.name  Apache 2.0
-// @license.url   http://www.apache.org/licenses/LICENSE-2.0.html
-// @host      127.0.0.1:38080
-// @BasePath  /
 func main() {
-       config.LoadConfig()
-       go services.StartSubscribe(config.RegistryCenter)
-       defer func() {
-               services.DestroySubscribe(config.RegistryCenter)
-       }()
-       os.Setenv(constant.ConfigFileEnvKey, config.MockProviderConf)
-       go mock.RunMockServiceServer()
-       router := router.InitRouter()
-       _ = router.Run(":38080")
+       if err := cmd.GetRootCmd(os.Args[1:]).Execute(); err != nil {
+               fmt.Fprintf(os.Stderr, "%v\n", err)
+               os.Exit(1)
+       }
 }
diff --git a/cmd/authority/app/authority.go b/cmd/authority/app/authority.go
index c49fa6e1..922cba1b 100644
--- a/cmd/authority/app/authority.go
+++ b/cmd/authority/app/authority.go
@@ -17,26 +17,10 @@ package app
 
 import (
        "flag"
-       "fmt"
-       "os"
-       "os/signal"
-       "strings"
-       "syscall"
-
+       "github.com/apache/dubbo-admin/pkg/authority"
        "github.com/apache/dubbo-admin/pkg/authority/config"
-       "github.com/apache/dubbo-admin/pkg/authority/security"
        "github.com/apache/dubbo-admin/pkg/logger"
        "github.com/spf13/cobra"
-       "github.com/spf13/pflag"
-       "github.com/spf13/viper"
-)
-
-var (
-       // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT.
-       envNamePrefix = "DUBBO"
-
-       // Replace hyphenated flag names with camelCase
-       replaceWithCamelCase = false
 )
 
 func NewAppCommand() *cobra.Command {
@@ -45,19 +29,26 @@ func NewAppCommand() *cobra.Command {
        cmd := &cobra.Command{
                Use:  "authority",
                Long: `The authority app is responsible for controllers in 
dubbo authority`,
-               PersistentPreRun: func(cmd *cobra.Command, args []string) {
+               PersistentPreRunE: func(cmd *cobra.Command, args []string) 
error {
                        logger.Infof("Authority command PersistentPreRun")
-                       initialize(cmd)
+                       if err := authority.Initialize(cmd); err != nil {
+                               logger.Fatal("Failed to initialize CA server.")
+                               return err
+                       }
+                       return nil
                },
-               Run: func(cmd *cobra.Command, args []string) {
+               RunE: func(cmd *cobra.Command, args []string) error {
                        logger.Infof("Authority command Run with options: %+v", 
options)
                        if errs := options.Validate(); len(errs) != 0 {
                                logger.Fatal(errs)
+                               return errs[0]
                        }
 
-                       if err := Run(options); err != nil {
+                       if err := authority.Run(options); err != nil {
                                logger.Fatal(err)
+                               return err
                        }
+                       return nil
                },
        }
 
@@ -65,54 +56,3 @@ func NewAppCommand() *cobra.Command {
        options.FillFlags(cmd.Flags())
        return cmd
 }
-
-func Run(options *config.Options) error {
-       s := security.NewServer(options)
-
-       s.Init()
-       s.Start()
-
-       c := make(chan os.Signal, 1)
-       signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
-       signal.Notify(s.StopChan, syscall.SIGINT, syscall.SIGTERM)
-       signal.Notify(s.CertStorage.GetStopChan(), syscall.SIGINT, 
syscall.SIGTERM)
-
-       <-c
-
-       return nil
-}
-
-func initialize(cmd *cobra.Command) error {
-       v := viper.New()
-
-       // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT.
-       v.SetEnvPrefix(envNamePrefix)
-
-       // keys with underscores, e.g. DUBBO-WEBHOOK-PORT to DUBBO_WEBHOOK_PORT
-       v.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
-
-       // Bind to environment variables
-       v.AutomaticEnv()
-
-       // Bind the current command's flags to viper
-       bindFlags(cmd, v)
-
-       return nil
-}
-
-func bindFlags(cmd *cobra.Command, v *viper.Viper) {
-       cmd.Flags().VisitAll(func(f *pflag.Flag) {
-               configName := f.Name
-
-               //  Replace hyphens with a camelCased string.
-               if replaceWithCamelCase {
-                       configName = strings.ReplaceAll(f.Name, "-", "")
-               }
-
-               // Apply the viper config value to the flag when the flag is 
not set and viper has a value
-               if !f.Changed && v.IsSet(configName) {
-                       val := v.Get(configName)
-                       cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val))
-               }
-       })
-}
diff --git a/conf/dubboadmin.yml b/conf/admin.yml
similarity index 100%
rename from conf/dubboadmin.yml
rename to conf/admin.yml
diff --git a/deploy/charts/dubbo-admin/templates/configmap.yaml 
b/deploy/charts/dubbo-admin/templates/configmap.yaml
index 48616278..cb36a05b 100644
--- a/deploy/charts/dubbo-admin/templates/configmap.yaml
+++ b/deploy/charts/dubbo-admin/templates/configmap.yaml
@@ -13,16 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-{{- $defaultsessionTimeoutMilli := 3600000 -}}
-{{- $defaulttokenTimeoutMilli := 3600000 -}}
-{{- $defaultsignSecret := "86295dd0c4ef69a1036b0b0c15158d77" -}}
-{{- $defaulttoken := "e16e5cd903fd0c97a116c873b448544b9d086de9" -}}
-{{- $defaultname := "dubbo-admin" -}}
-{{- $defaultdriverclassname := "com.mysql.jdbc.Driver" -}}
-{{- $apollotoken := (coalesce .Values.apollo.token $defaulttoken) -}}
-{{- $dubboname := (coalesce .Values.dubbo.application.name $defaultname) -}}
-{{- $springdriverclassname := (coalesce 
.Values.spring.datasource.driverclassname $defaultdriverclassname) -}}
----
 apiVersion: v1
 kind: ConfigMap
 metadata:
@@ -34,71 +24,10 @@ metadata:
   {{- toYaml . | nindent 4 }}
   {{- end }}
 data:
-  application.properties: |-
-    {{- with .Values.admin }}
-    {{- if .zookeeper.enabled }}
-    admin.registry.address: {{ .zookeeper.address }}
-    admin.metadata-report.address: {{ .report.zookeeper.address }}
-    admin.config-center: {{ .zookeeper.center }}
-    {{- end }}
-    {{- if .nacos.enabled }}
-    admin.registry.address: {{ .nacos.address }}
-    admin.registry.group: {{ .nacos.group }}
-    admin.registry.namespace: {{ .nacos.namespace }}
-    admin.config-center: {{ .nacos.center }}
-    admin.config-center.group: {{ .nacos.group }}
-    admin.config-center.namespace: {{ .nacos.namespace }}
-    admin.metadata-report.address: {{ .report.nacos.address }}
-    admin.metadata-report.group: {{ .report.nacos.group }}
-    admin.metadata-report.namespace: {{ .report.nacos.namespace }}
-    {{- end }}
-    {{- end }}
-    {{- with .Values.root.user }}
-    admin.root.user.name: {{ .name }}
-    admin.root.user.password: {{ .password }}
-    {{- end }}
-    {{- if .Values.check.enabled }}
-    {{- if $defaultsessionTimeoutMilli }}
-    admin.check.sessionTimeoutMilli: {{ $defaultsessionTimeoutMilli }}
-    {{- end }}
-    {{- if $defaulttokenTimeoutMilli }}
-    admin.check.tokenTimeoutMilli: {{ $defaulttokenTimeoutMilli }}
-    {{- end }}
-    {{- if $defaultsignSecret }}
-    admin.check.signSecret: {{ $defaultsignSecret }}
-    {{- end }}
-    {{- end }}
-    {{- with .Values.apollo }}
-    {{- if .enabled }}
-    {{- if $apollotoken }}
-    admin.apollo.token: {{ $apollotoken }}
-    {{- end }}
-    admin.apollo.appId: {{ .appid }}
-    admin.apollo.env: {{ .env }}
-    admin.apollo.cluster: {{ .cluster }}
-    admin.config-center: {{ .center }}
-    {{- end }}
-    {{- end }}
-    {{- with .Values.server.compression }}
-    server.compression.enabled: {{ .enabled }}
-    server.compression.mime-types: {{ .types }}
-    server.compression.min-response-size: {{ .size }}
-    {{- end }}
-    {{- with .Values.dubbo }}
-    {{- if $dubboname }}
-    dubbo.application.name: {{ $dubboname }}
-    {{- end }}
-    dubbo.application.logger: {{ .application.logger }}
-    dubbo.registry.address: {{ .registry.address }}
-    {{- end }}
-    {{- with .Values.spring }}
-    {{- if .datasource.enabled }}
-    spring.datasource.driver-class-name: {{ $springdriverclassname }}
-    spring.datasource.url: {{ .datasource.url }}
-    spring.datasource.username: {{ .datasource.username }}
-    spring.datasource.password: {{ .datasource.password }}
-    {{- end }}
-    {{- end }}
-    {{- with .Values.mybatis }}
-    mybatis-plus.global-config.db-config.id-type: {{ .type }}
-    {{- end }}
\ No newline at end of file
+  # use this file to override default configuration of `dubbo-admin`
+  #
+  # see conf/admin.yml for available settings
+  admin.yml: |-
+    {{ if .Values.admin }}
+    {{ toYaml .Values.admin | trim | nindent 4 }}
+    {{ end }}
\ No newline at end of file
diff --git a/deploy/charts/dubbo-admin/templates/deployment.yaml 
b/deploy/charts/dubbo-admin/templates/deployment.yaml
index 2379156e..2e05c8d9 100644
--- a/deploy/charts/dubbo-admin/templates/deployment.yaml
+++ b/deploy/charts/dubbo-admin/templates/deployment.yaml
@@ -113,6 +113,9 @@ spec:
             {{- end }}
           resources:
             {{- toYaml .Values.resources | nindent 12 }}
+          env:
+            - name: ADMIN_CONFIG_PATH
+              value: /config/admin.yml
       volumes:
         - name: application-properties
           configMap:
diff --git a/deploy/charts/dubbo-admin/values.yaml 
b/deploy/charts/dubbo-admin/values.yaml
index 4fa0565c..8ef5f252 100644
--- a/deploy/charts/dubbo-admin/values.yaml
+++ b/deploy/charts/dubbo-admin/values.yaml
@@ -61,26 +61,24 @@ serviceAccount:
 imagePullSecrets: []
 # - name: secretName
 
+## admin configuration
+admin:
+  ## dubbo configuration when admin works as a provider
+  dubbo:
+    ## dubbo enabled
+    enabled: true
 
-##  @param Admin Default Enable Configuration
-dubbo:
-  ## dubbo enabled
-  enabled: true
-
-  ## dubbo application
-  application:
-    ## dubbo application name
-    name: ~
-
-    ## dubbo application logger
-    logger: slf4j
+    ## dubbo application
+    application:
+      ## dubbo application name
+      name: ~
 
-  registry:
-    ## dubbo registry address
-    address: ${admin.registry.address}
+      ## dubbo application logger
+      logger: slf4j
 
-## admin configuration
-admin:
+    registry:
+      ## dubbo registry address
+      address: ${admin.registry.address}
   ## zookeeper configuration
   zookeeper:
     ## zookeeper enabled
@@ -123,8 +121,11 @@ admin:
 
       ## nacos namespace
       namespace: public
-
-
+  prometheus:
+    address: prometheus.dubbo-system.svc.cluster.local:3000
+  grafana:
+    address: grafana.dubbo-system.svc.cluster.local
+  mysql-dsn: 
"root:password@tcp(127.0.0.1:3306)/dubbo-admin?charset=utf8&parseTime=true"
 
 ## apollo configuration
 apollo:
diff --git a/pkg/admin/config/config.go b/pkg/admin/config/config.go
index 847b4a12..460e59d1 100644
--- a/pkg/admin/config/config.go
+++ b/pkg/admin/config/config.go
@@ -41,7 +41,7 @@ import (
 )
 
 const (
-       conf        = "./conf/dubboadmin.yml"
+       conf        = "./conf/admin.yml"
        confPathKey = "ADMIN_CONFIG_PATH"
 )
 
diff --git a/pkg/authority/security/server.go b/pkg/authority/security/server.go
index 060f9849..34f6d10c 100644
--- a/pkg/authority/security/server.go
+++ b/pkg/authority/security/server.go
@@ -119,6 +119,7 @@ func (s *Server) Init() {
 
        s.registerCertificateService()
        s.registerObserveService()
+       s.registerTrafficService()
 
        reflection.Register(s.SecureServer)
 
diff --git a/cmd/authority/app/authority.go b/pkg/authority/setup.go
similarity index 50%
copy from cmd/authority/app/authority.go
copy to pkg/authority/setup.go
index c49fa6e1..50377b55 100644
--- a/cmd/authority/app/authority.go
+++ b/pkg/authority/setup.go
@@ -1,34 +1,16 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package app
+package authority
 
 import (
-       "flag"
        "fmt"
-       "os"
-       "os/signal"
-       "strings"
-       "syscall"
-
        "github.com/apache/dubbo-admin/pkg/authority/config"
        "github.com/apache/dubbo-admin/pkg/authority/security"
-       "github.com/apache/dubbo-admin/pkg/logger"
        "github.com/spf13/cobra"
        "github.com/spf13/pflag"
        "github.com/spf13/viper"
+       "os"
+       "os/signal"
+       "strings"
+       "syscall"
 )
 
 var (
@@ -39,33 +21,6 @@ var (
        replaceWithCamelCase = false
 )
 
-func NewAppCommand() *cobra.Command {
-       options := config.NewOptions()
-
-       cmd := &cobra.Command{
-               Use:  "authority",
-               Long: `The authority app is responsible for controllers in 
dubbo authority`,
-               PersistentPreRun: func(cmd *cobra.Command, args []string) {
-                       logger.Infof("Authority command PersistentPreRun")
-                       initialize(cmd)
-               },
-               Run: func(cmd *cobra.Command, args []string) {
-                       logger.Infof("Authority command Run with options: %+v", 
options)
-                       if errs := options.Validate(); len(errs) != 0 {
-                               logger.Fatal(errs)
-                       }
-
-                       if err := Run(options); err != nil {
-                               logger.Fatal(err)
-                       }
-               },
-       }
-
-       cmd.Flags().AddGoFlagSet(flag.CommandLine)
-       options.FillFlags(cmd.Flags())
-       return cmd
-}
-
 func Run(options *config.Options) error {
        s := security.NewServer(options)
 
@@ -82,7 +37,7 @@ func Run(options *config.Options) error {
        return nil
 }
 
-func initialize(cmd *cobra.Command) error {
+func Initialize(cmd *cobra.Command) error {
        v := viper.New()
 
        // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT.
diff --git a/pkg/core/alias.go b/pkg/core/alias.go
new file mode 100755
index 00000000..426b67f9
--- /dev/null
+++ b/pkg/core/alias.go
@@ -0,0 +1,43 @@
+package core
+
+import (
+       "context"
+       "os"
+       "os/signal"
+       "syscall"
+       "time"
+
+       "github.com/google/uuid"
+       kube_log "sigs.k8s.io/controller-runtime/pkg/log"
+)
+
+var (
+       // TODO remove dependency on kubernetes see: 
https://github.com/kumahq/kuma/issues/2798
+       Log       = kube_log.Log
+       SetLogger = kube_log.SetLogger
+       Now       = time.Now
+       TempDir   = os.TempDir
+
+       SetupSignalHandler = func() (context.Context, context.Context) {
+               gracefulCtx, gracefulCancel := 
context.WithCancel(context.Background())
+               ctx, cancel := context.WithCancel(context.Background())
+               c := make(chan os.Signal, 3)
+               signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
+               go func() {
+                       s := <-c
+                       Log.Info("Received signal, stopping instance 
gracefully", "signal", s.String())
+                       gracefulCancel()
+                       s = <-c
+                       Log.Info("Received second signal, stopping instance", 
"signal", s.String())
+                       cancel()
+                       s = <-c
+                       Log.Info("Received third signal, force exit", "signal", 
s.String())
+                       os.Exit(1)
+               }()
+               return gracefulCtx, ctx
+       }
+)
+
+func NewUUID() string {
+       return uuid.NewString()
+}
diff --git a/pkg/core/cmd/helpers.go b/pkg/core/cmd/helpers.go
new file mode 100755
index 00000000..d6ff1170
--- /dev/null
+++ b/pkg/core/cmd/helpers.go
@@ -0,0 +1,14 @@
+package cmd
+
+import (
+       "fmt"
+       "strings"
+)
+
+func UsageOptions(desc string, options ...interface{}) string {
+       values := make([]string, 0, len(options))
+       for _, option := range options {
+               values = append(values, fmt.Sprintf("%v", option))
+       }
+       return fmt.Sprintf("%s: one of %s", desc, strings.Join(values, "|"))
+}
diff --git a/pkg/core/cmd/util.go b/pkg/core/cmd/util.go
new file mode 100755
index 00000000..7b79464d
--- /dev/null
+++ b/pkg/core/cmd/util.go
@@ -0,0 +1,17 @@
+package cmd
+
+import (
+       "context"
+       "github.com/apache/dubbo-admin/pkg/core"
+)
+
+type RunCmdOpts struct {
+       // The first returned context is closed upon receiving first signal 
(SIGSTOP, SIGTERM).
+       // The second returned context is closed upon receiving second signal.
+       // We can start graceful shutdown when first context is closed and 
forcefully stop when the second one is closed.
+       SetupSignalHandler func() (context.Context, context.Context)
+}
+
+var DefaultRunCmdOpts = RunCmdOpts{
+       SetupSignalHandler: core.SetupSignalHandler,
+}
diff --git a/pkg/authority/security/server.go b/pkg/core/rule/server.go
similarity index 99%
copy from pkg/authority/security/server.go
copy to pkg/core/rule/server.go
index 060f9849..3c3d8210 100644
--- a/pkg/authority/security/server.go
+++ b/pkg/core/rule/server.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package security
+package rule
 
 import (
        "crypto/tls"
@@ -119,6 +119,7 @@ func (s *Server) Init() {
 
        s.registerCertificateService()
        s.registerObserveService()
+       s.registerTrafficService()
 
        reflection.Register(s.SecureServer)
 
diff --git a/pkg/core/rule/server_test.go b/pkg/core/rule/server_test.go
new file mode 100644
index 00000000..b52bec83
--- /dev/null
+++ b/pkg/core/rule/server_test.go
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rule
+
+import (
+       "crypto/tls"
+       "os"
+       "testing"
+       "time"
+
+       "github.com/apache/dubbo-admin/pkg/authority/election"
+
+       "k8s.io/client-go/kubernetes"
+
+       cert2 "github.com/apache/dubbo-admin/pkg/authority/cert"
+       "github.com/apache/dubbo-admin/pkg/authority/config"
+       "github.com/apache/dubbo-admin/pkg/authority/k8s"
+       "github.com/apache/dubbo-admin/pkg/logger"
+)
+
+type mockKubeClient struct {
+       k8s.Client
+}
+
+var (
+       certPEM = ""
+       priPEM  = ""
+)
+
+func (s *mockKubeClient) Init(options *config.Options) bool {
+       return true
+}
+
+func (s *mockKubeClient) GetAuthorityCert(namespace string) (string, string) {
+       return certPEM, priPEM
+}
+
+func (s *mockKubeClient) UpdateAuthorityCert(cert string, pri string, 
namespace string) {
+}
+
+func (s *mockKubeClient) UpdateAuthorityPublicKey(cert string) bool {
+       return true
+}
+
+func (s *mockKubeClient) UpdateWebhookConfig(options *config.Options, storage 
cert2.Storage) {
+}
+
+func (s *mockKubeClient) GetKubClient() *kubernetes.Clientset {
+       return nil
+}
+
+type mockStorage struct {
+       cert2.Storage
+       origin cert2.Storage
+}
+
+func (s *mockStorage) GetServerCert(serverName string) *tls.Certificate {
+       return nil
+}
+
+func (s *mockStorage) RefreshServerCert() {
+}
+
+func (s *mockStorage) SetAuthorityCert(cert *cert2.Cert) {
+       s.origin.SetAuthorityCert(cert)
+}
+
+func (s *mockStorage) GetAuthorityCert() *cert2.Cert {
+       return s.origin.GetAuthorityCert()
+}
+
+func (s *mockStorage) SetRootCert(cert *cert2.Cert) {
+       s.origin.SetRootCert(cert)
+}
+
+func (s *mockStorage) GetRootCert() *cert2.Cert {
+       return s.origin.GetRootCert()
+}
+
+func (s *mockStorage) AddTrustedCert(cert *cert2.Cert) {
+       s.origin.AddTrustedCert(cert)
+}
+
+func (s *mockStorage) GetTrustedCerts() []*cert2.Cert {
+       return s.origin.GetTrustedCerts()
+}
+
+func (s *mockStorage) GetStopChan() chan os.Signal {
+       return s.origin.GetStopChan()
+}
+
+type mockLeaderElection struct {
+       election.LeaderElection
+}
+
+func (s *mockLeaderElection) Election(storage cert2.Storage, options 
*config.Options, kubeClient *kubernetes.Clientset) error {
+       
storage.SetAuthorityCert(cert2.GenerateAuthorityCert(storage.GetRootCert(), 
options.CaValidity))
+       return nil
+}
+
+func TestInit(t *testing.T) {
+       t.Parallel()
+
+       logger.Init()
+
+       options := &config.Options{
+               IsKubernetesConnected: true,
+               Namespace:             "dubbo-system",
+               PlainServerPort:       30060,
+               SecureServerPort:      30062,
+               DebugPort:             30070,
+               CaValidity:            30 * 24 * 60 * 60 * 1000, // 30 day
+               CertValidity:          1 * 60 * 60 * 1000,       // 1 hour
+       }
+
+       s := NewServer(options)
+       s.KubeClient = &mockKubeClient{}
+
+       s.Init()
+       if !s.CertStorage.GetAuthorityCert().IsValid() {
+               t.Fatal("Authority cert is not valid")
+               return
+       }
+
+       certPEM = s.CertStorage.GetAuthorityCert().CertPem
+       priPEM = 
cert2.EncodePrivateKey(s.CertStorage.GetAuthorityCert().PrivateKey)
+
+       s.PlainServer.Stop()
+       s.SecureServer.Stop()
+       s.StopChan <- os.Kill
+
+       s = NewServer(options)
+       s.KubeClient = &mockKubeClient{}
+       s.Init()
+
+       if !s.CertStorage.GetAuthorityCert().IsValid() {
+               t.Fatal("Authority cert is not valid")
+
+               return
+       }
+
+       if s.CertStorage.GetAuthorityCert().CertPem != certPEM {
+               t.Fatal("Authority cert is not equal")
+
+               return
+       }
+
+       s.PlainServer.Stop()
+       s.SecureServer.Stop()
+       s.StopChan <- os.Kill
+       s.CertStorage.GetStopChan() <- os.Kill
+}
+
+func TestRefresh(t *testing.T) {
+       t.Parallel()
+
+       logger.Init()
+
+       options := &config.Options{
+               IsKubernetesConnected: false,
+               Namespace:             "dubbo-system",
+               PlainServerPort:       30060,
+               SecureServerPort:      30062,
+               DebugPort:             30070,
+               CaValidity:            10,
+       }
+
+       s := NewServer(options)
+
+       s.KubeClient = &mockKubeClient{}
+       storage := &mockStorage{}
+       s.Elec = &mockLeaderElection{}
+       storage.origin = cert2.NewStorage(options)
+       s.CertStorage = storage
+
+       s.Init()
+
+       origin := s.CertStorage.GetAuthorityCert()
+
+       for i := 0; i < 1000; i++ {
+               // wait at most 100s
+               time.Sleep(100 * time.Millisecond)
+               if s.CertStorage.GetAuthorityCert() != origin {
+                       break
+               }
+       }
+
+       if s.CertStorage.GetAuthorityCert() == origin {
+               t.Fatal("Authority cert is not refreshed")
+               return
+       }
+
+       s.PlainServer.Stop()
+       s.SecureServer.Stop()
+       s.StopChan <- os.Kill
+}
diff --git a/pkg/core/runtime/builder.go b/pkg/core/runtime/builder.go
new file mode 100755
index 00000000..43477c87
--- /dev/null
+++ b/pkg/core/runtime/builder.go
@@ -0,0 +1,117 @@
+package runtime
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/dubbo-admin/pkg/core"
+       "github.com/apache/dubbo-admin/pkg/core/runtime/component"
+       "os"
+       "time"
+
+       "github.com/pkg/errors"
+)
+
+// BuilderContext provides access to Builder's interim state.
+type BuilderContext interface {
+       ComponentManager() component.Manager
+       ResourceStore() core_store.ResourceStore
+       SecretStore() store.SecretStore
+       ConfigStore() core_store.ResourceStore
+       ResourceManager() core_manager.CustomizableResourceManager
+       Config() kuma_cp.Config
+       DataSourceLoader() datasource.Loader
+       Extensions() context.Context
+       ConfigManager() config_manager.ConfigManager
+       LeaderInfo() component.LeaderInfo
+       Metrics() metrics.Metrics
+       EventReaderFactory() events.ListenerFactory
+       APIManager() api_server.APIManager
+       XDSHooks() *xds_hooks.Hooks
+       CAProvider() secrets.CaProvider
+       DpServer() *dp_server.DpServer
+       ResourceValidators() ResourceValidators
+       KDSContext() *kds_context.Context
+       APIServerAuthenticator() authn.Authenticator
+       Access() Access
+       TokenIssuers() builtin.TokenIssuers
+       MeshCache() *mesh.Cache
+       InterCPClientPool() *client.Pool
+}
+
+var _ BuilderContext = &Builder{}
+
+// Builder represents a multi-step initialization process.
+type Builder struct {
+       cfg kuma_cp.Config
+       cm  component.Manager
+       rs  core_store.ResourceStore
+       *runtimeInfo
+}
+
+func BuilderFor(appCtx context.Context, cfg kuma_cp.Config) (*Builder, error) {
+       hostname, err := os.Hostname()
+       if err != nil {
+               return nil, errors.Wrap(err, "could not get hostname")
+       }
+       suffix := core.NewUUID()[0:4]
+       return &Builder{
+               cfg: cfg,
+               ext: context.Background(),
+               cam: core_ca.Managers{},
+               runtimeInfo: &runtimeInfo{
+                       instanceId: fmt.Sprintf("%s-%s", hostname, suffix),
+                       startTime:  time.Now(),
+               },
+               appCtx: appCtx,
+       }, nil
+}
+
+func (b *Builder) WithComponentManager(cm component.Manager) *Builder {
+       b.cm = cm
+       return b
+}
+
+func (b *Builder) Build() (Runtime, error) {
+       if b.cm == nil {
+               return nil, errors.Errorf("ComponentManager has not been 
configured")
+       }
+       return &runtime{
+               RuntimeInfo: b.runtimeInfo,
+               RuntimeContext: &runtimeContext{
+                       cfg:            b.cfg,
+                       rm:             b.rm,
+                       rom:            b.rom,
+                       rs:             b.rs,
+                       ss:             b.ss,
+                       cam:            b.cam,
+                       dsl:            b.dsl,
+                       ext:            b.ext,
+                       configm:        b.configm,
+                       leadInfo:       b.leadInfo,
+                       lif:            b.lif,
+                       eac:            b.eac,
+                       metrics:        b.metrics,
+                       erf:            b.erf,
+                       apim:           b.apim,
+                       xdsauth:        b.xdsauth,
+                       xdsCallbacks:   b.xdsCallbacks,
+                       xdsh:           b.xdsh,
+                       cap:            b.cap,
+                       dps:            b.dps,
+                       kdsctx:         b.kdsctx,
+                       rv:             b.rv,
+                       au:             b.au,
+                       acc:            b.acc,
+                       appCtx:         b.appCtx,
+                       extraReportsFn: b.extraReportsFn,
+                       tokenIssuers:   b.tokenIssuers,
+                       meshCache:      b.meshCache,
+                       interCpPool:    b.interCpPool,
+               },
+               Manager: b.cm,
+       }, nil
+}
+
+func (b *Builder) ComponentManager() component.Manager {
+       return b.cm
+}
diff --git a/pkg/core/runtime/component/component.go 
b/pkg/core/runtime/component/component.go
new file mode 100755
index 00000000..d3f6a21b
--- /dev/null
+++ b/pkg/core/runtime/component/component.go
@@ -0,0 +1,163 @@
+package component
+
+import (
+       "sync"
+
+       "github.com/kumahq/kuma/pkg/core"
+       "github.com/kumahq/kuma/pkg/util/channels"
+)
+
+var log = core.Log.WithName("bootstrap")
+
+// Component defines a process that will be run in the application
+// Component should be designed in such a way that it can be stopped by stop 
channel and started again (for example when instance is reelected for a leader).
+type Component interface {
+       // Start blocks until the channel is closed or an error occurs.
+       // The component will stop running when the channel is closed.
+       Start(<-chan struct{}) error
+
+       // NeedLeaderElection indicates if component should be run only by one 
instance of Control Plane even with many Control Plane replicas.
+       NeedLeaderElection() bool
+}
+
+// GracefulComponent is a component that supports waiting until it's finished.
+// It's useful if there is cleanup logic that has to be executed before the 
process exits
+// (i.e. sending SIGTERM signals to subprocesses started by this component).
+type GracefulComponent interface {
+       Component
+
+       // WaitForDone blocks until all components are done.
+       // If a component was not started (i.e. leader components on non-leader 
CP) it returns immediately.
+       WaitForDone()
+}
+
+// Component of Kuma, i.e. gRPC Server, HTTP server, reconciliation loop.
+var _ Component = ComponentFunc(nil)
+
+type ComponentFunc func(<-chan struct{}) error
+
+func (f ComponentFunc) NeedLeaderElection() bool {
+       return false
+}
+
+func (f ComponentFunc) Start(stop <-chan struct{}) error {
+       return f(stop)
+}
+
+var _ Component = LeaderComponentFunc(nil)
+
+type LeaderComponentFunc func(<-chan struct{}) error
+
+func (f LeaderComponentFunc) NeedLeaderElection() bool {
+       return true
+}
+
+func (f LeaderComponentFunc) Start(stop <-chan struct{}) error {
+       return f(stop)
+}
+
+type Manager interface {
+
+       // Add registers a component, i.e. gRPC Server, HTTP server, 
reconciliation loop.
+       Add(...Component) error
+
+       // Start starts registered components and blocks until the Stop channel 
is closed.
+       // Returns an error if there is an error starting any component.
+       // If there are any GracefulComponent, it waits until all components 
are done.
+       Start(<-chan struct{}) error
+}
+
+var _ Manager = &manager{}
+
+func NewManager(leaderElector LeaderElector) Manager {
+       return &manager{
+               leaderElector: leaderElector,
+       }
+}
+
+type manager struct {
+       components    []Component
+       leaderElector LeaderElector
+}
+
+func (cm *manager) Add(c ...Component) error {
+       cm.components = append(cm.components, c...)
+       return nil
+}
+
+func (cm *manager) waitForDone() {
+       for _, c := range cm.components {
+               if gc, ok := c.(GracefulComponent); ok {
+                       gc.WaitForDone()
+               }
+       }
+}
+
+func (cm *manager) Start(stop <-chan struct{}) error {
+       errCh := make(chan error)
+
+       cm.startNonLeaderComponents(stop, errCh)
+       cm.startLeaderComponents(stop, errCh)
+
+       defer cm.waitForDone()
+       select {
+       case <-stop:
+               return nil
+       case err := <-errCh:
+               return err
+       }
+}
+
+func (cm *manager) startNonLeaderComponents(stop <-chan struct{}, errCh chan 
error) {
+       for _, component := range cm.components {
+               if !component.NeedLeaderElection() {
+                       go func(c Component) {
+                               if err := c.Start(stop); err != nil {
+                                       errCh <- err
+                               }
+                       }(component)
+               }
+       }
+}
+
+func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan 
error) {
+       // leader stop channel needs to be stored in atomic because it will be 
written by leader elector goroutine
+       // and read by the last goroutine in this function.
+       // we need separate channel for leader components because they can be 
restarted
+       mutex := sync.Mutex{}
+       leaderStopCh := make(chan struct{})
+       closeLeaderCh := func() {
+               mutex.Lock()
+               defer mutex.Unlock()
+               if !channels.IsClosed(leaderStopCh) {
+                       close(leaderStopCh)
+               }
+       }
+
+       cm.leaderElector.AddCallbacks(LeaderCallbacks{
+               OnStartedLeading: func() {
+                       log.Info("leader acquired")
+                       mutex.Lock()
+                       defer mutex.Unlock()
+                       leaderStopCh = make(chan struct{})
+                       for _, component := range cm.components {
+                               if component.NeedLeaderElection() {
+                                       go func(c Component) {
+                                               if err := 
c.Start(leaderStopCh); err != nil {
+                                                       errCh <- err
+                                               }
+                                       }(component)
+                               }
+                       }
+               },
+               OnStoppedLeading: func() {
+                       log.Info("leader lost")
+                       closeLeaderCh()
+               },
+       })
+       go cm.leaderElector.Start(stop)
+       go func() {
+               <-stop
+               closeLeaderCh()
+       }()
+}
diff --git a/pkg/core/runtime/component/leader.go 
b/pkg/core/runtime/component/leader.go
new file mode 100755
index 00000000..3edbc4e9
--- /dev/null
+++ b/pkg/core/runtime/component/leader.go
@@ -0,0 +1,53 @@
+package component
+
+import "sync/atomic"
+
+// LeaderCallbacks defines callbacks for events from LeaderElector
+// It is guaranteed that each methods will be executed from the same 
goroutine, so only one method can be run at once.
+type LeaderCallbacks struct {
+       OnStartedLeading func()
+       OnStoppedLeading func()
+}
+
+type LeaderElector interface {
+       AddCallbacks(LeaderCallbacks)
+       // IsLeader should be used for diagnostic reasons (metrics/API info), 
because there may not be any leader elector for a short period of time.
+       // Use Callbacks to write logic to execute when Leader is elected.
+       IsLeader() bool
+
+       // Start blocks until the channel is closed or an error occurs.
+       Start(stop <-chan struct{})
+}
+
+type LeaderInfo interface {
+       IsLeader() bool
+}
+
+var _ LeaderInfo = &LeaderInfoComponent{}
+var _ Component = &LeaderInfoComponent{}
+
+type LeaderInfoComponent struct {
+       leader int32
+}
+
+func (l *LeaderInfoComponent) Start(stop <-chan struct{}) error {
+       l.setLeader(true)
+       <-stop
+       l.setLeader(false)
+       return nil
+}
+
+func (l *LeaderInfoComponent) NeedLeaderElection() bool {
+       return true
+}
+
+func (p *LeaderInfoComponent) setLeader(leader bool) {
+       var value int32 = 0
+       if leader {
+               value = 1
+       }
+       atomic.StoreInt32(&p.leader, value)
+}
+func (p *LeaderInfoComponent) IsLeader() bool {
+       return atomic.LoadInt32(&(p.leader)) == 1
+}
diff --git a/pkg/core/runtime/component/resilient.go 
b/pkg/core/runtime/component/resilient.go
new file mode 100755
index 00000000..8167f519
--- /dev/null
+++ b/pkg/core/runtime/component/resilient.go
@@ -0,0 +1,60 @@
+package component
+
+import (
+       "time"
+
+       "github.com/go-logr/logr"
+       "github.com/pkg/errors"
+)
+
+const (
+       backoffTime = 5 * time.Second
+)
+
+type resilientComponent struct {
+       log       logr.Logger
+       component Component
+}
+
+func NewResilientComponent(log logr.Logger, component Component) Component {
+       return &resilientComponent{
+               log:       log,
+               component: component,
+       }
+}
+
+func (r *resilientComponent) Start(stop <-chan struct{}) error {
+       r.log.Info("starting resilient component ...")
+       for generationID := uint64(1); ; generationID++ {
+               errCh := make(chan error, 1)
+               go func(errCh chan<- error) {
+                       defer close(errCh)
+                       // recover from a panic
+                       defer func() {
+                               if e := recover(); e != nil {
+                                       if err, ok := e.(error); ok {
+                                               errCh <- err
+                                       } else {
+                                               errCh <- errors.Errorf("%v", e)
+                                       }
+                               }
+                       }()
+
+                       errCh <- r.component.Start(stop)
+               }(errCh)
+               select {
+               case <-stop:
+                       r.log.Info("done")
+                       return nil
+               case err := <-errCh:
+                       if err != nil {
+                               r.log.WithValues("generationID", 
generationID).Error(err, "component terminated with an error")
+                       }
+               }
+               <-time.After(backoffTime)
+       }
+}
+
+func (r *resilientComponent) NeedLeaderElection() bool {
+       return r.component.NeedLeaderElection()
+}
diff --git a/pkg/core/runtime/reports/reports.go 
b/pkg/core/runtime/reports/reports.go
new file mode 100755
index 00000000..29219a1e
--- /dev/null
+++ b/pkg/core/runtime/reports/reports.go
@@ -0,0 +1,277 @@
+package reports
+
+import (
+       "context"
+       "crypto/tls"
+       "fmt"
+       "net"
+       "os"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/pkg/errors"
+
+       mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
+       kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
+       config_core "github.com/kumahq/kuma/pkg/config/core"
+       "github.com/kumahq/kuma/pkg/core"
+       "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
+       "github.com/kumahq/kuma/pkg/core/resources/apis/system"
+       "github.com/kumahq/kuma/pkg/core/resources/registry"
+       core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
+       "github.com/kumahq/kuma/pkg/core/user"
+       kuma_version "github.com/kumahq/kuma/pkg/version"
+)
+
+const (
+       pingInterval = 3600
+       pingHost     = "kong-hf.konghq.com"
+       pingPort     = 61832
+)
+
+var (
+       log = core.Log.WithName("core").WithName("reports")
+)
+
+/*
+  - buffer initialized upon Init call
+  - append adds more keys onto it
+*/
+
+type reportsBuffer struct {
+       sync.Mutex
+       mutable   map[string]string
+       immutable map[string]string
+}
+
+func fetchDataplanes(ctx context.Context, rt core_runtime.Runtime) 
(*mesh.DataplaneResourceList, error) {
+       dataplanes := mesh.DataplaneResourceList{}
+       if err := rt.ReadOnlyResourceManager().List(ctx, &dataplanes); err != 
nil {
+               return nil, errors.Wrap(err, "could not fetch dataplanes")
+       }
+
+       return &dataplanes, nil
+}
+
+func fetchMeshes(ctx context.Context, rt core_runtime.Runtime) 
(*mesh.MeshResourceList, error) {
+       meshes := mesh.MeshResourceList{}
+       if err := rt.ReadOnlyResourceManager().List(ctx, &meshes); err != nil {
+               return nil, errors.Wrap(err, "could not fetch meshes")
+       }
+
+       return &meshes, nil
+}
+
+func fetchZones(ctx context.Context, rt core_runtime.Runtime) 
(*system.ZoneResourceList, error) {
+       zones := system.ZoneResourceList{}
+       if err := rt.ReadOnlyResourceManager().List(ctx, &zones); err != nil {
+               return nil, errors.Wrap(err, "could not fetch zones")
+       }
+       return &zones, nil
+}
+
+func fetchNumPolicies(ctx context.Context, rt core_runtime.Runtime) 
(map[string]string, error) {
+       policyCounts := map[string]string{}
+
+       for _, descr := range registry.Global().ObjectDescriptors() {
+               typedList := descr.NewList()
+               k := "n_" + strings.ToLower(string(descr.Name))
+               if err := rt.ReadOnlyResourceManager().List(ctx, typedList); 
err != nil {
+                       return nil, errors.Wrap(err, fmt.Sprintf("could not 
fetch %s", k))
+               }
+               policyCounts[k] = strconv.Itoa(len(typedList.GetItems()))
+       }
+       return policyCounts, nil
+}
+
+func fetchNumOfServices(ctx context.Context, rt core_runtime.Runtime) (int, 
int, error) {
+       insights := mesh.ServiceInsightResourceList{}
+       if err := rt.ReadOnlyResourceManager().List(ctx, &insights); err != nil 
{
+               return 0, 0, errors.Wrap(err, "could not fetch service 
insights")
+       }
+       internalServices := 0
+       for _, insight := range insights.Items {
+               internalServices += len(insight.Spec.Services)
+       }
+
+       externalServicesList := mesh.ExternalServiceResourceList{}
+       if err := rt.ReadOnlyResourceManager().List(ctx, 
&externalServicesList); err != nil {
+               return 0, 0, errors.Wrap(err, "could not fetch external 
services")
+       }
+       return internalServices, len(externalServicesList.Items), nil
+}
+
+func (b *reportsBuffer) marshall() (string, error) {
+       var builder strings.Builder
+
+       _, err := fmt.Fprintf(&builder, "<14>")
+       if err != nil {
+               return "", err
+       }
+
+       for k, v := range b.immutable {
+               _, err := fmt.Fprintf(&builder, "%s=%s;", k, v)
+               if err != nil {
+                       return "", err
+               }
+       }
+
+       for k, v := range b.mutable {
+               _, err := fmt.Fprintf(&builder, "%s=%s;", k, v)
+               if err != nil {
+                       return "", err
+               }
+       }
+
+       return builder.String(), nil
+}
+
+// XXX this function retrieves all dataplanes and all meshes;
+// ideally, the number of dataplanes and number of meshes
+// should be pushed from the outside rather than pulled
+func (b *reportsBuffer) updateEntitiesReport(rt core_runtime.Runtime) error {
+       ctx := user.Ctx(context.Background(), user.ControlPlane)
+       dps, err := fetchDataplanes(ctx, rt)
+       if err != nil {
+               return err
+       }
+       b.mutable["dps_total"] = strconv.Itoa(len(dps.Items))
+
+       ngateways := 0
+       gatewayTypes := map[string]int{}
+       for _, dp := range dps.Items {
+               spec := dp.GetSpec().(*mesh_proto.Dataplane)
+               gateway := spec.GetNetworking().GetGateway()
+               if gateway != nil {
+                       ngateways++
+                       gatewayType := 
strings.ToLower(gateway.GetType().String())
+                       gatewayTypes["gateway_dp_type_"+gatewayType] += 1
+               }
+       }
+       b.mutable["gateway_dps"] = strconv.Itoa(ngateways)
+       for gtype, n := range gatewayTypes {
+               b.mutable[gtype] = strconv.Itoa(n)
+       }
+
+       meshes, err := fetchMeshes(ctx, rt)
+       if err != nil {
+               return err
+       }
+       b.mutable["meshes_total"] = strconv.Itoa(len(meshes.Items))
+
+       switch rt.Config().Mode {
+       case config_core.Standalone:
+               b.mutable["zones_total"] = strconv.Itoa(1)
+       case config_core.Global:
+               zones, err := fetchZones(ctx, rt)
+               if err != nil {
+                       return err
+               }
+               b.mutable["zones_total"] = strconv.Itoa(len(zones.Items))
+       }
+
+       internalServices, externalServices, err := fetchNumOfServices(ctx, rt)
+       if err != nil {
+               return err
+       }
+       b.mutable["internal_services"] = strconv.Itoa(internalServices)
+       b.mutable["external_services"] = strconv.Itoa(externalServices)
+       b.mutable["services_total"] = strconv.Itoa(internalServices + 
externalServices)
+
+       policyCounts, err := fetchNumPolicies(ctx, rt)
+       if err != nil {
+               return err
+       }
+
+       for k, count := range policyCounts {
+               b.mutable[k] = count
+       }
+       return nil
+}
+
+func (b *reportsBuffer) dispatch(rt core_runtime.Runtime, host string, port 
int, pingType string, extraFn core_runtime.ExtraReportsFn) error {
+       if err := b.updateEntitiesReport(rt); err != nil {
+               return err
+       }
+       b.mutable["signal"] = pingType
+       b.mutable["cluster_id"] = rt.GetClusterId()
+       b.mutable["uptime"] = 
strconv.FormatInt(int64(time.Since(rt.GetStartTime())/time.Second), 10)
+       if extraFn != nil {
+               if valMap, err := extraFn(rt); err != nil {
+                       return err
+               } else {
+                       b.Append(valMap)
+               }
+       }
+       pingData, err := b.marshall()
+       if err != nil {
+               return err
+       }
+
+       conf := &tls.Config{}
+       conn, err := tls.Dial("tcp", net.JoinHostPort(host,
+               strconv.FormatUint(uint64(port), 10)), conf)
+       if err != nil {
+               return err
+       }
+
+       _, err = fmt.Fprint(conn, pingData)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// Append information to the mutable portion of the reports buffer
+func (b *reportsBuffer) Append(info map[string]string) {
+       b.Lock()
+       defer b.Unlock()
+
+       for key, value := range info {
+               b.mutable[key] = value
+       }
+}
+
+func (b *reportsBuffer) initImmutable(rt core_runtime.Runtime) {
+       b.immutable["version"] = kuma_version.Build.Version
+       b.immutable["product"] = kuma_version.Product
+       b.immutable["unique_id"] = rt.GetInstanceId()
+       b.immutable["backend"] = rt.Config().Store.Type
+       b.immutable["mode"] = rt.Config().Mode
+
+       hostname, err := os.Hostname()
+       if err == nil {
+               b.immutable["hostname"] = hostname
+       }
+}
+
+func startReportTicker(rt core_runtime.Runtime, buffer *reportsBuffer, extraFn 
core_runtime.ExtraReportsFn) {
+       go func() {
+               err := buffer.dispatch(rt, pingHost, pingPort, "start", extraFn)
+               if err != nil {
+                       log.V(2).Info("failed sending usage info", "cause", 
err.Error())
+               }
+               for range time.Tick(time.Second * pingInterval) {
+                       err := buffer.dispatch(rt, pingHost, pingPort, "ping", 
extraFn)
+                       if err != nil {
+                               log.V(2).Info("failed sending usage info", 
"cause", err.Error())
+                       }
+               }
+       }()
+}
+
+// Init core reports
+func Init(rt core_runtime.Runtime, cfg kuma_cp.Config, extraFn 
core_runtime.ExtraReportsFn) {
+       var buffer reportsBuffer
+       buffer.immutable = make(map[string]string)
+       buffer.mutable = make(map[string]string)
+
+       buffer.initImmutable(rt)
+
+       if cfg.Reports.Enabled {
+               startReportTicker(rt, &buffer, extraFn)
+       }
+}
diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go
new file mode 100755
index 00000000..76f2e091
--- /dev/null
+++ b/pkg/core/runtime/runtime.go
@@ -0,0 +1,87 @@
+package runtime
+
+import (
+       "github.com/apache/dubbo-admin/pkg/core/runtime/component"
+       "sync"
+       "time"
+)
+
+// Runtime represents initialized application state.
+type Runtime interface {
+       RuntimeInfo
+       RuntimeContext
+       component.Manager
+}
+
+type RuntimeInfo interface {
+       GetInstanceId() string
+       SetClusterId(clusterId string)
+       GetClusterId() string
+       GetStartTime() time.Time
+}
+
+type RuntimeContext interface {
+       Config() kuma_cp.Config
+       DataSourceLoader() datasource.Loader
+       ResourceManager() core_manager.ResourceManager
+}
+
+type ExtraReportsFn func(Runtime) (map[string]string, error)
+
+var _ Runtime = &runtime{}
+
+type runtime struct {
+       RuntimeInfo
+       RuntimeContext
+       component.Manager
+}
+
+var _ RuntimeInfo = &runtimeInfo{}
+
+type runtimeInfo struct {
+       mtx sync.RWMutex
+
+       instanceId string
+       clusterId  string
+       startTime  time.Time
+}
+
+func (i *runtimeInfo) GetInstanceId() string {
+       return i.instanceId
+}
+
+func (i *runtimeInfo) SetClusterId(clusterId string) {
+       i.mtx.Lock()
+       defer i.mtx.Unlock()
+       i.clusterId = clusterId
+}
+
+func (i *runtimeInfo) GetClusterId() string {
+       i.mtx.RLock()
+       defer i.mtx.RUnlock()
+       return i.clusterId
+}
+
+func (i *runtimeInfo) GetStartTime() time.Time {
+       return i.startTime
+}
+
+var _ RuntimeContext = &runtimeContext{}
+
+type runtimeContext struct {
+       cfg kuma_cp.Config
+       rm  core_manager.ResourceManager
+       rs  core_store.ResourceStore
+}
+
+func (rc *runtimeContext) Config() kuma_cp.Config {
+       return rc.cfg
+}
+
+func (rc *runtimeContext) ResourceManager() core_manager.ResourceManager {
+       return rc.rm
+}
+
+func (rc *runtimeContext) ResourceStore() core_store.ResourceStore {
+       return rc.rs
+}
diff --git a/pkg/version/version.go b/pkg/version/version.go
index 40ac63dc..73c9451d 100644
--- a/pkg/version/version.go
+++ b/pkg/version/version.go
@@ -20,6 +20,7 @@ package version
 import (
        "encoding/json"
        "fmt"
+       "github.com/spf13/cobra"
        "runtime"
 )
 
@@ -60,3 +61,26 @@ func GetVersionInfo() string {
        result, _ := json.Marshal(version)
        return string(result)
 }
+
+func NewVersionCmd() *cobra.Command {
+       args := struct {
+               detailed bool
+       }{}
+       cmd := &cobra.Command{
+               Use:   "version",
+               Short: "Print version",
+               Long:  `Print version.`,
+               RunE: func(cmd *cobra.Command, _ []string) error {
+                       if args.detailed {
+                               cmd.Println(GetVersionInfo())
+                       } else {
+                               cmd.Printf("%s\n", gitVersion)
+                       }
+
+                       return nil
+               },
+       }
+       // flags
+       cmd.PersistentFlags().BoolVarP(&args.detailed, "detailed", "a", false, 
"Print detailed version")
+       return cmd
+}

Reply via email to