This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 570bf720 [discovery] delete xds cache and webhook code (#797)
570bf720 is described below
commit 570bf72010e0446174d3849cb75626e72b87ea08
Author: Jian Zhong <[email protected]>
AuthorDate: Tue Sep 30 21:08:12 2025 +0800
[discovery] delete xds cache and webhook code (#797)
---
go.mod | 1 +
go.sum | 2 +
pkg/kube/inject/webhook.go | 4 +
pkg/util/hash/hash.go | 45 ++++++++
pkg/xds/server.go | 27 -----
sail/pkg/bootstrap/server.go | 19 +++-
sail/pkg/config/kube/crdclient/client.go | 3 +
sail/pkg/features/tuning.go | 10 +-
sail/pkg/model/config.go | 29 +++++
sail/pkg/model/context.go | 16 ---
sail/pkg/model/push_context.go | 139 +++++++++++++++++++-----
sail/pkg/model/typed_xds_cache.go | 35 ------
sail/pkg/model/xds_cache.go | 53 ---------
sail/pkg/xds/ads.go | 38 +++++++
sail/pkg/xds/discovery.go | 179 ++++++++++++++++++++++++++++++-
15 files changed, 435 insertions(+), 165 deletions(-)
diff --git a/go.mod b/go.mod
index c9d60e36..a6b993a4 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-multierror v1.1.1
+ github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/heroku/color v0.0.6
github.com/moby/term v0.5.2
github.com/ory/viper v1.7.5
diff --git a/go.sum b/go.sum
index 310ea56b..5fc89569 100644
--- a/go.sum
+++ b/go.sum
@@ -392,6 +392,8 @@ github.com/hashicorp/errwrap v1.1.0
h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
+github.com/hashicorp/golang-lru/v2 v2.0.5
h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
+github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod
h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/heroku/color v0.0.6 h1:UTFFMrmMLFcL3OweqP1lAdp8i1y/9oHqkeHjQ/b/Ny0=
diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go
new file mode 100644
index 00000000..5f8858f9
--- /dev/null
+++ b/pkg/kube/inject/webhook.go
@@ -0,0 +1,4 @@
+package inject
+
+type Webhook struct {
+}
diff --git a/pkg/util/hash/hash.go b/pkg/util/hash/hash.go
new file mode 100644
index 00000000..da7a02bd
--- /dev/null
+++ b/pkg/util/hash/hash.go
@@ -0,0 +1,45 @@
+package hash
+
+import (
+ "encoding/hex"
+
+ "github.com/cespare/xxhash/v2"
+)
+
+type Hash interface {
+ Write(p []byte) (n int)
+ WriteString(s string) (n int)
+ Sum() string
+ Sum64() uint64
+}
+
+type instance struct {
+ hash *xxhash.Digest
+}
+
+var _ Hash = &instance{}
+
+func New() Hash {
+ return &instance{
+ hash: xxhash.New(),
+ }
+}
+
+func (i *instance) Write(p []byte) (n int) {
+ n, _ = i.hash.Write(p)
+ return
+}
+
+func (i *instance) WriteString(s string) (n int) {
+ n, _ = i.hash.WriteString(s)
+ return
+}
+
+func (i *instance) Sum64() uint64 {
+ return i.hash.Sum64()
+}
+
+func (i *instance) Sum() string {
+ sum := i.hash.Sum(nil)
+ return hex.EncodeToString(sum)
+}
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 814630db..3a213c9d 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -95,26 +95,10 @@ func NewConnection(peerAddr string, stream DiscoveryStream)
Connection {
func Stream(ctx ConnectionContext) error {
con := ctx.XdsConnection()
- // Do not call: defer close(con.pushChannel). The push channel will be
garbage collected
- // when the connection is no longer used. Closing the channel can cause
subtle race conditions
- // with push. According to the spec: "It's only necessary to close a
channel when it is important
- // to tell the receiving goroutines that all data have been sent."
-
- // Block until either a request is received or a push is triggered.
- // We need 2 go routines because 'read' blocks in Recv().
go Receive(ctx)
-
- // Wait for the proxy to be fully initialized before we start serving
traffic. Because
- // initialization doesn't have dependencies that will block, there is
no need to add any timeout
- // here. Prior to this explicit wait, we were implicitly waiting by
receive() not sending to
- // reqChannel and the connection not being enqueued for pushes to
pushChannel until the
- // initialization is complete.
<-con.initialized
for {
- // Go select{} statements are not ordered; the same channel can
be chosen many times.
- // For requests, these are higher priority (client may be
blocked on startup until these are done)
- // and often very cheap to handle (simple ACK), so we check it
first.
select {
case req, ok := <-con.reqChan:
if ok {
@@ -129,10 +113,6 @@ func Stream(ctx ConnectionContext) error {
return nil
default:
}
- // If there wasn't already a request, poll for requests and
pushes. Note: if we have a huge
- // amount of incoming requests, we may still send some pushes,
as we do not `continue` above;
- // however, requests will be handled ~2x as much as pushes.
This ensures a wave of requests
- // cannot completely starve pushes. However, this scenario is
unlikely.
select {
case req, ok := <-con.reqChan:
if ok {
@@ -140,7 +120,6 @@ func Stream(ctx ConnectionContext) error {
return err
}
} else {
- // Remote side closed connection or error
processing the request.
return <-con.errorChan
}
case pushEv := <-con.pushChannel:
@@ -159,7 +138,6 @@ func Receive(ctx ConnectionContext) {
defer func() {
close(con.errorChan)
close(con.reqChan)
- // Close the initialized channel, if its not already closed, to
prevent blocking the stream.
select {
case <-con.initialized:
default:
@@ -179,9 +157,7 @@ func Receive(ctx ConnectionContext) {
klog.Errorf("ADS: %q %s terminated with error: %v",
con.peerAddr, con.conID, err)
return
}
- // This should be only set for the first request. The node id
may not be set - for example malicious clients.
if firstRequest {
- // probe happens before envoy sends first xDS request
if req.TypeUrl == model.HealthInfoType {
klog.Warningf("ADS: %q %s send health check
probe before normal xDS request", con.peerAddr, con.conID)
continue
@@ -227,9 +203,6 @@ func (conn *Connection) MarkInitialized() {
func ShouldRespond(w Watcher, id string, request *discovery.DiscoveryRequest)
(bool, ResourceDelta) {
stype := model.GetShortType(request.TypeUrl)
- // If there is an error in request that means previous response is
erroneous.
- // We do not have to respond in that case. In this case request's
version info
- // will be different from the version sent. But it is fragile to rely
on that.
if request.ErrorDetail != nil {
errCode := codes.Code(request.ErrorDetail.Code)
klog.Warningf("ADS:%s: ACK ERROR %s %s:%s", stype, id,
errCode.String(), request.ErrorDetail.GetMessage())
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index fed67bdb..1a60f80d 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/h2c"
dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/inject"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
@@ -109,6 +110,13 @@ type Server struct {
dubbodCertBundleWatcher *keycertbundle.Watcher
readinessProbes map[string]readinessProbe
+
+ webhookInfo *webhookInfo
+}
+
+type webhookInfo struct {
+ mu sync.RWMutex
+ wh *inject.Webhook
}
type readinessProbe func() bool
@@ -215,7 +223,7 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
// TODO initRegistryEventHandlers?
- // TODO initDiscoveryService?
+ s.initDiscoveryService()
s.startCA(caOpts)
@@ -286,6 +294,15 @@ func (s *Server) Start(stop <-chan struct{}) error {
return nil
}
+func (s *Server) initDiscoveryService() {
+ klog.Infof("starting discovery service")
+ s.addStartFunc("xds server", func(stop <-chan struct{}) error {
+ klog.Infof("Starting ADS server")
+ s.XDSServer.Start(stop)
+ return nil
+ })
+}
+
func (s *Server) startCA(caOpts *caOptions) {
if s.CA == nil && s.RA == nil {
return
diff --git a/sail/pkg/config/kube/crdclient/client.go
b/sail/pkg/config/kube/crdclient/client.go
new file mode 100644
index 00000000..da7070cb
--- /dev/null
+++ b/sail/pkg/config/kube/crdclient/client.go
@@ -0,0 +1,3 @@
+package crdclient
+
+type Client struct{}
diff --git a/sail/pkg/features/tuning.go b/sail/pkg/features/tuning.go
index a99ba302..1b29b630 100644
--- a/sail/pkg/features/tuning.go
+++ b/sail/pkg/features/tuning.go
@@ -17,7 +17,10 @@
package features
-import "github.com/apache/dubbo-kubernetes/pkg/env"
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/env"
+ "time"
+)
var (
MaxConcurrentStreams = env.Register(
@@ -32,4 +35,9 @@ var (
4*1024*1024,
"Sets the max receive buffer size of gRPC stream in bytes.",
).Get()
+
+ XDSCacheMaxSize = env.Register("SAIL_XDS_CACHE_SIZE", 60000,
+ "The maximum number of cache entries for the XDS cache.").Get()
+ XDSCacheIndexClearInterval =
env.Register("SAIL_XDS_CACHE_INDEX_CLEAR_INTERVAL", 5*time.Second,
+ "The interval for xds cache index clearing.").Get()
)
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
index 6619a5d9..9cd803ae 100644
--- a/sail/pkg/model/config.go
+++ b/sail/pkg/model/config.go
@@ -4,9 +4,14 @@ import (
"cmp"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+ "github.com/apache/dubbo-kubernetes/pkg/util/hash"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
"sort"
)
+type ConfigHash uint64
+
const (
NamespaceAll = ""
)
@@ -43,3 +48,27 @@ func sortConfigByCreationTime(configs []config.Config)
[]config.Config {
})
return configs
}
+
+func (key ConfigKey) String() string {
+ return key.Kind.String() + "/" + key.Namespace + "/" + key.Name
+}
+
+func HasConfigsOfKind(configs sets.Set[ConfigKey], kind kind.Kind) bool {
+ for c := range configs {
+ if c.Kind == kind {
+ return true
+ }
+ }
+ return false
+}
+
+func (key ConfigKey) HashCode() ConfigHash {
+ h := hash.New()
+ h.Write([]byte{byte(key.Kind)})
+ // Add separator / to avoid collision.
+ h.WriteString("/")
+ h.WriteString(key.Namespace)
+ h.WriteString("/")
+ h.WriteString(key.Name)
+ return ConfigHash(h.Sum64())
+}
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index e6a2f4f1..98c0bd75 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -23,7 +23,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
"github.com/apache/dubbo-kubernetes/pkg/xds"
- "github.com/apache/dubbo-kubernetes/sail/pkg/features"
meshconfig "istio.io/api/mesh/v1alpha1"
"net"
"strconv"
@@ -40,30 +39,15 @@ type Environment struct {
ConfigStore
mutex sync.RWMutex
pushContext *PushContext
- Cache XdsCache
NetworksWatcher mesh.NetworksWatcher
NetworkManager *NetworkManager
clusterLocalServices ClusterLocalProvider
DomainSuffix string
}
-type XdsCacheImpl struct {
- cds typedXdsCache[uint64]
- eds typedXdsCache[uint64]
- rds typedXdsCache[uint64]
- sds typedXdsCache[string]
-}
-
func NewEnvironment() *Environment {
- var cache XdsCache
- if features.EnableXDSCaching {
- cache = NewXdsCache()
- } else {
- cache = DisabledCache{}
- }
return &Environment{
pushContext: NewPushContext(),
- Cache: cache,
}
}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 4e941844..e1e48814 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -27,6 +27,13 @@ import (
meshconfig "istio.io/api/mesh/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"sync"
+ "time"
+)
+
+type TriggerReason string
+
+const (
+ UnknownTrigger TriggerReason = "unknown"
)
type PushContext struct {
@@ -51,26 +58,9 @@ type serviceAccountKey struct {
}
type virtualServiceIndex struct {
- exportedToNamespaceByGateway map[types.NamespacedName][]config.Config
- // this contains all the virtual services with exportTo "." and current
namespace. The keys are namespace,gateway.
- privateByNamespaceAndGateway map[types.NamespacedName][]config.Config
- // This contains all virtual services whose exportTo is "*", keyed by
gateway
- publicByGateway map[string][]config.Config
- // root vs namespace/name ->delegate vs virtualservice
gvk/namespace/name
- delegates map[ConfigKey][]ConfigKey
-
- // This contains destination hosts of virtual services, keyed by
gateway's namespace/name,
- // only used when PILOT_FILTER_GATEWAY_CLUSTER_CONFIG is enabled
- destinationsByGateway map[string]sets.String
-
- // Map of VS hostname -> referenced hostnames
- referencedDestinations map[string]sets.String
}
type destinationRuleIndex struct {
- namespaceLocal map[string]*consolidatedDestRules
- exportedByNamespace map[string]*consolidatedDestRules
- rootNamespaceLocal *consolidatedDestRules
}
type consolidatedDestRules struct {
@@ -91,17 +81,17 @@ type ConsolidatedDestRule struct {
from []types.NamespacedName
}
-type TriggerReason string
-
type ReasonStats map[TriggerReason]int
type PushRequest struct {
- Reason ReasonStats
- ConfigsUpdated sets.Set[ConfigKey]
- Forced bool
- Full bool
- Push *PushContext
- LastPushContext *PushContext
+ Reason ReasonStats
+ ConfigsUpdated sets.Set[ConfigKey]
+ Forced bool
+ Full bool
+ Push *PushContext
+ LastPushContext *PushContext
+ AddressesUpdated sets.Set[string]
+ Start time.Time
}
func NewPushContext() *PushContext {
@@ -132,8 +122,7 @@ func (pr *PushRequest) CopyMerge(other *PushRequest)
*PushRequest {
return merged
}
-type XDSUpdater interface {
-}
+type XDSUpdater interface{}
func (ps *PushContext) InitContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
ps.initializeMutex.Lock()
@@ -271,3 +260,99 @@ func (ps *PushContext) updateContext(env *Environment,
oldPushContext *PushConte
ps.AuthzPolicies = oldPushContext.AuthzPolicies
}
}
+
+func (pr *PushRequest) Merge(other *PushRequest) *PushRequest {
+ if pr == nil {
+ return other
+ }
+ if other == nil {
+ return pr
+ }
+
+ // Keep the first (older) start time
+
+ // Merge the two reasons. Note that we shouldn't deduplicate here, or
we would under count
+ if len(other.Reason) > 0 {
+ if pr.Reason == nil {
+ pr.Reason = make(map[TriggerReason]int)
+ }
+ pr.Reason.Merge(other.Reason)
+ }
+
+ // If either is full we need a full push
+ pr.Full = pr.Full || other.Full
+
+ // If either is forced we need a forced push
+ pr.Forced = pr.Forced || other.Forced
+
+ // The other push context is presumed to be later and more up to date
+ if other.Push != nil {
+ pr.Push = other.Push
+ }
+
+ if pr.ConfigsUpdated == nil {
+ pr.ConfigsUpdated = other.ConfigsUpdated
+ } else {
+ pr.ConfigsUpdated.Merge(other.ConfigsUpdated)
+ }
+
+ if pr.AddressesUpdated == nil {
+ pr.AddressesUpdated = other.AddressesUpdated
+ } else {
+ pr.AddressesUpdated.Merge(other.AddressesUpdated)
+ }
+
+ return pr
+}
+
+func NewReasonStats(reasons ...TriggerReason) ReasonStats {
+ ret := make(ReasonStats)
+ for _, reason := range reasons {
+ ret.Add(reason)
+ }
+ return ret
+}
+
+func (r ReasonStats) Add(reason TriggerReason) {
+ r[reason]++
+}
+
+func (r ReasonStats) Merge(other ReasonStats) {
+ for reason, count := range other {
+ r[reason] += count
+ }
+}
+
+func (r ReasonStats) Count() int {
+ var ret int
+ for _, count := range r {
+ ret += count
+ }
+ return ret
+}
+
+func (ps *PushContext) GetAllServices() []*Service {
+ return ps.servicesExportedToNamespace(NamespaceAll)
+}
+
+func (ps *PushContext) servicesExportedToNamespace(ns string) []*Service {
+ var out []*Service
+
+ // First add private services and explicitly exportedTo services
+ if ns == NamespaceAll {
+ out = make([]*Service, 0,
len(ps.ServiceIndex.privateByNamespace)+len(ps.ServiceIndex.public))
+ for _, privateServices := range
ps.ServiceIndex.privateByNamespace {
+ out = append(out, privateServices...)
+ }
+ } else {
+ out = make([]*Service, 0,
len(ps.ServiceIndex.privateByNamespace[ns])+
+
len(ps.ServiceIndex.exportedToNamespace[ns])+len(ps.ServiceIndex.public))
+ out = append(out, ps.ServiceIndex.privateByNamespace[ns]...)
+ out = append(out, ps.ServiceIndex.exportedToNamespace[ns]...)
+ }
+
+ // Second add public services
+ out = append(out, ps.ServiceIndex.public...)
+
+ return out
+}
diff --git a/sail/pkg/model/typed_xds_cache.go
b/sail/pkg/model/typed_xds_cache.go
deleted file mode 100644
index 90ee70ca..00000000
--- a/sail/pkg/model/typed_xds_cache.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-type typedXdsCache[K comparable] interface {
-}
-
-type lruCache[K comparable] struct {
-}
-
-var _ typedXdsCache[uint64] = &lruCache[uint64]{}
-
-func newTypedXdsCache[K comparable]() typedXdsCache[K] {
- cache := &lruCache[K]{}
- return cache
-}
-
-type disabledCache[K comparable] struct{}
-
-var _ typedXdsCache[uint64] = &disabledCache[uint64]{}
diff --git a/sail/pkg/model/xds_cache.go b/sail/pkg/model/xds_cache.go
deleted file mode 100644
index 10e53ef6..00000000
--- a/sail/pkg/model/xds_cache.go
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "github.com/apache/dubbo-kubernetes/sail/pkg/features"
-)
-
-type XdsCache interface {
- Run(stop <-chan struct{})
-}
-
-type DisabledCache struct{}
-
-func NewXdsCache() XdsCache {
- cache := XdsCacheImpl{
- eds: newTypedXdsCache[uint64](),
- }
- if features.EnableCDSCaching {
- cache.cds = newTypedXdsCache[uint64]()
- } else {
- cache.cds = disabledCache[uint64]{}
- }
- if features.EnableRDSCaching {
- cache.rds = newTypedXdsCache[uint64]()
- } else {
- cache.rds = disabledCache[uint64]{}
- }
-
- cache.sds = newTypedXdsCache[string]()
-
- return cache
-}
-
-func (x XdsCacheImpl) Run(stop <-chan struct{}) {}
-
-func (d DisabledCache) Run(stop <-chan struct{}) {
-}
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index 5591c2e6..79d5fa9d 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -19,10 +19,13 @@ package xds
import (
"context"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "k8s.io/klog/v2"
"time"
)
@@ -63,6 +66,41 @@ func (s *DiscoveryServer) StreamAggregatedResources(stream
DiscoveryStream) erro
return s.Stream(stream)
}
+func (s *DiscoveryServer) StartPush(req *model.PushRequest) {
+ req.Start = time.Now()
+ for _, p := range s.AllClients() {
+ s.pushQueue.Enqueue(p, req)
+ }
+}
+
+func (s *DiscoveryServer) AllClients() []*Connection {
+ s.adsClientsMutex.RLock()
+ defer s.adsClientsMutex.RUnlock()
+ return maps.Values(s.adsClients)
+}
+
+func (s *DiscoveryServer) AdsPushAll(req *model.PushRequest) {
+ if !req.Full {
+ klog.Infof("XDS: Incremental Pushing ConnectedEndpoints:%d",
s.adsClientCount())
+ } else {
+ totalService := len(req.Push.GetAllServices())
+ klog.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d",
totalService, s.adsClientCount())
+
+ // Make sure the ConfigsUpdated map exists
+ if req.ConfigsUpdated == nil {
+ req.ConfigsUpdated = make(sets.Set[model.ConfigKey])
+ }
+ }
+
+ s.StartPush(req)
+}
+
+func (s *DiscoveryServer) adsClientCount() int {
+ s.adsClientsMutex.RLock()
+ defer s.adsClientsMutex.RUnlock()
+ return len(s.adsClients)
+}
+
func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy,
error) {
return nil, nil
}
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 57d9d4ba..7b00ce4f 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -18,6 +18,7 @@
package xds
import (
+ "fmt"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
@@ -26,6 +27,8 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"
"k8s.io/klog/v2"
+ "strconv"
+ "sync"
"time"
)
@@ -40,7 +43,6 @@ type DiscoveryServer struct {
serverReady atomic.Bool
DiscoveryStartTime time.Time
ClusterAliases map[cluster.ID]cluster.ID
- Cache model.XdsCache
pushQueue *PushQueue
krtDebugger *krt.DebugHandler
InboundUpdates *atomic.Int64
@@ -50,12 +52,13 @@ type DiscoveryServer struct {
pushChannel chan *model.PushRequest
DebounceOptions DebounceOptions
concurrentPushLimit chan struct{}
+ adsClientsMutex sync.RWMutex
+ adsClients map[string]*Connection
}
func NewDiscoveryServer(env *model.Environment, clusterAliases
map[string]string, debugger *krt.DebugHandler) *DiscoveryServer {
out := &DiscoveryServer{
Env: env,
- Cache: env.Cache,
krtDebugger: debugger,
InboundUpdates: atomic.NewInt64(0),
CommittedUpdates: atomic.NewInt64(0),
@@ -75,7 +78,6 @@ func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.handleUpdates(stopCh)
go s.sendPushes(stopCh)
- go s.Cache.Run(stopCh)
}
func (s *DiscoveryServer) CachesSynced() {
@@ -87,7 +89,13 @@ func (s *DiscoveryServer) Shutdown() {
s.pushQueue.ShutDown()
}
-func (s *DiscoveryServer) Push(req *model.PushRequest) {}
+func (s *DiscoveryServer) Push(req *model.PushRequest) {
+ if !req.Full {
+ req.Push = s.globalPushContext()
+ s.AdsPushAll(req)
+ return
+ }
+}
func (s *DiscoveryServer) globalPushContext() *model.PushContext {
return s.Env.PushContext()
@@ -102,6 +110,167 @@ func (s *DiscoveryServer) sendPushes(stopCh <-chan
struct{}) {
}
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts
DebounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64)
{
+ var timeChan <-chan time.Time
+ var startDebounce time.Time
+ var lastConfigUpdateTime time.Time
+
+ pushCounter := 0
+ debouncedEvents := 0
+
+ var req *model.PushRequest
+
+ free := true
+ freeCh := make(chan struct{}, 1)
+
+ push := func(req *model.PushRequest, debouncedEvents int, startDebounce
time.Time) {
+ pushFn(req)
+ updateSent.Add(int64(debouncedEvents))
+ freeCh <- struct{}{}
+ }
+
+ pushWorker := func() {
+ eventDelay := time.Since(startDebounce)
+ quietTime := time.Since(lastConfigUpdateTime)
+ // it has been too long or quiet enough
+ if eventDelay >= opts.debounceMax || quietTime >=
opts.DebounceAfter {
+ if req != nil {
+ pushCounter++
+ if req.ConfigsUpdated == nil {
+ klog.Infof("Push debounce stable[%d] %d
for reason %s: %v since last change, %v since last push, full=%v",
+ pushCounter, debouncedEvents,
reasonsUpdated(req),
+ quietTime, eventDelay, req.Full)
+ } else {
+ klog.Infof("Push debounce stable[%d] %d
for config %s: %v since last change, %v since last push, full=%v",
+ pushCounter, debouncedEvents,
configsUpdated(req),
+ quietTime, eventDelay, req.Full)
+ }
+ free = false
+ go push(req, debouncedEvents, startDebounce)
+ req = nil
+ debouncedEvents = 0
+ }
+ } else {
+ timeChan = time.After(opts.DebounceAfter - quietTime)
+ }
+ }
+
+ for {
+ select {
+ case <-freeCh:
+ free = true
+ pushWorker()
+ case r := <-ch:
+ // If reason is not set, record it as an unknown reason
+ if len(r.Reason) == 0 {
+ r.Reason =
model.NewReasonStats(model.UnknownTrigger)
+ }
+ if !opts.enableEDSDebounce && !r.Full {
+ // trigger push now, just for EDS
+ go func(req *model.PushRequest) {
+ pushFn(req)
+ updateSent.Inc()
+ }(r)
+ continue
+ }
+
+ lastConfigUpdateTime = time.Now()
+ if debouncedEvents == 0 {
+ timeChan = time.After(opts.DebounceAfter)
+ startDebounce = lastConfigUpdateTime
+ }
+ debouncedEvents++
+
+ req = req.Merge(r)
+ case <-timeChan:
+ if free {
+ pushWorker()
+ }
+ case <-stopCh:
+ return
+ }
+ }
}
-func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue
*PushQueue) {}
+func reasonsUpdated(req *model.PushRequest) string {
+ var (
+ reason0, reason1 model.TriggerReason
+ reason0Cnt, reason1Cnt, idx int
+ )
+ for r, cnt := range req.Reason {
+ if idx == 0 {
+ reason0, reason0Cnt = r, cnt
+ } else if idx == 1 {
+ reason1, reason1Cnt = r, cnt
+ } else {
+ break
+ }
+ idx++
+ }
+
+ switch len(req.Reason) {
+ case 0:
+ return "unknown"
+ case 1:
+ return fmt.Sprintf("%s:%d", reason0, reason0Cnt)
+ case 2:
+ return fmt.Sprintf("%s:%d and %s:%d", reason0, reason0Cnt,
reason1, reason1Cnt)
+ default:
+ return fmt.Sprintf("%s:%d and %d(%d) more reasons", reason0,
reason0Cnt, len(req.Reason)-1,
+ req.Reason.Count()-reason0Cnt)
+ }
+}
+
+func configsUpdated(req *model.PushRequest) string {
+ configs := ""
+ for key := range req.ConfigsUpdated {
+ configs += key.String()
+ break
+ }
+ if len(req.ConfigsUpdated) > 1 {
+ more := " and " + strconv.Itoa(len(req.ConfigsUpdated)-1) + "
more configs"
+ configs += more
+ }
+ return configs
+}
+
+func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue
*PushQueue) {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ semaphore <- struct{}{}
+
+ // Get the next proxy to push. This will block if there
are no updates required.
+ client, push, shuttingdown := queue.Dequeue()
+ if shuttingdown {
+ return
+ }
+ doneFunc := func() {
+ queue.MarkDone(client)
+ <-semaphore
+ }
+
+ var closed <-chan struct{}
+ if client.deltaStream != nil {
+ closed = client.deltaStream.Context().Done()
+ } else {
+ closed = client.StreamDone()
+ }
+ go func() {
+ pushEv := &Event{
+ pushRequest: push,
+ done: doneFunc,
+ }
+
+ select {
+ case client.PushCh() <- pushEv:
+ return
+ case <-closed: // grpc stream was closed
+ doneFunc()
+ klog.Infof("Client closed connection
%v", client.ID())
+ }
+ }()
+ }
+ }
+}