This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch refactor-with-go-components-experimental in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
commit 934c27f05a7ace7246d819b336f4b28c1095d52c Author: chickenlj <[email protected]> AuthorDate: Mon Jul 3 11:09:22 2023 +0800 refactor, introduce component concept to admin --- .run/Admin.run.xml | 41 +-- README.md | 2 +- README_ZH.md | 2 +- cmd/admin/README.md | 2 +- cmd/admin/cmd/root.go | 45 ++++ cmd/admin/cmd/run.go | 93 +++++++ cmd/admin/main.go | 28 +-- cmd/authority/app/authority.go | 84 +------ conf/{dubboadmin.yml => admin.yml} | 0 deploy/charts/dubbo-admin/templates/configmap.yaml | 85 +------ .../charts/dubbo-admin/templates/deployment.yaml | 3 + deploy/charts/dubbo-admin/values.yaml | 39 +-- pkg/admin/config/config.go | 2 +- pkg/authority/security/server.go | 1 + .../app/authority.go => pkg/authority/setup.go | 57 +---- pkg/core/alias.go | 43 ++++ pkg/core/cmd/helpers.go | 14 ++ pkg/core/cmd/util.go | 17 ++ pkg/{authority/security => core/rule}/server.go | 3 +- pkg/core/rule/server_test.go | 209 ++++++++++++++++ pkg/core/runtime/builder.go | 117 +++++++++ pkg/core/runtime/component/component.go | 163 ++++++++++++ pkg/core/runtime/component/leader.go | 53 ++++ pkg/core/runtime/component/resilient.go | 60 +++++ pkg/core/runtime/reports/reports.go | 277 +++++++++++++++++++++ pkg/core/runtime/runtime.go | 87 +++++++ pkg/version/version.go | 24 ++ 27 files changed, 1275 insertions(+), 276 deletions(-) diff --git a/.run/Admin.run.xml b/.run/Admin.run.xml index 06768acd..484d9ec0 100644 --- a/.run/Admin.run.xml +++ b/.run/Admin.run.xml @@ -1,31 +1,14 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. ---> - <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="Admin" type="GoApplicationRunConfiguration" factoryName="Go Application"> - <module name="dubbo-admin" /> - <working_directory value="$PROJECT_DIR$" /> - <envs> - <env name="ADMIN_CONFIG_PATH" value="conf/dubboadmin.yml" /> - </envs> - <kind value="PACKAGE" /> - <package value="github.com/apache/dubbo-admin/cmd/admin" /> - <directory value="$PROJECT_DIR$" /> - <filePath value="$PROJECT_DIR$/cmd/admin/main.go" /> - <method v="2" /> - </configuration> + <configuration default="false" name="Admin" type="GoApplicationRunConfiguration" factoryName="Go Application"> + <module name="dubbo-admin" /> + <working_directory value="$PROJECT_DIR$" /> + <envs> + <env name="ADMIN_CONFIG_PATH" value="conf/admin.yml" /> + </envs> + <kind value="PACKAGE" /> + <package value="github.com/apache/dubbo-admin/cmd/admin" /> + <directory value="$PROJECT_DIR$" /> + <filePath value="$PROJECT_DIR$/cmd/admin/main.go" /> + <method v="2" /> + </configuration> </component> \ No newline at end of file diff --git a/README.md b/README.md index c9c27373..02c97ea3 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ admin: ### Run with command line ```shell -$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml +$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml $ cd cmd/admin $ go run . ``` diff --git a/README_ZH.md b/README_ZH.md index 617ea956..ac07ecf9 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -27,7 +27,7 @@ admin: ### Run with command line ```shell -$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml +$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml $ cd cmd/admin $ go run . ``` diff --git a/cmd/admin/README.md b/cmd/admin/README.md index 851fb655..e4199f9a 100644 --- a/cmd/admin/README.md +++ b/cmd/admin/README.md @@ -24,7 +24,7 @@ admin: ### Run with command line ```shell -$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml +$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml $ cd cmd/admin $ go run . ``` diff --git a/cmd/admin/cmd/root.go b/cmd/admin/cmd/root.go new file mode 100644 index 00000000..58a073d5 --- /dev/null +++ b/cmd/admin/cmd/root.go @@ -0,0 +1,45 @@ +package cmd + +import ( + "github.com/apache/dubbo-admin/pkg/logger" + "github.com/apache/dubbo-admin/pkg/version" + + corecmd "github.com/apache/dubbo-admin/pkg/core/cmd" + + "github.com/spf13/cobra" + "os" +) + +func GetRootCmd(args []string) *cobra.Command { + // rootCmd represents the base command when called without any subcommands + cmd := &cobra.Command{ + Use: "dubbo-admin", + Short: "Console and control plane for microservices built with Apache Dubbo.", + Long: `Console and control plane for microservices built with Apache Dubbo.`, + + PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { + logger.Init() + + // once command line flags have been parsed, + // avoid printing usage instructions + cmd.SilenceUsage = true + + return nil + }, + } + + cmd.SetOut(os.Stdout) + + // root flags + //cmd.PersistentFlags().StringVar(&args.logLevel, "log-level", kuma_log.InfoLevel.String(), kuma_cmd.UsageOptions("log level", kuma_log.OffLevel, kuma_log.InfoLevel, kuma_log.DebugLevel)) + //cmd.PersistentFlags().StringVar(&args.outputPath, "log-output-path", args.outputPath, "path to the file that will be filled with logs. Example: if we set it to /tmp/admin.log then after the file is rotated we will have /tmp/admin-2021-06-07T09-15-18.265.log") + //cmd.PersistentFlags().IntVar(&args.maxBackups, "log-max-retained-files", 1000, "maximum number of the old log files to retain") + //cmd.PersistentFlags().IntVar(&args.maxSize, "log-max-size", 100, "maximum size in megabytes of a log file before it gets rotated") + //cmd.PersistentFlags().IntVar(&args.maxAge, "log-max-age", 30, "maximum number of days to retain old log files based on the timestamp encoded in their filename") + + // sub-commands + cmd.AddCommand(newRunCmdWithOpts(corecmd.DefaultRunCmdOpts)) + cmd.AddCommand(version.NewVersionCmd()) + + return cmd +} diff --git a/cmd/admin/cmd/run.go b/cmd/admin/cmd/run.go new file mode 100755 index 00000000..b12a1b6c --- /dev/null +++ b/cmd/admin/cmd/run.go @@ -0,0 +1,93 @@ +package cmd + +import ( + "github.com/apache/dubbo-admin/pkg/admin/config" + "github.com/apache/dubbo-admin/pkg/admin/constant" + "github.com/apache/dubbo-admin/pkg/admin/providers/mock" + "github.com/apache/dubbo-admin/pkg/admin/router" + "github.com/apache/dubbo-admin/pkg/admin/services" + "github.com/apache/dubbo-admin/pkg/authority" + "github.com/apache/dubbo-admin/pkg/core/cmd" + "github.com/apache/dubbo-admin/pkg/logger" + + caconfig "github.com/apache/dubbo-admin/pkg/authority/config" + + "os" + "time" + + "github.com/spf13/cobra" +) + +const gracefullyShutdownDuration = 3 * time.Second + +// This is the open file limit below which the control plane may not +// reasonably have enough descriptors to accept all its clients. +const minOpenFileLimit = 4096 + +func newRunCmdWithOpts(opts cmd.RunCmdOpts) *cobra.Command { + args := struct { + configPath string + }{} + cmd := &cobra.Command{ + Use: "run", + Short: "Launch Dubbo Admin", + Long: `Launch Dubbo Admin.`, + RunE: func(cmd *cobra.Command, _ []string) error { + // init config + config.LoadConfig() + + // subscribe to registries + go services.StartSubscribe(config.RegistryCenter) + defer func() { + services.DestroySubscribe(config.RegistryCenter) + }() + + // start mock server + os.Setenv(constant.ConfigFileEnvKey, config.MockProviderConf) + go mock.RunMockServiceServer() + + // start console server + router := router.InitRouter() + if err := router.Run(":38080"); err != nil { + logger.Error("Failed to start Admin console server.") + return err + } + + // start CA + if err := startCA(cmd); err != nil { + logger.Error("Failed to start CA server.") + return err + } + + // start + + return nil + }, + } + + // flags + cmd.PersistentFlags().StringVarP(&args.configPath, "config-file", "c", "", "configuration file") + + return cmd +} + +func startCA(cmd *cobra.Command) error { + options := caconfig.NewOptions() + + if err := authority.Initialize(cmd); err != nil { + logger.Fatal("Failed to initialize CA server.") + return err + } + + logger.Infof("Authority command Run with options: %+v", options) + if errs := options.Validate(); len(errs) != 0 { + logger.Fatal(errs) + return errs[0] + } + + if err := authority.Run(options); err != nil { + logger.Fatal(err) + return err + } + return nil +} diff --git a/cmd/admin/main.go b/cmd/admin/main.go index 5bc1ffd7..6dc3e449 100644 --- a/cmd/admin/main.go +++ b/cmd/admin/main.go @@ -18,30 +18,14 @@ package main import ( + "fmt" + "github.com/apache/dubbo-admin/cmd/admin/cmd" "os" - - "dubbo.apache.org/dubbo-go/v3/common/constant" - "github.com/apache/dubbo-admin/pkg/admin/config" - mock "github.com/apache/dubbo-admin/pkg/admin/providers/mock" - "github.com/apache/dubbo-admin/pkg/admin/router" - "github.com/apache/dubbo-admin/pkg/admin/services" ) -// @title Dubbo-Admin API -// @version 1.0 -// @description This is a dubbo-admin swagger ui server. -// @license.name Apache 2.0 -// @license.url http://www.apache.org/licenses/LICENSE-2.0.html -// @host 127.0.0.1:38080 -// @BasePath / func main() { - config.LoadConfig() - go services.StartSubscribe(config.RegistryCenter) - defer func() { - services.DestroySubscribe(config.RegistryCenter) - }() - os.Setenv(constant.ConfigFileEnvKey, config.MockProviderConf) - go mock.RunMockServiceServer() - router := router.InitRouter() - _ = router.Run(":38080") + if err := cmd.GetRootCmd(os.Args[1:]).Execute(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } } diff --git a/cmd/authority/app/authority.go b/cmd/authority/app/authority.go index c49fa6e1..922cba1b 100644 --- a/cmd/authority/app/authority.go +++ b/cmd/authority/app/authority.go @@ -17,26 +17,10 @@ package app import ( "flag" - "fmt" - "os" - "os/signal" - "strings" - "syscall" - + "github.com/apache/dubbo-admin/pkg/authority" "github.com/apache/dubbo-admin/pkg/authority/config" - "github.com/apache/dubbo-admin/pkg/authority/security" "github.com/apache/dubbo-admin/pkg/logger" "github.com/spf13/cobra" - "github.com/spf13/pflag" - "github.com/spf13/viper" -) - -var ( - // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT. - envNamePrefix = "DUBBO" - - // Replace hyphenated flag names with camelCase - replaceWithCamelCase = false ) func NewAppCommand() *cobra.Command { @@ -45,19 +29,26 @@ func NewAppCommand() *cobra.Command { cmd := &cobra.Command{ Use: "authority", Long: `The authority app is responsible for controllers in dubbo authority`, - PersistentPreRun: func(cmd *cobra.Command, args []string) { + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { logger.Infof("Authority command PersistentPreRun") - initialize(cmd) + if err := authority.Initialize(cmd); err != nil { + logger.Fatal("Failed to initialize CA server.") + return err + } + return nil }, - Run: func(cmd *cobra.Command, args []string) { + RunE: func(cmd *cobra.Command, args []string) error { logger.Infof("Authority command Run with options: %+v", options) if errs := options.Validate(); len(errs) != 0 { logger.Fatal(errs) + return errs[0] } - if err := Run(options); err != nil { + if err := authority.Run(options); err != nil { logger.Fatal(err) + return err } + return nil }, } @@ -65,54 +56,3 @@ func NewAppCommand() *cobra.Command { options.FillFlags(cmd.Flags()) return cmd } - -func Run(options *config.Options) error { - s := security.NewServer(options) - - s.Init() - s.Start() - - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - signal.Notify(s.StopChan, syscall.SIGINT, syscall.SIGTERM) - signal.Notify(s.CertStorage.GetStopChan(), syscall.SIGINT, syscall.SIGTERM) - - <-c - - return nil -} - -func initialize(cmd *cobra.Command) error { - v := viper.New() - - // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT. - v.SetEnvPrefix(envNamePrefix) - - // keys with underscores, e.g. DUBBO-WEBHOOK-PORT to DUBBO_WEBHOOK_PORT - v.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) - - // Bind to environment variables - v.AutomaticEnv() - - // Bind the current command's flags to viper - bindFlags(cmd, v) - - return nil -} - -func bindFlags(cmd *cobra.Command, v *viper.Viper) { - cmd.Flags().VisitAll(func(f *pflag.Flag) { - configName := f.Name - - // Replace hyphens with a camelCased string. - if replaceWithCamelCase { - configName = strings.ReplaceAll(f.Name, "-", "") - } - - // Apply the viper config value to the flag when the flag is not set and viper has a value - if !f.Changed && v.IsSet(configName) { - val := v.Get(configName) - cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val)) - } - }) -} diff --git a/conf/dubboadmin.yml b/conf/admin.yml similarity index 100% rename from conf/dubboadmin.yml rename to conf/admin.yml diff --git a/deploy/charts/dubbo-admin/templates/configmap.yaml b/deploy/charts/dubbo-admin/templates/configmap.yaml index 48616278..cb36a05b 100644 --- a/deploy/charts/dubbo-admin/templates/configmap.yaml +++ b/deploy/charts/dubbo-admin/templates/configmap.yaml @@ -13,16 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -{{- $defaultsessionTimeoutMilli := 3600000 -}} -{{- $defaulttokenTimeoutMilli := 3600000 -}} -{{- $defaultsignSecret := "86295dd0c4ef69a1036b0b0c15158d77" -}} -{{- $defaulttoken := "e16e5cd903fd0c97a116c873b448544b9d086de9" -}} -{{- $defaultname := "dubbo-admin" -}} -{{- $defaultdriverclassname := "com.mysql.jdbc.Driver" -}} -{{- $apollotoken := (coalesce .Values.apollo.token $defaulttoken) -}} -{{- $dubboname := (coalesce .Values.dubbo.application.name $defaultname) -}} -{{- $springdriverclassname := (coalesce .Values.spring.datasource.driverclassname $defaultdriverclassname) -}} ---- apiVersion: v1 kind: ConfigMap metadata: @@ -34,71 +24,10 @@ metadata: {{- toYaml . | nindent 4 }} {{- end }} data: - application.properties: |- - {{- with .Values.admin }} - {{- if .zookeeper.enabled }} - admin.registry.address: {{ .zookeeper.address }} - admin.metadata-report.address: {{ .report.zookeeper.address }} - admin.config-center: {{ .zookeeper.center }} - {{- end }} - {{- if .nacos.enabled }} - admin.registry.address: {{ .nacos.address }} - admin.registry.group: {{ .nacos.group }} - admin.registry.namespace: {{ .nacos.namespace }} - admin.config-center: {{ .nacos.center }} - admin.config-center.group: {{ .nacos.group }} - admin.config-center.namespace: {{ .nacos.namespace }} - admin.metadata-report.address: {{ .report.nacos.address }} - admin.metadata-report.group: {{ .report.nacos.group }} - admin.metadata-report.namespace: {{ .report.nacos.namespace }} - {{- end }} - {{- end }} - {{- with .Values.root.user }} - admin.root.user.name: {{ .name }} - admin.root.user.password: {{ .password }} - {{- end }} - {{- if .Values.check.enabled }} - {{- if $defaultsessionTimeoutMilli }} - admin.check.sessionTimeoutMilli: {{ $defaultsessionTimeoutMilli }} - {{- end }} - {{- if $defaulttokenTimeoutMilli }} - admin.check.tokenTimeoutMilli: {{ $defaulttokenTimeoutMilli }} - {{- end }} - {{- if $defaultsignSecret }} - admin.check.signSecret: {{ $defaultsignSecret }} - {{- end }} - {{- end }} - {{- with .Values.apollo }} - {{- if .enabled }} - {{- if $apollotoken }} - admin.apollo.token: {{ $apollotoken }} - {{- end }} - admin.apollo.appId: {{ .appid }} - admin.apollo.env: {{ .env }} - admin.apollo.cluster: {{ .cluster }} - admin.config-center: {{ .center }} - {{- end }} - {{- end }} - {{- with .Values.server.compression }} - server.compression.enabled: {{ .enabled }} - server.compression.mime-types: {{ .types }} - server.compression.min-response-size: {{ .size }} - {{- end }} - {{- with .Values.dubbo }} - {{- if $dubboname }} - dubbo.application.name: {{ $dubboname }} - {{- end }} - dubbo.application.logger: {{ .application.logger }} - dubbo.registry.address: {{ .registry.address }} - {{- end }} - {{- with .Values.spring }} - {{- if .datasource.enabled }} - spring.datasource.driver-class-name: {{ $springdriverclassname }} - spring.datasource.url: {{ .datasource.url }} - spring.datasource.username: {{ .datasource.username }} - spring.datasource.password: {{ .datasource.password }} - {{- end }} - {{- end }} - {{- with .Values.mybatis }} - mybatis-plus.global-config.db-config.id-type: {{ .type }} - {{- end }} \ No newline at end of file + # use this file to override default configuration of `dubbo-admin` + # + # see conf/admin.yml for available settings + admin.yml: |- + {{ if .Values.admin }} + {{ toYaml .Values.admin | trim | nindent 4 }} + {{ end }} \ No newline at end of file diff --git a/deploy/charts/dubbo-admin/templates/deployment.yaml b/deploy/charts/dubbo-admin/templates/deployment.yaml index 2379156e..2e05c8d9 100644 --- a/deploy/charts/dubbo-admin/templates/deployment.yaml +++ b/deploy/charts/dubbo-admin/templates/deployment.yaml @@ -113,6 +113,9 @@ spec: {{- end }} resources: {{- toYaml .Values.resources | nindent 12 }} + env: + - name: ADMIN_CONFIG_PATH + value: /config/admin.yml volumes: - name: application-properties configMap: diff --git a/deploy/charts/dubbo-admin/values.yaml b/deploy/charts/dubbo-admin/values.yaml index 4fa0565c..8ef5f252 100644 --- a/deploy/charts/dubbo-admin/values.yaml +++ b/deploy/charts/dubbo-admin/values.yaml @@ -61,26 +61,24 @@ serviceAccount: imagePullSecrets: [] # - name: secretName +## admin configuration +admin: + ## dubbo configuration when admin works as a provider + dubbo: + ## dubbo enabled + enabled: true -## @param Admin Default Enable Configuration -dubbo: - ## dubbo enabled - enabled: true - - ## dubbo application - application: - ## dubbo application name - name: ~ - - ## dubbo application logger - logger: slf4j + ## dubbo application + application: + ## dubbo application name + name: ~ - registry: - ## dubbo registry address - address: ${admin.registry.address} + ## dubbo application logger + logger: slf4j -## admin configuration -admin: + registry: + ## dubbo registry address + address: ${admin.registry.address} ## zookeeper configuration zookeeper: ## zookeeper enabled @@ -123,8 +121,11 @@ admin: ## nacos namespace namespace: public - - + prometheus: + address: prometheus.dubbo-system.svc.cluster.local:3000 + grafana: + address: grafana.dubbo-system.svc.cluster.local + mysql-dsn: "root:password@tcp(127.0.0.1:3306)/dubbo-admin?charset=utf8&parseTime=true" ## apollo configuration apollo: diff --git a/pkg/admin/config/config.go b/pkg/admin/config/config.go index 847b4a12..460e59d1 100644 --- a/pkg/admin/config/config.go +++ b/pkg/admin/config/config.go @@ -41,7 +41,7 @@ import ( ) const ( - conf = "./conf/dubboadmin.yml" + conf = "./conf/admin.yml" confPathKey = "ADMIN_CONFIG_PATH" ) diff --git a/pkg/authority/security/server.go b/pkg/authority/security/server.go index 060f9849..34f6d10c 100644 --- a/pkg/authority/security/server.go +++ b/pkg/authority/security/server.go @@ -119,6 +119,7 @@ func (s *Server) Init() { s.registerCertificateService() s.registerObserveService() + s.registerTrafficService() reflection.Register(s.SecureServer) diff --git a/cmd/authority/app/authority.go b/pkg/authority/setup.go similarity index 50% copy from cmd/authority/app/authority.go copy to pkg/authority/setup.go index c49fa6e1..50377b55 100644 --- a/cmd/authority/app/authority.go +++ b/pkg/authority/setup.go @@ -1,34 +1,16 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app +package authority import ( - "flag" "fmt" - "os" - "os/signal" - "strings" - "syscall" - "github.com/apache/dubbo-admin/pkg/authority/config" "github.com/apache/dubbo-admin/pkg/authority/security" - "github.com/apache/dubbo-admin/pkg/logger" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" + "os" + "os/signal" + "strings" + "syscall" ) var ( @@ -39,33 +21,6 @@ var ( replaceWithCamelCase = false ) -func NewAppCommand() *cobra.Command { - options := config.NewOptions() - - cmd := &cobra.Command{ - Use: "authority", - Long: `The authority app is responsible for controllers in dubbo authority`, - PersistentPreRun: func(cmd *cobra.Command, args []string) { - logger.Infof("Authority command PersistentPreRun") - initialize(cmd) - }, - Run: func(cmd *cobra.Command, args []string) { - logger.Infof("Authority command Run with options: %+v", options) - if errs := options.Validate(); len(errs) != 0 { - logger.Fatal(errs) - } - - if err := Run(options); err != nil { - logger.Fatal(err) - } - }, - } - - cmd.Flags().AddGoFlagSet(flag.CommandLine) - options.FillFlags(cmd.Flags()) - return cmd -} - func Run(options *config.Options) error { s := security.NewServer(options) @@ -82,7 +37,7 @@ func Run(options *config.Options) error { return nil } -func initialize(cmd *cobra.Command) error { +func Initialize(cmd *cobra.Command) error { v := viper.New() // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT. diff --git a/pkg/core/alias.go b/pkg/core/alias.go new file mode 100755 index 00000000..426b67f9 --- /dev/null +++ b/pkg/core/alias.go @@ -0,0 +1,43 @@ +package core + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/google/uuid" + kube_log "sigs.k8s.io/controller-runtime/pkg/log" +) + +var ( + // TODO remove dependency on kubernetes see: https://github.com/kumahq/kuma/issues/2798 + Log = kube_log.Log + SetLogger = kube_log.SetLogger + Now = time.Now + TempDir = os.TempDir + + SetupSignalHandler = func() (context.Context, context.Context) { + gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 3) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + go func() { + s := <-c + Log.Info("Received signal, stopping instance gracefully", "signal", s.String()) + gracefulCancel() + s = <-c + Log.Info("Received second signal, stopping instance", "signal", s.String()) + cancel() + s = <-c + Log.Info("Received third signal, force exit", "signal", s.String()) + os.Exit(1) + }() + return gracefulCtx, ctx + } +) + +func NewUUID() string { + return uuid.NewString() +} diff --git a/pkg/core/cmd/helpers.go b/pkg/core/cmd/helpers.go new file mode 100755 index 00000000..d6ff1170 --- /dev/null +++ b/pkg/core/cmd/helpers.go @@ -0,0 +1,14 @@ +package cmd + +import ( + "fmt" + "strings" +) + +func UsageOptions(desc string, options ...interface{}) string { + values := make([]string, 0, len(options)) + for _, option := range options { + values = append(values, fmt.Sprintf("%v", option)) + } + return fmt.Sprintf("%s: one of %s", desc, strings.Join(values, "|")) +} diff --git a/pkg/core/cmd/util.go b/pkg/core/cmd/util.go new file mode 100755 index 00000000..7b79464d --- /dev/null +++ b/pkg/core/cmd/util.go @@ -0,0 +1,17 @@ +package cmd + +import ( + "context" + "github.com/apache/dubbo-admin/pkg/core" +) + +type RunCmdOpts struct { + // The first returned context is closed upon receiving first signal (SIGSTOP, SIGTERM). + // The second returned context is closed upon receiving second signal. + // We can start graceful shutdown when first context is closed and forcefully stop when the second one is closed. + SetupSignalHandler func() (context.Context, context.Context) +} + +var DefaultRunCmdOpts = RunCmdOpts{ + SetupSignalHandler: core.SetupSignalHandler, +} diff --git a/pkg/authority/security/server.go b/pkg/core/rule/server.go similarity index 99% copy from pkg/authority/security/server.go copy to pkg/core/rule/server.go index 060f9849..3c3d8210 100644 --- a/pkg/authority/security/server.go +++ b/pkg/core/rule/server.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package security +package rule import ( "crypto/tls" @@ -119,6 +119,7 @@ func (s *Server) Init() { s.registerCertificateService() s.registerObserveService() + s.registerTrafficService() reflection.Register(s.SecureServer) diff --git a/pkg/core/rule/server_test.go b/pkg/core/rule/server_test.go new file mode 100644 index 00000000..b52bec83 --- /dev/null +++ b/pkg/core/rule/server_test.go @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rule + +import ( + "crypto/tls" + "os" + "testing" + "time" + + "github.com/apache/dubbo-admin/pkg/authority/election" + + "k8s.io/client-go/kubernetes" + + cert2 "github.com/apache/dubbo-admin/pkg/authority/cert" + "github.com/apache/dubbo-admin/pkg/authority/config" + "github.com/apache/dubbo-admin/pkg/authority/k8s" + "github.com/apache/dubbo-admin/pkg/logger" +) + +type mockKubeClient struct { + k8s.Client +} + +var ( + certPEM = "" + priPEM = "" +) + +func (s *mockKubeClient) Init(options *config.Options) bool { + return true +} + +func (s *mockKubeClient) GetAuthorityCert(namespace string) (string, string) { + return certPEM, priPEM +} + +func (s *mockKubeClient) UpdateAuthorityCert(cert string, pri string, namespace string) { +} + +func (s *mockKubeClient) UpdateAuthorityPublicKey(cert string) bool { + return true +} + +func (s *mockKubeClient) UpdateWebhookConfig(options *config.Options, storage cert2.Storage) { +} + +func (s *mockKubeClient) GetKubClient() *kubernetes.Clientset { + return nil +} + +type mockStorage struct { + cert2.Storage + origin cert2.Storage +} + +func (s *mockStorage) GetServerCert(serverName string) *tls.Certificate { + return nil +} + +func (s *mockStorage) RefreshServerCert() { +} + +func (s *mockStorage) SetAuthorityCert(cert *cert2.Cert) { + s.origin.SetAuthorityCert(cert) +} + +func (s *mockStorage) GetAuthorityCert() *cert2.Cert { + return s.origin.GetAuthorityCert() +} + +func (s *mockStorage) SetRootCert(cert *cert2.Cert) { + s.origin.SetRootCert(cert) +} + +func (s *mockStorage) GetRootCert() *cert2.Cert { + return s.origin.GetRootCert() +} + +func (s *mockStorage) AddTrustedCert(cert *cert2.Cert) { + s.origin.AddTrustedCert(cert) +} + +func (s *mockStorage) GetTrustedCerts() []*cert2.Cert { + return s.origin.GetTrustedCerts() +} + +func (s *mockStorage) GetStopChan() chan os.Signal { + return s.origin.GetStopChan() +} + +type mockLeaderElection struct { + election.LeaderElection +} + +func (s *mockLeaderElection) Election(storage cert2.Storage, options *config.Options, kubeClient *kubernetes.Clientset) error { + storage.SetAuthorityCert(cert2.GenerateAuthorityCert(storage.GetRootCert(), options.CaValidity)) + return nil +} + +func TestInit(t *testing.T) { + t.Parallel() + + logger.Init() + + options := &config.Options{ + IsKubernetesConnected: true, + Namespace: "dubbo-system", + PlainServerPort: 30060, + SecureServerPort: 30062, + DebugPort: 30070, + CaValidity: 30 * 24 * 60 * 60 * 1000, // 30 day + CertValidity: 1 * 60 * 60 * 1000, // 1 hour + } + + s := NewServer(options) + s.KubeClient = &mockKubeClient{} + + s.Init() + if !s.CertStorage.GetAuthorityCert().IsValid() { + t.Fatal("Authority cert is not valid") + return + } + + certPEM = s.CertStorage.GetAuthorityCert().CertPem + priPEM = cert2.EncodePrivateKey(s.CertStorage.GetAuthorityCert().PrivateKey) + + s.PlainServer.Stop() + s.SecureServer.Stop() + s.StopChan <- os.Kill + + s = NewServer(options) + s.KubeClient = &mockKubeClient{} + s.Init() + + if !s.CertStorage.GetAuthorityCert().IsValid() { + t.Fatal("Authority cert is not valid") + + return + } + + if s.CertStorage.GetAuthorityCert().CertPem != certPEM { + t.Fatal("Authority cert is not equal") + + return + } + + s.PlainServer.Stop() + s.SecureServer.Stop() + s.StopChan <- os.Kill + s.CertStorage.GetStopChan() <- os.Kill +} + +func TestRefresh(t *testing.T) { + t.Parallel() + + logger.Init() + + options := &config.Options{ + IsKubernetesConnected: false, + Namespace: "dubbo-system", + PlainServerPort: 30060, + SecureServerPort: 30062, + DebugPort: 30070, + CaValidity: 10, + } + + s := NewServer(options) + + s.KubeClient = &mockKubeClient{} + storage := &mockStorage{} + s.Elec = &mockLeaderElection{} + storage.origin = cert2.NewStorage(options) + s.CertStorage = storage + + s.Init() + + origin := s.CertStorage.GetAuthorityCert() + + for i := 0; i < 1000; i++ { + // wait at most 100s + time.Sleep(100 * time.Millisecond) + if s.CertStorage.GetAuthorityCert() != origin { + break + } + } + + if s.CertStorage.GetAuthorityCert() == origin { + t.Fatal("Authority cert is not refreshed") + return + } + + s.PlainServer.Stop() + s.SecureServer.Stop() + s.StopChan <- os.Kill +} diff --git a/pkg/core/runtime/builder.go b/pkg/core/runtime/builder.go new file mode 100755 index 00000000..43477c87 --- /dev/null +++ b/pkg/core/runtime/builder.go @@ -0,0 +1,117 @@ +package runtime + +import ( + "context" + "fmt" + "github.com/apache/dubbo-admin/pkg/core" + "github.com/apache/dubbo-admin/pkg/core/runtime/component" + "os" + "time" + + "github.com/pkg/errors" +) + +// BuilderContext provides access to Builder's interim state. +type BuilderContext interface { + ComponentManager() component.Manager + ResourceStore() core_store.ResourceStore + SecretStore() store.SecretStore + ConfigStore() core_store.ResourceStore + ResourceManager() core_manager.CustomizableResourceManager + Config() kuma_cp.Config + DataSourceLoader() datasource.Loader + Extensions() context.Context + ConfigManager() config_manager.ConfigManager + LeaderInfo() component.LeaderInfo + Metrics() metrics.Metrics + EventReaderFactory() events.ListenerFactory + APIManager() api_server.APIManager + XDSHooks() *xds_hooks.Hooks + CAProvider() secrets.CaProvider + DpServer() *dp_server.DpServer + ResourceValidators() ResourceValidators + KDSContext() *kds_context.Context + APIServerAuthenticator() authn.Authenticator + Access() Access + TokenIssuers() builtin.TokenIssuers + MeshCache() *mesh.Cache + InterCPClientPool() *client.Pool +} + +var _ BuilderContext = &Builder{} + +// Builder represents a multi-step initialization process. +type Builder struct { + cfg kuma_cp.Config + cm component.Manager + rs core_store.ResourceStore + *runtimeInfo +} + +func BuilderFor(appCtx context.Context, cfg kuma_cp.Config) (*Builder, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, errors.Wrap(err, "could not get hostname") + } + suffix := core.NewUUID()[0:4] + return &Builder{ + cfg: cfg, + ext: context.Background(), + cam: core_ca.Managers{}, + runtimeInfo: &runtimeInfo{ + instanceId: fmt.Sprintf("%s-%s", hostname, suffix), + startTime: time.Now(), + }, + appCtx: appCtx, + }, nil +} + +func (b *Builder) WithComponentManager(cm component.Manager) *Builder { + b.cm = cm + return b +} + +func (b *Builder) Build() (Runtime, error) { + if b.cm == nil { + return nil, errors.Errorf("ComponentManager has not been configured") + } + return &runtime{ + RuntimeInfo: b.runtimeInfo, + RuntimeContext: &runtimeContext{ + cfg: b.cfg, + rm: b.rm, + rom: b.rom, + rs: b.rs, + ss: b.ss, + cam: b.cam, + dsl: b.dsl, + ext: b.ext, + configm: b.configm, + leadInfo: b.leadInfo, + lif: b.lif, + eac: b.eac, + metrics: b.metrics, + erf: b.erf, + apim: b.apim, + xdsauth: b.xdsauth, + xdsCallbacks: b.xdsCallbacks, + xdsh: b.xdsh, + cap: b.cap, + dps: b.dps, + kdsctx: b.kdsctx, + rv: b.rv, + au: b.au, + acc: b.acc, + appCtx: b.appCtx, + extraReportsFn: b.extraReportsFn, + tokenIssuers: b.tokenIssuers, + meshCache: b.meshCache, + interCpPool: b.interCpPool, + }, + Manager: b.cm, + }, nil +} + +func (b *Builder) ComponentManager() component.Manager { + return b.cm +} diff --git a/pkg/core/runtime/component/component.go b/pkg/core/runtime/component/component.go new file mode 100755 index 00000000..d3f6a21b --- /dev/null +++ b/pkg/core/runtime/component/component.go @@ -0,0 +1,163 @@ +package component + +import ( + "sync" + + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/util/channels" +) + +var log = core.Log.WithName("bootstrap") + +// Component defines a process that will be run in the application +// Component should be designed in such a way that it can be stopped by stop channel and started again (for example when instance is reelected for a leader). +type Component interface { + // Start blocks until the channel is closed or an error occurs. + // The component will stop running when the channel is closed. + Start(<-chan struct{}) error + + // NeedLeaderElection indicates if component should be run only by one instance of Control Plane even with many Control Plane replicas. + NeedLeaderElection() bool +} + +// GracefulComponent is a component that supports waiting until it's finished. +// It's useful if there is cleanup logic that has to be executed before the process exits +// (i.e. sending SIGTERM signals to subprocesses started by this component). +type GracefulComponent interface { + Component + + // WaitForDone blocks until all components are done. + // If a component was not started (i.e. leader components on non-leader CP) it returns immediately. + WaitForDone() +} + +// Component of Kuma, i.e. gRPC Server, HTTP server, reconciliation loop. +var _ Component = ComponentFunc(nil) + +type ComponentFunc func(<-chan struct{}) error + +func (f ComponentFunc) NeedLeaderElection() bool { + return false +} + +func (f ComponentFunc) Start(stop <-chan struct{}) error { + return f(stop) +} + +var _ Component = LeaderComponentFunc(nil) + +type LeaderComponentFunc func(<-chan struct{}) error + +func (f LeaderComponentFunc) NeedLeaderElection() bool { + return true +} + +func (f LeaderComponentFunc) Start(stop <-chan struct{}) error { + return f(stop) +} + +type Manager interface { + + // Add registers a component, i.e. gRPC Server, HTTP server, reconciliation loop. + Add(...Component) error + + // Start starts registered components and blocks until the Stop channel is closed. + // Returns an error if there is an error starting any component. + // If there are any GracefulComponent, it waits until all components are done. + Start(<-chan struct{}) error +} + +var _ Manager = &manager{} + +func NewManager(leaderElector LeaderElector) Manager { + return &manager{ + leaderElector: leaderElector, + } +} + +type manager struct { + components []Component + leaderElector LeaderElector +} + +func (cm *manager) Add(c ...Component) error { + cm.components = append(cm.components, c...) + return nil +} + +func (cm *manager) waitForDone() { + for _, c := range cm.components { + if gc, ok := c.(GracefulComponent); ok { + gc.WaitForDone() + } + } +} + +func (cm *manager) Start(stop <-chan struct{}) error { + errCh := make(chan error) + + cm.startNonLeaderComponents(stop, errCh) + cm.startLeaderComponents(stop, errCh) + + defer cm.waitForDone() + select { + case <-stop: + return nil + case err := <-errCh: + return err + } +} + +func (cm *manager) startNonLeaderComponents(stop <-chan struct{}, errCh chan error) { + for _, component := range cm.components { + if !component.NeedLeaderElection() { + go func(c Component) { + if err := c.Start(stop); err != nil { + errCh <- err + } + }(component) + } + } +} + +func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan error) { + // leader stop channel needs to be stored in atomic because it will be written by leader elector goroutine + // and read by the last goroutine in this function. + // we need separate channel for leader components because they can be restarted + mutex := sync.Mutex{} + leaderStopCh := make(chan struct{}) + closeLeaderCh := func() { + mutex.Lock() + defer mutex.Unlock() + if !channels.IsClosed(leaderStopCh) { + close(leaderStopCh) + } + } + + cm.leaderElector.AddCallbacks(LeaderCallbacks{ + OnStartedLeading: func() { + log.Info("leader acquired") + mutex.Lock() + defer mutex.Unlock() + leaderStopCh = make(chan struct{}) + for _, component := range cm.components { + if component.NeedLeaderElection() { + go func(c Component) { + if err := c.Start(leaderStopCh); err != nil { + errCh <- err + } + }(component) + } + } + }, + OnStoppedLeading: func() { + log.Info("leader lost") + closeLeaderCh() + }, + }) + go cm.leaderElector.Start(stop) + go func() { + <-stop + closeLeaderCh() + }() +} diff --git a/pkg/core/runtime/component/leader.go b/pkg/core/runtime/component/leader.go new file mode 100755 index 00000000..3edbc4e9 --- /dev/null +++ b/pkg/core/runtime/component/leader.go @@ -0,0 +1,53 @@ +package component + +import "sync/atomic" + +// LeaderCallbacks defines callbacks for events from LeaderElector +// It is guaranteed that each methods will be executed from the same goroutine, so only one method can be run at once. +type LeaderCallbacks struct { + OnStartedLeading func() + OnStoppedLeading func() +} + +type LeaderElector interface { + AddCallbacks(LeaderCallbacks) + // IsLeader should be used for diagnostic reasons (metrics/API info), because there may not be any leader elector for a short period of time. + // Use Callbacks to write logic to execute when Leader is elected. + IsLeader() bool + + // Start blocks until the channel is closed or an error occurs. + Start(stop <-chan struct{}) +} + +type LeaderInfo interface { + IsLeader() bool +} + +var _ LeaderInfo = &LeaderInfoComponent{} +var _ Component = &LeaderInfoComponent{} + +type LeaderInfoComponent struct { + leader int32 +} + +func (l *LeaderInfoComponent) Start(stop <-chan struct{}) error { + l.setLeader(true) + <-stop + l.setLeader(false) + return nil +} + +func (l *LeaderInfoComponent) NeedLeaderElection() bool { + return true +} + +func (p *LeaderInfoComponent) setLeader(leader bool) { + var value int32 = 0 + if leader { + value = 1 + } + atomic.StoreInt32(&p.leader, value) +} +func (p *LeaderInfoComponent) IsLeader() bool { + return atomic.LoadInt32(&(p.leader)) == 1 +} diff --git a/pkg/core/runtime/component/resilient.go b/pkg/core/runtime/component/resilient.go new file mode 100755 index 00000000..8167f519 --- /dev/null +++ b/pkg/core/runtime/component/resilient.go @@ -0,0 +1,60 @@ +package component + +import ( + "time" + + "github.com/go-logr/logr" + "github.com/pkg/errors" +) + +const ( + backoffTime = 5 * time.Second +) + +type resilientComponent struct { + log logr.Logger + component Component +} + +func NewResilientComponent(log logr.Logger, component Component) Component { + return &resilientComponent{ + log: log, + component: component, + } +} + +func (r *resilientComponent) Start(stop <-chan struct{}) error { + r.log.Info("starting resilient component ...") + for generationID := uint64(1); ; generationID++ { + errCh := make(chan error, 1) + go func(errCh chan<- error) { + defer close(errCh) + // recover from a panic + defer func() { + if e := recover(); e != nil { + if err, ok := e.(error); ok { + errCh <- err + } else { + errCh <- errors.Errorf("%v", e) + } + } + }() + + errCh <- r.component.Start(stop) + }(errCh) + select { + case <-stop: + r.log.Info("done") + return nil + case err := <-errCh: + if err != nil { + r.log.WithValues("generationID", generationID).Error(err, "component terminated with an error") + } + } + <-time.After(backoffTime) + } +} + +func (r *resilientComponent) NeedLeaderElection() bool { + return r.component.NeedLeaderElection() +} diff --git a/pkg/core/runtime/reports/reports.go b/pkg/core/runtime/reports/reports.go new file mode 100755 index 00000000..29219a1e --- /dev/null +++ b/pkg/core/runtime/reports/reports.go @@ -0,0 +1,277 @@ +package reports + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + + mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" + kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp" + config_core "github.com/kumahq/kuma/pkg/config/core" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/registry" + core_runtime "github.com/kumahq/kuma/pkg/core/runtime" + "github.com/kumahq/kuma/pkg/core/user" + kuma_version "github.com/kumahq/kuma/pkg/version" +) + +const ( + pingInterval = 3600 + pingHost = "kong-hf.konghq.com" + pingPort = 61832 +) + +var ( + log = core.Log.WithName("core").WithName("reports") +) + +/* + - buffer initialized upon Init call + - append adds more keys onto it +*/ + +type reportsBuffer struct { + sync.Mutex + mutable map[string]string + immutable map[string]string +} + +func fetchDataplanes(ctx context.Context, rt core_runtime.Runtime) (*mesh.DataplaneResourceList, error) { + dataplanes := mesh.DataplaneResourceList{} + if err := rt.ReadOnlyResourceManager().List(ctx, &dataplanes); err != nil { + return nil, errors.Wrap(err, "could not fetch dataplanes") + } + + return &dataplanes, nil +} + +func fetchMeshes(ctx context.Context, rt core_runtime.Runtime) (*mesh.MeshResourceList, error) { + meshes := mesh.MeshResourceList{} + if err := rt.ReadOnlyResourceManager().List(ctx, &meshes); err != nil { + return nil, errors.Wrap(err, "could not fetch meshes") + } + + return &meshes, nil +} + +func fetchZones(ctx context.Context, rt core_runtime.Runtime) (*system.ZoneResourceList, error) { + zones := system.ZoneResourceList{} + if err := rt.ReadOnlyResourceManager().List(ctx, &zones); err != nil { + return nil, errors.Wrap(err, "could not fetch zones") + } + return &zones, nil +} + +func fetchNumPolicies(ctx context.Context, rt core_runtime.Runtime) (map[string]string, error) { + policyCounts := map[string]string{} + + for _, descr := range registry.Global().ObjectDescriptors() { + typedList := descr.NewList() + k := "n_" + strings.ToLower(string(descr.Name)) + if err := rt.ReadOnlyResourceManager().List(ctx, typedList); err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("could not fetch %s", k)) + } + policyCounts[k] = strconv.Itoa(len(typedList.GetItems())) + } + return policyCounts, nil +} + +func fetchNumOfServices(ctx context.Context, rt core_runtime.Runtime) (int, int, error) { + insights := mesh.ServiceInsightResourceList{} + if err := rt.ReadOnlyResourceManager().List(ctx, &insights); err != nil { + return 0, 0, errors.Wrap(err, "could not fetch service insights") + } + internalServices := 0 + for _, insight := range insights.Items { + internalServices += len(insight.Spec.Services) + } + + externalServicesList := mesh.ExternalServiceResourceList{} + if err := rt.ReadOnlyResourceManager().List(ctx, &externalServicesList); err != nil { + return 0, 0, errors.Wrap(err, "could not fetch external services") + } + return internalServices, len(externalServicesList.Items), nil +} + +func (b *reportsBuffer) marshall() (string, error) { + var builder strings.Builder + + _, err := fmt.Fprintf(&builder, "<14>") + if err != nil { + return "", err + } + + for k, v := range b.immutable { + _, err := fmt.Fprintf(&builder, "%s=%s;", k, v) + if err != nil { + return "", err + } + } + + for k, v := range b.mutable { + _, err := fmt.Fprintf(&builder, "%s=%s;", k, v) + if err != nil { + return "", err + } + } + + return builder.String(), nil +} + +// XXX this function retrieves all dataplanes and all meshes; +// ideally, the number of dataplanes and number of meshes +// should be pushed from the outside rather than pulled +func (b *reportsBuffer) updateEntitiesReport(rt core_runtime.Runtime) error { + ctx := user.Ctx(context.Background(), user.ControlPlane) + dps, err := fetchDataplanes(ctx, rt) + if err != nil { + return err + } + b.mutable["dps_total"] = strconv.Itoa(len(dps.Items)) + + ngateways := 0 + gatewayTypes := map[string]int{} + for _, dp := range dps.Items { + spec := dp.GetSpec().(*mesh_proto.Dataplane) + gateway := spec.GetNetworking().GetGateway() + if gateway != nil { + ngateways++ + gatewayType := strings.ToLower(gateway.GetType().String()) + gatewayTypes["gateway_dp_type_"+gatewayType] += 1 + } + } + b.mutable["gateway_dps"] = strconv.Itoa(ngateways) + for gtype, n := range gatewayTypes { + b.mutable[gtype] = strconv.Itoa(n) + } + + meshes, err := fetchMeshes(ctx, rt) + if err != nil { + return err + } + b.mutable["meshes_total"] = strconv.Itoa(len(meshes.Items)) + + switch rt.Config().Mode { + case config_core.Standalone: + b.mutable["zones_total"] = strconv.Itoa(1) + case config_core.Global: + zones, err := fetchZones(ctx, rt) + if err != nil { + return err + } + b.mutable["zones_total"] = strconv.Itoa(len(zones.Items)) + } + + internalServices, externalServices, err := fetchNumOfServices(ctx, rt) + if err != nil { + return err + } + b.mutable["internal_services"] = strconv.Itoa(internalServices) + b.mutable["external_services"] = strconv.Itoa(externalServices) + b.mutable["services_total"] = strconv.Itoa(internalServices + externalServices) + + policyCounts, err := fetchNumPolicies(ctx, rt) + if err != nil { + return err + } + + for k, count := range policyCounts { + b.mutable[k] = count + } + return nil +} + +func (b *reportsBuffer) dispatch(rt core_runtime.Runtime, host string, port int, pingType string, extraFn core_runtime.ExtraReportsFn) error { + if err := b.updateEntitiesReport(rt); err != nil { + return err + } + b.mutable["signal"] = pingType + b.mutable["cluster_id"] = rt.GetClusterId() + b.mutable["uptime"] = strconv.FormatInt(int64(time.Since(rt.GetStartTime())/time.Second), 10) + if extraFn != nil { + if valMap, err := extraFn(rt); err != nil { + return err + } else { + b.Append(valMap) + } + } + pingData, err := b.marshall() + if err != nil { + return err + } + + conf := &tls.Config{} + conn, err := tls.Dial("tcp", net.JoinHostPort(host, + strconv.FormatUint(uint64(port), 10)), conf) + if err != nil { + return err + } + + _, err = fmt.Fprint(conn, pingData) + if err != nil { + return err + } + + return nil +} + +// Append information to the mutable portion of the reports buffer +func (b *reportsBuffer) Append(info map[string]string) { + b.Lock() + defer b.Unlock() + + for key, value := range info { + b.mutable[key] = value + } +} + +func (b *reportsBuffer) initImmutable(rt core_runtime.Runtime) { + b.immutable["version"] = kuma_version.Build.Version + b.immutable["product"] = kuma_version.Product + b.immutable["unique_id"] = rt.GetInstanceId() + b.immutable["backend"] = rt.Config().Store.Type + b.immutable["mode"] = rt.Config().Mode + + hostname, err := os.Hostname() + if err == nil { + b.immutable["hostname"] = hostname + } +} + +func startReportTicker(rt core_runtime.Runtime, buffer *reportsBuffer, extraFn core_runtime.ExtraReportsFn) { + go func() { + err := buffer.dispatch(rt, pingHost, pingPort, "start", extraFn) + if err != nil { + log.V(2).Info("failed sending usage info", "cause", err.Error()) + } + for range time.Tick(time.Second * pingInterval) { + err := buffer.dispatch(rt, pingHost, pingPort, "ping", extraFn) + if err != nil { + log.V(2).Info("failed sending usage info", "cause", err.Error()) + } + } + }() +} + +// Init core reports +func Init(rt core_runtime.Runtime, cfg kuma_cp.Config, extraFn core_runtime.ExtraReportsFn) { + var buffer reportsBuffer + buffer.immutable = make(map[string]string) + buffer.mutable = make(map[string]string) + + buffer.initImmutable(rt) + + if cfg.Reports.Enabled { + startReportTicker(rt, &buffer, extraFn) + } +} diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go new file mode 100755 index 00000000..76f2e091 --- /dev/null +++ b/pkg/core/runtime/runtime.go @@ -0,0 +1,87 @@ +package runtime + +import ( + "github.com/apache/dubbo-admin/pkg/core/runtime/component" + "sync" + "time" +) + +// Runtime represents initialized application state. +type Runtime interface { + RuntimeInfo + RuntimeContext + component.Manager +} + +type RuntimeInfo interface { + GetInstanceId() string + SetClusterId(clusterId string) + GetClusterId() string + GetStartTime() time.Time +} + +type RuntimeContext interface { + Config() kuma_cp.Config + DataSourceLoader() datasource.Loader + ResourceManager() core_manager.ResourceManager +} + +type ExtraReportsFn func(Runtime) (map[string]string, error) + +var _ Runtime = &runtime{} + +type runtime struct { + RuntimeInfo + RuntimeContext + component.Manager +} + +var _ RuntimeInfo = &runtimeInfo{} + +type runtimeInfo struct { + mtx sync.RWMutex + + instanceId string + clusterId string + startTime time.Time +} + +func (i *runtimeInfo) GetInstanceId() string { + return i.instanceId +} + +func (i *runtimeInfo) SetClusterId(clusterId string) { + i.mtx.Lock() + defer i.mtx.Unlock() + i.clusterId = clusterId +} + +func (i *runtimeInfo) GetClusterId() string { + i.mtx.RLock() + defer i.mtx.RUnlock() + return i.clusterId +} + +func (i *runtimeInfo) GetStartTime() time.Time { + return i.startTime +} + +var _ RuntimeContext = &runtimeContext{} + +type runtimeContext struct { + cfg kuma_cp.Config + rm core_manager.ResourceManager + rs core_store.ResourceStore +} + +func (rc *runtimeContext) Config() kuma_cp.Config { + return rc.cfg +} + +func (rc *runtimeContext) ResourceManager() core_manager.ResourceManager { + return rc.rm +} + +func (rc *runtimeContext) ResourceStore() core_store.ResourceStore { + return rc.rs +} diff --git a/pkg/version/version.go b/pkg/version/version.go index 40ac63dc..73c9451d 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -20,6 +20,7 @@ package version import ( "encoding/json" "fmt" + "github.com/spf13/cobra" "runtime" ) @@ -60,3 +61,26 @@ func GetVersionInfo() string { result, _ := json.Marshal(version) return string(result) } + +func NewVersionCmd() *cobra.Command { + args := struct { + detailed bool + }{} + cmd := &cobra.Command{ + Use: "version", + Short: "Print version", + Long: `Print version.`, + RunE: func(cmd *cobra.Command, _ []string) error { + if args.detailed { + cmd.Println(GetVersionInfo()) + } else { + cmd.Printf("%s\n", gitVersion) + } + + return nil + }, + } + // flags + cmd.PersistentFlags().BoolVarP(&args.detailed, "detailed", "a", false, "Print detailed version") + return cmd +}
