This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new ec344806 [agent] add sail-agent proxy logic (#791)
ec344806 is described below

commit ec344806cf5aad2bbc0f435ffb572f7df98bf7b6
Author: Jian Zhong <[email protected]>
AuthorDate: Sat Sep 27 00:00:19 2025 +0800

    [agent] add sail-agent proxy logic (#791)
---
 go.mod                                             |   3 +-
 go.sum                                             |   2 +
 .../dubbo-discovery/files/grpc-agent.yaml          |   1 +
 pkg/bootstrap/config.go                            |  45 +++
 pkg/cmd/cmd.go                                     |  11 +
 pkg/config/constants/constants.go                  |   8 +
 pkg/config/mesh/mesh.go                            |  17 +-
 pkg/dubbo-agent/agent.go                           | 388 +++++++++++++++++++++
 pkg/dubbo-agent/config/config.go                   | 145 ++++++++
 pkg/dubbo-agent/grpcxds/grpc_bootstrap.go          | 109 ++++++
 pkg/dubbo-agent/plugins.go                         |  55 +++
 pkg/dubbo-agent/xds_proxy.go                       |  80 +++++
 pkg/features/security.go                           |  35 ++
 pkg/file/fadvise_linux.go                          |  20 ++
 pkg/file/fadvise_unspecified.go                    |  12 +
 pkg/file/file.go                                   |  71 ++++
 pkg/jwt/jwt.go                                     |   6 +
 pkg/model/fips.go                                  |  34 ++
 pkg/model/xds.go                                   |  32 +-
 pkg/queue/delay.go                                 | 269 ++++++++++++++
 pkg/sail-agent/agent.go                            |  26 --
 pkg/security/retry.go                              |  16 +
 pkg/security/security.go                           | 104 ++++--
 pkg/uds/listener.go                                |  39 +++
 pkg/xds/server.go                                  | 312 +++++++++++++++++
 sail/cmd/sail-agent/app/cmd.go                     |  43 ++-
 sail/cmd/sail-agent/main.go                        |   8 +-
 sail/cmd/sail-agent/options/agent.go               |  31 ++
 sail/cmd/sail-agent/options/agent_proxy.go         |   9 +-
 sail/cmd/sail-agent/options/options.go             |  29 ++
 sail/cmd/sail-agent/options/security.go            |  84 +++++
 sail/pkg/features/{ship.go => sail.go}             |   0
 sail/pkg/grpc/grpc.go                              |  92 +++++
 sail/pkg/grpc/tls.go                               |  96 +++++
 security/pkg/credentialfetcher/fetcher.go          |  21 ++
 security/pkg/credentialfetcher/plugin/token.go     |  36 ++
 security/pkg/nodeagent/cache/secretcache.go        | 132 +++++++
 security/pkg/nodeagent/caclient/credentials.go     |  50 +++
 .../nodeagent/caclient/providers/aegis/client.go   | 149 ++++++++
 security/pkg/nodeagent/sds/sdsservice.go           | 231 ++++++++++++
 security/pkg/nodeagent/sds/server.go               | 115 ++++++
 41 files changed, 2902 insertions(+), 64 deletions(-)

diff --git a/go.mod b/go.mod
index 5b397c0a..748a3e54 100644
--- a/go.mod
+++ b/go.mod
@@ -40,6 +40,7 @@ require (
        github.com/golang/protobuf v1.5.4
        github.com/google/go-cmp v0.7.0
        github.com/google/go-containerregistry v0.20.6
+       github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
        github.com/hashicorp/go-multierror v1.1.1
        github.com/heroku/color v0.0.6
        github.com/moby/term v0.5.2
@@ -53,6 +54,7 @@ require (
        golang.org/x/crypto v0.41.0
        golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6
        golang.org/x/net v0.43.0
+       golang.org/x/sys v0.35.0
        golang.org/x/term v0.34.0
        google.golang.org/grpc v1.74.2
        google.golang.org/protobuf v1.36.7
@@ -230,7 +232,6 @@ require (
        golang.org/x/mod v0.27.0 // indirect
        golang.org/x/oauth2 v0.30.0 // indirect
        golang.org/x/sync v0.16.0 // indirect
-       golang.org/x/sys v0.35.0 // indirect
        golang.org/x/text v0.28.0 // indirect
        golang.org/x/time v0.12.0 // indirect
        google.golang.org/genproto/googleapis/api 
v0.0.0-20250811230008-5f3141c8851a // indirect
diff --git a/go.sum b/go.sum
index c0bd9a31..beb2eaa2 100644
--- a/go.sum
+++ b/go.sum
@@ -373,6 +373,8 @@ github.com/gorilla/mux v1.8.1 
h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
 github.com/gorilla/mux v1.8.1/go.mod 
h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 
h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod 
h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 
h1:sGm2vDRFUrQJO/Veii4h4zG2vvqG6uWNkBHSTqXOZk0=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2/go.mod 
h1:wd1YpapPLivG6nQgbf7ZkG1hhSOXDhhn4MLTknx2aAc=
 github.com/hashicorp/errwrap v1.0.0/go.mod 
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/errwrap v1.1.0 
h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
 github.com/hashicorp/errwrap v1.1.0/go.mod 
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
diff --git 
a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml 
b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
new file mode 100644
index 00000000..69b462ca
--- /dev/null
+++ b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
@@ -0,0 +1 @@
+# inject grpc template
\ No newline at end of file
diff --git a/pkg/bootstrap/config.go b/pkg/bootstrap/config.go
new file mode 100644
index 00000000..730d395e
--- /dev/null
+++ b/pkg/bootstrap/config.go
@@ -0,0 +1,45 @@
+package bootstrap
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+       "os"
+       "strconv"
+       "strings"
+)
+
+type MetadataOptions struct {
+}
+
+func ReadPodAnnotations(path string) (map[string]string, error) {
+       if path == "" {
+               path = constants.PodInfoAnnotationsPath
+       }
+       b, err := os.ReadFile(path)
+       if err != nil {
+               return nil, err
+       }
+       return ParseDownwardAPI(string(b))
+}
+
+func ParseDownwardAPI(i string) (map[string]string, error) {
+       res := map[string]string{}
+       for _, line := range strings.Split(i, "\n") {
+               sl := strings.SplitN(line, "=", 2)
+               if len(sl) != 2 {
+                       continue
+               }
+               key := sl[0]
+               // Strip the leading/trailing quotes
+               val, err := strconv.Unquote(sl[1])
+               if err != nil {
+                       return nil, fmt.Errorf("failed to unquote %v: %v", 
sl[1], err)
+               }
+               res[key] = val
+       }
+       return res, nil
+}
+
+func GetNodeMetaData(options MetadataOptions) error {
+       return nil
+}
diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go
index 2caf64e8..0d9d4719 100644
--- a/pkg/cmd/cmd.go
+++ b/pkg/cmd/cmd.go
@@ -18,10 +18,14 @@
 package cmd
 
 import (
+       "context"
        "flag"
        "fmt"
        "github.com/spf13/cobra"
        "github.com/spf13/pflag"
+       "os"
+       "os/signal"
+       "syscall"
 )
 
 func AddFlags(rootCmd *cobra.Command) {
@@ -33,3 +37,10 @@ func PrintFlags(flags *pflag.FlagSet) {
                fmt.Printf("FLAG: --%s=%q\n", flag.Name, flag.Value)
        })
 }
+
+func WaitSignalFunc(cancel context.CancelCauseFunc) {
+       sigs := make(chan os.Signal, 1)
+       signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+       sig := <-sigs
+       cancel(fmt.Errorf("received signal: %v", sig.String()))
+}
diff --git a/pkg/config/constants/constants.go 
b/pkg/config/constants/constants.go
index f8d053b2..e9bf56fd 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -20,6 +20,8 @@ package constants
 const (
        DubboSystemNamespace      = "dubbo-system"
        DefaultClusterLocalDomain = "cluster.local"
+       KeyFilename               = "key.pem"
+       CertChainFilename         = "cert-chain.pem"
        DefaultClusterName        = "Kubernetes"
        ServiceClusterName        = "dubbo-proxy"
        ConfigPathDir             = "./etc/dubbo/proxy"
@@ -28,4 +30,10 @@ const (
        CertProviderKubernetesSignerPrefix = "k8s.io/"
 
        CACertNamespaceConfigMapDataName = "root-cert.pem"
+
+       PodInfoAnnotationsPath = "./etc/dubbo/pod/annotations"
+       CertProviderNone       = "none"
+       CertProviderCustom     = "custom"
+
+       ThirdPartyJwtPath = "./var/run/secrets/tokens/dubbo-token"
 )
diff --git a/pkg/config/mesh/mesh.go b/pkg/config/mesh/mesh.go
index 14b81139..3fdf4cf4 100644
--- a/pkg/config/mesh/mesh.go
+++ b/pkg/config/mesh/mesh.go
@@ -48,9 +48,10 @@ func DefaultProxyConfig() *meshconfig.ProxyConfig {
                TerminationDrainDuration: durationpb.New(5 * time.Second),
                ProxyAdminPort:           15000,
                // TODO authpolicy
-               DiscoveryAddress: "dubbod.dubbo-system.svc:15012",
-               StatNameLength:   189,
-               StatusPort:       15020,
+               DiscoveryAddress:       "dubbod.dubbo-system.svc:15012",
+               ControlPlaneAuthPolicy: 
meshconfig.AuthenticationPolicy_MUTUAL_TLS,
+               StatNameLength:         189,
+               StatusPort:             15020,
        }
 }
 
@@ -125,6 +126,16 @@ func ApplyMeshConfig(yaml string, defaultConfig 
*meshconfig.MeshConfig) (*meshco
        return defaultConfig, nil
 }
 
+func ApplyProxyConfig(yaml string, meshConfig *meshconfig.MeshConfig) 
(*meshconfig.MeshConfig, error) {
+       mc := protomarshal.Clone(meshConfig)
+       pc, err := MergeProxyConfig(yaml, mc.DefaultConfig)
+       if err != nil {
+               return nil, err
+       }
+       mc.DefaultConfig = pc
+       return mc, nil
+}
+
 // EmptyMeshNetworks configuration with no networks
 func EmptyMeshNetworks() meshconfig.MeshNetworks {
        return meshconfig.MeshNetworks{
diff --git a/pkg/dubbo-agent/agent.go b/pkg/dubbo-agent/agent.go
new file mode 100644
index 00000000..409c57f7
--- /dev/null
+++ b/pkg/dubbo-agent/agent.go
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubboagent
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/bootstrap"
+       "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+       "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/grpcxds"
+       "github.com/apache/dubbo-kubernetes/pkg/filewatcher"
+       "github.com/apache/dubbo-kubernetes/pkg/model"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/cache"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       mesh "istio.io/api/mesh/v1alpha1"
+       "k8s.io/klog/v2"
+       "os"
+       "path"
+       "path/filepath"
+       "sync"
+       "time"
+)
+
+const (
+       AegisCACertPath = "./var/run/secrets/dubbo"
+)
+
+const (
+       MetadataClientCertKey   = "DUBBO_META_TLS_CLIENT_KEY"
+       MetadataClientCertChain = "DUBBO_META_TLS_CLIENT_CERT_CHAIN"
+       MetadataClientRootCert  = "DUBBO_META_TLS_CLIENT_ROOT_CERT"
+)
+
+type SDSServiceFactory = func(_ *security.Options, _ security.SecretManager, _ 
*mesh.PrivateKeyProvider) SDSService
+
+type Proxy struct {
+       Type      model.NodeType
+       DNSDomain string
+}
+
+type Agent struct {
+       proxyConfig *mesh.ProxyConfig
+       cfg         *AgentOptions
+       secOpts     *security.Options
+       sdsServer   SDSService
+       secretCache *cache.SecretManagerClient
+
+       xdsProxy    *XdsProxy
+       fileWatcher filewatcher.FileWatcher
+
+       wg sync.WaitGroup
+}
+
+type AgentOptions struct {
+       WorkloadIdentitySocketFile string
+       GRPCBootstrapPath          string
+       XDSHeaders                 map[string]string
+       XDSRootCerts               string
+       MetadataDiscovery          *bool
+       CARootCerts                string
+}
+
+func NewAgent(proxyConfig *mesh.ProxyConfig, agentOpts *AgentOptions, sopts 
*security.Options) *Agent {
+       return &Agent{
+               proxyConfig: proxyConfig,
+               cfg:         agentOpts,
+               secOpts:     sopts,
+               fileWatcher: filewatcher.NewWatcher(),
+       }
+}
+
+func (a *Agent) Run(ctx context.Context) (func(), error) {
+       // TODO initLocalDNSServer?
+
+       if a.cfg.WorkloadIdentitySocketFile != 
filepath.Base(a.cfg.WorkloadIdentitySocketFile) {
+               return nil, fmt.Errorf("workload identity socket file override 
must be a filename, not a path: %s", a.cfg.WorkloadIdentitySocketFile)
+       }
+
+       configuredAgentSocketPath := 
security.GetWorkloadSDSSocketListenPath(a.cfg.WorkloadIdentitySocketFile)
+
+       isDubboSDS := configuredAgentSocketPath == 
security.GetDubboSDSServerSocketPath()
+
+       socketExists, err := checkSocket(ctx, configuredAgentSocketPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to check SDS socket: %v", err)
+       }
+       if socketExists {
+               klog.Infof("Existing workload SDS socket found at %s. Default 
Dubbo SDS Server will only serve files", configuredAgentSocketPath)
+               a.secOpts.ServeOnlyFiles = true
+       } else if !isDubboSDS {
+               return nil, fmt.Errorf("agent configured for non-default SDS 
socket path: %s but no socket found", configuredAgentSocketPath)
+       }
+
+       klog.Info("Starting default Dubbo SDS Server")
+       err = a.initSdsServer()
+       if err != nil {
+               return nil, fmt.Errorf("failed to start default Dubbo SDS 
server: %v", err)
+       }
+       a.xdsProxy, err = initXdsProxy(a)
+       if err != nil {
+               return nil, fmt.Errorf("failed to start xds proxy: %v", err)
+       }
+
+       if a.cfg.GRPCBootstrapPath != "" {
+               if err := a.generateGRPCBootstrap(); err != nil {
+                       return nil, fmt.Errorf("failed generating gRPC XDS 
bootstrap: %v", err)
+               }
+       }
+       if a.proxyConfig.ControlPlaneAuthPolicy != 
mesh.AuthenticationPolicy_NONE {
+               rootCAForXDS, err := a.FindRootCAForXDS()
+               if err != nil {
+                       return nil, fmt.Errorf("failed to find root XDS CA: 
%v", err)
+               }
+               go a.startFileWatcher(ctx, rootCAForXDS, func() {
+                       if err := a.xdsProxy.initDubbodDialOptions(a); err != 
nil {
+                               klog.InfoS("Failed to init xds proxy dial 
options")
+                       }
+               })
+       }
+
+       return a.wg.Wait, nil
+}
+
+func (a *Agent) Close() {
+       if a.xdsProxy != nil {
+               a.xdsProxy.close()
+       }
+       if a.sdsServer != nil {
+               a.sdsServer.Stop()
+       }
+       if a.secretCache != nil {
+               a.secretCache.Close()
+       }
+       if a.fileWatcher != nil {
+               _ = a.fileWatcher.Close()
+       }
+}
+
+func (a *Agent) FindRootCAForXDS() (string, error) {
+       var rootCAPath string
+
+       if a.cfg.XDSRootCerts == security.SystemRootCerts {
+               // Special case input for root cert configuration to use system 
root certificates
+               return "", nil
+       } else if a.cfg.XDSRootCerts != "" {
+               // Using specific platform certs or custom roots
+               rootCAPath = a.cfg.XDSRootCerts
+       } else if fileExists(security.DefaultRootCertFilePath) {
+               // Old style - mounted cert. This is used for XDS auth only,
+               // not connecting to CA_ADDR because this mode uses external
+               // agent (Secret refresh, etc)
+               return security.DefaultRootCertFilePath, nil
+       } else if a.secOpts.ProvCert != "" {
+               // This was never completely correct - PROV_CERT are only 
intended for auth with CA_ADDR,
+               // and should not be involved in determining the root CA.
+               // For VMs, the root cert file used to auth may be populated 
afterwards.
+               // Thus, return directly here and skip checking for existence.
+               return a.secOpts.ProvCert + "/root-cert.pem", nil
+       } else if a.secOpts.FileMountedCerts {
+               // FileMountedCerts - Load it from Proxy Metadata.
+               rootCAPath = a.proxyConfig.ProxyMetadata[MetadataClientRootCert]
+       } else if a.secOpts.SailCertProvider == constants.CertProviderNone {
+               return "", fmt.Errorf("root CA file for XDS required but 
configured provider as none")
+       } else {
+               rootCAPath = path.Join(AegisCACertPath, 
constants.CACertNamespaceConfigMapDataName)
+       }
+
+       // Additional checks for root CA cert existence. Fail early, instead of 
obscure envoy errors
+       if fileExists(rootCAPath) {
+               return rootCAPath, nil
+       }
+
+       return "", fmt.Errorf("root CA file for XDS does not exist %s", 
rootCAPath)
+}
+
+func (a *Agent) GetKeyCertsForXDS() (string, string) {
+       var key, cert string
+       if a.secOpts.ProvCert != "" {
+               key, cert = getKeyCertInner(a.secOpts.ProvCert)
+       } else if a.secOpts.FileMountedCerts {
+               key = a.proxyConfig.ProxyMetadata[MetadataClientCertKey]
+               cert = a.proxyConfig.ProxyMetadata[MetadataClientCertChain]
+       }
+       return key, cert
+}
+
+func (a *Agent) GetKeyCertsForCA() (string, string) {
+       var key, cert string
+       if a.secOpts.ProvCert != "" {
+               key, cert = getKeyCertInner(a.secOpts.ProvCert)
+       }
+       return key, cert
+}
+
+func (a *Agent) FindRootCAForCA() (string, error) {
+       var rootCAPath string
+
+       if a.cfg.CARootCerts == security.SystemRootCerts {
+               return "", nil
+       } else if a.cfg.CARootCerts != "" {
+               rootCAPath = a.cfg.CARootCerts
+       } else if a.secOpts.SailCertProvider == constants.CertProviderCustom {
+               rootCAPath = security.DefaultRootCertFilePath // 
./etc/certs/root-cert.pem
+       } else if a.secOpts.ProvCert != "" {
+               // This was never completely correct - PROV_CERT are only 
intended for auth with CA_ADDR,
+               // and should not be involved in determining the root CA.
+               // For VMs, the root cert file used to auth may be populated 
afterwards.
+               // Thus, return directly here and skip checking for existence.
+               return a.secOpts.ProvCert + "/root-cert.pem", nil
+       } else if a.secOpts.SailCertProvider == constants.CertProviderNone {
+               return "", fmt.Errorf("root CA file for CA required but 
configured provider as none")
+       } else {
+               rootCAPath = path.Join(AegisCACertPath, 
constants.CACertNamespaceConfigMapDataName)
+       }
+
+       if fileExists(rootCAPath) {
+               return rootCAPath, nil
+       }
+
+       return "", fmt.Errorf("root CA file for CA does not exist %s", 
rootCAPath)
+}
+
+func (a *Agent) startFileWatcher(ctx context.Context, filePath string, handler 
func()) {
+       if err := a.fileWatcher.Add(filePath); err != nil {
+               klog.Warningf("Failed to add file watcher %s", filePath)
+               return
+       }
+
+       klog.V(2).Infof("Add file %s watcher", filePath)
+       for {
+               select {
+               case gotEvent := <-a.fileWatcher.Events(filePath):
+                       klog.V(2).Infof("Receive file %s event %v", filePath, 
gotEvent)
+                       handler()
+               case err := <-a.fileWatcher.Errors(filePath):
+                       klog.Warningf("Watch file %s error: %v", filePath, err)
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
+func (a *Agent) initSdsServer() error {
+       var err error
+       if 
security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath, 
security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
+               klog.Info("workload certificate files detected, creating secret 
manager without caClient")
+               a.secOpts.RootCertFilePath = 
security.WorkloadIdentityRootCertPath
+               a.secOpts.CertChainFilePath = 
security.WorkloadIdentityCertChainPath
+               a.secOpts.KeyFilePath = security.WorkloadIdentityKeyPath
+               a.secOpts.FileMountedCerts = true
+       }
+
+       createCaClient := !a.secOpts.FileMountedCerts && 
!a.secOpts.ServeOnlyFiles
+       a.secretCache, err = a.newSecretManager(createCaClient)
+       if err != nil {
+               return fmt.Errorf("failed to start workload secret manager %v", 
err)
+       }
+
+       return nil
+}
+
+func (a *Agent) generateGRPCBootstrap() error {
+       // generate metadata
+       err := a.generateNodeMetadata()
+       if err != nil {
+               return fmt.Errorf("failed generating node metadata: %v", err)
+       }
+       if err := os.MkdirAll(filepath.Dir(a.cfg.GRPCBootstrapPath), 0o700); 
err != nil {
+               return err
+       }
+
+       _, err = grpcxds.GenerateBootstrapFile(grpcxds.GenerateBootstrapOptions{
+               DiscoveryAddress: a.proxyConfig.DiscoveryAddress,
+               CertDir:          a.secOpts.OutputKeyCertToDir,
+       }, a.cfg.GRPCBootstrapPath)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (a *Agent) newSecretManager(createCaClient bool) 
(*cache.SecretManagerClient, error) {
+       if !createCaClient {
+               klog.Info("Workload is using file mounted certificates. 
Skipping connecting to CA")
+               return cache.NewSecretManagerClient(nil, a.secOpts)
+       }
+       klog.Infof("CA Endpoint %s, provider %s", a.secOpts.CAEndpoint, 
a.secOpts.CAProviderName)
+
+       caClient, err := createCAClient(a.secOpts, a)
+       if err != nil {
+               return nil, err
+       }
+       return cache.NewSecretManagerClient(caClient, a.secOpts)
+}
+
+func (a *Agent) generateNodeMetadata() error {
+       credentialSocketExists, err := checkSocket(context.TODO(), 
security.CredentialNameSocketPath)
+       if err != nil {
+               return fmt.Errorf("failed to check credential SDS socket: %v", 
err)
+       }
+       if credentialSocketExists {
+               klog.Info("Credential SDS socket found")
+       }
+
+       return bootstrap.GetNodeMetaData(bootstrap.MetadataOptions{})
+}
+
+type SDSService interface {
+       OnSecretUpdate(resourceName string)
+       Stop()
+}
+
+func checkSocket(ctx context.Context, socketPath string) (bool, error) {
+       socketExists := socketFileExists(socketPath)
+       if !socketExists {
+               return false, nil
+       }
+
+       err := socketHealthCheck(ctx, socketPath)
+       if err != nil {
+               klog.V(2).Infof("SDS socket detected but not healthy: %v", err)
+               err = os.Remove(socketPath)
+               if err != nil {
+                       return false, fmt.Errorf("existing SDS socket could not 
be removed: %v", err)
+               }
+               return false, nil
+       }
+
+       return true, nil
+}
+
+func socketHealthCheck(ctx context.Context, socketPath string) error {
+       ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second))
+       defer cancel()
+
+       conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:%s", socketPath),
+               grpc.WithTransportCredentials(insecure.NewCredentials()),
+               grpc.FailOnNonTempDialError(true),
+               grpc.WithReturnConnectionError(),
+               grpc.WithBlock(),
+       )
+       if err != nil {
+               return err
+       }
+       err = conn.Close()
+       if err != nil {
+               klog.Infof("connection is not closed: %v", err)
+       }
+
+       return nil
+}
+
+func getKeyCertInner(certPath string) (string, string) {
+       key := path.Join(certPath, constants.KeyFilename)
+       cert := path.Join(certPath, constants.CertChainFilename)
+       return key, cert
+}
+
+func fileExists(path string) bool {
+       if fi, err := os.Stat(path); err == nil && fi.Mode().IsRegular() {
+               return true
+       }
+       return false
+}
+
+func socketFileExists(path string) bool {
+       if fi, err := os.Stat(path); err == nil && !fi.Mode().IsRegular() {
+               return true
+       }
+       return false
+}
diff --git a/pkg/dubbo-agent/config/config.go b/pkg/dubbo-agent/config/config.go
new file mode 100644
index 00000000..1bd21a36
--- /dev/null
+++ b/pkg/dubbo-agent/config/config.go
@@ -0,0 +1,145 @@
+package config
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/bootstrap"
+       "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+       "github.com/apache/dubbo-kubernetes/pkg/env"
+       "google.golang.org/protobuf/types/known/wrapperspb"
+       "istio.io/api/annotation"
+       meshconfig "istio.io/api/mesh/v1alpha1"
+       "k8s.io/klog/v2"
+       "os"
+       "runtime"
+       "strconv"
+       "strings"
+)
+
+// ConstructProxyConfig returns proxyConfig
+func ConstructProxyConfig(meshConfigFile, serviceCluster, proxyConfigEnv 
string, concurrency int) (*meshconfig.ProxyConfig, error) {
+       annotations, err := bootstrap.ReadPodAnnotations("")
+       if err != nil {
+               if os.IsNotExist(err) {
+                       klog.V(2).Infof("failed to read pod annotations: %v", 
err)
+               } else {
+                       klog.V(2).Infof("failed to read pod annotations: %v", 
err)
+               }
+       }
+       var fileMeshContents string
+       if fileExists(meshConfigFile) {
+               contents, err := os.ReadFile(meshConfigFile)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to read mesh config file 
%v: %v", meshConfigFile, err)
+               }
+               fileMeshContents = string(contents)
+       }
+       meshConfig, err := getMeshConfig(fileMeshContents, 
annotations[annotation.ProxyConfig.Name], proxyConfigEnv)
+       if err != nil {
+               return nil, err
+       }
+       proxyConfig := mesh.DefaultProxyConfig()
+       if meshConfig.DefaultConfig != nil {
+               proxyConfig = meshConfig.DefaultConfig
+       }
+
+       // Concurrency wasn't explicitly set
+       if proxyConfig.Concurrency == nil {
+               // We want to detect based on CPU limit configured. If we are 
running on a 100 core machine, but with
+               // only 2 CPUs allocated, we want to have 2 threads, not 100, 
or we will get excessively throttled.
+               if CPULimit != 0 {
+                       klog.Infof("cpu limit detected as %v, setting 
concurrency", CPULimit)
+                       proxyConfig.Concurrency = 
wrapperspb.Int32(int32(CPULimit))
+               }
+       }
+       // Respect the old flag, if they set it. This should never be set in 
typical installation.
+       if concurrency != 0 {
+               klog.V(2).Infof("legacy --concurrency=%d flag detected; prefer 
to use ProxyConfig", concurrency)
+               proxyConfig.Concurrency = wrapperspb.Int32(int32(concurrency))
+       }
+
+       if proxyConfig.Concurrency.GetValue() == 0 {
+               if CPULimit < runtime.NumCPU() {
+                       klog.V(2).Infof("concurrency is set to 0, which will 
use a thread per CPU on the host. However, CPU limit is set lower. "+
+                               "This is not recommended and may lead to 
performance issues. "+
+                               "CPU count: %d, CPU Limit: %d.", 
runtime.NumCPU(), CPULimit)
+               }
+       }
+
+       if x, ok := 
proxyConfig.GetClusterName().(*meshconfig.ProxyConfig_ServiceCluster); ok {
+               if x.ServiceCluster == "" {
+                       proxyConfig.ClusterName = 
&meshconfig.ProxyConfig_ServiceCluster{ServiceCluster: serviceCluster}
+               }
+       }
+       // TODO ResolveAddr
+       // TODO ValidateMeshConfigProxyConfig
+       return applyAnnotations(proxyConfig, annotations), nil
+}
+
+func getMeshConfig(fileOverride, annotationOverride, proxyConfigEnv string) 
(*meshconfig.MeshConfig, error) {
+       mc := mesh.DefaultMeshConfig()
+       if fileOverride != "" {
+               klog.Infof("Apply mesh config from file %v", fileOverride)
+               fileMesh, err := mesh.ApplyMeshConfig(fileOverride, mc)
+               if err != nil || fileMesh == nil {
+                       return nil, fmt.Errorf("failed to unmarshal mesh config 
from file [%v]: %v", fileOverride, err)
+               }
+               mc = fileMesh
+       }
+
+       if proxyConfigEnv != "" {
+               klog.Infof("Apply proxy config from env %v", proxyConfigEnv)
+               envMesh, err := mesh.ApplyProxyConfig(proxyConfigEnv, mc)
+               if err != nil || envMesh == nil {
+                       return nil, fmt.Errorf("failed to unmarshal mesh config 
from environment [%v]: %v", proxyConfigEnv, err)
+               }
+               mc = envMesh
+       }
+
+       if annotationOverride != "" {
+               klog.Infof("Apply proxy config from annotation %v", 
annotationOverride)
+               annotationMesh, err := 
mesh.ApplyProxyConfig(annotationOverride, mc)
+               if err != nil || annotationMesh == nil {
+                       return nil, fmt.Errorf("failed to unmarshal mesh config 
from annotation [%v]: %v", annotationOverride, err)
+               }
+               mc = annotationMesh
+       }
+
+       return mc, nil
+}
+
+// Apply any overrides to proxy config from annotations
+func applyAnnotations(config *meshconfig.ProxyConfig, annos map[string]string) 
*meshconfig.ProxyConfig {
+       if v, f := annos[annotation.SidecarDiscoveryAddress.Name]; f {
+               config.DiscoveryAddress = v
+       }
+       if v, f := annos[annotation.SidecarStatusPort.Name]; f {
+               p, err := strconv.Atoi(v)
+               if err != nil {
+                       klog.Errorf("Invalid annotation %v=%v: %v", 
annotation.SidecarStatusPort.Name, v, err)
+               }
+               config.StatusPort = int32(p)
+       }
+       return config
+}
+
+func GetSailSan(discoveryAddress string) string {
+       discHost := strings.Split(discoveryAddress, ":")[0]
+       // For local debugging - the discoveryAddress is set to localhost, but 
the cert issued for normal SA.
+       if discHost == "localhost" {
+               discHost = "dubbod.istio-system.svc"
+       }
+       return discHost
+}
+
+func fileExists(path string) bool {
+       if _, err := os.Stat(path); os.IsNotExist(err) {
+               return false
+       }
+       return true
+}
+
+var CPULimit = env.Register(
+       "DUBBO_CPU_LIMIT",
+       0,
+       "CPU limit for the current process. Expressed as an integer value, 
rounded up.",
+).Get()
diff --git a/pkg/dubbo-agent/grpcxds/grpc_bootstrap.go 
b/pkg/dubbo-agent/grpcxds/grpc_bootstrap.go
new file mode 100644
index 00000000..6dc2df0f
--- /dev/null
+++ b/pkg/dubbo-agent/grpcxds/grpc_bootstrap.go
@@ -0,0 +1,109 @@
+package grpcxds
+
+import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/file"
+       "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
+       core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+       "google.golang.org/protobuf/types/known/durationpb"
+       "os"
+       "path"
+       "time"
+)
+
+const (
+       ServerListenerNamePrefix   = "xds.dubbo.io/grpc/lds/inbound/"
+       ServerListenerNameTemplate = ServerListenerNamePrefix + "%s"
+)
+
+type FileWatcherCertProviderConfig struct {
+       CertificateFile   string          `json:"certificate_file,omitempty"`
+       PrivateKeyFile    string          `json:"private_key_file,omitempty"`
+       CACertificateFile string          `json:"ca_certificate_file,omitempty"`
+       RefreshDuration   json.RawMessage `json:"refresh_interval,omitempty"`
+}
+
+type GenerateBootstrapOptions struct {
+       XdsUdsPath       string
+       DiscoveryAddress string
+       CertDir          string
+}
+
+type ChannelCreds struct {
+       Type   string `json:"type,omitempty"`
+       Config any    `json:"config,omitempty"`
+}
+
+type Bootstrap struct {
+       XDSServers                 []XdsServer                    
`json:"xds_servers,omitempty"`
+       Node                       *core.Node                     
`json:"node,omitempty"`
+       CertProviders              map[string]CertificateProvider 
`json:"certificate_providers,omitempty"`
+       ServerListenerNameTemplate string                         
`json:"server_listener_resource_name_template,omitempty"`
+}
+
+type XdsServer struct {
+       ServerURI      string         `json:"server_uri,omitempty"`
+       ChannelCreds   []ChannelCreds `json:"channel_creds,omitempty"`
+       ServerFeatures []string       `json:"server_features,omitempty"`
+}
+
+type CertificateProvider struct {
+       PluginName string `json:"plugin_name,omitempty"`
+       Config     any    `json:"config,omitempty"`
+}
+
+func GenerateBootstrap(opts GenerateBootstrapOptions) (*Bootstrap, error) {
+       // TODO direct to CP should use secure channel (most likely JWT + TLS, 
but possibly allow mTLS)
+       serverURI := opts.DiscoveryAddress
+       if opts.XdsUdsPath != "" {
+               serverURI = fmt.Sprintf("unix:///%s", opts.XdsUdsPath)
+       }
+
+       bootstrap := Bootstrap{
+               XDSServers: []XdsServer{{
+                       ServerURI: serverURI,
+                       // connect locally via agent
+                       ChannelCreds:   []ChannelCreds{{Type: "insecure"}},
+                       ServerFeatures: []string{"xds_v3"},
+               }},
+               ServerListenerNameTemplate: ServerListenerNameTemplate,
+       }
+
+       if opts.CertDir != "" {
+               // TODO use a more appropriate interval
+               refresh, err := protomarshal.Marshal(durationpb.New(15 * 
time.Minute))
+               if err != nil {
+                       return nil, err
+               }
+
+               bootstrap.CertProviders = map[string]CertificateProvider{
+                       "default": {
+                               PluginName: "file_watcher",
+                               Config: FileWatcherCertProviderConfig{
+                                       PrivateKeyFile:    
path.Join(opts.CertDir, "key.pem"),
+                                       CertificateFile:   
path.Join(opts.CertDir, "cert-chain.pem"),
+                                       CACertificateFile: 
path.Join(opts.CertDir, "root-cert.pem"),
+                                       RefreshDuration:   refresh,
+                               },
+                       },
+               }
+       }
+
+       return &bootstrap, nil
+}
+
+func GenerateBootstrapFile(opts GenerateBootstrapOptions, path string) 
(*Bootstrap, error) {
+       bootstrap, err := GenerateBootstrap(opts)
+       if err != nil {
+               return nil, err
+       }
+       jsonData, err := json.MarshalIndent(bootstrap, "", "  ")
+       if err != nil {
+               return nil, err
+       }
+       if err := file.AtomicWrite(path, jsonData, os.FileMode(0o644)); err != 
nil {
+               return nil, fmt.Errorf("failed writing to %s: %v", path, err)
+       }
+       return bootstrap, nil
+}
diff --git a/pkg/dubbo-agent/plugins.go b/pkg/dubbo-agent/plugins.go
new file mode 100644
index 00000000..29ea6cd1
--- /dev/null
+++ b/pkg/dubbo-agent/plugins.go
@@ -0,0 +1,55 @@
+package dubboagent
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       
"github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/caclient/providers/aegis"
+       "k8s.io/klog/v2"
+       "strings"
+)
+
+type RootCertProvider interface {
+       GetKeyCertsForCA() (string, string)
+       FindRootCAForCA() (string, error)
+}
+
+var providers = make(map[string]func(*security.Options, RootCertProvider) 
(security.Client, error))
+
+func init() {
+       providers["Aegis"] = createAegis
+}
+
+func createAegis(opts *security.Options, a RootCertProvider) (security.Client, 
error) {
+       var tlsOpts *aegis.TLSOptions
+       var err error
+       // TODO: may add extra cases or explicit settings - but this is a rare 
use cases, mostly debugging
+       if strings.HasSuffix(opts.CAEndpoint, ":15010") {
+               klog.Warning("Debug mode or IP-secure network")
+       } else {
+               tlsOpts = &aegis.TLSOptions{}
+               tlsOpts.RootCert, err = a.FindRootCAForCA()
+               if err != nil {
+                       return nil, fmt.Errorf("failed to find root CA cert for 
CA: %v", err)
+               }
+
+               if tlsOpts.RootCert == "" {
+                       klog.Infof("Using CA %s cert with system certs", 
opts.CAEndpoint)
+               } else if !fileExists(tlsOpts.RootCert) {
+                       klog.Fatalf("invalid config - %s missing a root 
certificate %s", opts.CAEndpoint, tlsOpts.RootCert)
+               } else {
+                       klog.Infof("Using CA %s cert with certs: %s", 
opts.CAEndpoint, tlsOpts.RootCert)
+               }
+
+               tlsOpts.Key, tlsOpts.Cert = a.GetKeyCertsForCA()
+       }
+
+       return aegis.NewAegisClient(opts, tlsOpts)
+}
+
+func createCAClient(opts *security.Options, a RootCertProvider) 
(security.Client, error) {
+       provider, ok := providers[opts.CAProviderName]
+       if !ok {
+               return nil, fmt.Errorf("CA provider %q not registered", 
opts.CAProviderName)
+       }
+       return provider(opts, a)
+}
diff --git a/pkg/dubbo-agent/xds_proxy.go b/pkg/dubbo-agent/xds_proxy.go
new file mode 100644
index 00000000..e90426ae
--- /dev/null
+++ b/pkg/dubbo-agent/xds_proxy.go
@@ -0,0 +1,80 @@
+package dubboagent
+
+import (
+       "fmt"
+       dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+       "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/caclient"
+       "google.golang.org/grpc"
+       meshconfig "istio.io/api/mesh/v1alpha1"
+       "net"
+       "sync"
+)
+
+type XdsProxy struct {
+       stopChan             chan struct{}
+       downstreamGrpcServer *grpc.Server
+       downstreamListener   net.Listener
+       optsMutex            sync.RWMutex
+       dialOptions          []grpc.DialOption
+       dubbodSAN            string
+}
+
+func initXdsProxy(ia *Agent) (*XdsProxy, error) {
+       proxy := &XdsProxy{}
+       return proxy, nil
+}
+
+func (p *XdsProxy) initDubbodDialOptions(agent *Agent) error {
+       opts, err := p.buildUpstreamClientDialOpts(agent)
+       if err != nil {
+               return err
+       }
+
+       p.optsMutex.Lock()
+       p.dialOptions = opts
+       p.optsMutex.Unlock()
+       return nil
+}
+
+func (p *XdsProxy) buildUpstreamClientDialOpts(sa *Agent) ([]grpc.DialOption, 
error) {
+       tlsOpts, err := p.getTLSOptions(sa)
+       if err != nil {
+               return nil, fmt.Errorf("failed to get TLS options to talk to 
upstream: %v", err)
+       }
+       options, err := dubbogrpc.ClientOptions(nil, tlsOpts)
+       if err != nil {
+               return nil, err
+       }
+       if sa.secOpts.CredFetcher != nil {
+               options = append(options, 
grpc.WithPerRPCCredentials(caclient.NewDefaultTokenProvider(sa.secOpts)))
+       }
+       return options, nil
+}
+
+func (p *XdsProxy) getTLSOptions(agent *Agent) (*dubbogrpc.TLSOptions, error) {
+       if agent.proxyConfig.ControlPlaneAuthPolicy == 
meshconfig.AuthenticationPolicy_NONE {
+               return nil, nil
+       }
+       xdsCACertPath, err := agent.FindRootCAForXDS()
+       if err != nil {
+               return nil, fmt.Errorf("failed to find root CA cert for XDS: 
%v", err)
+       }
+       key, cert := agent.GetKeyCertsForXDS()
+       return &dubbogrpc.TLSOptions{
+               RootCert:      xdsCACertPath,
+               Key:           key,
+               Cert:          cert,
+               ServerAddress: agent.proxyConfig.DiscoveryAddress,
+               SAN:           p.dubbodSAN,
+       }, nil
+}
+
+func (p *XdsProxy) close() {
+       close(p.stopChan)
+       if p.downstreamGrpcServer != nil {
+               p.downstreamGrpcServer.Stop()
+       }
+       if p.downstreamListener != nil {
+               _ = p.downstreamListener.Close()
+       }
+}
diff --git a/pkg/features/security.go b/pkg/features/security.go
new file mode 100644
index 00000000..778a67df
--- /dev/null
+++ b/pkg/features/security.go
@@ -0,0 +1,35 @@
+package features
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/env"
+)
+
+const (
+       // FIPS_140_2 compliance policy.
+       // nolint: revive, stylecheck
+       FIPS_140_2 = "fips-140-2"
+
+       PQC = "pqc"
+)
+
+// Define common security feature flags shared among the Istio components.
+var (
+       CompliancePolicy = env.Register("COMPLIANCE_POLICY", "",
+               `If set, applies policy-specific restrictions over all existing 
TLS
+settings, including in-mesh mTLS and external TLS. Valid values are:
+
+* '' or unset places no additional restrictions.
+* 'fips-140-2' which enforces a version of the TLS protocol and a subset
+of cipher suites overriding any user preferences or defaults for all runtime
+components, including Envoy, gRPC Go SDK, and gRPC C++ SDK.
+* 'pqc' which enforces post-quantum-safe key exchange X25519MLKEM768, TLS v1.3
+and cipher suites TLS_AES_128_GCM_SHA256 and TLS_AES_256_GCM_SHA384 overriding
+any user preferences or defaults for all runtime components, including Envoy,
+gRPC Go SDK, and gRPC C++ SDK. This policy is experimental.
+
+WARNING: Setting compliance policy in the control plane is a necessary but
+not a sufficient requirement to achieve compliance. There are additional
+steps necessary to claim compliance, including using the validated
+cryptograhic modules (please consult
+https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/security/ssl#fips-140-2).`).Get()
+)
diff --git a/pkg/file/fadvise_linux.go b/pkg/file/fadvise_linux.go
new file mode 100644
index 00000000..fb4fdcaa
--- /dev/null
+++ b/pkg/file/fadvise_linux.go
@@ -0,0 +1,20 @@
+package file
+
+import (
+       "fmt"
+       "os"
+
+       "golang.org/x/sys/unix"
+)
+
+// markNotNeeded marks a function as 'not needed'.
+// Interactions with large files can end up with a large page cache. This 
isn't really a big deal as the OS can reclaim
+// this under memory pressure. However, Kubernetes counts page cache usage 
against the container memory usage.
+// This leads to bloating up memory usage if we are just copying large files 
around.
+func markNotNeeded(in *os.File) error {
+       err := unix.Fadvise(int(in.Fd()), 0, 0, unix.FADV_DONTNEED)
+       if err != nil {
+               return fmt.Errorf("failed to mark file FADV_DONTNEED: %v", err)
+       }
+       return nil
+}
diff --git a/pkg/file/fadvise_unspecified.go b/pkg/file/fadvise_unspecified.go
new file mode 100644
index 00000000..292966f8
--- /dev/null
+++ b/pkg/file/fadvise_unspecified.go
@@ -0,0 +1,12 @@
+//go:build !linux
+// +build !linux
+
+package file
+
+import (
+       "os"
+)
+
+func markNotNeeded(in *os.File) error {
+       return nil
+}
diff --git a/pkg/file/file.go b/pkg/file/file.go
new file mode 100644
index 00000000..148e7261
--- /dev/null
+++ b/pkg/file/file.go
@@ -0,0 +1,71 @@
+package file
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "io"
+       "io/fs"
+       "k8s.io/klog/v2"
+       "os"
+       "path/filepath"
+)
+
+func AtomicWrite(path string, data []byte, mode os.FileMode) error {
+       return AtomicWriteReader(path, bytes.NewReader(data), mode)
+}
+
+func AtomicWriteReader(path string, data io.Reader, mode os.FileMode) error {
+       tmpFile, err := os.CreateTemp(filepath.Dir(path), 
filepath.Base(path)+".tmp.")
+       if err != nil {
+               return err
+       }
+       defer func() {
+               if Exists(tmpFile.Name()) {
+                       if rmErr := os.Remove(tmpFile.Name()); rmErr != nil {
+                               if err != nil {
+                                       err = fmt.Errorf("%s: %w", 
rmErr.Error(), err)
+                               } else {
+                                       err = rmErr
+                               }
+                       }
+               }
+       }()
+
+       if err := os.Chmod(tmpFile.Name(), mode); err != nil {
+               return err
+       }
+
+       n, err := io.Copy(tmpFile, data)
+       if _, err := io.Copy(tmpFile, data); err != nil {
+               if closeErr := tmpFile.Close(); closeErr != nil {
+                       err = fmt.Errorf("%s: %w", closeErr.Error(), err)
+               }
+               return err
+       }
+       tryMarkLargeFileAsNotNeeded(n, tmpFile)
+       if err := tmpFile.Close(); err != nil {
+               return err
+       }
+
+       return os.Rename(tmpFile.Name(), path)
+}
+
+func Exists(name string) bool {
+       // We must explicitly check if the error is due to the file not 
existing (as opposed to a
+       // permissions error).
+       _, err := os.Stat(name)
+       return !errors.Is(err, fs.ErrNotExist)
+}
+
+func tryMarkLargeFileAsNotNeeded(size int64, in *os.File) {
+       // Somewhat arbitrary value to not bother with this on small files
+       const largeFileThreshold = 16 * 1024
+       if size < largeFileThreshold {
+               return
+       }
+       if err := markNotNeeded(in); err != nil {
+               // Error is fine, this is just an optimization anyways. Continue
+               klog.Errorf("failed to mark not needed, continuing anyways: 
%v", err)
+       }
+}
diff --git a/pkg/jwt/jwt.go b/pkg/jwt/jwt.go
new file mode 100644
index 00000000..9d27495b
--- /dev/null
+++ b/pkg/jwt/jwt.go
@@ -0,0 +1,6 @@
+package jwt
+
+const (
+       PolicyThirdParty = "third-party-jwt"
+       PolicyFirstParty = "first-party-jwt"
+)
diff --git a/pkg/model/fips.go b/pkg/model/fips.go
new file mode 100644
index 00000000..10a41cba
--- /dev/null
+++ b/pkg/model/fips.go
@@ -0,0 +1,34 @@
+package model
+
+import (
+       gotls "crypto/tls"
+       common_features "github.com/apache/dubbo-kubernetes/pkg/features"
+       "k8s.io/klog/v2"
+)
+
+var fipsGoCiphers = []uint16{
+       gotls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+       gotls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+       gotls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+       gotls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+}
+
+func EnforceGoCompliance(ctx *gotls.Config) {
+       switch common_features.CompliancePolicy {
+       case "":
+               return
+       case common_features.FIPS_140_2:
+               ctx.MinVersion = gotls.VersionTLS12
+               ctx.MaxVersion = gotls.VersionTLS12
+               ctx.CipherSuites = fipsGoCiphers
+               ctx.CurvePreferences = []gotls.CurveID{gotls.CurveP256}
+               return
+       case common_features.PQC:
+               ctx.MinVersion = gotls.VersionTLS13
+               ctx.CipherSuites = []uint16{gotls.TLS_AES_128_GCM_SHA256, 
gotls.TLS_AES_256_GCM_SHA384}
+               ctx.CurvePreferences = []gotls.CurveID{gotls.X25519MLKEM768}
+       default:
+               klog.Warningf("unknown compliance policy: %q", 
common_features.CompliancePolicy)
+               return
+       }
+}
diff --git a/pkg/model/xds.go b/pkg/model/xds.go
index c5eccbf1..72d334d3 100644
--- a/pkg/model/xds.go
+++ b/pkg/model/xds.go
@@ -1,11 +1,31 @@
 package model
 
 const (
-       APITypePrefix = "type.googleapis.com/"
-       ClusterType   = APITypePrefix + "envoy.config.cluster.v3.Cluster"
-       ListenerType  = APITypePrefix + "envoy.config.listener.v3.Listener"
-       EndpointType  = APITypePrefix + 
"envoy.config.endpoint.v3.ClusterLoadAssignment"
-       RouteType     = APITypePrefix + 
"envoy.config.route.v3.RouteConfiguration"
+       APITypePrefix              = "type.googleapis.com/"
+       ClusterType                = APITypePrefix + 
"envoy.config.cluster.v3.Cluster"
+       ListenerType               = APITypePrefix + 
"envoy.config.listener.v3.Listener"
+       EndpointType               = APITypePrefix + 
"envoy.config.endpoint.v3.ClusterLoadAssignment"
+       RouteType                  = APITypePrefix + 
"envoy.config.route.v3.RouteConfiguration"
+       SecretType                 = APITypePrefix + 
"envoy.extensions.transport_sockets.tls.v3.Secret"
+       ExtensionConfigurationType = APITypePrefix + 
"envoy.config.core.v3.TypedExtensionConfig"
 
-       DebugType = "dubbo.io/debug"
+       HealthInfoType = APITypePrefix + "dubbo.v1.HealthInformation"
+       DebugType      = "dubbo.io/debug"
 )
+
+func GetShortType(typeURL string) string {
+       switch typeURL {
+       case ClusterType:
+               return "CDS"
+       case ListenerType:
+               return "LDS"
+       case RouteType:
+               return "RDS"
+       case EndpointType:
+               return "EDS"
+       case SecretType:
+               return "SDS"
+       default:
+               return typeURL
+       }
+}
diff --git a/pkg/queue/delay.go b/pkg/queue/delay.go
new file mode 100644
index 00000000..171597a3
--- /dev/null
+++ b/pkg/queue/delay.go
@@ -0,0 +1,269 @@
+package queue
+
+import (
+       "container/heap"
+       "k8s.io/klog/v2"
+       "runtime"
+       "sync"
+       "time"
+)
+
+type delayTask struct {
+       do      func() error
+       runAt   time.Time
+       retries int
+}
+
+const maxTaskRetry = 3
+
+var _ heap.Interface = &pq{}
+
+// pq implements an internal priority queue so that tasks with the soonest 
expiry will be run first.
+// Methods on pq are not threadsafe, access should be protected.
+// much of this is taken from the example at 
https://golang.org/pkg/container/heap/
+type pq []*delayTask
+
+func (q pq) Len() int {
+       return len(q)
+}
+
+func (q pq) Less(i, j int) bool {
+       return q[i].runAt.Before(q[j].runAt)
+}
+
+func (q *pq) Swap(i, j int) {
+       (*q)[i], (*q)[j] = (*q)[j], (*q)[i]
+}
+
+func (q *pq) Push(x any) {
+       *q = append(*q, x.(*delayTask))
+}
+
+func (q *pq) Pop() any {
+       old := *q
+       n := len(old)
+       c := cap(old)
+       // Shrink the capacity of task queue.
+       if n < c/2 && c > 32 {
+               npq := make(pq, n, c/2)
+               copy(npq, old)
+               old = npq
+       }
+       if n == 0 {
+               return nil
+       }
+       item := old[n-1]
+       old[n-1] = nil // avoid memory leak
+       *q = old[0 : n-1]
+       return item
+}
+
+// Peek is not managed by the container/heap package, so we return the 0th 
element in the list.
+func (q *pq) Peek() any {
+       if q.Len() < 1 {
+               return nil
+       }
+       return (*q)[0]
+}
+
+// Delayed implements queue such that tasks are executed after a specified 
delay.
+type Delayed interface {
+       baseInstance
+       PushDelayed(t Task, delay time.Duration)
+}
+
+var _ Delayed = &delayQueue{}
+
+// DelayQueueOption configure the behavior of the queue. Must be applied 
before Run.
+type DelayQueueOption func(*delayQueue)
+
+// DelayQueueBuffer sets maximum number of tasks awaiting execution. If this 
limit is reached, Push and PushDelayed
+// will block until there is room.
+func DelayQueueBuffer(bufferSize int) DelayQueueOption {
+       return func(queue *delayQueue) {
+               if queue.enqueue != nil {
+                       close(queue.enqueue)
+               }
+               queue.enqueue = make(chan *delayTask, bufferSize)
+       }
+}
+
+// DelayQueueWorkers sets the number of background worker goroutines await 
tasks to execute. Effectively the
+// maximum number of concurrent tasks.
+func DelayQueueWorkers(workers int) DelayQueueOption {
+       return func(queue *delayQueue) {
+               queue.workers = workers
+       }
+}
+
+// workerChanBuf determines whether the channel of a worker should be a 
buffered channel
+// to get the best performance.
+var workerChanBuf = func() int {
+       // Use blocking channel if GOMAXPROCS=1.
+       // This switches context from sender to receiver immediately,
+       // which results in higher performance.
+       var n int
+       if n = runtime.GOMAXPROCS(0); n == 1 {
+               return 0
+       }
+
+       // Make channel non-blocking and set up its capacity with GOMAXPROCS if 
GOMAXPROCS>1,
+       // otherwise the sender might be dragged down if the receiver is 
CPU-bound.
+       //
+       // GOMAXPROCS determines how many goroutines can run in parallel,
+       // which makes it the best choice as the channel capacity,
+       return n
+}()
+
+// NewDelayed gives a Delayed queue with maximum concurrency specified by 
workers.
+func NewDelayed(opts ...DelayQueueOption) Delayed {
+       q := &delayQueue{
+               workers: 1,
+               queue:   &pq{},
+               execute: make(chan *delayTask, workerChanBuf),
+               enqueue: make(chan *delayTask, 100),
+       }
+       for _, o := range opts {
+               o(q)
+       }
+       return q
+}
+
+type delayQueue struct {
+       workers       int
+       workerStopped []chan struct{}
+
+       // incoming
+       enqueue chan *delayTask
+       // outgoing
+       execute chan *delayTask
+
+       mu    sync.Mutex
+       queue *pq
+}
+
+// Push will execute the task as soon as possible
+func (d *delayQueue) Push(task Task) {
+       d.pushInternal(&delayTask{do: task, runAt: time.Now()})
+}
+
+// PushDelayed will execute the task after waiting for the delay
+func (d *delayQueue) PushDelayed(t Task, delay time.Duration) {
+       task := &delayTask{do: t, runAt: time.Now().Add(delay)}
+       d.pushInternal(task)
+}
+
+// pushInternal will enqueue the delayTask with retries.
+func (d *delayQueue) pushInternal(task *delayTask) {
+       select {
+       case d.enqueue <- task:
+       // buffer has room to enqueue
+       default:
+               // TODO warn and resize buffer
+               // if the buffer is full, we take the more expensive route of 
locking and pushing directly to the heap
+               d.mu.Lock()
+               heap.Push(d.queue, task)
+               d.mu.Unlock()
+       }
+}
+
+func (d *delayQueue) Closed() <-chan struct{} {
+       done := make(chan struct{})
+       go func() {
+               for _, ch := range d.workerStopped {
+                       <-ch
+               }
+               close(done)
+       }()
+       return done
+}
+
+func (d *delayQueue) Run(stop <-chan struct{}) {
+       for i := 0; i < d.workers; i++ {
+               d.workerStopped = append(d.workerStopped, d.work(stop))
+       }
+
+       push := func(t *delayTask) bool {
+               select {
+               case d.execute <- t:
+                       return true
+               case <-stop:
+                       return false
+               }
+       }
+
+       for {
+               var task *delayTask
+               d.mu.Lock()
+               if head := d.queue.Peek(); head != nil {
+                       task = head.(*delayTask)
+                       heap.Pop(d.queue)
+               }
+               d.mu.Unlock()
+
+               if task != nil {
+                       delay := time.Until(task.runAt)
+                       if delay <= 0 {
+                               // execute now and continue processing incoming 
enqueues/tasks
+                               if !push(task) {
+                                       return
+                               }
+                       } else {
+                               // not ready yet, don't block enqueueing
+                               await := time.NewTimer(delay)
+                               select {
+                               case t := <-d.enqueue:
+                                       d.mu.Lock()
+                                       heap.Push(d.queue, t)
+                                       // put the old "head" back on the 
queue, it may be scheduled to execute after the one
+                                       // that was just pushed
+                                       heap.Push(d.queue, task)
+                                       d.mu.Unlock()
+                               case <-await.C:
+                                       if !push(task) {
+                                               return
+                                       }
+                               case <-stop:
+                                       await.Stop()
+                                       return
+                               }
+                               await.Stop()
+                       }
+               } else {
+                       // no items, wait for Push or stop
+                       select {
+                       case t := <-d.enqueue:
+                               d.mu.Lock()
+                               d.queue.Push(t)
+                               d.mu.Unlock()
+                       case <-stop:
+                               return
+                       }
+               }
+       }
+}
+
+// work takes a channel that signals to stop, and returns a channel that 
signals the worker has fully stopped
+func (d *delayQueue) work(stop <-chan struct{}) (stopped chan struct{}) {
+       stopped = make(chan struct{})
+       go func() {
+               defer close(stopped)
+               for {
+                       select {
+                       case t := <-d.execute:
+                               if err := t.do(); err != nil {
+                                       if t.retries < maxTaskRetry {
+                                               t.retries++
+                                               klog.Warningf("Work item handle 
failed: %v %d times, retry it", err, t.retries)
+                                               d.pushInternal(t)
+                                               continue
+                                       }
+                                       klog.Errorf("Work item handle failed: 
%v, reaching the maximum retry times: %d, drop it", err, maxTaskRetry)
+                               }
+                       case <-stop:
+                               return
+                       }
+               }
+       }()
+       return
+}
diff --git a/pkg/sail-agent/agent.go b/pkg/sail-agent/agent.go
deleted file mode 100644
index e79eecea..00000000
--- a/pkg/sail-agent/agent.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package naviagent
-
-import "github.com/apache/dubbo-kubernetes/pkg/model"
-
-// Shared properties with Pilot Proxy struct.
-type Proxy struct {
-       Type      model.NodeType
-       DNSDomain string
-}
diff --git a/pkg/security/retry.go b/pkg/security/retry.go
new file mode 100644
index 00000000..e16069c1
--- /dev/null
+++ b/pkg/security/retry.go
@@ -0,0 +1,16 @@
+package security
+
+import (
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+)
+
+var CARetryOptions = []retry.CallOption{
+       retry.WithMax(5),
+       retry.WithCodes(codes.Canceled, codes.DeadlineExceeded, 
codes.ResourceExhausted, codes.Aborted, codes.Internal, codes.Unavailable),
+}
+
+func CARetryInterceptor() grpc.DialOption {
+       return 
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor(CARetryOptions...))
+}
diff --git a/pkg/security/security.go b/pkg/security/security.go
index 84ab7b62..bcf5ac08 100644
--- a/pkg/security/security.go
+++ b/pkg/security/security.go
@@ -20,15 +20,29 @@ package security
 import (
        "context"
        "net/http"
+       "os"
+       "path/filepath"
        "time"
 )
 
 const (
-       // RootCertReqResourceName is resource name of discovery request for 
root certificate.
-       RootCertReqResourceName = "ROOTCA"
-       // WorkloadKeyCertResourceName is the resource name of the discovery 
request for workload
-       // identity.
-       WorkloadKeyCertResourceName = "default"
+       RootCertReqResourceName           = "ROOTCA"
+       WorkloadKeyCertResourceName       = "default"
+       WorkloadIdentityPath              = 
"./var/run/secrets/workload-spiffe-uds"
+       DefaultWorkloadIdentitySocketFile = "socket"
+       SystemRootCerts                   = "SYSTEM"
+       DefaultRootCertFilePath           = "./etc/certs/root-cert.pem"
+       CredentialNameSocketPath          = 
"./var/run/secrets/credential-uds/socket"
+       WorkloadIdentityCredentialsPath   = 
"./var/run/secrets/workload-spiffe-credentials"
+       WorkloadIdentityCertChainPath     = WorkloadIdentityCredentialsPath + 
"/cert-chain.pem"
+       WorkloadIdentityRootCertPath      = WorkloadIdentityCredentialsPath + 
"/root-cert.pem"
+       WorkloadIdentityKeyPath           = WorkloadIdentityCredentialsPath + 
"/key.pem"
+       FileCredentialNameSocketPath      = 
"./var/run/secrets/credential-uds/files-socket"
+       JWT                               = "JWT"
+)
+
+const (
+       CertSigner = "CertSigner"
 )
 
 type AuthContext struct {
@@ -47,29 +61,23 @@ type Authenticator interface {
 type SecretItem struct {
        CertificateChain []byte
        PrivateKey       []byte
-
-       RootCert []byte
-
-       // ResourceName passed from envoy SDS discovery request.
-       // "ROOTCA" for root cert request, "default" for key/cert request.
-       ResourceName string
-
-       CreatedTime time.Time
-
-       ExpireTime time.Time
+       RootCert         []byte
+       ResourceName     string
+       CreatedTime      time.Time
+       ExpireTime       time.Time
 }
 
 // SecretManager defines secrets management interface which is used by SDS.
 type SecretManager interface {
-       // GenerateSecret generates new secret for the given resource.
-       //
-       // The current implementation also watched the generated secret and 
trigger a callback when it is
-       // near expiry. It will constructs the SAN based on the token's 'sub' 
claim, expected to be in
-       // the K8S format. No other JWTs are currently supported due to client 
logic. If JWT is
-       // missing/invalid, the resourceName is used.
        GenerateSecret(resourceName string) (*SecretItem, error)
 }
 
+type SdsCertificateConfig struct {
+       CertificatePath   string
+       PrivateKeyPath    string
+       CaCertificatePath string
+}
+
 type AuthSource int
 
 type KubernetesInfo struct {
@@ -85,3 +93,57 @@ type Caller struct {
 
        KubernetesInfo KubernetesInfo
 }
+
+type Options struct {
+       ServeOnlyFiles       bool
+       ProvCert             string
+       FileMountedCerts     bool
+       SailCertProvider     string
+       OutputKeyCertToDir   string
+       CertChainFilePath    string
+       KeyFilePath          string
+       RootCertFilePath     string
+       CARootPath           string
+       CAEndpoint           string
+       CAProviderName       string
+       CredFetcher          CredFetcher
+       CAHeaders            map[string]string
+       CAEndpointSAN        string
+       CertSigner           string
+       ClusterID            string
+       CredIdentityProvider string
+       TrustDomain          string
+}
+
+type CredFetcher interface {
+       GetPlatformCredential() (string, error)
+       GetIdentityProvider() string
+       Stop()
+}
+
+type Client interface {
+       CSRSign(csrPEM []byte, certValidTTLInSec int64) ([]string, error)
+       Close()
+       GetRootCertBundle() ([]string, error)
+}
+
+func GetWorkloadSDSSocketListenPath(sockfile string) string {
+       return filepath.Join(WorkloadIdentityPath, sockfile)
+}
+
+func GetDubboSDSServerSocketPath() string {
+       return filepath.Join(WorkloadIdentityPath, 
DefaultWorkloadIdentitySocketFile)
+}
+
+func CheckWorkloadCertificate(certChainFilePath, keyFilePath, rootCertFilePath 
string) bool {
+       if _, err := os.Stat(certChainFilePath); err != nil {
+               return false
+       }
+       if _, err := os.Stat(keyFilePath); err != nil {
+               return false
+       }
+       if _, err := os.Stat(rootCertFilePath); err != nil {
+               return false
+       }
+       return true
+}
diff --git a/pkg/uds/listener.go b/pkg/uds/listener.go
new file mode 100644
index 00000000..9936711f
--- /dev/null
+++ b/pkg/uds/listener.go
@@ -0,0 +1,39 @@
+package uds
+
+import (
+       "fmt"
+       "k8s.io/klog/v2"
+       "net"
+       "os"
+       "path/filepath"
+)
+
+func NewListener(path string) (net.Listener, error) {
+       // Remove unix socket before use.
+       if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
+               // Anything other than "file not found" is an error.
+               return nil, fmt.Errorf("failed to remove unix://%s: %v", path, 
err)
+       }
+
+       // Attempt to create the folder in case it doesn't exist
+       if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
+               // If we cannot create it, just warn here - we will fail later 
if there is a real error
+               klog.Warningf("Failed to create directory for %v: %v", path, 
err)
+       }
+
+       var err error
+       listener, err := net.Listen("unix", path)
+       if err != nil {
+               return nil, fmt.Errorf("failed to listen on unix socket %q: 
%v", path, err)
+       }
+
+       // Update file permission so that istio-proxy has permission to access 
it.
+       if _, err := os.Stat(path); err != nil {
+               return nil, fmt.Errorf("uds file %q doesn't exist", path)
+       }
+       if err := os.Chmod(path, 0o666); err != nil {
+               return nil, fmt.Errorf("failed to update %q permission", path)
+       }
+
+       return listener, nil
+}
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index ab68c09d..183f05a8 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -18,12 +18,34 @@
 package xds
 
 import (
+       "github.com/apache/dubbo-kubernetes/pkg/model"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+       dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+       core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
        discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       "k8s.io/klog/v2"
        "time"
 )
 
 type DiscoveryStream = 
discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer
 
+type Resources = []*discovery.Resource
+
+type WatchedResource struct {
+       TypeUrl       string
+       ResourceNames sets.String
+       Wildcard      bool
+       NonceSent     string
+       NonceAcked    string
+       AlwaysRespond bool
+       LastSendTime  time.Time
+       LastError     string
+       LastResources Resources
+}
+
 type Connection struct {
        peerAddr    string
        connectedAt time.Time
@@ -36,6 +58,28 @@ type Connection struct {
        errorChan   chan error
 }
 
+type Watcher interface {
+       DeleteWatchedResource(url string)
+       GetWatchedResource(url string) *WatchedResource
+       NewWatchedResource(url string, names []string)
+       UpdateWatchedResource(string, func(*WatchedResource) *WatchedResource)
+       // GetID identifies an xDS client. This is different from a connection 
ID.
+       GetID() string
+}
+
+type ConnectionContext interface {
+       XdsConnection() *Connection
+       Watcher() Watcher
+       // Initialize checks the first request.
+       Initialize(node *core.Node) error
+       // Close discards the connection.
+       Close()
+       // Process responds to a discovery request.
+       Process(req *discovery.DiscoveryRequest) error
+       // Push responds to a push event queue
+       Push(ev any) error
+}
+
 func NewConnection(peerAddr string, stream DiscoveryStream) Connection {
        return Connection{
                pushChannel: make(chan any),
@@ -48,3 +92,271 @@ func NewConnection(peerAddr string, stream DiscoveryStream) 
Connection {
                stream:      stream,
        }
 }
+
+func Stream(ctx ConnectionContext) error {
+       con := ctx.XdsConnection()
+       // Do not call: defer close(con.pushChannel). The push channel will be 
garbage collected
+       // when the connection is no longer used. Closing the channel can cause 
subtle race conditions
+       // with push. According to the spec: "It's only necessary to close a 
channel when it is important
+       // to tell the receiving goroutines that all data have been sent."
+
+       // Block until either a request is received or a push is triggered.
+       // We need 2 go routines because 'read' blocks in Recv().
+       go Receive(ctx)
+
+       // Wait for the proxy to be fully initialized before we start serving 
traffic. Because
+       // initialization doesn't have dependencies that will block, there is 
no need to add any timeout
+       // here. Prior to this explicit wait, we were implicitly waiting by 
receive() not sending to
+       // reqChannel and the connection not being enqueued for pushes to 
pushChannel until the
+       // initialization is complete.
+       <-con.initialized
+
+       for {
+               // Go select{} statements are not ordered; the same channel can 
be chosen many times.
+               // For requests, these are higher priority (client may be 
blocked on startup until these are done)
+               // and often very cheap to handle (simple ACK), so we check it 
first.
+               select {
+               case req, ok := <-con.reqChan:
+                       if ok {
+                               if err := ctx.Process(req); err != nil {
+                                       return err
+                               }
+                       } else {
+                               // Remote side closed connection or error 
processing the request.
+                               return <-con.errorChan
+                       }
+               case <-con.stop:
+                       return nil
+               default:
+               }
+               // If there wasn't already a request, poll for requests and 
pushes. Note: if we have a huge
+               // amount of incoming requests, we may still send some pushes, 
as we do not `continue` above;
+               // however, requests will be handled ~2x as much as pushes. 
This ensures a wave of requests
+               // cannot completely starve pushes. However, this scenario is 
unlikely.
+               select {
+               case req, ok := <-con.reqChan:
+                       if ok {
+                               if err := ctx.Process(req); err != nil {
+                                       return err
+                               }
+                       } else {
+                               // Remote side closed connection or error 
processing the request.
+                               return <-con.errorChan
+                       }
+               case pushEv := <-con.pushChannel:
+                       err := ctx.Push(pushEv)
+                       if err != nil {
+                               return err
+                       }
+               case <-con.stop:
+                       return nil
+               }
+       }
+}
+
+func Receive(ctx ConnectionContext) {
+       con := ctx.XdsConnection()
+       defer func() {
+               close(con.errorChan)
+               close(con.reqChan)
+               // Close the initialized channel, if its not already closed, to 
prevent blocking the stream.
+               select {
+               case <-con.initialized:
+               default:
+                       close(con.initialized)
+               }
+       }()
+
+       firstRequest := true
+       for {
+               req, err := con.stream.Recv()
+               if err != nil {
+                       if dubbogrpc.GRPCErrorType(err) != 
dubbogrpc.UnexpectedError {
+                               klog.Infof("ADS: %q %s terminated", 
con.peerAddr, con.conID)
+                               return
+                       }
+                       con.errorChan <- err
+                       klog.Errorf("ADS: %q %s terminated with error: %v", 
con.peerAddr, con.conID, err)
+                       return
+               }
+               // This should be only set for the first request. The node id 
may not be set - for example malicious clients.
+               if firstRequest {
+                       // probe happens before envoy sends first xDS request
+                       if req.TypeUrl == model.HealthInfoType {
+                               klog.Warningf("ADS: %q %s send health check 
probe before normal xDS request", con.peerAddr, con.conID)
+                               continue
+                       }
+                       firstRequest = false
+                       if req.Node == nil || req.Node.Id == "" {
+                               con.errorChan <- 
status.New(codes.InvalidArgument, "missing node information").Err()
+                               return
+                       }
+                       if err := ctx.Initialize(req.Node); err != nil {
+                               con.errorChan <- err
+                               return
+                       }
+                       defer ctx.Close()
+                       klog.Infof("ADS: new connection for node:%s", con.conID)
+               }
+
+               select {
+               case con.reqChan <- req:
+               case <-con.stream.Context().Done():
+                       klog.Infof("ADS: %q %s terminated with stream closed", 
con.peerAddr, con.conID)
+                       return
+               }
+       }
+}
+
+func (conn *Connection) ID() string {
+       return conn.conID
+}
+
+func (conn *Connection) Peer() string {
+       return conn.peerAddr
+}
+
+func (conn *Connection) SetID(id string) {
+       conn.conID = id
+}
+
+func (conn *Connection) MarkInitialized() {
+       close(conn.initialized)
+}
+
+func ShouldRespond(w Watcher, id string, request *discovery.DiscoveryRequest) 
(bool, ResourceDelta) {
+       stype := model.GetShortType(request.TypeUrl)
+
+       // If there is an error in request that means previous response is 
erroneous.
+       // We do not have to respond in that case. In this case request's 
version info
+       // will be different from the version sent. But it is fragile to rely 
on that.
+       if request.ErrorDetail != nil {
+               errCode := codes.Code(request.ErrorDetail.Code)
+               klog.Warningf("ADS:%s: ACK ERROR %s %s:%s", stype, id, 
errCode.String(), request.ErrorDetail.GetMessage())
+               w.UpdateWatchedResource(request.TypeUrl, func(wr 
*WatchedResource) *WatchedResource {
+                       wr.LastError = request.ErrorDetail.GetMessage()
+                       return wr
+               })
+               return false, emptyResourceDelta
+       }
+
+       if shouldUnsubscribe(request) {
+               klog.V(2).Infof("ADS:%s: UNSUBSCRIBE %s %s %s", stype, id, 
request.VersionInfo, request.ResponseNonce)
+               w.DeleteWatchedResource(request.TypeUrl)
+               return false, emptyResourceDelta
+       }
+
+       previousInfo := w.GetWatchedResource(request.TypeUrl)
+       // This can happen in two cases:
+       // 1. When Envoy starts for the first time, it sends an initial 
Discovery request to Istiod.
+       // 2. When Envoy reconnects to a new Istiod that does not have 
information about this typeUrl
+       // i.e. non empty response nonce.
+       // We should always respond with the current resource names.
+       if request.ResponseNonce == "" || previousInfo == nil {
+               klog.V(2).Infof("ADS:%s: INIT/RECONNECT %s %s %s", stype, id, 
request.VersionInfo, request.ResponseNonce)
+               w.NewWatchedResource(request.TypeUrl, request.ResourceNames)
+               return true, emptyResourceDelta
+       }
+
+       // If there is mismatch in the nonce, that is a case of expired/stale 
nonce.
+       // A nonce becomes stale following a newer nonce being sent to Envoy.
+       // previousInfo.NonceSent can be empty if we previously had 
shouldRespond=true but didn't send any resources.
+       if request.ResponseNonce != previousInfo.NonceSent {
+               if features.EnableUnsafeAssertions && previousInfo.NonceSent == 
"" {
+                       // Assert we do not end up in an invalid state
+                       klog.V(2).Infof("ADS:%s: REQ %s Expired nonce received 
%s, but we never sent any nonce", stype,
+                               id, request.ResponseNonce)
+               }
+               klog.V(2).Infof("ADS:%s: REQ %s Expired nonce received %s, sent 
%s", stype,
+                       id, request.ResponseNonce, previousInfo.NonceSent)
+               return false, emptyResourceDelta
+       }
+
+       // If it comes here, that means nonce match.
+       var previousResources sets.String
+       var cur sets.String
+       var alwaysRespond bool
+       w.UpdateWatchedResource(request.TypeUrl, func(wr *WatchedResource) 
*WatchedResource {
+               // Clear last error, we got an ACK.
+               wr.LastError = ""
+               previousResources = wr.ResourceNames
+               wr.NonceAcked = request.ResponseNonce
+               wr.ResourceNames = sets.New(request.ResourceNames...)
+               cur = wr.ResourceNames
+               alwaysRespond = wr.AlwaysRespond
+               wr.AlwaysRespond = false
+               return wr
+       })
+
+       // Envoy can send two DiscoveryRequests with same version and nonce.
+       // when it detects a new resource. We should respond if they change.
+       removed := previousResources.Difference(cur)
+       added := cur.Difference(previousResources)
+
+       // We should always respond "alwaysRespond" marked requests to let 
Envoy finish warming
+       // even though Nonce match and it looks like an ACK.
+       if alwaysRespond {
+               klog.Infof("ADS:%s: FORCE RESPONSE %s for warming.", stype, id)
+               return true, emptyResourceDelta
+       }
+
+       if len(removed) == 0 && len(added) == 0 {
+               klog.V(2).Infof("ADS:%s: ACK %s %s %s", stype, id, 
request.VersionInfo, request.ResponseNonce)
+               return false, emptyResourceDelta
+       }
+       klog.V(2).Infof("ADS:%s: RESOURCE CHANGE added %v removed %v %s %s %s", 
stype,
+               added, removed, id, request.VersionInfo, request.ResponseNonce)
+
+       // For non wildcard resource, if no new resources are subscribed, it 
means we do not need to push.
+       if !IsWildcardTypeURL(request.TypeUrl) && len(added) == 0 {
+               return false, emptyResourceDelta
+       }
+
+       return true, ResourceDelta{
+               Subscribed: added,
+               // we do not need to set unsubscribed for StoW
+       }
+}
+
+func Send(ctx ConnectionContext, res *discovery.DiscoveryResponse) error {
+       return nil
+}
+
+type ResourceDelta struct {
+       // Subscribed indicates the client requested these additional resources
+       Subscribed sets.String
+       // Unsubscribed indicates the client no longer requires these resources
+       Unsubscribed sets.String
+}
+
+var emptyResourceDelta = ResourceDelta{}
+
+func (rd ResourceDelta) IsEmpty() bool {
+       return len(rd.Subscribed) == 0 && len(rd.Unsubscribed) == 0
+}
+
+func shouldUnsubscribe(request *discovery.DiscoveryRequest) bool {
+       return len(request.ResourceNames) == 0 && 
!IsWildcardTypeURL(request.TypeUrl)
+}
+
+func IsWildcardTypeURL(typeURL string) bool {
+       switch typeURL {
+       case model.SecretType, model.EndpointType, model.RouteType, 
model.ExtensionConfigurationType:
+               // By XDS spec, these are not wildcard
+               return false
+       case model.ClusterType, model.ListenerType:
+               // By XDS spec, these are wildcard
+               return true
+       default:
+               // All of our internal types use wildcard semantics
+               return true
+       }
+}
+
+func (conn *Connection) PushCh() chan any {
+       return conn.pushChannel
+}
+
+func (conn *Connection) StreamDone() <-chan struct{} {
+       return conn.stream.Context().Done()
+}
diff --git a/sail/cmd/sail-agent/app/cmd.go b/sail/cmd/sail-agent/app/cmd.go
index 2356cbe9..27d97f53 100644
--- a/sail/cmd/sail-agent/app/cmd.go
+++ b/sail/cmd/sail-agent/app/cmd.go
@@ -18,19 +18,25 @@
 package app
 
 import (
+       "context"
+       "errors"
        "fmt"
        "github.com/apache/dubbo-kubernetes/pkg/cmd"
        "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+       dubboagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
+       "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/config"
        "github.com/apache/dubbo-kubernetes/pkg/model"
+       "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
        "github.com/apache/dubbo-kubernetes/sail/cmd/sail-agent/options"
        "github.com/spf13/cobra"
+       "k8s.io/klog/v2"
 )
 
 var (
        proxyArgs options.ProxyArgs
 )
 
-func NewRootCommand() *cobra.Command {
+func NewRootCommand(sds dubboagent.SDSServiceFactory) *cobra.Command {
        rootCmd := &cobra.Command{
                Use:          "sail-agent",
                Short:        "Dubbo Sail agent.",
@@ -42,7 +48,7 @@ func NewRootCommand() *cobra.Command {
                },
        }
        cmd.AddFlags(rootCmd)
-       proxyCmd := newProxyCommand()
+       proxyCmd := newProxyCommand(sds)
        addFlags(proxyCmd)
        rootCmd.AddCommand(proxyCmd)
        rootCmd.AddCommand(waitCmd)
@@ -50,7 +56,7 @@ func NewRootCommand() *cobra.Command {
        return rootCmd
 }
 
-func newProxyCommand() *cobra.Command {
+func newProxyCommand(sds dubboagent.SDSServiceFactory) *cobra.Command {
        return &cobra.Command{
                Use:   "proxy",
                Short: "XDS proxy agent",
@@ -64,6 +70,37 @@ func newProxyCommand() *cobra.Command {
                        if err != nil {
                                return err
                        }
+
+                       proxyConfig, err := 
config.ConstructProxyConfig(proxyArgs.MeshConfigFile, proxyArgs.ServiceCluster, 
options.ProxyConfigEnv, proxyArgs.Concurrency)
+                       if err != nil {
+                               return fmt.Errorf("failed to get proxy config: 
%v", err)
+                       }
+                       if out, err := protomarshal.ToYAML(proxyConfig); err != 
nil {
+                               klog.Infof("Failed to serialize to YAML: %v", 
err)
+                       } else {
+                               klog.Infof("Effective config: \n%s", out)
+                       }
+
+                       secOpts, err := options.NewSecurityOptions(proxyConfig, 
proxyArgs.StsPort, proxyArgs.TokenManagerPlugin)
+                       if err != nil {
+                               return err
+                       }
+
+                       agentOptions := options.NewAgentOptions(&proxyArgs, 
proxyConfig, sds)
+                       agent := dubboagent.NewAgent(proxyConfig, agentOptions, 
secOpts)
+                       ctx, cancel := 
context.WithCancelCause(context.Background())
+                       defer cancel(errors.New("application shutdown"))
+                       defer agent.Close()
+
+                       // On SIGINT or SIGTERM, cancel the context, triggering 
a graceful shutdown
+                       go cmd.WaitSignalFunc(cancel)
+
+                       wait, err := agent.Run(ctx)
+                       if err != nil {
+                               return err
+                       }
+                       wait()
+
                        return nil
                },
        }
diff --git a/sail/cmd/sail-agent/main.go b/sail/cmd/sail-agent/main.go
index cf80f16d..f2edc9ab 100644
--- a/sail/cmd/sail-agent/main.go
+++ b/sail/cmd/sail-agent/main.go
@@ -18,12 +18,18 @@
 package main
 
 import (
+       dubboagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
        "github.com/apache/dubbo-kubernetes/sail/cmd/sail-agent/app"
+       "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/sds"
+       meshconfig "istio.io/api/mesh/v1alpha1"
        "os"
 )
 
 func main() {
-       rootCmd := app.NewRootCommand()
+       rootCmd := app.NewRootCommand(func(options *security.Options, 
workloadSecretCache security.SecretManager, pkpConf 
*meshconfig.PrivateKeyProvider) dubboagent.SDSService {
+               return sds.NewServer(options, workloadSecretCache, pkpConf)
+       })
        if err := rootCmd.Execute(); err != nil {
                os.Exit(-1)
        }
diff --git a/sail/cmd/sail-agent/options/agent.go 
b/sail/cmd/sail-agent/options/agent.go
new file mode 100644
index 00000000..ae487025
--- /dev/null
+++ b/sail/cmd/sail-agent/options/agent.go
@@ -0,0 +1,31 @@
+package options
+
+import (
+       dubboagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
+       meshconfig "istio.io/api/mesh/v1alpha1"
+       "os"
+       "strings"
+)
+
+const xdsHeaderPrefix = "XDS_HEADER_"
+
+func NewAgentOptions(proxy *ProxyArgs, cfg *meshconfig.ProxyConfig, sds 
dubboagent.SDSServiceFactory) *dubboagent.AgentOptions {
+       o := &dubboagent.AgentOptions{
+               WorkloadIdentitySocketFile: workloadIdentitySocketFile,
+       }
+       extractXDSHeadersFromEnv(o)
+       return o
+}
+
+func extractXDSHeadersFromEnv(o *dubboagent.AgentOptions) {
+       envs := os.Environ()
+       for _, e := range envs {
+               if strings.HasPrefix(e, xdsHeaderPrefix) {
+                       parts := strings.SplitN(e, "=", 2)
+                       if len(parts) != 2 {
+                               continue
+                       }
+                       o.XDSHeaders[parts[0][len(xdsHeaderPrefix):]] = parts[1]
+               }
+       }
+}
diff --git a/sail/cmd/sail-agent/options/agent_proxy.go 
b/sail/cmd/sail-agent/options/agent_proxy.go
index d962ea57..0b6e9f36 100644
--- a/sail/cmd/sail-agent/options/agent_proxy.go
+++ b/sail/cmd/sail-agent/options/agent_proxy.go
@@ -17,13 +17,16 @@
 
 package options
 
-import sailagent "github.com/apache/dubbo-kubernetes/pkg/sail-agent"
+import sailagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
 
 // ProxyArgs provides all of the configuration parameters for the Saku proxy.
 type ProxyArgs struct {
        sailagent.Proxy
-       MeshConfigFile string
-       ServiceCluster string
+       Concurrency        int
+       StsPort            int
+       TokenManagerPlugin string
+       MeshConfigFile     string
+       ServiceCluster     string
 }
 
 func NewProxyArgs() ProxyArgs {
diff --git a/sail/cmd/sail-agent/options/options.go 
b/sail/cmd/sail-agent/options/options.go
new file mode 100644
index 00000000..6f4bdd3a
--- /dev/null
+++ b/sail/cmd/sail-agent/options/options.go
@@ -0,0 +1,29 @@
+package options
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/env"
+       "github.com/apache/dubbo-kubernetes/pkg/jwt"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+)
+
+var (
+       ProxyConfigEnv = env.Register(
+               "PROXY_CONFIG",
+               "",
+               "The proxy configuration. This will be set by the injection - 
gateways will use file mounts.",
+       ).Get()
+       dubbodSAN = env.Register("DUBBOD_SAN", "",
+               "Override the ServerName used to validate Istiod certificate. "+
+                       "Can be used as an alternative to setting /etc/hosts 
for VMs - discovery address will be an IP:port")
+       jwtPolicy = env.Register("JWT_POLICY", jwt.PolicyThirdParty,
+               "The JWT validation policy.")
+       workloadIdentitySocketFile = 
env.Register("WORKLOAD_IDENTITY_SOCKET_FILE", 
security.DefaultWorkloadIdentitySocketFile,
+               fmt.Sprintf("SPIRE workload identity SDS socket filename. If 
set, an SDS socket with this name must exist at %s", 
security.WorkloadIdentityPath)).Get()
+       credFetcherTypeEnv = env.Register("CREDENTIAL_FETCHER_TYPE", 
security.JWT,
+               "The type of the credential fetcher. Currently supported types 
include GoogleComputeEngine").Get()
+       credIdentityProvider = env.Register("CREDENTIAL_IDENTITY_PROVIDER", 
"GoogleComputeEngine",
+               "The identity provider for credential. Currently default 
supported identity provider is GoogleComputeEngine").Get()
+       caProviderEnv = env.Register("CA_PROVIDER", "Aegis", "name of 
authentication provider").Get()
+       caEndpointEnv = env.Register("CA_ADDR", "", "Address of the spiffe 
certificate provider. Defaults to discoveryAddress").Get()
+)
diff --git a/sail/cmd/sail-agent/options/security.go 
b/sail/cmd/sail-agent/options/security.go
new file mode 100644
index 00000000..2e1322fd
--- /dev/null
+++ b/sail/cmd/sail-agent/options/security.go
@@ -0,0 +1,84 @@
+package options
+
+import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+       "github.com/apache/dubbo-kubernetes/pkg/jwt"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/apache/dubbo-kubernetes/security/pkg/credentialfetcher"
+       meshconfig "istio.io/api/mesh/v1alpha1"
+       "k8s.io/klog/v2"
+       "os"
+       "strings"
+)
+
+const caHeaderPrefix = "CA_HEADER_"
+
+func NewSecurityOptions(proxyConfig *meshconfig.ProxyConfig, stsPort int, 
tokenManagerPlugin string) (*security.Options, error) {
+       o := &security.Options{
+               CAEndpoint:     caEndpointEnv,
+               CAProviderName: caProviderEnv,
+       }
+
+       o, err := SetupSecurityOptions(proxyConfig, o, jwtPolicy.Get(),
+               credFetcherTypeEnv, credIdentityProvider)
+       if err != nil {
+               return o, err
+       }
+
+       extractCAHeadersFromEnv(o)
+
+       return o, err
+}
+
+func SetupSecurityOptions(proxyConfig *meshconfig.ProxyConfig, secOpt 
*security.Options, jwtPolicy,
+       credFetcherTypeEnv, credIdentityProvider string,
+) (*security.Options, error) {
+       jwtPath := constants.ThirdPartyJwtPath
+       switch jwtPolicy {
+       case jwt.PolicyThirdParty:
+               klog.Info("JWT policy is third-party-jwt")
+               jwtPath = constants.ThirdPartyJwtPath
+       case jwt.PolicyFirstParty:
+               klog.Warningf("Using deprecated JWT policy 'first-party-jwt'; 
treating as 'third-party-jwt'")
+               jwtPath = constants.ThirdPartyJwtPath
+       default:
+               klog.Info("Using existing certs")
+       }
+
+       o := secOpt
+
+       // If not set explicitly, default to the discovery address.
+       if o.CAEndpoint == "" {
+               o.CAEndpoint = proxyConfig.DiscoveryAddress
+               o.CAEndpointSAN = dubbodSAN.Get()
+       }
+
+       o.CredIdentityProvider = credIdentityProvider
+       credFetcher, err := 
credentialfetcher.NewCredFetcher(credFetcherTypeEnv, o.TrustDomain, jwtPath, 
o.CredIdentityProvider)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create credential fetcher: 
%v", err)
+       }
+       klog.Infof("using credential fetcher of %s type in %s trust domain", 
credFetcherTypeEnv, o.TrustDomain)
+       o.CredFetcher = credFetcher
+
+       if o.ProvCert != "" && o.FileMountedCerts {
+               return nil, fmt.Errorf("invalid options: PROV_CERT and 
FILE_MOUNTED_CERTS are mutually exclusive")
+       }
+       return o, nil
+}
+
+func extractCAHeadersFromEnv(o *security.Options) {
+       envs := os.Environ()
+       for _, e := range envs {
+               if !strings.HasPrefix(e, caHeaderPrefix) {
+                       continue
+               }
+
+               parts := strings.SplitN(e, "=", 2)
+               if len(parts) != 2 {
+                       continue
+               }
+               o.CAHeaders[parts[0][len(caHeaderPrefix):]] = parts[1]
+       }
+}
diff --git a/sail/pkg/features/ship.go b/sail/pkg/features/sail.go
similarity index 100%
rename from sail/pkg/features/ship.go
rename to sail/pkg/features/sail.go
diff --git a/sail/pkg/grpc/grpc.go b/sail/pkg/grpc/grpc.go
new file mode 100644
index 00000000..0f380b77
--- /dev/null
+++ b/sail/pkg/grpc/grpc.go
@@ -0,0 +1,92 @@
+package grpc
+
+import (
+       dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/keepalive"
+       "google.golang.org/grpc/status"
+       "io"
+       "math"
+       "strings"
+)
+
+type ErrorType string
+
+const (
+       UnexpectedError     ErrorType = "unexpectedError"
+       ExpectedError       ErrorType = "expectedError"
+       GracefulTermination ErrorType = "gracefulTermination"
+)
+
+const (
+       defaultClientMaxReceiveMessageSize = math.MaxInt32
+       defaultInitialConnWindowSize       = 1024 * 1024 // default gRPC 
InitialWindowSize
+       defaultInitialWindowSize           = 1024 * 1024 // default gRPC 
ConnWindowSize
+)
+
+var expectedGrpcFailureMessages = sets.New(
+       "client disconnected",
+       "error reading from server: EOF",
+       "transport is closing",
+)
+
+func ClientOptions(options *dubbokeepalive.Options, tlsOpts *TLSOptions) 
([]grpc.DialOption, error) {
+       if options == nil {
+               options = dubbokeepalive.DefaultOption()
+       }
+       keepaliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{
+               Time:    options.Time,
+               Timeout: options.Timeout,
+       })
+
+       initialWindowSizeOption := 
grpc.WithInitialWindowSize(int32(defaultInitialWindowSize))
+       initialConnWindowSizeOption := 
grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize))
+       msgSizeOption := 
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
+       var tlsDialOpts grpc.DialOption
+       var err error
+       if tlsOpts != nil {
+               tlsDialOpts, err = getTLSDialOption(tlsOpts)
+               if err != nil {
+                       return nil, err
+               }
+       } else {
+               tlsDialOpts = 
grpc.WithTransportCredentials(insecure.NewCredentials())
+       }
+       return []grpc.DialOption{keepaliveOption, initialWindowSizeOption, 
initialConnWindowSizeOption, msgSizeOption, tlsDialOpts}, nil
+}
+
+func GRPCErrorType(err error) ErrorType {
+       if err == io.EOF {
+               return GracefulTermination
+       }
+
+       if s, ok := status.FromError(err); ok {
+               if s.Code() == codes.Canceled || s.Code() == 
codes.DeadlineExceeded {
+                       return ExpectedError
+               }
+               if s.Code() == codes.Unavailable && 
containsExpectedMessage(s.Message()) {
+                       return ExpectedError
+               }
+       }
+       // If this is not a gRPCStatus we should just error message.
+       if strings.Contains(err.Error(), "stream terminated by RST_STREAM with 
error code: NO_ERROR") {
+               return ExpectedError
+       }
+       if strings.Contains(err.Error(), "received prior goaway: code: 
NO_ERROR") {
+               return ExpectedError
+       }
+
+       return UnexpectedError
+}
+
+func containsExpectedMessage(msg string) bool {
+       for m := range expectedGrpcFailureMessages {
+               if strings.Contains(msg, m) {
+                       return true
+               }
+       }
+       return false
+}
diff --git a/sail/pkg/grpc/tls.go b/sail/pkg/grpc/tls.go
new file mode 100644
index 00000000..13078451
--- /dev/null
+++ b/sail/pkg/grpc/tls.go
@@ -0,0 +1,96 @@
+package grpc
+
+import (
+       "crypto/tls"
+       "crypto/x509"
+       "fmt"
+       sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
+       "github.com/apache/dubbo-kubernetes/security/pkg/pki/util"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "k8s.io/klog/v2"
+       "net"
+       "os"
+       "strings"
+)
+
+type TLSOptions struct {
+       RootCert      string
+       Key           string
+       Cert          string
+       ServerAddress string
+       SAN           string
+}
+
+func getTLSDialOption(opts *TLSOptions) (grpc.DialOption, error) {
+       rootCert, err := getRootCertificate(opts.RootCert)
+       if err != nil {
+               return nil, err
+       }
+
+       config := tls.Config{
+               GetClientCertificate: func(*tls.CertificateRequestInfo) 
(*tls.Certificate, error) {
+                       var certificate tls.Certificate
+                       key, cert := opts.Key, opts.Cert
+                       if key != "" && cert != "" {
+                               isExpired, err := util.IsCertExpired(opts.Cert)
+                               if err != nil {
+                                       klog.Warningf("cannot parse the cert 
chain, using token instead: %v", err)
+                                       return &certificate, nil
+                               }
+                               if isExpired {
+                                       klog.Warningf("cert expired, using 
token instead")
+                                       return &certificate, nil
+                               }
+                               // Load the certificate from disk
+                               certificate, err = tls.LoadX509KeyPair(cert, 
key)
+                               if err != nil {
+                                       return nil, err
+                               }
+                       }
+                       return &certificate, nil
+               },
+               RootCAs:    rootCert,
+               MinVersion: tls.VersionTLS12,
+       }
+
+       if host, _, err := net.SplitHostPort(opts.ServerAddress); err == nil {
+               config.ServerName = host
+       }
+       // For debugging on localhost (with port forward)
+       if strings.Contains(config.ServerName, "localhost") {
+               config.ServerName = "istiod.istio-system.svc"
+       }
+       if opts.SAN != "" {
+               config.ServerName = opts.SAN
+       }
+       // Compliance for all gRPC clients (e.g. Citadel)..
+       sec_model.EnforceGoCompliance(&config)
+       transportCreds := credentials.NewTLS(&config)
+       return grpc.WithTransportCredentials(transportCreds), nil
+}
+
+func getRootCertificate(rootCertFile string) (*x509.CertPool, error) {
+       var certPool *x509.CertPool
+       var rootCert []byte
+       var err error
+
+       if rootCertFile != "" {
+               rootCert, err = os.ReadFile(rootCertFile)
+               if err != nil {
+                       return nil, err
+               }
+
+               certPool = x509.NewCertPool()
+               ok := certPool.AppendCertsFromPEM(rootCert)
+               if !ok {
+                       return nil, fmt.Errorf("failed to create TLS dial 
option with root certificates")
+               }
+       } else {
+               certPool, err = x509.SystemCertPool()
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return certPool, nil
+}
diff --git a/security/pkg/credentialfetcher/fetcher.go 
b/security/pkg/credentialfetcher/fetcher.go
new file mode 100644
index 00000000..c95c6da5
--- /dev/null
+++ b/security/pkg/credentialfetcher/fetcher.go
@@ -0,0 +1,21 @@
+package credentialfetcher
+
+import (
+       "fmt"
+
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       
"github.com/apache/dubbo-kubernetes/security/pkg/credentialfetcher/plugin"
+)
+
+func NewCredFetcher(credtype, trustdomain, jwtPath, identityProvider string) 
(security.CredFetcher, error) {
+       switch credtype {
+       case security.JWT, "":
+               // If unset, also default to JWT for backwards compatibility
+               if jwtPath == "" {
+                       return nil, nil // no cred fetcher - using certificates 
only
+               }
+               return plugin.CreateTokenPlugin(jwtPath), nil
+       default:
+               return nil, fmt.Errorf("invalid credential fetcher type %s", 
credtype)
+       }
+}
diff --git a/security/pkg/credentialfetcher/plugin/token.go 
b/security/pkg/credentialfetcher/plugin/token.go
new file mode 100644
index 00000000..2a08602e
--- /dev/null
+++ b/security/pkg/credentialfetcher/plugin/token.go
@@ -0,0 +1,36 @@
+package plugin
+
+import (
+       "k8s.io/klog/v2"
+       "os"
+       "strings"
+)
+
+type KubernetesTokenPlugin struct {
+       path string
+}
+
+func CreateTokenPlugin(path string) *KubernetesTokenPlugin {
+       return &KubernetesTokenPlugin{
+               path: path,
+       }
+}
+
+func (t KubernetesTokenPlugin) GetPlatformCredential() (string, error) {
+       if t.path == "" {
+               return "", nil
+       }
+       tok, err := os.ReadFile(t.path)
+       if err != nil {
+               klog.Warningf("failed to fetch token from file: %v", err)
+               return "", nil
+       }
+       return strings.TrimSpace(string(tok)), nil
+}
+
+func (t KubernetesTokenPlugin) GetIdentityProvider() string {
+       return ""
+}
+
+func (t KubernetesTokenPlugin) Stop() {
+}
diff --git a/security/pkg/nodeagent/cache/secretcache.go 
b/security/pkg/nodeagent/cache/secretcache.go
new file mode 100644
index 00000000..31ce0988
--- /dev/null
+++ b/security/pkg/nodeagent/cache/secretcache.go
@@ -0,0 +1,132 @@
+package cache
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/queue"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/fsnotify/fsnotify"
+       "k8s.io/klog/v2"
+       "sync"
+)
+
+type FileCert struct {
+       ResourceName string
+       Filename     string
+}
+
+type SecretManagerClient struct {
+       certWatcher             *fsnotify.Watcher
+       caClient                security.Client
+       queue                   queue.Delayed
+       configOptions           *security.Options
+       existingCertificateFile security.SdsCertificateConfig
+       stop                    chan struct{}
+       caRootPath              string
+       certMutex               sync.RWMutex
+       secretHandler           func(resourceName string)
+       fileCerts               map[FileCert]struct{}
+}
+
+func NewSecretManagerClient(caClient security.Client, options 
*security.Options) (*SecretManagerClient, error) {
+       watcher, err := fsnotify.NewWatcher()
+       if err != nil {
+               return nil, err
+       }
+
+       ret := &SecretManagerClient{
+               queue:         queue.NewDelayed(queue.DelayQueueBuffer(0)),
+               caClient:      caClient,
+               configOptions: options,
+               existingCertificateFile: security.SdsCertificateConfig{
+                       CertificatePath:   options.CertChainFilePath,
+                       PrivateKeyPath:    options.KeyFilePath,
+                       CaCertificatePath: options.RootCertFilePath,
+               },
+               certWatcher: watcher,
+               fileCerts:   make(map[FileCert]struct{}),
+               stop:        make(chan struct{}),
+               caRootPath:  options.CARootPath,
+       }
+
+       go ret.queue.Run(ret.stop)
+       go ret.handleFileWatch()
+       return ret, nil
+}
+
+func (sc *SecretManagerClient) Close() {
+       _ = sc.certWatcher.Close()
+       if sc.caClient != nil {
+               sc.caClient.Close()
+       }
+       close(sc.stop)
+}
+
+func (sc *SecretManagerClient) handleFileWatch() {
+       for {
+               select {
+               case event, ok := <-sc.certWatcher.Events:
+                       // Channel is closed.
+                       if !ok {
+                               return
+                       }
+                       // We only care about updates that change the file 
content
+                       if !(isWrite(event) || isRemove(event) || 
isCreate(event)) {
+                               continue
+                       }
+                       sc.certMutex.RLock()
+                       resources := make(map[FileCert]struct{})
+                       for k, v := range sc.fileCerts {
+                               resources[k] = v
+                       }
+                       sc.certMutex.RUnlock()
+                       klog.Infof("event for file certificate %s : %s, pushing 
to proxy", event.Name, event.Op.String())
+                       // If it is remove event - cleanup from file certs so 
that if it is added again, we can watch.
+                       // The cleanup should happen first before triggering 
callbacks, as the callbacks are async and
+                       // we may get generate call before cleanup is done and 
we will end up not watching the file.
+                       if isRemove(event) {
+                               sc.certMutex.Lock()
+                               for fc := range sc.fileCerts {
+                                       if fc.Filename == event.Name {
+                                               klog.V(2).Infof("removing file 
%s from file certs", event.Name)
+                                               delete(sc.fileCerts, fc)
+                                               break
+                                       }
+                               }
+                               sc.certMutex.Unlock()
+                       }
+                       // Trigger callbacks for all resources referencing this 
file. This is practically always
+                       // a single resource.
+                       for k := range resources {
+                               if k.Filename == event.Name {
+                                       sc.OnSecretUpdate(k.ResourceName)
+                               }
+                       }
+               case err, ok := <-sc.certWatcher.Errors:
+                       // Channel is closed.
+                       if !ok {
+                               return
+                       }
+                       // TODO numFileSecretFailures
+                       klog.Errorf("certificate watch error: %v", err)
+               }
+       }
+}
+
+func (sc *SecretManagerClient) OnSecretUpdate(resourceName string) {
+       sc.certMutex.RLock()
+       defer sc.certMutex.RUnlock()
+       if sc.secretHandler != nil {
+               sc.secretHandler(resourceName)
+       }
+}
+
+func isWrite(event fsnotify.Event) bool {
+       return event.Has(fsnotify.Write)
+}
+
+func isCreate(event fsnotify.Event) bool {
+       return event.Has(fsnotify.Create)
+}
+
+func isRemove(event fsnotify.Event) bool {
+       return event.Has(fsnotify.Remove)
+}
diff --git a/security/pkg/nodeagent/caclient/credentials.go 
b/security/pkg/nodeagent/caclient/credentials.go
new file mode 100644
index 00000000..6f946cec
--- /dev/null
+++ b/security/pkg/nodeagent/caclient/credentials.go
@@ -0,0 +1,50 @@
+package caclient
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "google.golang.org/grpc/credentials"
+)
+
+type DefaultTokenProvider struct {
+       opts *security.Options
+}
+
+func NewDefaultTokenProvider(opts *security.Options) 
credentials.PerRPCCredentials {
+       return &DefaultTokenProvider{opts}
+}
+
+func (t *DefaultTokenProvider) GetRequestMetadata(ctx context.Context, uri 
...string) (map[string]string, error) {
+       if t == nil {
+               return nil, nil
+       }
+       token, err := t.GetToken()
+       if err != nil {
+               return nil, err
+       }
+       if token == "" {
+               return nil, nil
+       }
+       return map[string]string{
+               "authorization": "Bearer " + token,
+       }, nil
+}
+
+// Allow the token provider to be used regardless of transport security; 
callers can determine whether
+// this is safe themselves.
+func (t *DefaultTokenProvider) RequireTransportSecurity() bool {
+       return false
+}
+
+func (t *DefaultTokenProvider) GetToken() (string, error) {
+       if t.opts.CredFetcher == nil {
+               return "", nil
+       }
+       token, err := t.opts.CredFetcher.GetPlatformCredential()
+       if err != nil {
+               return "", fmt.Errorf("fetch platform credential: %v", err)
+       }
+
+       return token, nil
+}
diff --git a/security/pkg/nodeagent/caclient/providers/aegis/client.go 
b/security/pkg/nodeagent/caclient/providers/aegis/client.go
new file mode 100644
index 00000000..e7f0fcdc
--- /dev/null
+++ b/security/pkg/nodeagent/caclient/providers/aegis/client.go
@@ -0,0 +1,149 @@
+package aegis
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+       "github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/caclient"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/protobuf/types/known/structpb"
+       pb "istio.io/api/security/v1alpha1"
+       "k8s.io/klog/v2"
+)
+
+type AegisClient struct {
+       // It means enable tls connection to Citadel if this is not nil.
+       tlsOpts  *TLSOptions
+       client   pb.IstioCertificateServiceClient
+       conn     *grpc.ClientConn
+       provider credentials.PerRPCCredentials
+       opts     *security.Options
+}
+
+type TLSOptions struct {
+       RootCert string
+       Key      string
+       Cert     string
+}
+
+func NewAegisClient(opts *security.Options, tlsOpts *TLSOptions) 
(*AegisClient, error) {
+       c := &AegisClient{
+               tlsOpts:  tlsOpts,
+               opts:     opts,
+               provider: caclient.NewDefaultTokenProvider(opts),
+       }
+
+       conn, err := c.buildConnection()
+       if err != nil {
+               klog.Errorf("Failed to connect to endpoint %s: %v", 
opts.CAEndpoint, err)
+               return nil, fmt.Errorf("failed to connect to endpoint %s", 
opts.CAEndpoint)
+       }
+       c.conn = conn
+       c.client = pb.NewIstioCertificateServiceClient(conn)
+       return c, nil
+}
+
+func (c *AegisClient) Close() {
+       if c.conn != nil {
+               c.conn.Close()
+       }
+}
+
+func (c *AegisClient) getTLSOptions() *dubbogrpc.TLSOptions {
+       if c.tlsOpts != nil {
+               return &dubbogrpc.TLSOptions{
+                       RootCert:      c.tlsOpts.RootCert,
+                       Key:           c.tlsOpts.Key,
+                       Cert:          c.tlsOpts.Cert,
+                       ServerAddress: c.opts.CAEndpoint,
+                       SAN:           c.opts.CAEndpointSAN,
+               }
+       }
+       return nil
+}
+
+func (c *AegisClient) CSRSign(csrPEM []byte, certValidTTLInSec int64) (res 
[]string, err error) {
+       crMetaStruct := &structpb.Struct{
+               Fields: map[string]*structpb.Value{
+                       security.CertSigner: {
+                               Kind: &structpb.Value_StringValue{StringValue: 
c.opts.CertSigner},
+                       },
+               },
+       }
+       req := &pb.IstioCertificateRequest{
+               Csr:              string(csrPEM),
+               ValidityDuration: certValidTTLInSec,
+               Metadata:         crMetaStruct,
+       }
+       // TODO(hzxuzhonghu): notify caclient rebuilding only when root cert is 
updated.
+       // It can happen when the istiod dns certs is resigned after root cert 
is updated,
+       // in this case, the ca grpc client can not automatically connect to 
istiod after the underlying network connection closed.
+       // Becase that the grpc client still use the old tls configuration to 
reconnect to istiod.
+       // So here we need to rebuild the caClient in order to use the new root 
cert.
+       defer func() {
+               if err != nil {
+                       klog.Errorf("failed to sign CSR: %v", err)
+                       if err := c.reconnect(); err != nil {
+                               klog.Errorf("failed reconnect: %v", err)
+                       }
+               }
+       }()
+
+       ctx := metadata.NewOutgoingContext(context.Background(), 
metadata.Pairs("ClusterID", c.opts.ClusterID))
+       for k, v := range c.opts.CAHeaders {
+               ctx = metadata.AppendToOutgoingContext(ctx, k, v)
+       }
+
+       resp, err := c.client.CreateCertificate(ctx, req)
+       if err != nil {
+               return nil, fmt.Errorf("create certificate: %v", err)
+       }
+
+       if len(resp.CertChain) <= 1 {
+               return nil, errors.New("invalid empty CertChain")
+       }
+
+       return resp.CertChain, nil
+}
+
+func (c *AegisClient) GetRootCertBundle() ([]string, error) {
+       return []string{}, nil
+}
+
+func (c *AegisClient) buildConnection() (*grpc.ClientConn, error) {
+       tlsOpts := c.getTLSOptions()
+       opts, err := dubbogrpc.ClientOptions(nil, tlsOpts)
+       if err != nil {
+               return nil, err
+       }
+       opts = append(opts,
+               grpc.WithPerRPCCredentials(c.provider),
+               security.CARetryInterceptor(),
+       )
+       conn, err := grpc.Dial(c.opts.CAEndpoint, opts...)
+       if err != nil {
+               klog.Errorf("Failed to connect to endpoint %s: %v", 
c.opts.CAEndpoint, err)
+               return nil, fmt.Errorf("failed to connect to endpoint %s", 
c.opts.CAEndpoint)
+       }
+
+       return conn, nil
+}
+
+func (c *AegisClient) reconnect() error {
+       if err := c.conn.Close(); err != nil {
+               return fmt.Errorf("failed to close connection: %v", err)
+       }
+
+       conn, err := c.buildConnection()
+       if err != nil {
+               return err
+       }
+       c.conn = conn
+       c.client = pb.NewIstioCertificateServiceClient(conn)
+       klog.Info("recreated connection")
+       return nil
+}
diff --git a/security/pkg/nodeagent/sds/sdsservice.go 
b/security/pkg/nodeagent/sds/sdsservice.go
new file mode 100644
index 00000000..00b8b08b
--- /dev/null
+++ b/security/pkg/nodeagent/sds/sdsservice.go
@@ -0,0 +1,231 @@
+package sds
+
+import (
+       "context"
+       "github.com/apache/dubbo-kubernetes/pkg/backoff"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+       "github.com/apache/dubbo-kubernetes/pkg/xds"
+       core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+       discovery 
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+       sds "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       mesh "istio.io/api/mesh/v1alpha1"
+       "k8s.io/klog/v2"
+       "strconv"
+       "sync"
+       "sync/atomic"
+)
+
+type sdsservice struct {
+       st security.SecretManager
+
+       stop       chan struct{}
+       rootCaPath string
+       pkpConf    *mesh.PrivateKeyProvider
+
+       sync.Mutex
+       clients map[string]*Context
+}
+
+type Context struct {
+       BaseConnection xds.Connection
+       s              *sdsservice
+       w              *Watch
+}
+
+type Watch struct {
+       sync.Mutex
+       watch *xds.WatchedResource
+}
+
+func newSDSService(st security.SecretManager, options *security.Options, 
pkpConf *mesh.PrivateKeyProvider) *sdsservice {
+       ret := &sdsservice{
+               st:      st,
+               stop:    make(chan struct{}),
+               pkpConf: pkpConf,
+               clients: make(map[string]*Context),
+       }
+
+       ret.rootCaPath = options.CARootPath
+
+       if options.FileMountedCerts || options.ServeOnlyFiles {
+               return ret
+       }
+
+       // Pre-generate workload certificates to improve startup latency and 
ensure that for OUTPUT_CERTS
+       // case we always write a certificate. A workload can technically run 
without any mTLS/CA
+       // configured, in which case this will fail; if it becomes noisy we 
should disable the entire SDS
+       // server in these cases.
+       go func() {
+               // TODO: do we need max timeout for retry, seems meaningless to 
retry forever if it never succeed
+               b := backoff.NewExponentialBackOff(backoff.DefaultOption())
+               // context for both timeout and channel, whichever stops first, 
the context will be done
+               ctx, cancel := context.WithCancel(context.Background())
+               go func() {
+                       select {
+                       case <-ret.stop:
+                               cancel()
+                       case <-ctx.Done():
+                       }
+               }()
+               defer cancel()
+               _ = b.RetryWithContext(ctx, func() error {
+                       _, err := 
st.GenerateSecret(security.WorkloadKeyCertResourceName)
+                       if err != nil {
+                               klog.Warningf("failed to warm certificate: %v", 
err)
+                               return err
+                       }
+
+                       _, err = 
st.GenerateSecret(security.RootCertReqResourceName)
+                       if err != nil {
+                               klog.Warningf("failed to warm root certificate: 
%v", err)
+                               return err
+                       }
+
+                       return nil
+               })
+       }()
+
+       return ret
+}
+
+// StreamSecrets serves SDS discovery requests and SDS push requests
+func (s *sdsservice) StreamSecrets(stream 
sds.SecretDiscoveryService_StreamSecretsServer) error {
+       return xds.Stream(&Context{
+               BaseConnection: xds.NewConnection("", stream),
+               s:              s,
+               w:              &Watch{},
+       })
+}
+
+func (s *sdsservice) DeltaSecrets(stream 
sds.SecretDiscoveryService_DeltaSecretsServer) error {
+       return status.Error(codes.Unimplemented, "DeltaSecrets not implemented")
+}
+
+func (s *sdsservice) FetchSecrets(ctx context.Context, discReq 
*discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
+       return nil, status.Error(codes.Unimplemented, "FetchSecrets not 
implemented")
+}
+
+// register adds the SDS handle to the grpc server
+func (s *sdsservice) register(rpcs *grpc.Server) {
+       sds.RegisterSecretDiscoveryServiceServer(rpcs, s)
+}
+
+func (s *sdsservice) Close() {
+       close(s.stop)
+}
+
+func (c *Context) XdsConnection() *xds.Connection {
+       return &c.BaseConnection
+}
+
+var connectionNumber = int64(0)
+
+func (c *Context) Initialize(_ *core.Node) error {
+       id := atomic.AddInt64(&connectionNumber, 1)
+       con := c.XdsConnection()
+       con.SetID(strconv.FormatInt(id, 10))
+
+       c.s.Lock()
+       c.s.clients[con.ID()] = c
+       c.s.Unlock()
+
+       con.MarkInitialized()
+       return nil
+}
+
+func (c *Context) Close() {
+       c.s.Lock()
+       defer c.s.Unlock()
+       delete(c.s.clients, c.XdsConnection().ID())
+}
+
+func (c *Context) Watcher() xds.Watcher {
+       return c.w
+}
+
+func (c *Context) Process(req *discovery.DiscoveryRequest) error {
+       shouldRespond, delta := xds.ShouldRespond(c.Watcher(), 
c.XdsConnection().ID(), req)
+       if !shouldRespond {
+               return nil
+       }
+       resources := req.ResourceNames
+       if !delta.IsEmpty() {
+               resources = delta.Subscribed.UnsortedList()
+       }
+       res, err := c.s.generate(resources)
+       if err != nil {
+               return err
+       }
+       return xds.Send(c, res)
+}
+
+func (c *Context) Push(ev any) error {
+       secretName := ev.(string)
+       if !c.w.requested(secretName) {
+               return nil
+       }
+       res, err := c.s.generate([]string{secretName})
+       if err != nil {
+               return err
+       }
+       return xds.Send(c, res)
+}
+
+func (w *Watch) requested(secretName string) bool {
+       w.Lock()
+       defer w.Unlock()
+       if w.watch != nil {
+               return w.watch.ResourceNames.Contains(secretName)
+       }
+       return false
+}
+
+func (w *Watch) GetWatchedResource(string) *xds.WatchedResource {
+       w.Lock()
+       defer w.Unlock()
+       return w.watch
+}
+
+func (w *Watch) NewWatchedResource(typeURL string, names []string) {
+       w.Lock()
+       defer w.Unlock()
+       w.watch = &xds.WatchedResource{TypeUrl: typeURL, ResourceNames: 
sets.New(names...)}
+}
+
+func (w *Watch) UpdateWatchedResource(_ string, f func(*xds.WatchedResource) 
*xds.WatchedResource) {
+       w.Lock()
+       defer w.Unlock()
+       w.watch = f(w.watch)
+}
+
+func (w *Watch) DeleteWatchedResource(string) {
+       w.Lock()
+       defer w.Unlock()
+       w.watch = nil
+}
+
+func (w *Watch) GetID() string {
+       // This always maps to the same local Envoy instance.
+       return ""
+}
+
+func (s *sdsservice) generate(resourceNames []string) 
(*discovery.DiscoveryResponse, error) {
+       return &discovery.DiscoveryResponse{}, nil
+}
+
+func (s *sdsservice) push(secretName string) {
+       s.Lock()
+       defer s.Unlock()
+       for _, client := range s.clients {
+               go func(client *Context) {
+                       select {
+                       case client.XdsConnection().PushCh() <- secretName:
+                       case <-client.XdsConnection().StreamDone():
+                       }
+               }(client)
+       }
+}
diff --git a/security/pkg/nodeagent/sds/server.go 
b/security/pkg/nodeagent/sds/server.go
new file mode 100644
index 00000000..07af70c8
--- /dev/null
+++ b/security/pkg/nodeagent/sds/server.go
@@ -0,0 +1,115 @@
+package sds
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/apache/dubbo-kubernetes/pkg/uds"
+       "go.uber.org/atomic"
+       "google.golang.org/grpc"
+       mesh "istio.io/api/mesh/v1alpha1"
+       "k8s.io/klog/v2"
+       "net"
+       "time"
+)
+
+const (
+       maxStreams    = 100000
+       maxRetryTimes = 5
+)
+
+type Server struct {
+       workloadSds *sdsservice
+
+       grpcWorkloadListener net.Listener
+
+       grpcWorkloadServer *grpc.Server
+       stopped            *atomic.Bool
+}
+
+func NewServer(options *security.Options, workloadSecretCache 
security.SecretManager, pkpConf *mesh.PrivateKeyProvider) *Server {
+       s := &Server{stopped: atomic.NewBool(false)}
+       s.workloadSds = newSDSService(workloadSecretCache, options, pkpConf)
+       s.initWorkloadSdsService(options)
+       return s
+}
+
+func (s *Server) initWorkloadSdsService(opts *security.Options) {
+       s.grpcWorkloadServer = grpc.NewServer(s.grpcServerOptions()...)
+       s.workloadSds.register(s.grpcWorkloadServer)
+       var err error
+       path := security.GetDubboSDSServerSocketPath()
+       if opts.ServeOnlyFiles {
+               path = security.FileCredentialNameSocketPath
+       }
+       s.grpcWorkloadListener, err = uds.NewListener(path)
+       go func() {
+               klog.Info("Starting SDS grpc server")
+               waitTime := time.Second
+               started := false
+               for i := 0; i < maxRetryTimes; i++ {
+                       if s.stopped.Load() {
+                               return
+                       }
+                       serverOk := true
+                       setUpUdsOK := true
+                       if s.grpcWorkloadListener == nil {
+                               if s.grpcWorkloadListener, err = 
uds.NewListener(path); err != nil {
+                                       klog.Errorf("SDS grpc server for 
workload proxies failed to set up UDS: %v", err)
+                                       setUpUdsOK = false
+                               }
+                       }
+                       if s.grpcWorkloadListener != nil {
+                               if opts.ServeOnlyFiles {
+                                       klog.Infof("Starting SDS server for 
file certificates only, will listen on %q", path)
+                               } else {
+                                       klog.Infof("Starting SDS server for 
workload certificates, will listen on %q", path)
+                               }
+                               if err = 
s.grpcWorkloadServer.Serve(s.grpcWorkloadListener); err != nil {
+                                       klog.Errorf("SDS grpc server for 
workload proxies failed to start: %v", err)
+                                       serverOk = false
+                               }
+                       }
+                       if serverOk && setUpUdsOK {
+                               started = true
+                               break
+                       }
+                       time.Sleep(waitTime)
+                       waitTime *= 2
+               }
+               if !started {
+                       klog.Warningf("SDS grpc server could not be started")
+               }
+       }()
+}
+
+func (s *Server) grpcServerOptions() []grpc.ServerOption {
+       grpcOptions := []grpc.ServerOption{
+               grpc.MaxConcurrentStreams(uint32(maxStreams)),
+       }
+
+       return grpcOptions
+}
+
+func (s *Server) OnSecretUpdate(resourceName string) {
+       if s.workloadSds == nil {
+               return
+       }
+
+       klog.V(2).Infof("Trigger on secret update, resource name: %s", 
resourceName)
+       s.workloadSds.push(resourceName)
+}
+
+func (s *Server) Stop() {
+       if s == nil {
+               return
+       }
+       s.stopped.Store(true)
+       if s.grpcWorkloadServer != nil {
+               s.grpcWorkloadServer.Stop()
+       }
+       if s.grpcWorkloadListener != nil {
+               s.grpcWorkloadListener.Close()
+       }
+       if s.workloadSds != nil {
+               s.workloadSds.Close()
+       }
+}

Reply via email to