This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new ec344806 [agent] add sail-agent proxy logic (#791)
ec344806 is described below
commit ec344806cf5aad2bbc0f435ffb572f7df98bf7b6
Author: Jian Zhong <[email protected]>
AuthorDate: Sat Sep 27 00:00:19 2025 +0800
[agent] add sail-agent proxy logic (#791)
---
go.mod | 3 +-
go.sum | 2 +
.../dubbo-discovery/files/grpc-agent.yaml | 1 +
pkg/bootstrap/config.go | 45 +++
pkg/cmd/cmd.go | 11 +
pkg/config/constants/constants.go | 8 +
pkg/config/mesh/mesh.go | 17 +-
pkg/dubbo-agent/agent.go | 388 +++++++++++++++++++++
pkg/dubbo-agent/config/config.go | 145 ++++++++
pkg/dubbo-agent/grpcxds/grpc_bootstrap.go | 109 ++++++
pkg/dubbo-agent/plugins.go | 55 +++
pkg/dubbo-agent/xds_proxy.go | 80 +++++
pkg/features/security.go | 35 ++
pkg/file/fadvise_linux.go | 20 ++
pkg/file/fadvise_unspecified.go | 12 +
pkg/file/file.go | 71 ++++
pkg/jwt/jwt.go | 6 +
pkg/model/fips.go | 34 ++
pkg/model/xds.go | 32 +-
pkg/queue/delay.go | 269 ++++++++++++++
pkg/sail-agent/agent.go | 26 --
pkg/security/retry.go | 16 +
pkg/security/security.go | 104 ++++--
pkg/uds/listener.go | 39 +++
pkg/xds/server.go | 312 +++++++++++++++++
sail/cmd/sail-agent/app/cmd.go | 43 ++-
sail/cmd/sail-agent/main.go | 8 +-
sail/cmd/sail-agent/options/agent.go | 31 ++
sail/cmd/sail-agent/options/agent_proxy.go | 9 +-
sail/cmd/sail-agent/options/options.go | 29 ++
sail/cmd/sail-agent/options/security.go | 84 +++++
sail/pkg/features/{ship.go => sail.go} | 0
sail/pkg/grpc/grpc.go | 92 +++++
sail/pkg/grpc/tls.go | 96 +++++
security/pkg/credentialfetcher/fetcher.go | 21 ++
security/pkg/credentialfetcher/plugin/token.go | 36 ++
security/pkg/nodeagent/cache/secretcache.go | 132 +++++++
security/pkg/nodeagent/caclient/credentials.go | 50 +++
.../nodeagent/caclient/providers/aegis/client.go | 149 ++++++++
security/pkg/nodeagent/sds/sdsservice.go | 231 ++++++++++++
security/pkg/nodeagent/sds/server.go | 115 ++++++
41 files changed, 2902 insertions(+), 64 deletions(-)
diff --git a/go.mod b/go.mod
index 5b397c0a..748a3e54 100644
--- a/go.mod
+++ b/go.mod
@@ -40,6 +40,7 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.7.0
github.com/google/go-containerregistry v0.20.6
+ github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
github.com/hashicorp/go-multierror v1.1.1
github.com/heroku/color v0.0.6
github.com/moby/term v0.5.2
@@ -53,6 +54,7 @@ require (
golang.org/x/crypto v0.41.0
golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6
golang.org/x/net v0.43.0
+ golang.org/x/sys v0.35.0
golang.org/x/term v0.34.0
google.golang.org/grpc v1.74.2
google.golang.org/protobuf v1.36.7
@@ -230,7 +232,6 @@ require (
golang.org/x/mod v0.27.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
- golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.12.0 // indirect
google.golang.org/genproto/googleapis/api
v0.0.0-20250811230008-5f3141c8851a // indirect
diff --git a/go.sum b/go.sum
index c0bd9a31..beb2eaa2 100644
--- a/go.sum
+++ b/go.sum
@@ -373,6 +373,8 @@ github.com/gorilla/mux v1.8.1
h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod
h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79
h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod
h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
h1:sGm2vDRFUrQJO/Veii4h4zG2vvqG6uWNkBHSTqXOZk0=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2/go.mod
h1:wd1YpapPLivG6nQgbf7ZkG1hhSOXDhhn4MLTknx2aAc=
github.com/hashicorp/errwrap v1.0.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0
h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
new file mode 100644
index 00000000..69b462ca
--- /dev/null
+++ b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
@@ -0,0 +1 @@
+# inject grpc template
\ No newline at end of file
diff --git a/pkg/bootstrap/config.go b/pkg/bootstrap/config.go
new file mode 100644
index 00000000..730d395e
--- /dev/null
+++ b/pkg/bootstrap/config.go
@@ -0,0 +1,45 @@
+package bootstrap
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "os"
+ "strconv"
+ "strings"
+)
+
+type MetadataOptions struct {
+}
+
+func ReadPodAnnotations(path string) (map[string]string, error) {
+ if path == "" {
+ path = constants.PodInfoAnnotationsPath
+ }
+ b, err := os.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+ return ParseDownwardAPI(string(b))
+}
+
+func ParseDownwardAPI(i string) (map[string]string, error) {
+ res := map[string]string{}
+ for _, line := range strings.Split(i, "\n") {
+ sl := strings.SplitN(line, "=", 2)
+ if len(sl) != 2 {
+ continue
+ }
+ key := sl[0]
+ // Strip the leading/trailing quotes
+ val, err := strconv.Unquote(sl[1])
+ if err != nil {
+ return nil, fmt.Errorf("failed to unquote %v: %v",
sl[1], err)
+ }
+ res[key] = val
+ }
+ return res, nil
+}
+
+func GetNodeMetaData(options MetadataOptions) error {
+ return nil
+}
diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go
index 2caf64e8..0d9d4719 100644
--- a/pkg/cmd/cmd.go
+++ b/pkg/cmd/cmd.go
@@ -18,10 +18,14 @@
package cmd
import (
+ "context"
"flag"
"fmt"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
+ "os"
+ "os/signal"
+ "syscall"
)
func AddFlags(rootCmd *cobra.Command) {
@@ -33,3 +37,10 @@ func PrintFlags(flags *pflag.FlagSet) {
fmt.Printf("FLAG: --%s=%q\n", flag.Name, flag.Value)
})
}
+
+func WaitSignalFunc(cancel context.CancelCauseFunc) {
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ sig := <-sigs
+ cancel(fmt.Errorf("received signal: %v", sig.String()))
+}
diff --git a/pkg/config/constants/constants.go
b/pkg/config/constants/constants.go
index f8d053b2..e9bf56fd 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -20,6 +20,8 @@ package constants
const (
DubboSystemNamespace = "dubbo-system"
DefaultClusterLocalDomain = "cluster.local"
+ KeyFilename = "key.pem"
+ CertChainFilename = "cert-chain.pem"
DefaultClusterName = "Kubernetes"
ServiceClusterName = "dubbo-proxy"
ConfigPathDir = "./etc/dubbo/proxy"
@@ -28,4 +30,10 @@ const (
CertProviderKubernetesSignerPrefix = "k8s.io/"
CACertNamespaceConfigMapDataName = "root-cert.pem"
+
+ PodInfoAnnotationsPath = "./etc/dubbo/pod/annotations"
+ CertProviderNone = "none"
+ CertProviderCustom = "custom"
+
+ ThirdPartyJwtPath = "./var/run/secrets/tokens/dubbo-token"
)
diff --git a/pkg/config/mesh/mesh.go b/pkg/config/mesh/mesh.go
index 14b81139..3fdf4cf4 100644
--- a/pkg/config/mesh/mesh.go
+++ b/pkg/config/mesh/mesh.go
@@ -48,9 +48,10 @@ func DefaultProxyConfig() *meshconfig.ProxyConfig {
TerminationDrainDuration: durationpb.New(5 * time.Second),
ProxyAdminPort: 15000,
// TODO authpolicy
- DiscoveryAddress: "dubbod.dubbo-system.svc:15012",
- StatNameLength: 189,
- StatusPort: 15020,
+ DiscoveryAddress: "dubbod.dubbo-system.svc:15012",
+ ControlPlaneAuthPolicy:
meshconfig.AuthenticationPolicy_MUTUAL_TLS,
+ StatNameLength: 189,
+ StatusPort: 15020,
}
}
@@ -125,6 +126,16 @@ func ApplyMeshConfig(yaml string, defaultConfig
*meshconfig.MeshConfig) (*meshco
return defaultConfig, nil
}
+func ApplyProxyConfig(yaml string, meshConfig *meshconfig.MeshConfig)
(*meshconfig.MeshConfig, error) {
+ mc := protomarshal.Clone(meshConfig)
+ pc, err := MergeProxyConfig(yaml, mc.DefaultConfig)
+ if err != nil {
+ return nil, err
+ }
+ mc.DefaultConfig = pc
+ return mc, nil
+}
+
// EmptyMeshNetworks configuration with no networks
func EmptyMeshNetworks() meshconfig.MeshNetworks {
return meshconfig.MeshNetworks{
diff --git a/pkg/dubbo-agent/agent.go b/pkg/dubbo-agent/agent.go
new file mode 100644
index 00000000..409c57f7
--- /dev/null
+++ b/pkg/dubbo-agent/agent.go
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubboagent
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/bootstrap"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/grpcxds"
+ "github.com/apache/dubbo-kubernetes/pkg/filewatcher"
+ "github.com/apache/dubbo-kubernetes/pkg/model"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/cache"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ mesh "istio.io/api/mesh/v1alpha1"
+ "k8s.io/klog/v2"
+ "os"
+ "path"
+ "path/filepath"
+ "sync"
+ "time"
+)
+
+const (
+ AegisCACertPath = "./var/run/secrets/dubbo"
+)
+
+const (
+ MetadataClientCertKey = "DUBBO_META_TLS_CLIENT_KEY"
+ MetadataClientCertChain = "DUBBO_META_TLS_CLIENT_CERT_CHAIN"
+ MetadataClientRootCert = "DUBBO_META_TLS_CLIENT_ROOT_CERT"
+)
+
+type SDSServiceFactory = func(_ *security.Options, _ security.SecretManager, _
*mesh.PrivateKeyProvider) SDSService
+
+type Proxy struct {
+ Type model.NodeType
+ DNSDomain string
+}
+
+type Agent struct {
+ proxyConfig *mesh.ProxyConfig
+ cfg *AgentOptions
+ secOpts *security.Options
+ sdsServer SDSService
+ secretCache *cache.SecretManagerClient
+
+ xdsProxy *XdsProxy
+ fileWatcher filewatcher.FileWatcher
+
+ wg sync.WaitGroup
+}
+
+type AgentOptions struct {
+ WorkloadIdentitySocketFile string
+ GRPCBootstrapPath string
+ XDSHeaders map[string]string
+ XDSRootCerts string
+ MetadataDiscovery *bool
+ CARootCerts string
+}
+
+func NewAgent(proxyConfig *mesh.ProxyConfig, agentOpts *AgentOptions, sopts
*security.Options) *Agent {
+ return &Agent{
+ proxyConfig: proxyConfig,
+ cfg: agentOpts,
+ secOpts: sopts,
+ fileWatcher: filewatcher.NewWatcher(),
+ }
+}
+
+func (a *Agent) Run(ctx context.Context) (func(), error) {
+ // TODO initLocalDNSServer?
+
+ if a.cfg.WorkloadIdentitySocketFile !=
filepath.Base(a.cfg.WorkloadIdentitySocketFile) {
+ return nil, fmt.Errorf("workload identity socket file override
must be a filename, not a path: %s", a.cfg.WorkloadIdentitySocketFile)
+ }
+
+ configuredAgentSocketPath :=
security.GetWorkloadSDSSocketListenPath(a.cfg.WorkloadIdentitySocketFile)
+
+ isDubboSDS := configuredAgentSocketPath ==
security.GetDubboSDSServerSocketPath()
+
+ socketExists, err := checkSocket(ctx, configuredAgentSocketPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to check SDS socket: %v", err)
+ }
+ if socketExists {
+ klog.Infof("Existing workload SDS socket found at %s. Default
Dubbo SDS Server will only serve files", configuredAgentSocketPath)
+ a.secOpts.ServeOnlyFiles = true
+ } else if !isDubboSDS {
+ return nil, fmt.Errorf("agent configured for non-default SDS
socket path: %s but no socket found", configuredAgentSocketPath)
+ }
+
+ klog.Info("Starting default Dubbo SDS Server")
+ err = a.initSdsServer()
+ if err != nil {
+ return nil, fmt.Errorf("failed to start default Dubbo SDS
server: %v", err)
+ }
+ a.xdsProxy, err = initXdsProxy(a)
+ if err != nil {
+ return nil, fmt.Errorf("failed to start xds proxy: %v", err)
+ }
+
+ if a.cfg.GRPCBootstrapPath != "" {
+ if err := a.generateGRPCBootstrap(); err != nil {
+ return nil, fmt.Errorf("failed generating gRPC XDS
bootstrap: %v", err)
+ }
+ }
+ if a.proxyConfig.ControlPlaneAuthPolicy !=
mesh.AuthenticationPolicy_NONE {
+ rootCAForXDS, err := a.FindRootCAForXDS()
+ if err != nil {
+ return nil, fmt.Errorf("failed to find root XDS CA:
%v", err)
+ }
+ go a.startFileWatcher(ctx, rootCAForXDS, func() {
+ if err := a.xdsProxy.initDubbodDialOptions(a); err !=
nil {
+ klog.InfoS("Failed to init xds proxy dial
options")
+ }
+ })
+ }
+
+ return a.wg.Wait, nil
+}
+
+func (a *Agent) Close() {
+ if a.xdsProxy != nil {
+ a.xdsProxy.close()
+ }
+ if a.sdsServer != nil {
+ a.sdsServer.Stop()
+ }
+ if a.secretCache != nil {
+ a.secretCache.Close()
+ }
+ if a.fileWatcher != nil {
+ _ = a.fileWatcher.Close()
+ }
+}
+
+func (a *Agent) FindRootCAForXDS() (string, error) {
+ var rootCAPath string
+
+ if a.cfg.XDSRootCerts == security.SystemRootCerts {
+ // Special case input for root cert configuration to use system
root certificates
+ return "", nil
+ } else if a.cfg.XDSRootCerts != "" {
+ // Using specific platform certs or custom roots
+ rootCAPath = a.cfg.XDSRootCerts
+ } else if fileExists(security.DefaultRootCertFilePath) {
+ // Old style - mounted cert. This is used for XDS auth only,
+ // not connecting to CA_ADDR because this mode uses external
+ // agent (Secret refresh, etc)
+ return security.DefaultRootCertFilePath, nil
+ } else if a.secOpts.ProvCert != "" {
+ // This was never completely correct - PROV_CERT are only
intended for auth with CA_ADDR,
+ // and should not be involved in determining the root CA.
+ // For VMs, the root cert file used to auth may be populated
afterwards.
+ // Thus, return directly here and skip checking for existence.
+ return a.secOpts.ProvCert + "/root-cert.pem", nil
+ } else if a.secOpts.FileMountedCerts {
+ // FileMountedCerts - Load it from Proxy Metadata.
+ rootCAPath = a.proxyConfig.ProxyMetadata[MetadataClientRootCert]
+ } else if a.secOpts.SailCertProvider == constants.CertProviderNone {
+ return "", fmt.Errorf("root CA file for XDS required but
configured provider as none")
+ } else {
+ rootCAPath = path.Join(AegisCACertPath,
constants.CACertNamespaceConfigMapDataName)
+ }
+
+ // Additional checks for root CA cert existence. Fail early, instead of
obscure envoy errors
+ if fileExists(rootCAPath) {
+ return rootCAPath, nil
+ }
+
+ return "", fmt.Errorf("root CA file for XDS does not exist %s",
rootCAPath)
+}
+
+func (a *Agent) GetKeyCertsForXDS() (string, string) {
+ var key, cert string
+ if a.secOpts.ProvCert != "" {
+ key, cert = getKeyCertInner(a.secOpts.ProvCert)
+ } else if a.secOpts.FileMountedCerts {
+ key = a.proxyConfig.ProxyMetadata[MetadataClientCertKey]
+ cert = a.proxyConfig.ProxyMetadata[MetadataClientCertChain]
+ }
+ return key, cert
+}
+
+func (a *Agent) GetKeyCertsForCA() (string, string) {
+ var key, cert string
+ if a.secOpts.ProvCert != "" {
+ key, cert = getKeyCertInner(a.secOpts.ProvCert)
+ }
+ return key, cert
+}
+
+func (a *Agent) FindRootCAForCA() (string, error) {
+ var rootCAPath string
+
+ if a.cfg.CARootCerts == security.SystemRootCerts {
+ return "", nil
+ } else if a.cfg.CARootCerts != "" {
+ rootCAPath = a.cfg.CARootCerts
+ } else if a.secOpts.SailCertProvider == constants.CertProviderCustom {
+ rootCAPath = security.DefaultRootCertFilePath //
./etc/certs/root-cert.pem
+ } else if a.secOpts.ProvCert != "" {
+ // This was never completely correct - PROV_CERT are only
intended for auth with CA_ADDR,
+ // and should not be involved in determining the root CA.
+ // For VMs, the root cert file used to auth may be populated
afterwards.
+ // Thus, return directly here and skip checking for existence.
+ return a.secOpts.ProvCert + "/root-cert.pem", nil
+ } else if a.secOpts.SailCertProvider == constants.CertProviderNone {
+ return "", fmt.Errorf("root CA file for CA required but
configured provider as none")
+ } else {
+ rootCAPath = path.Join(AegisCACertPath,
constants.CACertNamespaceConfigMapDataName)
+ }
+
+ if fileExists(rootCAPath) {
+ return rootCAPath, nil
+ }
+
+ return "", fmt.Errorf("root CA file for CA does not exist %s",
rootCAPath)
+}
+
+func (a *Agent) startFileWatcher(ctx context.Context, filePath string, handler
func()) {
+ if err := a.fileWatcher.Add(filePath); err != nil {
+ klog.Warningf("Failed to add file watcher %s", filePath)
+ return
+ }
+
+ klog.V(2).Infof("Add file %s watcher", filePath)
+ for {
+ select {
+ case gotEvent := <-a.fileWatcher.Events(filePath):
+ klog.V(2).Infof("Receive file %s event %v", filePath,
gotEvent)
+ handler()
+ case err := <-a.fileWatcher.Errors(filePath):
+ klog.Warningf("Watch file %s error: %v", filePath, err)
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (a *Agent) initSdsServer() error {
+ var err error
+ if
security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath,
security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
+ klog.Info("workload certificate files detected, creating secret
manager without caClient")
+ a.secOpts.RootCertFilePath =
security.WorkloadIdentityRootCertPath
+ a.secOpts.CertChainFilePath =
security.WorkloadIdentityCertChainPath
+ a.secOpts.KeyFilePath = security.WorkloadIdentityKeyPath
+ a.secOpts.FileMountedCerts = true
+ }
+
+ createCaClient := !a.secOpts.FileMountedCerts &&
!a.secOpts.ServeOnlyFiles
+ a.secretCache, err = a.newSecretManager(createCaClient)
+ if err != nil {
+ return fmt.Errorf("failed to start workload secret manager %v",
err)
+ }
+
+ return nil
+}
+
+func (a *Agent) generateGRPCBootstrap() error {
+ // generate metadata
+ err := a.generateNodeMetadata()
+ if err != nil {
+ return fmt.Errorf("failed generating node metadata: %v", err)
+ }
+ if err := os.MkdirAll(filepath.Dir(a.cfg.GRPCBootstrapPath), 0o700);
err != nil {
+ return err
+ }
+
+ _, err = grpcxds.GenerateBootstrapFile(grpcxds.GenerateBootstrapOptions{
+ DiscoveryAddress: a.proxyConfig.DiscoveryAddress,
+ CertDir: a.secOpts.OutputKeyCertToDir,
+ }, a.cfg.GRPCBootstrapPath)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (a *Agent) newSecretManager(createCaClient bool)
(*cache.SecretManagerClient, error) {
+ if !createCaClient {
+ klog.Info("Workload is using file mounted certificates.
Skipping connecting to CA")
+ return cache.NewSecretManagerClient(nil, a.secOpts)
+ }
+ klog.Infof("CA Endpoint %s, provider %s", a.secOpts.CAEndpoint,
a.secOpts.CAProviderName)
+
+ caClient, err := createCAClient(a.secOpts, a)
+ if err != nil {
+ return nil, err
+ }
+ return cache.NewSecretManagerClient(caClient, a.secOpts)
+}
+
+func (a *Agent) generateNodeMetadata() error {
+ credentialSocketExists, err := checkSocket(context.TODO(),
security.CredentialNameSocketPath)
+ if err != nil {
+ return fmt.Errorf("failed to check credential SDS socket: %v",
err)
+ }
+ if credentialSocketExists {
+ klog.Info("Credential SDS socket found")
+ }
+
+ return bootstrap.GetNodeMetaData(bootstrap.MetadataOptions{})
+}
+
+type SDSService interface {
+ OnSecretUpdate(resourceName string)
+ Stop()
+}
+
+func checkSocket(ctx context.Context, socketPath string) (bool, error) {
+ socketExists := socketFileExists(socketPath)
+ if !socketExists {
+ return false, nil
+ }
+
+ err := socketHealthCheck(ctx, socketPath)
+ if err != nil {
+ klog.V(2).Infof("SDS socket detected but not healthy: %v", err)
+ err = os.Remove(socketPath)
+ if err != nil {
+ return false, fmt.Errorf("existing SDS socket could not
be removed: %v", err)
+ }
+ return false, nil
+ }
+
+ return true, nil
+}
+
+func socketHealthCheck(ctx context.Context, socketPath string) error {
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second))
+ defer cancel()
+
+ conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:%s", socketPath),
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.FailOnNonTempDialError(true),
+ grpc.WithReturnConnectionError(),
+ grpc.WithBlock(),
+ )
+ if err != nil {
+ return err
+ }
+ err = conn.Close()
+ if err != nil {
+ klog.Infof("connection is not closed: %v", err)
+ }
+
+ return nil
+}
+
+func getKeyCertInner(certPath string) (string, string) {
+ key := path.Join(certPath, constants.KeyFilename)
+ cert := path.Join(certPath, constants.CertChainFilename)
+ return key, cert
+}
+
+func fileExists(path string) bool {
+ if fi, err := os.Stat(path); err == nil && fi.Mode().IsRegular() {
+ return true
+ }
+ return false
+}
+
+func socketFileExists(path string) bool {
+ if fi, err := os.Stat(path); err == nil && !fi.Mode().IsRegular() {
+ return true
+ }
+ return false
+}
diff --git a/pkg/dubbo-agent/config/config.go b/pkg/dubbo-agent/config/config.go
new file mode 100644
index 00000000..1bd21a36
--- /dev/null
+++ b/pkg/dubbo-agent/config/config.go
@@ -0,0 +1,145 @@
+package config
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/bootstrap"
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/env"
+ "google.golang.org/protobuf/types/known/wrapperspb"
+ "istio.io/api/annotation"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ "k8s.io/klog/v2"
+ "os"
+ "runtime"
+ "strconv"
+ "strings"
+)
+
+// ConstructProxyConfig returns proxyConfig
+func ConstructProxyConfig(meshConfigFile, serviceCluster, proxyConfigEnv
string, concurrency int) (*meshconfig.ProxyConfig, error) {
+ annotations, err := bootstrap.ReadPodAnnotations("")
+ if err != nil {
+ if os.IsNotExist(err) {
+ klog.V(2).Infof("failed to read pod annotations: %v",
err)
+ } else {
+ klog.V(2).Infof("failed to read pod annotations: %v",
err)
+ }
+ }
+ var fileMeshContents string
+ if fileExists(meshConfigFile) {
+ contents, err := os.ReadFile(meshConfigFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read mesh config file
%v: %v", meshConfigFile, err)
+ }
+ fileMeshContents = string(contents)
+ }
+ meshConfig, err := getMeshConfig(fileMeshContents,
annotations[annotation.ProxyConfig.Name], proxyConfigEnv)
+ if err != nil {
+ return nil, err
+ }
+ proxyConfig := mesh.DefaultProxyConfig()
+ if meshConfig.DefaultConfig != nil {
+ proxyConfig = meshConfig.DefaultConfig
+ }
+
+ // Concurrency wasn't explicitly set
+ if proxyConfig.Concurrency == nil {
+ // We want to detect based on CPU limit configured. If we are
running on a 100 core machine, but with
+ // only 2 CPUs allocated, we want to have 2 threads, not 100,
or we will get excessively throttled.
+ if CPULimit != 0 {
+ klog.Infof("cpu limit detected as %v, setting
concurrency", CPULimit)
+ proxyConfig.Concurrency =
wrapperspb.Int32(int32(CPULimit))
+ }
+ }
+ // Respect the old flag, if they set it. This should never be set in
typical installation.
+ if concurrency != 0 {
+ klog.V(2).Infof("legacy --concurrency=%d flag detected; prefer
to use ProxyConfig", concurrency)
+ proxyConfig.Concurrency = wrapperspb.Int32(int32(concurrency))
+ }
+
+ if proxyConfig.Concurrency.GetValue() == 0 {
+ if CPULimit < runtime.NumCPU() {
+ klog.V(2).Infof("concurrency is set to 0, which will
use a thread per CPU on the host. However, CPU limit is set lower. "+
+ "This is not recommended and may lead to
performance issues. "+
+ "CPU count: %d, CPU Limit: %d.",
runtime.NumCPU(), CPULimit)
+ }
+ }
+
+ if x, ok :=
proxyConfig.GetClusterName().(*meshconfig.ProxyConfig_ServiceCluster); ok {
+ if x.ServiceCluster == "" {
+ proxyConfig.ClusterName =
&meshconfig.ProxyConfig_ServiceCluster{ServiceCluster: serviceCluster}
+ }
+ }
+ // TODO ResolveAddr
+ // TODO ValidateMeshConfigProxyConfig
+ return applyAnnotations(proxyConfig, annotations), nil
+}
+
+func getMeshConfig(fileOverride, annotationOverride, proxyConfigEnv string)
(*meshconfig.MeshConfig, error) {
+ mc := mesh.DefaultMeshConfig()
+ if fileOverride != "" {
+ klog.Infof("Apply mesh config from file %v", fileOverride)
+ fileMesh, err := mesh.ApplyMeshConfig(fileOverride, mc)
+ if err != nil || fileMesh == nil {
+ return nil, fmt.Errorf("failed to unmarshal mesh config
from file [%v]: %v", fileOverride, err)
+ }
+ mc = fileMesh
+ }
+
+ if proxyConfigEnv != "" {
+ klog.Infof("Apply proxy config from env %v", proxyConfigEnv)
+ envMesh, err := mesh.ApplyProxyConfig(proxyConfigEnv, mc)
+ if err != nil || envMesh == nil {
+ return nil, fmt.Errorf("failed to unmarshal mesh config
from environment [%v]: %v", proxyConfigEnv, err)
+ }
+ mc = envMesh
+ }
+
+ if annotationOverride != "" {
+ klog.Infof("Apply proxy config from annotation %v",
annotationOverride)
+ annotationMesh, err :=
mesh.ApplyProxyConfig(annotationOverride, mc)
+ if err != nil || annotationMesh == nil {
+ return nil, fmt.Errorf("failed to unmarshal mesh config
from annotation [%v]: %v", annotationOverride, err)
+ }
+ mc = annotationMesh
+ }
+
+ return mc, nil
+}
+
+// Apply any overrides to proxy config from annotations
+func applyAnnotations(config *meshconfig.ProxyConfig, annos map[string]string)
*meshconfig.ProxyConfig {
+ if v, f := annos[annotation.SidecarDiscoveryAddress.Name]; f {
+ config.DiscoveryAddress = v
+ }
+ if v, f := annos[annotation.SidecarStatusPort.Name]; f {
+ p, err := strconv.Atoi(v)
+ if err != nil {
+ klog.Errorf("Invalid annotation %v=%v: %v",
annotation.SidecarStatusPort.Name, v, err)
+ }
+ config.StatusPort = int32(p)
+ }
+ return config
+}
+
+func GetSailSan(discoveryAddress string) string {
+ discHost := strings.Split(discoveryAddress, ":")[0]
+ // For local debugging - the discoveryAddress is set to localhost, but
the cert issued for normal SA.
+ if discHost == "localhost" {
+ discHost = "dubbod.istio-system.svc"
+ }
+ return discHost
+}
+
+func fileExists(path string) bool {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ return false
+ }
+ return true
+}
+
+var CPULimit = env.Register(
+ "DUBBO_CPU_LIMIT",
+ 0,
+ "CPU limit for the current process. Expressed as an integer value,
rounded up.",
+).Get()
diff --git a/pkg/dubbo-agent/grpcxds/grpc_bootstrap.go
b/pkg/dubbo-agent/grpcxds/grpc_bootstrap.go
new file mode 100644
index 00000000..6dc2df0f
--- /dev/null
+++ b/pkg/dubbo-agent/grpcxds/grpc_bootstrap.go
@@ -0,0 +1,109 @@
+package grpcxds
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/file"
+ "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
+ core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+ "google.golang.org/protobuf/types/known/durationpb"
+ "os"
+ "path"
+ "time"
+)
+
+const (
+ ServerListenerNamePrefix = "xds.dubbo.io/grpc/lds/inbound/"
+ ServerListenerNameTemplate = ServerListenerNamePrefix + "%s"
+)
+
+type FileWatcherCertProviderConfig struct {
+ CertificateFile string `json:"certificate_file,omitempty"`
+ PrivateKeyFile string `json:"private_key_file,omitempty"`
+ CACertificateFile string `json:"ca_certificate_file,omitempty"`
+ RefreshDuration json.RawMessage `json:"refresh_interval,omitempty"`
+}
+
+type GenerateBootstrapOptions struct {
+ XdsUdsPath string
+ DiscoveryAddress string
+ CertDir string
+}
+
+type ChannelCreds struct {
+ Type string `json:"type,omitempty"`
+ Config any `json:"config,omitempty"`
+}
+
+type Bootstrap struct {
+ XDSServers []XdsServer
`json:"xds_servers,omitempty"`
+ Node *core.Node
`json:"node,omitempty"`
+ CertProviders map[string]CertificateProvider
`json:"certificate_providers,omitempty"`
+ ServerListenerNameTemplate string
`json:"server_listener_resource_name_template,omitempty"`
+}
+
+type XdsServer struct {
+ ServerURI string `json:"server_uri,omitempty"`
+ ChannelCreds []ChannelCreds `json:"channel_creds,omitempty"`
+ ServerFeatures []string `json:"server_features,omitempty"`
+}
+
+type CertificateProvider struct {
+ PluginName string `json:"plugin_name,omitempty"`
+ Config any `json:"config,omitempty"`
+}
+
+func GenerateBootstrap(opts GenerateBootstrapOptions) (*Bootstrap, error) {
+ // TODO direct to CP should use secure channel (most likely JWT + TLS,
but possibly allow mTLS)
+ serverURI := opts.DiscoveryAddress
+ if opts.XdsUdsPath != "" {
+ serverURI = fmt.Sprintf("unix:///%s", opts.XdsUdsPath)
+ }
+
+ bootstrap := Bootstrap{
+ XDSServers: []XdsServer{{
+ ServerURI: serverURI,
+ // connect locally via agent
+ ChannelCreds: []ChannelCreds{{Type: "insecure"}},
+ ServerFeatures: []string{"xds_v3"},
+ }},
+ ServerListenerNameTemplate: ServerListenerNameTemplate,
+ }
+
+ if opts.CertDir != "" {
+ // TODO use a more appropriate interval
+ refresh, err := protomarshal.Marshal(durationpb.New(15 *
time.Minute))
+ if err != nil {
+ return nil, err
+ }
+
+ bootstrap.CertProviders = map[string]CertificateProvider{
+ "default": {
+ PluginName: "file_watcher",
+ Config: FileWatcherCertProviderConfig{
+ PrivateKeyFile:
path.Join(opts.CertDir, "key.pem"),
+ CertificateFile:
path.Join(opts.CertDir, "cert-chain.pem"),
+ CACertificateFile:
path.Join(opts.CertDir, "root-cert.pem"),
+ RefreshDuration: refresh,
+ },
+ },
+ }
+ }
+
+ return &bootstrap, nil
+}
+
+func GenerateBootstrapFile(opts GenerateBootstrapOptions, path string)
(*Bootstrap, error) {
+ bootstrap, err := GenerateBootstrap(opts)
+ if err != nil {
+ return nil, err
+ }
+ jsonData, err := json.MarshalIndent(bootstrap, "", " ")
+ if err != nil {
+ return nil, err
+ }
+ if err := file.AtomicWrite(path, jsonData, os.FileMode(0o644)); err !=
nil {
+ return nil, fmt.Errorf("failed writing to %s: %v", path, err)
+ }
+ return bootstrap, nil
+}
diff --git a/pkg/dubbo-agent/plugins.go b/pkg/dubbo-agent/plugins.go
new file mode 100644
index 00000000..29ea6cd1
--- /dev/null
+++ b/pkg/dubbo-agent/plugins.go
@@ -0,0 +1,55 @@
+package dubboagent
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+
"github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/caclient/providers/aegis"
+ "k8s.io/klog/v2"
+ "strings"
+)
+
+type RootCertProvider interface {
+ GetKeyCertsForCA() (string, string)
+ FindRootCAForCA() (string, error)
+}
+
+var providers = make(map[string]func(*security.Options, RootCertProvider)
(security.Client, error))
+
+func init() {
+ providers["Aegis"] = createAegis
+}
+
+func createAegis(opts *security.Options, a RootCertProvider) (security.Client,
error) {
+ var tlsOpts *aegis.TLSOptions
+ var err error
+ // TODO: may add extra cases or explicit settings - but this is a rare
use cases, mostly debugging
+ if strings.HasSuffix(opts.CAEndpoint, ":15010") {
+ klog.Warning("Debug mode or IP-secure network")
+ } else {
+ tlsOpts = &aegis.TLSOptions{}
+ tlsOpts.RootCert, err = a.FindRootCAForCA()
+ if err != nil {
+ return nil, fmt.Errorf("failed to find root CA cert for
CA: %v", err)
+ }
+
+ if tlsOpts.RootCert == "" {
+ klog.Infof("Using CA %s cert with system certs",
opts.CAEndpoint)
+ } else if !fileExists(tlsOpts.RootCert) {
+ klog.Fatalf("invalid config - %s missing a root
certificate %s", opts.CAEndpoint, tlsOpts.RootCert)
+ } else {
+ klog.Infof("Using CA %s cert with certs: %s",
opts.CAEndpoint, tlsOpts.RootCert)
+ }
+
+ tlsOpts.Key, tlsOpts.Cert = a.GetKeyCertsForCA()
+ }
+
+ return aegis.NewAegisClient(opts, tlsOpts)
+}
+
+func createCAClient(opts *security.Options, a RootCertProvider)
(security.Client, error) {
+ provider, ok := providers[opts.CAProviderName]
+ if !ok {
+ return nil, fmt.Errorf("CA provider %q not registered",
opts.CAProviderName)
+ }
+ return provider(opts, a)
+}
diff --git a/pkg/dubbo-agent/xds_proxy.go b/pkg/dubbo-agent/xds_proxy.go
new file mode 100644
index 00000000..e90426ae
--- /dev/null
+++ b/pkg/dubbo-agent/xds_proxy.go
@@ -0,0 +1,80 @@
+package dubboagent
+
+import (
+ "fmt"
+ dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+ "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/caclient"
+ "google.golang.org/grpc"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ "net"
+ "sync"
+)
+
+type XdsProxy struct {
+ stopChan chan struct{}
+ downstreamGrpcServer *grpc.Server
+ downstreamListener net.Listener
+ optsMutex sync.RWMutex
+ dialOptions []grpc.DialOption
+ dubbodSAN string
+}
+
+func initXdsProxy(ia *Agent) (*XdsProxy, error) {
+ proxy := &XdsProxy{}
+ return proxy, nil
+}
+
+func (p *XdsProxy) initDubbodDialOptions(agent *Agent) error {
+ opts, err := p.buildUpstreamClientDialOpts(agent)
+ if err != nil {
+ return err
+ }
+
+ p.optsMutex.Lock()
+ p.dialOptions = opts
+ p.optsMutex.Unlock()
+ return nil
+}
+
+func (p *XdsProxy) buildUpstreamClientDialOpts(sa *Agent) ([]grpc.DialOption,
error) {
+ tlsOpts, err := p.getTLSOptions(sa)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get TLS options to talk to
upstream: %v", err)
+ }
+ options, err := dubbogrpc.ClientOptions(nil, tlsOpts)
+ if err != nil {
+ return nil, err
+ }
+ if sa.secOpts.CredFetcher != nil {
+ options = append(options,
grpc.WithPerRPCCredentials(caclient.NewDefaultTokenProvider(sa.secOpts)))
+ }
+ return options, nil
+}
+
+func (p *XdsProxy) getTLSOptions(agent *Agent) (*dubbogrpc.TLSOptions, error) {
+ if agent.proxyConfig.ControlPlaneAuthPolicy ==
meshconfig.AuthenticationPolicy_NONE {
+ return nil, nil
+ }
+ xdsCACertPath, err := agent.FindRootCAForXDS()
+ if err != nil {
+ return nil, fmt.Errorf("failed to find root CA cert for XDS:
%v", err)
+ }
+ key, cert := agent.GetKeyCertsForXDS()
+ return &dubbogrpc.TLSOptions{
+ RootCert: xdsCACertPath,
+ Key: key,
+ Cert: cert,
+ ServerAddress: agent.proxyConfig.DiscoveryAddress,
+ SAN: p.dubbodSAN,
+ }, nil
+}
+
+func (p *XdsProxy) close() {
+ close(p.stopChan)
+ if p.downstreamGrpcServer != nil {
+ p.downstreamGrpcServer.Stop()
+ }
+ if p.downstreamListener != nil {
+ _ = p.downstreamListener.Close()
+ }
+}
diff --git a/pkg/features/security.go b/pkg/features/security.go
new file mode 100644
index 00000000..778a67df
--- /dev/null
+++ b/pkg/features/security.go
@@ -0,0 +1,35 @@
+package features
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/env"
+)
+
+const (
+ // FIPS_140_2 compliance policy.
+ // nolint: revive, stylecheck
+ FIPS_140_2 = "fips-140-2"
+
+ PQC = "pqc"
+)
+
+// Define common security feature flags shared among the Istio components.
+var (
+ CompliancePolicy = env.Register("COMPLIANCE_POLICY", "",
+ `If set, applies policy-specific restrictions over all existing
TLS
+settings, including in-mesh mTLS and external TLS. Valid values are:
+
+* '' or unset places no additional restrictions.
+* 'fips-140-2' which enforces a version of the TLS protocol and a subset
+of cipher suites overriding any user preferences or defaults for all runtime
+components, including Envoy, gRPC Go SDK, and gRPC C++ SDK.
+* 'pqc' which enforces post-quantum-safe key exchange X25519MLKEM768, TLS v1.3
+and cipher suites TLS_AES_128_GCM_SHA256 and TLS_AES_256_GCM_SHA384 overriding
+any user preferences or defaults for all runtime components, including Envoy,
+gRPC Go SDK, and gRPC C++ SDK. This policy is experimental.
+
+WARNING: Setting compliance policy in the control plane is a necessary but
+not a sufficient requirement to achieve compliance. There are additional
+steps necessary to claim compliance, including using the validated
+cryptograhic modules (please consult
+https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/security/ssl#fips-140-2).`).Get()
+)
diff --git a/pkg/file/fadvise_linux.go b/pkg/file/fadvise_linux.go
new file mode 100644
index 00000000..fb4fdcaa
--- /dev/null
+++ b/pkg/file/fadvise_linux.go
@@ -0,0 +1,20 @@
+package file
+
+import (
+ "fmt"
+ "os"
+
+ "golang.org/x/sys/unix"
+)
+
+// markNotNeeded marks a function as 'not needed'.
+// Interactions with large files can end up with a large page cache. This
isn't really a big deal as the OS can reclaim
+// this under memory pressure. However, Kubernetes counts page cache usage
against the container memory usage.
+// This leads to bloating up memory usage if we are just copying large files
around.
+func markNotNeeded(in *os.File) error {
+ err := unix.Fadvise(int(in.Fd()), 0, 0, unix.FADV_DONTNEED)
+ if err != nil {
+ return fmt.Errorf("failed to mark file FADV_DONTNEED: %v", err)
+ }
+ return nil
+}
diff --git a/pkg/file/fadvise_unspecified.go b/pkg/file/fadvise_unspecified.go
new file mode 100644
index 00000000..292966f8
--- /dev/null
+++ b/pkg/file/fadvise_unspecified.go
@@ -0,0 +1,12 @@
+//go:build !linux
+// +build !linux
+
+package file
+
+import (
+ "os"
+)
+
+func markNotNeeded(in *os.File) error {
+ return nil
+}
diff --git a/pkg/file/file.go b/pkg/file/file.go
new file mode 100644
index 00000000..148e7261
--- /dev/null
+++ b/pkg/file/file.go
@@ -0,0 +1,71 @@
+package file
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "io/fs"
+ "k8s.io/klog/v2"
+ "os"
+ "path/filepath"
+)
+
+func AtomicWrite(path string, data []byte, mode os.FileMode) error {
+ return AtomicWriteReader(path, bytes.NewReader(data), mode)
+}
+
+func AtomicWriteReader(path string, data io.Reader, mode os.FileMode) error {
+ tmpFile, err := os.CreateTemp(filepath.Dir(path),
filepath.Base(path)+".tmp.")
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if Exists(tmpFile.Name()) {
+ if rmErr := os.Remove(tmpFile.Name()); rmErr != nil {
+ if err != nil {
+ err = fmt.Errorf("%s: %w",
rmErr.Error(), err)
+ } else {
+ err = rmErr
+ }
+ }
+ }
+ }()
+
+ if err := os.Chmod(tmpFile.Name(), mode); err != nil {
+ return err
+ }
+
+ n, err := io.Copy(tmpFile, data)
+ if _, err := io.Copy(tmpFile, data); err != nil {
+ if closeErr := tmpFile.Close(); closeErr != nil {
+ err = fmt.Errorf("%s: %w", closeErr.Error(), err)
+ }
+ return err
+ }
+ tryMarkLargeFileAsNotNeeded(n, tmpFile)
+ if err := tmpFile.Close(); err != nil {
+ return err
+ }
+
+ return os.Rename(tmpFile.Name(), path)
+}
+
+func Exists(name string) bool {
+ // We must explicitly check if the error is due to the file not
existing (as opposed to a
+ // permissions error).
+ _, err := os.Stat(name)
+ return !errors.Is(err, fs.ErrNotExist)
+}
+
+func tryMarkLargeFileAsNotNeeded(size int64, in *os.File) {
+ // Somewhat arbitrary value to not bother with this on small files
+ const largeFileThreshold = 16 * 1024
+ if size < largeFileThreshold {
+ return
+ }
+ if err := markNotNeeded(in); err != nil {
+ // Error is fine, this is just an optimization anyways. Continue
+ klog.Errorf("failed to mark not needed, continuing anyways:
%v", err)
+ }
+}
diff --git a/pkg/jwt/jwt.go b/pkg/jwt/jwt.go
new file mode 100644
index 00000000..9d27495b
--- /dev/null
+++ b/pkg/jwt/jwt.go
@@ -0,0 +1,6 @@
+package jwt
+
+const (
+ PolicyThirdParty = "third-party-jwt"
+ PolicyFirstParty = "first-party-jwt"
+)
diff --git a/pkg/model/fips.go b/pkg/model/fips.go
new file mode 100644
index 00000000..10a41cba
--- /dev/null
+++ b/pkg/model/fips.go
@@ -0,0 +1,34 @@
+package model
+
+import (
+ gotls "crypto/tls"
+ common_features "github.com/apache/dubbo-kubernetes/pkg/features"
+ "k8s.io/klog/v2"
+)
+
+var fipsGoCiphers = []uint16{
+ gotls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ gotls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ gotls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ gotls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+}
+
+func EnforceGoCompliance(ctx *gotls.Config) {
+ switch common_features.CompliancePolicy {
+ case "":
+ return
+ case common_features.FIPS_140_2:
+ ctx.MinVersion = gotls.VersionTLS12
+ ctx.MaxVersion = gotls.VersionTLS12
+ ctx.CipherSuites = fipsGoCiphers
+ ctx.CurvePreferences = []gotls.CurveID{gotls.CurveP256}
+ return
+ case common_features.PQC:
+ ctx.MinVersion = gotls.VersionTLS13
+ ctx.CipherSuites = []uint16{gotls.TLS_AES_128_GCM_SHA256,
gotls.TLS_AES_256_GCM_SHA384}
+ ctx.CurvePreferences = []gotls.CurveID{gotls.X25519MLKEM768}
+ default:
+ klog.Warningf("unknown compliance policy: %q",
common_features.CompliancePolicy)
+ return
+ }
+}
diff --git a/pkg/model/xds.go b/pkg/model/xds.go
index c5eccbf1..72d334d3 100644
--- a/pkg/model/xds.go
+++ b/pkg/model/xds.go
@@ -1,11 +1,31 @@
package model
const (
- APITypePrefix = "type.googleapis.com/"
- ClusterType = APITypePrefix + "envoy.config.cluster.v3.Cluster"
- ListenerType = APITypePrefix + "envoy.config.listener.v3.Listener"
- EndpointType = APITypePrefix +
"envoy.config.endpoint.v3.ClusterLoadAssignment"
- RouteType = APITypePrefix +
"envoy.config.route.v3.RouteConfiguration"
+ APITypePrefix = "type.googleapis.com/"
+ ClusterType = APITypePrefix +
"envoy.config.cluster.v3.Cluster"
+ ListenerType = APITypePrefix +
"envoy.config.listener.v3.Listener"
+ EndpointType = APITypePrefix +
"envoy.config.endpoint.v3.ClusterLoadAssignment"
+ RouteType = APITypePrefix +
"envoy.config.route.v3.RouteConfiguration"
+ SecretType = APITypePrefix +
"envoy.extensions.transport_sockets.tls.v3.Secret"
+ ExtensionConfigurationType = APITypePrefix +
"envoy.config.core.v3.TypedExtensionConfig"
- DebugType = "dubbo.io/debug"
+ HealthInfoType = APITypePrefix + "dubbo.v1.HealthInformation"
+ DebugType = "dubbo.io/debug"
)
+
+func GetShortType(typeURL string) string {
+ switch typeURL {
+ case ClusterType:
+ return "CDS"
+ case ListenerType:
+ return "LDS"
+ case RouteType:
+ return "RDS"
+ case EndpointType:
+ return "EDS"
+ case SecretType:
+ return "SDS"
+ default:
+ return typeURL
+ }
+}
diff --git a/pkg/queue/delay.go b/pkg/queue/delay.go
new file mode 100644
index 00000000..171597a3
--- /dev/null
+++ b/pkg/queue/delay.go
@@ -0,0 +1,269 @@
+package queue
+
+import (
+ "container/heap"
+ "k8s.io/klog/v2"
+ "runtime"
+ "sync"
+ "time"
+)
+
+type delayTask struct {
+ do func() error
+ runAt time.Time
+ retries int
+}
+
+const maxTaskRetry = 3
+
+var _ heap.Interface = &pq{}
+
+// pq implements an internal priority queue so that tasks with the soonest
expiry will be run first.
+// Methods on pq are not threadsafe, access should be protected.
+// much of this is taken from the example at
https://golang.org/pkg/container/heap/
+type pq []*delayTask
+
+func (q pq) Len() int {
+ return len(q)
+}
+
+func (q pq) Less(i, j int) bool {
+ return q[i].runAt.Before(q[j].runAt)
+}
+
+func (q *pq) Swap(i, j int) {
+ (*q)[i], (*q)[j] = (*q)[j], (*q)[i]
+}
+
+func (q *pq) Push(x any) {
+ *q = append(*q, x.(*delayTask))
+}
+
+func (q *pq) Pop() any {
+ old := *q
+ n := len(old)
+ c := cap(old)
+ // Shrink the capacity of task queue.
+ if n < c/2 && c > 32 {
+ npq := make(pq, n, c/2)
+ copy(npq, old)
+ old = npq
+ }
+ if n == 0 {
+ return nil
+ }
+ item := old[n-1]
+ old[n-1] = nil // avoid memory leak
+ *q = old[0 : n-1]
+ return item
+}
+
+// Peek is not managed by the container/heap package, so we return the 0th
element in the list.
+func (q *pq) Peek() any {
+ if q.Len() < 1 {
+ return nil
+ }
+ return (*q)[0]
+}
+
+// Delayed implements queue such that tasks are executed after a specified
delay.
+type Delayed interface {
+ baseInstance
+ PushDelayed(t Task, delay time.Duration)
+}
+
+var _ Delayed = &delayQueue{}
+
+// DelayQueueOption configure the behavior of the queue. Must be applied
before Run.
+type DelayQueueOption func(*delayQueue)
+
+// DelayQueueBuffer sets maximum number of tasks awaiting execution. If this
limit is reached, Push and PushDelayed
+// will block until there is room.
+func DelayQueueBuffer(bufferSize int) DelayQueueOption {
+ return func(queue *delayQueue) {
+ if queue.enqueue != nil {
+ close(queue.enqueue)
+ }
+ queue.enqueue = make(chan *delayTask, bufferSize)
+ }
+}
+
+// DelayQueueWorkers sets the number of background worker goroutines await
tasks to execute. Effectively the
+// maximum number of concurrent tasks.
+func DelayQueueWorkers(workers int) DelayQueueOption {
+ return func(queue *delayQueue) {
+ queue.workers = workers
+ }
+}
+
+// workerChanBuf determines whether the channel of a worker should be a
buffered channel
+// to get the best performance.
+var workerChanBuf = func() int {
+ // Use blocking channel if GOMAXPROCS=1.
+ // This switches context from sender to receiver immediately,
+ // which results in higher performance.
+ var n int
+ if n = runtime.GOMAXPROCS(0); n == 1 {
+ return 0
+ }
+
+ // Make channel non-blocking and set up its capacity with GOMAXPROCS if
GOMAXPROCS>1,
+ // otherwise the sender might be dragged down if the receiver is
CPU-bound.
+ //
+ // GOMAXPROCS determines how many goroutines can run in parallel,
+ // which makes it the best choice as the channel capacity,
+ return n
+}()
+
+// NewDelayed gives a Delayed queue with maximum concurrency specified by
workers.
+func NewDelayed(opts ...DelayQueueOption) Delayed {
+ q := &delayQueue{
+ workers: 1,
+ queue: &pq{},
+ execute: make(chan *delayTask, workerChanBuf),
+ enqueue: make(chan *delayTask, 100),
+ }
+ for _, o := range opts {
+ o(q)
+ }
+ return q
+}
+
+type delayQueue struct {
+ workers int
+ workerStopped []chan struct{}
+
+ // incoming
+ enqueue chan *delayTask
+ // outgoing
+ execute chan *delayTask
+
+ mu sync.Mutex
+ queue *pq
+}
+
+// Push will execute the task as soon as possible
+func (d *delayQueue) Push(task Task) {
+ d.pushInternal(&delayTask{do: task, runAt: time.Now()})
+}
+
+// PushDelayed will execute the task after waiting for the delay
+func (d *delayQueue) PushDelayed(t Task, delay time.Duration) {
+ task := &delayTask{do: t, runAt: time.Now().Add(delay)}
+ d.pushInternal(task)
+}
+
+// pushInternal will enqueue the delayTask with retries.
+func (d *delayQueue) pushInternal(task *delayTask) {
+ select {
+ case d.enqueue <- task:
+ // buffer has room to enqueue
+ default:
+ // TODO warn and resize buffer
+ // if the buffer is full, we take the more expensive route of
locking and pushing directly to the heap
+ d.mu.Lock()
+ heap.Push(d.queue, task)
+ d.mu.Unlock()
+ }
+}
+
+func (d *delayQueue) Closed() <-chan struct{} {
+ done := make(chan struct{})
+ go func() {
+ for _, ch := range d.workerStopped {
+ <-ch
+ }
+ close(done)
+ }()
+ return done
+}
+
+func (d *delayQueue) Run(stop <-chan struct{}) {
+ for i := 0; i < d.workers; i++ {
+ d.workerStopped = append(d.workerStopped, d.work(stop))
+ }
+
+ push := func(t *delayTask) bool {
+ select {
+ case d.execute <- t:
+ return true
+ case <-stop:
+ return false
+ }
+ }
+
+ for {
+ var task *delayTask
+ d.mu.Lock()
+ if head := d.queue.Peek(); head != nil {
+ task = head.(*delayTask)
+ heap.Pop(d.queue)
+ }
+ d.mu.Unlock()
+
+ if task != nil {
+ delay := time.Until(task.runAt)
+ if delay <= 0 {
+ // execute now and continue processing incoming
enqueues/tasks
+ if !push(task) {
+ return
+ }
+ } else {
+ // not ready yet, don't block enqueueing
+ await := time.NewTimer(delay)
+ select {
+ case t := <-d.enqueue:
+ d.mu.Lock()
+ heap.Push(d.queue, t)
+ // put the old "head" back on the
queue, it may be scheduled to execute after the one
+ // that was just pushed
+ heap.Push(d.queue, task)
+ d.mu.Unlock()
+ case <-await.C:
+ if !push(task) {
+ return
+ }
+ case <-stop:
+ await.Stop()
+ return
+ }
+ await.Stop()
+ }
+ } else {
+ // no items, wait for Push or stop
+ select {
+ case t := <-d.enqueue:
+ d.mu.Lock()
+ d.queue.Push(t)
+ d.mu.Unlock()
+ case <-stop:
+ return
+ }
+ }
+ }
+}
+
+// work takes a channel that signals to stop, and returns a channel that
signals the worker has fully stopped
+func (d *delayQueue) work(stop <-chan struct{}) (stopped chan struct{}) {
+ stopped = make(chan struct{})
+ go func() {
+ defer close(stopped)
+ for {
+ select {
+ case t := <-d.execute:
+ if err := t.do(); err != nil {
+ if t.retries < maxTaskRetry {
+ t.retries++
+ klog.Warningf("Work item handle
failed: %v %d times, retry it", err, t.retries)
+ d.pushInternal(t)
+ continue
+ }
+ klog.Errorf("Work item handle failed:
%v, reaching the maximum retry times: %d, drop it", err, maxTaskRetry)
+ }
+ case <-stop:
+ return
+ }
+ }
+ }()
+ return
+}
diff --git a/pkg/sail-agent/agent.go b/pkg/sail-agent/agent.go
deleted file mode 100644
index e79eecea..00000000
--- a/pkg/sail-agent/agent.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package naviagent
-
-import "github.com/apache/dubbo-kubernetes/pkg/model"
-
-// Shared properties with Pilot Proxy struct.
-type Proxy struct {
- Type model.NodeType
- DNSDomain string
-}
diff --git a/pkg/security/retry.go b/pkg/security/retry.go
new file mode 100644
index 00000000..e16069c1
--- /dev/null
+++ b/pkg/security/retry.go
@@ -0,0 +1,16 @@
+package security
+
+import (
+ "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+)
+
+var CARetryOptions = []retry.CallOption{
+ retry.WithMax(5),
+ retry.WithCodes(codes.Canceled, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal, codes.Unavailable),
+}
+
+func CARetryInterceptor() grpc.DialOption {
+ return
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(CARetryOptions...))
+}
diff --git a/pkg/security/security.go b/pkg/security/security.go
index 84ab7b62..bcf5ac08 100644
--- a/pkg/security/security.go
+++ b/pkg/security/security.go
@@ -20,15 +20,29 @@ package security
import (
"context"
"net/http"
+ "os"
+ "path/filepath"
"time"
)
const (
- // RootCertReqResourceName is resource name of discovery request for
root certificate.
- RootCertReqResourceName = "ROOTCA"
- // WorkloadKeyCertResourceName is the resource name of the discovery
request for workload
- // identity.
- WorkloadKeyCertResourceName = "default"
+ RootCertReqResourceName = "ROOTCA"
+ WorkloadKeyCertResourceName = "default"
+ WorkloadIdentityPath =
"./var/run/secrets/workload-spiffe-uds"
+ DefaultWorkloadIdentitySocketFile = "socket"
+ SystemRootCerts = "SYSTEM"
+ DefaultRootCertFilePath = "./etc/certs/root-cert.pem"
+ CredentialNameSocketPath =
"./var/run/secrets/credential-uds/socket"
+ WorkloadIdentityCredentialsPath =
"./var/run/secrets/workload-spiffe-credentials"
+ WorkloadIdentityCertChainPath = WorkloadIdentityCredentialsPath +
"/cert-chain.pem"
+ WorkloadIdentityRootCertPath = WorkloadIdentityCredentialsPath +
"/root-cert.pem"
+ WorkloadIdentityKeyPath = WorkloadIdentityCredentialsPath +
"/key.pem"
+ FileCredentialNameSocketPath =
"./var/run/secrets/credential-uds/files-socket"
+ JWT = "JWT"
+)
+
+const (
+ CertSigner = "CertSigner"
)
type AuthContext struct {
@@ -47,29 +61,23 @@ type Authenticator interface {
type SecretItem struct {
CertificateChain []byte
PrivateKey []byte
-
- RootCert []byte
-
- // ResourceName passed from envoy SDS discovery request.
- // "ROOTCA" for root cert request, "default" for key/cert request.
- ResourceName string
-
- CreatedTime time.Time
-
- ExpireTime time.Time
+ RootCert []byte
+ ResourceName string
+ CreatedTime time.Time
+ ExpireTime time.Time
}
// SecretManager defines secrets management interface which is used by SDS.
type SecretManager interface {
- // GenerateSecret generates new secret for the given resource.
- //
- // The current implementation also watched the generated secret and
trigger a callback when it is
- // near expiry. It will constructs the SAN based on the token's 'sub'
claim, expected to be in
- // the K8S format. No other JWTs are currently supported due to client
logic. If JWT is
- // missing/invalid, the resourceName is used.
GenerateSecret(resourceName string) (*SecretItem, error)
}
+type SdsCertificateConfig struct {
+ CertificatePath string
+ PrivateKeyPath string
+ CaCertificatePath string
+}
+
type AuthSource int
type KubernetesInfo struct {
@@ -85,3 +93,57 @@ type Caller struct {
KubernetesInfo KubernetesInfo
}
+
+type Options struct {
+ ServeOnlyFiles bool
+ ProvCert string
+ FileMountedCerts bool
+ SailCertProvider string
+ OutputKeyCertToDir string
+ CertChainFilePath string
+ KeyFilePath string
+ RootCertFilePath string
+ CARootPath string
+ CAEndpoint string
+ CAProviderName string
+ CredFetcher CredFetcher
+ CAHeaders map[string]string
+ CAEndpointSAN string
+ CertSigner string
+ ClusterID string
+ CredIdentityProvider string
+ TrustDomain string
+}
+
+type CredFetcher interface {
+ GetPlatformCredential() (string, error)
+ GetIdentityProvider() string
+ Stop()
+}
+
+type Client interface {
+ CSRSign(csrPEM []byte, certValidTTLInSec int64) ([]string, error)
+ Close()
+ GetRootCertBundle() ([]string, error)
+}
+
+func GetWorkloadSDSSocketListenPath(sockfile string) string {
+ return filepath.Join(WorkloadIdentityPath, sockfile)
+}
+
+func GetDubboSDSServerSocketPath() string {
+ return filepath.Join(WorkloadIdentityPath,
DefaultWorkloadIdentitySocketFile)
+}
+
+func CheckWorkloadCertificate(certChainFilePath, keyFilePath, rootCertFilePath
string) bool {
+ if _, err := os.Stat(certChainFilePath); err != nil {
+ return false
+ }
+ if _, err := os.Stat(keyFilePath); err != nil {
+ return false
+ }
+ if _, err := os.Stat(rootCertFilePath); err != nil {
+ return false
+ }
+ return true
+}
diff --git a/pkg/uds/listener.go b/pkg/uds/listener.go
new file mode 100644
index 00000000..9936711f
--- /dev/null
+++ b/pkg/uds/listener.go
@@ -0,0 +1,39 @@
+package uds
+
+import (
+ "fmt"
+ "k8s.io/klog/v2"
+ "net"
+ "os"
+ "path/filepath"
+)
+
+func NewListener(path string) (net.Listener, error) {
+ // Remove unix socket before use.
+ if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
+ // Anything other than "file not found" is an error.
+ return nil, fmt.Errorf("failed to remove unix://%s: %v", path,
err)
+ }
+
+ // Attempt to create the folder in case it doesn't exist
+ if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
+ // If we cannot create it, just warn here - we will fail later
if there is a real error
+ klog.Warningf("Failed to create directory for %v: %v", path,
err)
+ }
+
+ var err error
+ listener, err := net.Listen("unix", path)
+ if err != nil {
+ return nil, fmt.Errorf("failed to listen on unix socket %q:
%v", path, err)
+ }
+
+ // Update file permission so that istio-proxy has permission to access
it.
+ if _, err := os.Stat(path); err != nil {
+ return nil, fmt.Errorf("uds file %q doesn't exist", path)
+ }
+ if err := os.Chmod(path, 0o666); err != nil {
+ return nil, fmt.Errorf("failed to update %q permission", path)
+ }
+
+ return listener, nil
+}
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index ab68c09d..183f05a8 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -18,12 +18,34 @@
package xds
import (
+ "github.com/apache/dubbo-kubernetes/pkg/model"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+ core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "k8s.io/klog/v2"
"time"
)
type DiscoveryStream =
discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer
+type Resources = []*discovery.Resource
+
+type WatchedResource struct {
+ TypeUrl string
+ ResourceNames sets.String
+ Wildcard bool
+ NonceSent string
+ NonceAcked string
+ AlwaysRespond bool
+ LastSendTime time.Time
+ LastError string
+ LastResources Resources
+}
+
type Connection struct {
peerAddr string
connectedAt time.Time
@@ -36,6 +58,28 @@ type Connection struct {
errorChan chan error
}
+type Watcher interface {
+ DeleteWatchedResource(url string)
+ GetWatchedResource(url string) *WatchedResource
+ NewWatchedResource(url string, names []string)
+ UpdateWatchedResource(string, func(*WatchedResource) *WatchedResource)
+ // GetID identifies an xDS client. This is different from a connection
ID.
+ GetID() string
+}
+
+type ConnectionContext interface {
+ XdsConnection() *Connection
+ Watcher() Watcher
+ // Initialize checks the first request.
+ Initialize(node *core.Node) error
+ // Close discards the connection.
+ Close()
+ // Process responds to a discovery request.
+ Process(req *discovery.DiscoveryRequest) error
+ // Push responds to a push event queue
+ Push(ev any) error
+}
+
func NewConnection(peerAddr string, stream DiscoveryStream) Connection {
return Connection{
pushChannel: make(chan any),
@@ -48,3 +92,271 @@ func NewConnection(peerAddr string, stream DiscoveryStream)
Connection {
stream: stream,
}
}
+
+func Stream(ctx ConnectionContext) error {
+ con := ctx.XdsConnection()
+ // Do not call: defer close(con.pushChannel). The push channel will be
garbage collected
+ // when the connection is no longer used. Closing the channel can cause
subtle race conditions
+ // with push. According to the spec: "It's only necessary to close a
channel when it is important
+ // to tell the receiving goroutines that all data have been sent."
+
+ // Block until either a request is received or a push is triggered.
+ // We need 2 go routines because 'read' blocks in Recv().
+ go Receive(ctx)
+
+ // Wait for the proxy to be fully initialized before we start serving
traffic. Because
+ // initialization doesn't have dependencies that will block, there is
no need to add any timeout
+ // here. Prior to this explicit wait, we were implicitly waiting by
receive() not sending to
+ // reqChannel and the connection not being enqueued for pushes to
pushChannel until the
+ // initialization is complete.
+ <-con.initialized
+
+ for {
+ // Go select{} statements are not ordered; the same channel can
be chosen many times.
+ // For requests, these are higher priority (client may be
blocked on startup until these are done)
+ // and often very cheap to handle (simple ACK), so we check it
first.
+ select {
+ case req, ok := <-con.reqChan:
+ if ok {
+ if err := ctx.Process(req); err != nil {
+ return err
+ }
+ } else {
+ // Remote side closed connection or error
processing the request.
+ return <-con.errorChan
+ }
+ case <-con.stop:
+ return nil
+ default:
+ }
+ // If there wasn't already a request, poll for requests and
pushes. Note: if we have a huge
+ // amount of incoming requests, we may still send some pushes,
as we do not `continue` above;
+ // however, requests will be handled ~2x as much as pushes.
This ensures a wave of requests
+ // cannot completely starve pushes. However, this scenario is
unlikely.
+ select {
+ case req, ok := <-con.reqChan:
+ if ok {
+ if err := ctx.Process(req); err != nil {
+ return err
+ }
+ } else {
+ // Remote side closed connection or error
processing the request.
+ return <-con.errorChan
+ }
+ case pushEv := <-con.pushChannel:
+ err := ctx.Push(pushEv)
+ if err != nil {
+ return err
+ }
+ case <-con.stop:
+ return nil
+ }
+ }
+}
+
+func Receive(ctx ConnectionContext) {
+ con := ctx.XdsConnection()
+ defer func() {
+ close(con.errorChan)
+ close(con.reqChan)
+ // Close the initialized channel, if its not already closed, to
prevent blocking the stream.
+ select {
+ case <-con.initialized:
+ default:
+ close(con.initialized)
+ }
+ }()
+
+ firstRequest := true
+ for {
+ req, err := con.stream.Recv()
+ if err != nil {
+ if dubbogrpc.GRPCErrorType(err) !=
dubbogrpc.UnexpectedError {
+ klog.Infof("ADS: %q %s terminated",
con.peerAddr, con.conID)
+ return
+ }
+ con.errorChan <- err
+ klog.Errorf("ADS: %q %s terminated with error: %v",
con.peerAddr, con.conID, err)
+ return
+ }
+ // This should be only set for the first request. The node id
may not be set - for example malicious clients.
+ if firstRequest {
+ // probe happens before envoy sends first xDS request
+ if req.TypeUrl == model.HealthInfoType {
+ klog.Warningf("ADS: %q %s send health check
probe before normal xDS request", con.peerAddr, con.conID)
+ continue
+ }
+ firstRequest = false
+ if req.Node == nil || req.Node.Id == "" {
+ con.errorChan <-
status.New(codes.InvalidArgument, "missing node information").Err()
+ return
+ }
+ if err := ctx.Initialize(req.Node); err != nil {
+ con.errorChan <- err
+ return
+ }
+ defer ctx.Close()
+ klog.Infof("ADS: new connection for node:%s", con.conID)
+ }
+
+ select {
+ case con.reqChan <- req:
+ case <-con.stream.Context().Done():
+ klog.Infof("ADS: %q %s terminated with stream closed",
con.peerAddr, con.conID)
+ return
+ }
+ }
+}
+
+func (conn *Connection) ID() string {
+ return conn.conID
+}
+
+func (conn *Connection) Peer() string {
+ return conn.peerAddr
+}
+
+func (conn *Connection) SetID(id string) {
+ conn.conID = id
+}
+
+func (conn *Connection) MarkInitialized() {
+ close(conn.initialized)
+}
+
+func ShouldRespond(w Watcher, id string, request *discovery.DiscoveryRequest)
(bool, ResourceDelta) {
+ stype := model.GetShortType(request.TypeUrl)
+
+ // If there is an error in request that means previous response is
erroneous.
+ // We do not have to respond in that case. In this case request's
version info
+ // will be different from the version sent. But it is fragile to rely
on that.
+ if request.ErrorDetail != nil {
+ errCode := codes.Code(request.ErrorDetail.Code)
+ klog.Warningf("ADS:%s: ACK ERROR %s %s:%s", stype, id,
errCode.String(), request.ErrorDetail.GetMessage())
+ w.UpdateWatchedResource(request.TypeUrl, func(wr
*WatchedResource) *WatchedResource {
+ wr.LastError = request.ErrorDetail.GetMessage()
+ return wr
+ })
+ return false, emptyResourceDelta
+ }
+
+ if shouldUnsubscribe(request) {
+ klog.V(2).Infof("ADS:%s: UNSUBSCRIBE %s %s %s", stype, id,
request.VersionInfo, request.ResponseNonce)
+ w.DeleteWatchedResource(request.TypeUrl)
+ return false, emptyResourceDelta
+ }
+
+ previousInfo := w.GetWatchedResource(request.TypeUrl)
+ // This can happen in two cases:
+ // 1. When Envoy starts for the first time, it sends an initial
Discovery request to Istiod.
+ // 2. When Envoy reconnects to a new Istiod that does not have
information about this typeUrl
+ // i.e. non empty response nonce.
+ // We should always respond with the current resource names.
+ if request.ResponseNonce == "" || previousInfo == nil {
+ klog.V(2).Infof("ADS:%s: INIT/RECONNECT %s %s %s", stype, id,
request.VersionInfo, request.ResponseNonce)
+ w.NewWatchedResource(request.TypeUrl, request.ResourceNames)
+ return true, emptyResourceDelta
+ }
+
+ // If there is mismatch in the nonce, that is a case of expired/stale
nonce.
+ // A nonce becomes stale following a newer nonce being sent to Envoy.
+ // previousInfo.NonceSent can be empty if we previously had
shouldRespond=true but didn't send any resources.
+ if request.ResponseNonce != previousInfo.NonceSent {
+ if features.EnableUnsafeAssertions && previousInfo.NonceSent ==
"" {
+ // Assert we do not end up in an invalid state
+ klog.V(2).Infof("ADS:%s: REQ %s Expired nonce received
%s, but we never sent any nonce", stype,
+ id, request.ResponseNonce)
+ }
+ klog.V(2).Infof("ADS:%s: REQ %s Expired nonce received %s, sent
%s", stype,
+ id, request.ResponseNonce, previousInfo.NonceSent)
+ return false, emptyResourceDelta
+ }
+
+ // If it comes here, that means nonce match.
+ var previousResources sets.String
+ var cur sets.String
+ var alwaysRespond bool
+ w.UpdateWatchedResource(request.TypeUrl, func(wr *WatchedResource)
*WatchedResource {
+ // Clear last error, we got an ACK.
+ wr.LastError = ""
+ previousResources = wr.ResourceNames
+ wr.NonceAcked = request.ResponseNonce
+ wr.ResourceNames = sets.New(request.ResourceNames...)
+ cur = wr.ResourceNames
+ alwaysRespond = wr.AlwaysRespond
+ wr.AlwaysRespond = false
+ return wr
+ })
+
+ // Envoy can send two DiscoveryRequests with same version and nonce.
+ // when it detects a new resource. We should respond if they change.
+ removed := previousResources.Difference(cur)
+ added := cur.Difference(previousResources)
+
+ // We should always respond "alwaysRespond" marked requests to let
Envoy finish warming
+ // even though Nonce match and it looks like an ACK.
+ if alwaysRespond {
+ klog.Infof("ADS:%s: FORCE RESPONSE %s for warming.", stype, id)
+ return true, emptyResourceDelta
+ }
+
+ if len(removed) == 0 && len(added) == 0 {
+ klog.V(2).Infof("ADS:%s: ACK %s %s %s", stype, id,
request.VersionInfo, request.ResponseNonce)
+ return false, emptyResourceDelta
+ }
+ klog.V(2).Infof("ADS:%s: RESOURCE CHANGE added %v removed %v %s %s %s",
stype,
+ added, removed, id, request.VersionInfo, request.ResponseNonce)
+
+ // For non wildcard resource, if no new resources are subscribed, it
means we do not need to push.
+ if !IsWildcardTypeURL(request.TypeUrl) && len(added) == 0 {
+ return false, emptyResourceDelta
+ }
+
+ return true, ResourceDelta{
+ Subscribed: added,
+ // we do not need to set unsubscribed for StoW
+ }
+}
+
+func Send(ctx ConnectionContext, res *discovery.DiscoveryResponse) error {
+ return nil
+}
+
+type ResourceDelta struct {
+ // Subscribed indicates the client requested these additional resources
+ Subscribed sets.String
+ // Unsubscribed indicates the client no longer requires these resources
+ Unsubscribed sets.String
+}
+
+var emptyResourceDelta = ResourceDelta{}
+
+func (rd ResourceDelta) IsEmpty() bool {
+ return len(rd.Subscribed) == 0 && len(rd.Unsubscribed) == 0
+}
+
+func shouldUnsubscribe(request *discovery.DiscoveryRequest) bool {
+ return len(request.ResourceNames) == 0 &&
!IsWildcardTypeURL(request.TypeUrl)
+}
+
+func IsWildcardTypeURL(typeURL string) bool {
+ switch typeURL {
+ case model.SecretType, model.EndpointType, model.RouteType,
model.ExtensionConfigurationType:
+ // By XDS spec, these are not wildcard
+ return false
+ case model.ClusterType, model.ListenerType:
+ // By XDS spec, these are wildcard
+ return true
+ default:
+ // All of our internal types use wildcard semantics
+ return true
+ }
+}
+
+func (conn *Connection) PushCh() chan any {
+ return conn.pushChannel
+}
+
+func (conn *Connection) StreamDone() <-chan struct{} {
+ return conn.stream.Context().Done()
+}
diff --git a/sail/cmd/sail-agent/app/cmd.go b/sail/cmd/sail-agent/app/cmd.go
index 2356cbe9..27d97f53 100644
--- a/sail/cmd/sail-agent/app/cmd.go
+++ b/sail/cmd/sail-agent/app/cmd.go
@@ -18,19 +18,25 @@
package app
import (
+ "context"
+ "errors"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/cmd"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ dubboagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
+ "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/config"
"github.com/apache/dubbo-kubernetes/pkg/model"
+ "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/sail/cmd/sail-agent/options"
"github.com/spf13/cobra"
+ "k8s.io/klog/v2"
)
var (
proxyArgs options.ProxyArgs
)
-func NewRootCommand() *cobra.Command {
+func NewRootCommand(sds dubboagent.SDSServiceFactory) *cobra.Command {
rootCmd := &cobra.Command{
Use: "sail-agent",
Short: "Dubbo Sail agent.",
@@ -42,7 +48,7 @@ func NewRootCommand() *cobra.Command {
},
}
cmd.AddFlags(rootCmd)
- proxyCmd := newProxyCommand()
+ proxyCmd := newProxyCommand(sds)
addFlags(proxyCmd)
rootCmd.AddCommand(proxyCmd)
rootCmd.AddCommand(waitCmd)
@@ -50,7 +56,7 @@ func NewRootCommand() *cobra.Command {
return rootCmd
}
-func newProxyCommand() *cobra.Command {
+func newProxyCommand(sds dubboagent.SDSServiceFactory) *cobra.Command {
return &cobra.Command{
Use: "proxy",
Short: "XDS proxy agent",
@@ -64,6 +70,37 @@ func newProxyCommand() *cobra.Command {
if err != nil {
return err
}
+
+ proxyConfig, err :=
config.ConstructProxyConfig(proxyArgs.MeshConfigFile, proxyArgs.ServiceCluster,
options.ProxyConfigEnv, proxyArgs.Concurrency)
+ if err != nil {
+ return fmt.Errorf("failed to get proxy config:
%v", err)
+ }
+ if out, err := protomarshal.ToYAML(proxyConfig); err !=
nil {
+ klog.Infof("Failed to serialize to YAML: %v",
err)
+ } else {
+ klog.Infof("Effective config: \n%s", out)
+ }
+
+ secOpts, err := options.NewSecurityOptions(proxyConfig,
proxyArgs.StsPort, proxyArgs.TokenManagerPlugin)
+ if err != nil {
+ return err
+ }
+
+ agentOptions := options.NewAgentOptions(&proxyArgs,
proxyConfig, sds)
+ agent := dubboagent.NewAgent(proxyConfig, agentOptions,
secOpts)
+ ctx, cancel :=
context.WithCancelCause(context.Background())
+ defer cancel(errors.New("application shutdown"))
+ defer agent.Close()
+
+ // On SIGINT or SIGTERM, cancel the context, triggering
a graceful shutdown
+ go cmd.WaitSignalFunc(cancel)
+
+ wait, err := agent.Run(ctx)
+ if err != nil {
+ return err
+ }
+ wait()
+
return nil
},
}
diff --git a/sail/cmd/sail-agent/main.go b/sail/cmd/sail-agent/main.go
index cf80f16d..f2edc9ab 100644
--- a/sail/cmd/sail-agent/main.go
+++ b/sail/cmd/sail-agent/main.go
@@ -18,12 +18,18 @@
package main
import (
+ dubboagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
"github.com/apache/dubbo-kubernetes/sail/cmd/sail-agent/app"
+ "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/sds"
+ meshconfig "istio.io/api/mesh/v1alpha1"
"os"
)
func main() {
- rootCmd := app.NewRootCommand()
+ rootCmd := app.NewRootCommand(func(options *security.Options,
workloadSecretCache security.SecretManager, pkpConf
*meshconfig.PrivateKeyProvider) dubboagent.SDSService {
+ return sds.NewServer(options, workloadSecretCache, pkpConf)
+ })
if err := rootCmd.Execute(); err != nil {
os.Exit(-1)
}
diff --git a/sail/cmd/sail-agent/options/agent.go
b/sail/cmd/sail-agent/options/agent.go
new file mode 100644
index 00000000..ae487025
--- /dev/null
+++ b/sail/cmd/sail-agent/options/agent.go
@@ -0,0 +1,31 @@
+package options
+
+import (
+ dubboagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ "os"
+ "strings"
+)
+
+const xdsHeaderPrefix = "XDS_HEADER_"
+
+func NewAgentOptions(proxy *ProxyArgs, cfg *meshconfig.ProxyConfig, sds
dubboagent.SDSServiceFactory) *dubboagent.AgentOptions {
+ o := &dubboagent.AgentOptions{
+ WorkloadIdentitySocketFile: workloadIdentitySocketFile,
+ }
+ extractXDSHeadersFromEnv(o)
+ return o
+}
+
+func extractXDSHeadersFromEnv(o *dubboagent.AgentOptions) {
+ envs := os.Environ()
+ for _, e := range envs {
+ if strings.HasPrefix(e, xdsHeaderPrefix) {
+ parts := strings.SplitN(e, "=", 2)
+ if len(parts) != 2 {
+ continue
+ }
+ o.XDSHeaders[parts[0][len(xdsHeaderPrefix):]] = parts[1]
+ }
+ }
+}
diff --git a/sail/cmd/sail-agent/options/agent_proxy.go
b/sail/cmd/sail-agent/options/agent_proxy.go
index d962ea57..0b6e9f36 100644
--- a/sail/cmd/sail-agent/options/agent_proxy.go
+++ b/sail/cmd/sail-agent/options/agent_proxy.go
@@ -17,13 +17,16 @@
package options
-import sailagent "github.com/apache/dubbo-kubernetes/pkg/sail-agent"
+import sailagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
// ProxyArgs provides all of the configuration parameters for the Saku proxy.
type ProxyArgs struct {
sailagent.Proxy
- MeshConfigFile string
- ServiceCluster string
+ Concurrency int
+ StsPort int
+ TokenManagerPlugin string
+ MeshConfigFile string
+ ServiceCluster string
}
func NewProxyArgs() ProxyArgs {
diff --git a/sail/cmd/sail-agent/options/options.go
b/sail/cmd/sail-agent/options/options.go
new file mode 100644
index 00000000..6f4bdd3a
--- /dev/null
+++ b/sail/cmd/sail-agent/options/options.go
@@ -0,0 +1,29 @@
+package options
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/env"
+ "github.com/apache/dubbo-kubernetes/pkg/jwt"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+)
+
+var (
+ ProxyConfigEnv = env.Register(
+ "PROXY_CONFIG",
+ "",
+ "The proxy configuration. This will be set by the injection -
gateways will use file mounts.",
+ ).Get()
+ dubbodSAN = env.Register("DUBBOD_SAN", "",
+ "Override the ServerName used to validate Istiod certificate. "+
+ "Can be used as an alternative to setting /etc/hosts
for VMs - discovery address will be an IP:port")
+ jwtPolicy = env.Register("JWT_POLICY", jwt.PolicyThirdParty,
+ "The JWT validation policy.")
+ workloadIdentitySocketFile =
env.Register("WORKLOAD_IDENTITY_SOCKET_FILE",
security.DefaultWorkloadIdentitySocketFile,
+ fmt.Sprintf("SPIRE workload identity SDS socket filename. If
set, an SDS socket with this name must exist at %s",
security.WorkloadIdentityPath)).Get()
+ credFetcherTypeEnv = env.Register("CREDENTIAL_FETCHER_TYPE",
security.JWT,
+ "The type of the credential fetcher. Currently supported types
include GoogleComputeEngine").Get()
+ credIdentityProvider = env.Register("CREDENTIAL_IDENTITY_PROVIDER",
"GoogleComputeEngine",
+ "The identity provider for credential. Currently default
supported identity provider is GoogleComputeEngine").Get()
+ caProviderEnv = env.Register("CA_PROVIDER", "Aegis", "name of
authentication provider").Get()
+ caEndpointEnv = env.Register("CA_ADDR", "", "Address of the spiffe
certificate provider. Defaults to discoveryAddress").Get()
+)
diff --git a/sail/cmd/sail-agent/options/security.go
b/sail/cmd/sail-agent/options/security.go
new file mode 100644
index 00000000..2e1322fd
--- /dev/null
+++ b/sail/cmd/sail-agent/options/security.go
@@ -0,0 +1,84 @@
+package options
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/jwt"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/apache/dubbo-kubernetes/security/pkg/credentialfetcher"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ "k8s.io/klog/v2"
+ "os"
+ "strings"
+)
+
+const caHeaderPrefix = "CA_HEADER_"
+
+func NewSecurityOptions(proxyConfig *meshconfig.ProxyConfig, stsPort int,
tokenManagerPlugin string) (*security.Options, error) {
+ o := &security.Options{
+ CAEndpoint: caEndpointEnv,
+ CAProviderName: caProviderEnv,
+ }
+
+ o, err := SetupSecurityOptions(proxyConfig, o, jwtPolicy.Get(),
+ credFetcherTypeEnv, credIdentityProvider)
+ if err != nil {
+ return o, err
+ }
+
+ extractCAHeadersFromEnv(o)
+
+ return o, err
+}
+
+func SetupSecurityOptions(proxyConfig *meshconfig.ProxyConfig, secOpt
*security.Options, jwtPolicy,
+ credFetcherTypeEnv, credIdentityProvider string,
+) (*security.Options, error) {
+ jwtPath := constants.ThirdPartyJwtPath
+ switch jwtPolicy {
+ case jwt.PolicyThirdParty:
+ klog.Info("JWT policy is third-party-jwt")
+ jwtPath = constants.ThirdPartyJwtPath
+ case jwt.PolicyFirstParty:
+ klog.Warningf("Using deprecated JWT policy 'first-party-jwt';
treating as 'third-party-jwt'")
+ jwtPath = constants.ThirdPartyJwtPath
+ default:
+ klog.Info("Using existing certs")
+ }
+
+ o := secOpt
+
+ // If not set explicitly, default to the discovery address.
+ if o.CAEndpoint == "" {
+ o.CAEndpoint = proxyConfig.DiscoveryAddress
+ o.CAEndpointSAN = dubbodSAN.Get()
+ }
+
+ o.CredIdentityProvider = credIdentityProvider
+ credFetcher, err :=
credentialfetcher.NewCredFetcher(credFetcherTypeEnv, o.TrustDomain, jwtPath,
o.CredIdentityProvider)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create credential fetcher:
%v", err)
+ }
+ klog.Infof("using credential fetcher of %s type in %s trust domain",
credFetcherTypeEnv, o.TrustDomain)
+ o.CredFetcher = credFetcher
+
+ if o.ProvCert != "" && o.FileMountedCerts {
+ return nil, fmt.Errorf("invalid options: PROV_CERT and
FILE_MOUNTED_CERTS are mutually exclusive")
+ }
+ return o, nil
+}
+
+func extractCAHeadersFromEnv(o *security.Options) {
+ envs := os.Environ()
+ for _, e := range envs {
+ if !strings.HasPrefix(e, caHeaderPrefix) {
+ continue
+ }
+
+ parts := strings.SplitN(e, "=", 2)
+ if len(parts) != 2 {
+ continue
+ }
+ o.CAHeaders[parts[0][len(caHeaderPrefix):]] = parts[1]
+ }
+}
diff --git a/sail/pkg/features/ship.go b/sail/pkg/features/sail.go
similarity index 100%
rename from sail/pkg/features/ship.go
rename to sail/pkg/features/sail.go
diff --git a/sail/pkg/grpc/grpc.go b/sail/pkg/grpc/grpc.go
new file mode 100644
index 00000000..0f380b77
--- /dev/null
+++ b/sail/pkg/grpc/grpc.go
@@ -0,0 +1,92 @@
+package grpc
+
+import (
+ dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/status"
+ "io"
+ "math"
+ "strings"
+)
+
+type ErrorType string
+
+const (
+ UnexpectedError ErrorType = "unexpectedError"
+ ExpectedError ErrorType = "expectedError"
+ GracefulTermination ErrorType = "gracefulTermination"
+)
+
+const (
+ defaultClientMaxReceiveMessageSize = math.MaxInt32
+ defaultInitialConnWindowSize = 1024 * 1024 // default gRPC
InitialWindowSize
+ defaultInitialWindowSize = 1024 * 1024 // default gRPC
ConnWindowSize
+)
+
+var expectedGrpcFailureMessages = sets.New(
+ "client disconnected",
+ "error reading from server: EOF",
+ "transport is closing",
+)
+
+func ClientOptions(options *dubbokeepalive.Options, tlsOpts *TLSOptions)
([]grpc.DialOption, error) {
+ if options == nil {
+ options = dubbokeepalive.DefaultOption()
+ }
+ keepaliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{
+ Time: options.Time,
+ Timeout: options.Timeout,
+ })
+
+ initialWindowSizeOption :=
grpc.WithInitialWindowSize(int32(defaultInitialWindowSize))
+ initialConnWindowSizeOption :=
grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize))
+ msgSizeOption :=
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
+ var tlsDialOpts grpc.DialOption
+ var err error
+ if tlsOpts != nil {
+ tlsDialOpts, err = getTLSDialOption(tlsOpts)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ tlsDialOpts =
grpc.WithTransportCredentials(insecure.NewCredentials())
+ }
+ return []grpc.DialOption{keepaliveOption, initialWindowSizeOption,
initialConnWindowSizeOption, msgSizeOption, tlsDialOpts}, nil
+}
+
+func GRPCErrorType(err error) ErrorType {
+ if err == io.EOF {
+ return GracefulTermination
+ }
+
+ if s, ok := status.FromError(err); ok {
+ if s.Code() == codes.Canceled || s.Code() ==
codes.DeadlineExceeded {
+ return ExpectedError
+ }
+ if s.Code() == codes.Unavailable &&
containsExpectedMessage(s.Message()) {
+ return ExpectedError
+ }
+ }
+ // If this is not a gRPCStatus we should just error message.
+ if strings.Contains(err.Error(), "stream terminated by RST_STREAM with
error code: NO_ERROR") {
+ return ExpectedError
+ }
+ if strings.Contains(err.Error(), "received prior goaway: code:
NO_ERROR") {
+ return ExpectedError
+ }
+
+ return UnexpectedError
+}
+
+func containsExpectedMessage(msg string) bool {
+ for m := range expectedGrpcFailureMessages {
+ if strings.Contains(msg, m) {
+ return true
+ }
+ }
+ return false
+}
diff --git a/sail/pkg/grpc/tls.go b/sail/pkg/grpc/tls.go
new file mode 100644
index 00000000..13078451
--- /dev/null
+++ b/sail/pkg/grpc/tls.go
@@ -0,0 +1,96 @@
+package grpc
+
+import (
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
+ "github.com/apache/dubbo-kubernetes/security/pkg/pki/util"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "k8s.io/klog/v2"
+ "net"
+ "os"
+ "strings"
+)
+
+type TLSOptions struct {
+ RootCert string
+ Key string
+ Cert string
+ ServerAddress string
+ SAN string
+}
+
+func getTLSDialOption(opts *TLSOptions) (grpc.DialOption, error) {
+ rootCert, err := getRootCertificate(opts.RootCert)
+ if err != nil {
+ return nil, err
+ }
+
+ config := tls.Config{
+ GetClientCertificate: func(*tls.CertificateRequestInfo)
(*tls.Certificate, error) {
+ var certificate tls.Certificate
+ key, cert := opts.Key, opts.Cert
+ if key != "" && cert != "" {
+ isExpired, err := util.IsCertExpired(opts.Cert)
+ if err != nil {
+ klog.Warningf("cannot parse the cert
chain, using token instead: %v", err)
+ return &certificate, nil
+ }
+ if isExpired {
+ klog.Warningf("cert expired, using
token instead")
+ return &certificate, nil
+ }
+ // Load the certificate from disk
+ certificate, err = tls.LoadX509KeyPair(cert,
key)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return &certificate, nil
+ },
+ RootCAs: rootCert,
+ MinVersion: tls.VersionTLS12,
+ }
+
+ if host, _, err := net.SplitHostPort(opts.ServerAddress); err == nil {
+ config.ServerName = host
+ }
+ // For debugging on localhost (with port forward)
+ if strings.Contains(config.ServerName, "localhost") {
+ config.ServerName = "istiod.istio-system.svc"
+ }
+ if opts.SAN != "" {
+ config.ServerName = opts.SAN
+ }
+ // Compliance for all gRPC clients (e.g. Citadel)..
+ sec_model.EnforceGoCompliance(&config)
+ transportCreds := credentials.NewTLS(&config)
+ return grpc.WithTransportCredentials(transportCreds), nil
+}
+
+func getRootCertificate(rootCertFile string) (*x509.CertPool, error) {
+ var certPool *x509.CertPool
+ var rootCert []byte
+ var err error
+
+ if rootCertFile != "" {
+ rootCert, err = os.ReadFile(rootCertFile)
+ if err != nil {
+ return nil, err
+ }
+
+ certPool = x509.NewCertPool()
+ ok := certPool.AppendCertsFromPEM(rootCert)
+ if !ok {
+ return nil, fmt.Errorf("failed to create TLS dial
option with root certificates")
+ }
+ } else {
+ certPool, err = x509.SystemCertPool()
+ if err != nil {
+ return nil, err
+ }
+ }
+ return certPool, nil
+}
diff --git a/security/pkg/credentialfetcher/fetcher.go
b/security/pkg/credentialfetcher/fetcher.go
new file mode 100644
index 00000000..c95c6da5
--- /dev/null
+++ b/security/pkg/credentialfetcher/fetcher.go
@@ -0,0 +1,21 @@
+package credentialfetcher
+
+import (
+ "fmt"
+
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+
"github.com/apache/dubbo-kubernetes/security/pkg/credentialfetcher/plugin"
+)
+
+func NewCredFetcher(credtype, trustdomain, jwtPath, identityProvider string)
(security.CredFetcher, error) {
+ switch credtype {
+ case security.JWT, "":
+ // If unset, also default to JWT for backwards compatibility
+ if jwtPath == "" {
+ return nil, nil // no cred fetcher - using certificates
only
+ }
+ return plugin.CreateTokenPlugin(jwtPath), nil
+ default:
+ return nil, fmt.Errorf("invalid credential fetcher type %s",
credtype)
+ }
+}
diff --git a/security/pkg/credentialfetcher/plugin/token.go
b/security/pkg/credentialfetcher/plugin/token.go
new file mode 100644
index 00000000..2a08602e
--- /dev/null
+++ b/security/pkg/credentialfetcher/plugin/token.go
@@ -0,0 +1,36 @@
+package plugin
+
+import (
+ "k8s.io/klog/v2"
+ "os"
+ "strings"
+)
+
+type KubernetesTokenPlugin struct {
+ path string
+}
+
+func CreateTokenPlugin(path string) *KubernetesTokenPlugin {
+ return &KubernetesTokenPlugin{
+ path: path,
+ }
+}
+
+func (t KubernetesTokenPlugin) GetPlatformCredential() (string, error) {
+ if t.path == "" {
+ return "", nil
+ }
+ tok, err := os.ReadFile(t.path)
+ if err != nil {
+ klog.Warningf("failed to fetch token from file: %v", err)
+ return "", nil
+ }
+ return strings.TrimSpace(string(tok)), nil
+}
+
+func (t KubernetesTokenPlugin) GetIdentityProvider() string {
+ return ""
+}
+
+func (t KubernetesTokenPlugin) Stop() {
+}
diff --git a/security/pkg/nodeagent/cache/secretcache.go
b/security/pkg/nodeagent/cache/secretcache.go
new file mode 100644
index 00000000..31ce0988
--- /dev/null
+++ b/security/pkg/nodeagent/cache/secretcache.go
@@ -0,0 +1,132 @@
+package cache
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/queue"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/fsnotify/fsnotify"
+ "k8s.io/klog/v2"
+ "sync"
+)
+
+type FileCert struct {
+ ResourceName string
+ Filename string
+}
+
+type SecretManagerClient struct {
+ certWatcher *fsnotify.Watcher
+ caClient security.Client
+ queue queue.Delayed
+ configOptions *security.Options
+ existingCertificateFile security.SdsCertificateConfig
+ stop chan struct{}
+ caRootPath string
+ certMutex sync.RWMutex
+ secretHandler func(resourceName string)
+ fileCerts map[FileCert]struct{}
+}
+
+func NewSecretManagerClient(caClient security.Client, options
*security.Options) (*SecretManagerClient, error) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ return nil, err
+ }
+
+ ret := &SecretManagerClient{
+ queue: queue.NewDelayed(queue.DelayQueueBuffer(0)),
+ caClient: caClient,
+ configOptions: options,
+ existingCertificateFile: security.SdsCertificateConfig{
+ CertificatePath: options.CertChainFilePath,
+ PrivateKeyPath: options.KeyFilePath,
+ CaCertificatePath: options.RootCertFilePath,
+ },
+ certWatcher: watcher,
+ fileCerts: make(map[FileCert]struct{}),
+ stop: make(chan struct{}),
+ caRootPath: options.CARootPath,
+ }
+
+ go ret.queue.Run(ret.stop)
+ go ret.handleFileWatch()
+ return ret, nil
+}
+
+func (sc *SecretManagerClient) Close() {
+ _ = sc.certWatcher.Close()
+ if sc.caClient != nil {
+ sc.caClient.Close()
+ }
+ close(sc.stop)
+}
+
+func (sc *SecretManagerClient) handleFileWatch() {
+ for {
+ select {
+ case event, ok := <-sc.certWatcher.Events:
+ // Channel is closed.
+ if !ok {
+ return
+ }
+ // We only care about updates that change the file
content
+ if !(isWrite(event) || isRemove(event) ||
isCreate(event)) {
+ continue
+ }
+ sc.certMutex.RLock()
+ resources := make(map[FileCert]struct{})
+ for k, v := range sc.fileCerts {
+ resources[k] = v
+ }
+ sc.certMutex.RUnlock()
+ klog.Infof("event for file certificate %s : %s, pushing
to proxy", event.Name, event.Op.String())
+ // If it is remove event - cleanup from file certs so
that if it is added again, we can watch.
+ // The cleanup should happen first before triggering
callbacks, as the callbacks are async and
+ // we may get generate call before cleanup is done and
we will end up not watching the file.
+ if isRemove(event) {
+ sc.certMutex.Lock()
+ for fc := range sc.fileCerts {
+ if fc.Filename == event.Name {
+ klog.V(2).Infof("removing file
%s from file certs", event.Name)
+ delete(sc.fileCerts, fc)
+ break
+ }
+ }
+ sc.certMutex.Unlock()
+ }
+ // Trigger callbacks for all resources referencing this
file. This is practically always
+ // a single resource.
+ for k := range resources {
+ if k.Filename == event.Name {
+ sc.OnSecretUpdate(k.ResourceName)
+ }
+ }
+ case err, ok := <-sc.certWatcher.Errors:
+ // Channel is closed.
+ if !ok {
+ return
+ }
+ // TODO numFileSecretFailures
+ klog.Errorf("certificate watch error: %v", err)
+ }
+ }
+}
+
+func (sc *SecretManagerClient) OnSecretUpdate(resourceName string) {
+ sc.certMutex.RLock()
+ defer sc.certMutex.RUnlock()
+ if sc.secretHandler != nil {
+ sc.secretHandler(resourceName)
+ }
+}
+
+func isWrite(event fsnotify.Event) bool {
+ return event.Has(fsnotify.Write)
+}
+
+func isCreate(event fsnotify.Event) bool {
+ return event.Has(fsnotify.Create)
+}
+
+func isRemove(event fsnotify.Event) bool {
+ return event.Has(fsnotify.Remove)
+}
diff --git a/security/pkg/nodeagent/caclient/credentials.go
b/security/pkg/nodeagent/caclient/credentials.go
new file mode 100644
index 00000000..6f946cec
--- /dev/null
+++ b/security/pkg/nodeagent/caclient/credentials.go
@@ -0,0 +1,50 @@
+package caclient
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "google.golang.org/grpc/credentials"
+)
+
+type DefaultTokenProvider struct {
+ opts *security.Options
+}
+
+func NewDefaultTokenProvider(opts *security.Options)
credentials.PerRPCCredentials {
+ return &DefaultTokenProvider{opts}
+}
+
+func (t *DefaultTokenProvider) GetRequestMetadata(ctx context.Context, uri
...string) (map[string]string, error) {
+ if t == nil {
+ return nil, nil
+ }
+ token, err := t.GetToken()
+ if err != nil {
+ return nil, err
+ }
+ if token == "" {
+ return nil, nil
+ }
+ return map[string]string{
+ "authorization": "Bearer " + token,
+ }, nil
+}
+
+// Allow the token provider to be used regardless of transport security;
callers can determine whether
+// this is safe themselves.
+func (t *DefaultTokenProvider) RequireTransportSecurity() bool {
+ return false
+}
+
+func (t *DefaultTokenProvider) GetToken() (string, error) {
+ if t.opts.CredFetcher == nil {
+ return "", nil
+ }
+ token, err := t.opts.CredFetcher.GetPlatformCredential()
+ if err != nil {
+ return "", fmt.Errorf("fetch platform credential: %v", err)
+ }
+
+ return token, nil
+}
diff --git a/security/pkg/nodeagent/caclient/providers/aegis/client.go
b/security/pkg/nodeagent/caclient/providers/aegis/client.go
new file mode 100644
index 00000000..e7f0fcdc
--- /dev/null
+++ b/security/pkg/nodeagent/caclient/providers/aegis/client.go
@@ -0,0 +1,149 @@
+package aegis
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+ "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/caclient"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/protobuf/types/known/structpb"
+ pb "istio.io/api/security/v1alpha1"
+ "k8s.io/klog/v2"
+)
+
+type AegisClient struct {
+ // It means enable tls connection to Citadel if this is not nil.
+ tlsOpts *TLSOptions
+ client pb.IstioCertificateServiceClient
+ conn *grpc.ClientConn
+ provider credentials.PerRPCCredentials
+ opts *security.Options
+}
+
+type TLSOptions struct {
+ RootCert string
+ Key string
+ Cert string
+}
+
+func NewAegisClient(opts *security.Options, tlsOpts *TLSOptions)
(*AegisClient, error) {
+ c := &AegisClient{
+ tlsOpts: tlsOpts,
+ opts: opts,
+ provider: caclient.NewDefaultTokenProvider(opts),
+ }
+
+ conn, err := c.buildConnection()
+ if err != nil {
+ klog.Errorf("Failed to connect to endpoint %s: %v",
opts.CAEndpoint, err)
+ return nil, fmt.Errorf("failed to connect to endpoint %s",
opts.CAEndpoint)
+ }
+ c.conn = conn
+ c.client = pb.NewIstioCertificateServiceClient(conn)
+ return c, nil
+}
+
+func (c *AegisClient) Close() {
+ if c.conn != nil {
+ c.conn.Close()
+ }
+}
+
+func (c *AegisClient) getTLSOptions() *dubbogrpc.TLSOptions {
+ if c.tlsOpts != nil {
+ return &dubbogrpc.TLSOptions{
+ RootCert: c.tlsOpts.RootCert,
+ Key: c.tlsOpts.Key,
+ Cert: c.tlsOpts.Cert,
+ ServerAddress: c.opts.CAEndpoint,
+ SAN: c.opts.CAEndpointSAN,
+ }
+ }
+ return nil
+}
+
+func (c *AegisClient) CSRSign(csrPEM []byte, certValidTTLInSec int64) (res
[]string, err error) {
+ crMetaStruct := &structpb.Struct{
+ Fields: map[string]*structpb.Value{
+ security.CertSigner: {
+ Kind: &structpb.Value_StringValue{StringValue:
c.opts.CertSigner},
+ },
+ },
+ }
+ req := &pb.IstioCertificateRequest{
+ Csr: string(csrPEM),
+ ValidityDuration: certValidTTLInSec,
+ Metadata: crMetaStruct,
+ }
+ // TODO(hzxuzhonghu): notify caclient rebuilding only when root cert is
updated.
+ // It can happen when the istiod dns certs is resigned after root cert
is updated,
+ // in this case, the ca grpc client can not automatically connect to
istiod after the underlying network connection closed.
+ // Becase that the grpc client still use the old tls configuration to
reconnect to istiod.
+ // So here we need to rebuild the caClient in order to use the new root
cert.
+ defer func() {
+ if err != nil {
+ klog.Errorf("failed to sign CSR: %v", err)
+ if err := c.reconnect(); err != nil {
+ klog.Errorf("failed reconnect: %v", err)
+ }
+ }
+ }()
+
+ ctx := metadata.NewOutgoingContext(context.Background(),
metadata.Pairs("ClusterID", c.opts.ClusterID))
+ for k, v := range c.opts.CAHeaders {
+ ctx = metadata.AppendToOutgoingContext(ctx, k, v)
+ }
+
+ resp, err := c.client.CreateCertificate(ctx, req)
+ if err != nil {
+ return nil, fmt.Errorf("create certificate: %v", err)
+ }
+
+ if len(resp.CertChain) <= 1 {
+ return nil, errors.New("invalid empty CertChain")
+ }
+
+ return resp.CertChain, nil
+}
+
+func (c *AegisClient) GetRootCertBundle() ([]string, error) {
+ return []string{}, nil
+}
+
+func (c *AegisClient) buildConnection() (*grpc.ClientConn, error) {
+ tlsOpts := c.getTLSOptions()
+ opts, err := dubbogrpc.ClientOptions(nil, tlsOpts)
+ if err != nil {
+ return nil, err
+ }
+ opts = append(opts,
+ grpc.WithPerRPCCredentials(c.provider),
+ security.CARetryInterceptor(),
+ )
+ conn, err := grpc.Dial(c.opts.CAEndpoint, opts...)
+ if err != nil {
+ klog.Errorf("Failed to connect to endpoint %s: %v",
c.opts.CAEndpoint, err)
+ return nil, fmt.Errorf("failed to connect to endpoint %s",
c.opts.CAEndpoint)
+ }
+
+ return conn, nil
+}
+
+func (c *AegisClient) reconnect() error {
+ if err := c.conn.Close(); err != nil {
+ return fmt.Errorf("failed to close connection: %v", err)
+ }
+
+ conn, err := c.buildConnection()
+ if err != nil {
+ return err
+ }
+ c.conn = conn
+ c.client = pb.NewIstioCertificateServiceClient(conn)
+ klog.Info("recreated connection")
+ return nil
+}
diff --git a/security/pkg/nodeagent/sds/sdsservice.go
b/security/pkg/nodeagent/sds/sdsservice.go
new file mode 100644
index 00000000..00b8b08b
--- /dev/null
+++ b/security/pkg/nodeagent/sds/sdsservice.go
@@ -0,0 +1,231 @@
+package sds
+
+import (
+ "context"
+ "github.com/apache/dubbo-kubernetes/pkg/backoff"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/pkg/xds"
+ core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+ discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ sds "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ mesh "istio.io/api/mesh/v1alpha1"
+ "k8s.io/klog/v2"
+ "strconv"
+ "sync"
+ "sync/atomic"
+)
+
+type sdsservice struct {
+ st security.SecretManager
+
+ stop chan struct{}
+ rootCaPath string
+ pkpConf *mesh.PrivateKeyProvider
+
+ sync.Mutex
+ clients map[string]*Context
+}
+
+type Context struct {
+ BaseConnection xds.Connection
+ s *sdsservice
+ w *Watch
+}
+
+type Watch struct {
+ sync.Mutex
+ watch *xds.WatchedResource
+}
+
+func newSDSService(st security.SecretManager, options *security.Options,
pkpConf *mesh.PrivateKeyProvider) *sdsservice {
+ ret := &sdsservice{
+ st: st,
+ stop: make(chan struct{}),
+ pkpConf: pkpConf,
+ clients: make(map[string]*Context),
+ }
+
+ ret.rootCaPath = options.CARootPath
+
+ if options.FileMountedCerts || options.ServeOnlyFiles {
+ return ret
+ }
+
+ // Pre-generate workload certificates to improve startup latency and
ensure that for OUTPUT_CERTS
+ // case we always write a certificate. A workload can technically run
without any mTLS/CA
+ // configured, in which case this will fail; if it becomes noisy we
should disable the entire SDS
+ // server in these cases.
+ go func() {
+ // TODO: do we need max timeout for retry, seems meaningless to
retry forever if it never succeed
+ b := backoff.NewExponentialBackOff(backoff.DefaultOption())
+ // context for both timeout and channel, whichever stops first,
the context will be done
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ select {
+ case <-ret.stop:
+ cancel()
+ case <-ctx.Done():
+ }
+ }()
+ defer cancel()
+ _ = b.RetryWithContext(ctx, func() error {
+ _, err :=
st.GenerateSecret(security.WorkloadKeyCertResourceName)
+ if err != nil {
+ klog.Warningf("failed to warm certificate: %v",
err)
+ return err
+ }
+
+ _, err =
st.GenerateSecret(security.RootCertReqResourceName)
+ if err != nil {
+ klog.Warningf("failed to warm root certificate:
%v", err)
+ return err
+ }
+
+ return nil
+ })
+ }()
+
+ return ret
+}
+
+// StreamSecrets serves SDS discovery requests and SDS push requests
+func (s *sdsservice) StreamSecrets(stream
sds.SecretDiscoveryService_StreamSecretsServer) error {
+ return xds.Stream(&Context{
+ BaseConnection: xds.NewConnection("", stream),
+ s: s,
+ w: &Watch{},
+ })
+}
+
+func (s *sdsservice) DeltaSecrets(stream
sds.SecretDiscoveryService_DeltaSecretsServer) error {
+ return status.Error(codes.Unimplemented, "DeltaSecrets not implemented")
+}
+
+func (s *sdsservice) FetchSecrets(ctx context.Context, discReq
*discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "FetchSecrets not
implemented")
+}
+
+// register adds the SDS handle to the grpc server
+func (s *sdsservice) register(rpcs *grpc.Server) {
+ sds.RegisterSecretDiscoveryServiceServer(rpcs, s)
+}
+
+func (s *sdsservice) Close() {
+ close(s.stop)
+}
+
+func (c *Context) XdsConnection() *xds.Connection {
+ return &c.BaseConnection
+}
+
+var connectionNumber = int64(0)
+
+func (c *Context) Initialize(_ *core.Node) error {
+ id := atomic.AddInt64(&connectionNumber, 1)
+ con := c.XdsConnection()
+ con.SetID(strconv.FormatInt(id, 10))
+
+ c.s.Lock()
+ c.s.clients[con.ID()] = c
+ c.s.Unlock()
+
+ con.MarkInitialized()
+ return nil
+}
+
+func (c *Context) Close() {
+ c.s.Lock()
+ defer c.s.Unlock()
+ delete(c.s.clients, c.XdsConnection().ID())
+}
+
+func (c *Context) Watcher() xds.Watcher {
+ return c.w
+}
+
+func (c *Context) Process(req *discovery.DiscoveryRequest) error {
+ shouldRespond, delta := xds.ShouldRespond(c.Watcher(),
c.XdsConnection().ID(), req)
+ if !shouldRespond {
+ return nil
+ }
+ resources := req.ResourceNames
+ if !delta.IsEmpty() {
+ resources = delta.Subscribed.UnsortedList()
+ }
+ res, err := c.s.generate(resources)
+ if err != nil {
+ return err
+ }
+ return xds.Send(c, res)
+}
+
+func (c *Context) Push(ev any) error {
+ secretName := ev.(string)
+ if !c.w.requested(secretName) {
+ return nil
+ }
+ res, err := c.s.generate([]string{secretName})
+ if err != nil {
+ return err
+ }
+ return xds.Send(c, res)
+}
+
+func (w *Watch) requested(secretName string) bool {
+ w.Lock()
+ defer w.Unlock()
+ if w.watch != nil {
+ return w.watch.ResourceNames.Contains(secretName)
+ }
+ return false
+}
+
+func (w *Watch) GetWatchedResource(string) *xds.WatchedResource {
+ w.Lock()
+ defer w.Unlock()
+ return w.watch
+}
+
+func (w *Watch) NewWatchedResource(typeURL string, names []string) {
+ w.Lock()
+ defer w.Unlock()
+ w.watch = &xds.WatchedResource{TypeUrl: typeURL, ResourceNames:
sets.New(names...)}
+}
+
+func (w *Watch) UpdateWatchedResource(_ string, f func(*xds.WatchedResource)
*xds.WatchedResource) {
+ w.Lock()
+ defer w.Unlock()
+ w.watch = f(w.watch)
+}
+
+func (w *Watch) DeleteWatchedResource(string) {
+ w.Lock()
+ defer w.Unlock()
+ w.watch = nil
+}
+
+func (w *Watch) GetID() string {
+ // This always maps to the same local Envoy instance.
+ return ""
+}
+
+func (s *sdsservice) generate(resourceNames []string)
(*discovery.DiscoveryResponse, error) {
+ return &discovery.DiscoveryResponse{}, nil
+}
+
+func (s *sdsservice) push(secretName string) {
+ s.Lock()
+ defer s.Unlock()
+ for _, client := range s.clients {
+ go func(client *Context) {
+ select {
+ case client.XdsConnection().PushCh() <- secretName:
+ case <-client.XdsConnection().StreamDone():
+ }
+ }(client)
+ }
+}
diff --git a/security/pkg/nodeagent/sds/server.go
b/security/pkg/nodeagent/sds/server.go
new file mode 100644
index 00000000..07af70c8
--- /dev/null
+++ b/security/pkg/nodeagent/sds/server.go
@@ -0,0 +1,115 @@
+package sds
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/apache/dubbo-kubernetes/pkg/uds"
+ "go.uber.org/atomic"
+ "google.golang.org/grpc"
+ mesh "istio.io/api/mesh/v1alpha1"
+ "k8s.io/klog/v2"
+ "net"
+ "time"
+)
+
+const (
+ maxStreams = 100000
+ maxRetryTimes = 5
+)
+
+type Server struct {
+ workloadSds *sdsservice
+
+ grpcWorkloadListener net.Listener
+
+ grpcWorkloadServer *grpc.Server
+ stopped *atomic.Bool
+}
+
+func NewServer(options *security.Options, workloadSecretCache
security.SecretManager, pkpConf *mesh.PrivateKeyProvider) *Server {
+ s := &Server{stopped: atomic.NewBool(false)}
+ s.workloadSds = newSDSService(workloadSecretCache, options, pkpConf)
+ s.initWorkloadSdsService(options)
+ return s
+}
+
+func (s *Server) initWorkloadSdsService(opts *security.Options) {
+ s.grpcWorkloadServer = grpc.NewServer(s.grpcServerOptions()...)
+ s.workloadSds.register(s.grpcWorkloadServer)
+ var err error
+ path := security.GetDubboSDSServerSocketPath()
+ if opts.ServeOnlyFiles {
+ path = security.FileCredentialNameSocketPath
+ }
+ s.grpcWorkloadListener, err = uds.NewListener(path)
+ go func() {
+ klog.Info("Starting SDS grpc server")
+ waitTime := time.Second
+ started := false
+ for i := 0; i < maxRetryTimes; i++ {
+ if s.stopped.Load() {
+ return
+ }
+ serverOk := true
+ setUpUdsOK := true
+ if s.grpcWorkloadListener == nil {
+ if s.grpcWorkloadListener, err =
uds.NewListener(path); err != nil {
+ klog.Errorf("SDS grpc server for
workload proxies failed to set up UDS: %v", err)
+ setUpUdsOK = false
+ }
+ }
+ if s.grpcWorkloadListener != nil {
+ if opts.ServeOnlyFiles {
+ klog.Infof("Starting SDS server for
file certificates only, will listen on %q", path)
+ } else {
+ klog.Infof("Starting SDS server for
workload certificates, will listen on %q", path)
+ }
+ if err =
s.grpcWorkloadServer.Serve(s.grpcWorkloadListener); err != nil {
+ klog.Errorf("SDS grpc server for
workload proxies failed to start: %v", err)
+ serverOk = false
+ }
+ }
+ if serverOk && setUpUdsOK {
+ started = true
+ break
+ }
+ time.Sleep(waitTime)
+ waitTime *= 2
+ }
+ if !started {
+ klog.Warningf("SDS grpc server could not be started")
+ }
+ }()
+}
+
+func (s *Server) grpcServerOptions() []grpc.ServerOption {
+ grpcOptions := []grpc.ServerOption{
+ grpc.MaxConcurrentStreams(uint32(maxStreams)),
+ }
+
+ return grpcOptions
+}
+
+func (s *Server) OnSecretUpdate(resourceName string) {
+ if s.workloadSds == nil {
+ return
+ }
+
+ klog.V(2).Infof("Trigger on secret update, resource name: %s",
resourceName)
+ s.workloadSds.push(resourceName)
+}
+
+func (s *Server) Stop() {
+ if s == nil {
+ return
+ }
+ s.stopped.Store(true)
+ if s.grpcWorkloadServer != nil {
+ s.grpcWorkloadServer.Stop()
+ }
+ if s.grpcWorkloadListener != nil {
+ s.grpcWorkloadListener.Close()
+ }
+ if s.workloadSds != nil {
+ s.workloadSds.Close()
+ }
+}