This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 49e4c522 [discovery] add aggregate and cert controller logic (#792)
49e4c522 is described below
commit 49e4c52295d2ff16da45214a5fcbb2b8972f2142
Author: Jian Zhong <[email protected]>
AuthorDate: Mon Sep 29 02:33:49 2025 +0800
[discovery] add aggregate and cert controller logic (#792)
---
dubboctl/pkg/validate/validate.go | 2 +-
go.mod | 6 +
go.sum | 19 ++
operator/cmd/cluster/manifest.go | 2 +-
operator/pkg/helm/helm.go | 2 +-
operator/pkg/install/installer.go | 2 +-
pkg/config/constants/constants.go | 10 +
pkg/config/labels/instance.go | 2 +-
pkg/config/model.go | 2 +-
pkg/config/protocol/instance.go | 146 +++++++++
pkg/config/schema/collection/schemas.go | 2 +-
.../schema/gvk/{resources.gen.go => resources.go} | 15 +-
.../schema/gvr/{resource.gen.go => resources.go} | 9 +
pkg/config/schema/kind/kind.go | 3 +
pkg/config/schema/kind/resource.go | 60 ++++
pkg/config/schema/kubetypes/common.go | 2 +-
.../kubetypes/{resources.gen.go => resources.go} | 2 +-
pkg/config/visibility/visibility.go | 28 ++
pkg/kube/kclient/client.go | 2 +-
pkg/kube/krt/collection.go | 2 +-
pkg/kube/krt/fetch.go | 2 +-
pkg/kube/krt/files/files.go | 2 +-
pkg/kube/krt/index.go | 2 +-
pkg/kube/krt/informer.go | 2 +-
pkg/kube/krt/processor.go | 2 +-
pkg/kube/krt/singleton.go | 3 +-
pkg/kube/krt/static.go | 2 +-
pkg/kube/namespace/filter.go | 2 +-
pkg/network/id.go | 14 +
pkg/{util => }/slices/slices.go | 0
pkg/util/sets/set.go | 5 +-
pkg/util/smallset/smallset.go | 2 +-
pkg/xds/server.go | 12 +
sail/pkg/bootstrap/certcontroller.go | 201 ++++++++++++-
sail/pkg/bootstrap/options.go | 10 +
sail/pkg/bootstrap/server.go | 334 ++++++++++++++++++++-
sail/pkg/bootstrap/servicecontroller.go | 2 -
sail/pkg/config/aggregate/config.go | 2 +-
sail/pkg/config/memory/controller.go | 2 +-
sail/pkg/features/sail.go | 7 +
sail/pkg/features/security.go | 5 +
sail/pkg/features/tuning.go | 7 +
sail/pkg/grpc/grpc.go | 26 ++
sail/pkg/model/addressmap.go | 75 +++++
sail/pkg/model/authentication.go | 119 ++++++++
sail/pkg/model/authorization.go | 48 +++
sail/pkg/model/cluster_local.go | 14 +
sail/pkg/model/config.go | 19 ++
sail/pkg/model/context.go | 33 +-
sail/pkg/model/controller.go | 6 +-
sail/pkg/model/network.go | 4 +
sail/pkg/model/push_context.go | 230 +++++++++++++-
sail/pkg/model/service.go | 144 +++++++++
sail/pkg/serviceregistry/aggregate/controller.go | 102 +++++++
.../serviceregistry/kube/controller/controller.go | 94 +++++-
sail/pkg/trustbundle/trustbundle.go | 2 +-
sail/pkg/xds/ads.go | 61 +++-
sail/pkg/xds/auth.go | 9 +
sail/pkg/xds/delta.go | 198 ++++++++++++
sail/pkg/xds/discovery.go | 14 +
sail/pkg/xds/v3/model.go | 15 +-
security/pkg/pki/ra/common.go | 2 +-
62 files changed, 2079 insertions(+), 74 deletions(-)
diff --git a/dubboctl/pkg/validate/validate.go
b/dubboctl/pkg/validate/validate.go
index dbcc4708..debd997c 100644
--- a/dubboctl/pkg/validate/validate.go
+++ b/dubboctl/pkg/validate/validate.go
@@ -25,7 +25,7 @@ import (
operator "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
operatorvalidate
"github.com/apache/dubbo-kubernetes/operator/pkg/apis/validation"
"github.com/apache/dubbo-kubernetes/pkg/config/validation"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"
"io"
diff --git a/go.mod b/go.mod
index 748a3e54..2a974281 100644
--- a/go.mod
+++ b/go.mod
@@ -40,7 +40,9 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.7.0
github.com/google/go-containerregistry v0.20.6
+ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
+ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-multierror v1.1.1
github.com/heroku/color v0.0.6
github.com/moby/term v0.5.2
@@ -112,10 +114,12 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.28.7 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/awslabs/amazon-ecr-credential-helper/ecr-login
v0.0.0-20230522190001-adf1bafd791a // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/buildpacks/imgutil v0.0.0-20230626185301-726f02e4225c //
indirect
github.com/buildpacks/lifecycle v0.17.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chrismellard/docker-credential-acr-env
v0.0.0-20230304212654-82a0ddb27589 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
@@ -203,6 +207,8 @@ require (
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240409071808-615f978279ca
// indirect
github.com/prometheus/client_golang v1.23.0 // indirect
+ github.com/prometheus/client_model v0.6.2 // indirect
+ github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/rivo/tview v0.0.0-20220307222120-9994674d60a8 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
diff --git a/go.sum b/go.sum
index beb2eaa2..bfe17708 100644
--- a/go.sum
+++ b/go.sum
@@ -186,6 +186,7 @@ github.com/chzyer/test v1.0.0/go.mod
h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38
github.com/client9/misspell v0.3.4/go.mod
h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/circl v1.3.7
h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vcU=
github.com/cloudflare/circl v1.3.7/go.mod
h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod
h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443
h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod
h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/containerd/containerd v1.7.27
h1:yFyEyojddO3MIGVER2xJLWoCIn+Up4GaHFquP7hsFII=
@@ -244,7 +245,9 @@ github.com/emicklei/go-restful/v3 v3.12.2
h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf
github.com/emicklei/go-restful/v3 v3.12.2/go.mod
h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emirpasic/gods v1.18.1
h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod
h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane
v0.9.1-0.20191026205805-5f8ba28d4473/go.mod
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod
h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane/envoy
v1.32.5-0.20250627145903-197b96a9c7f8
h1:/F9jLyfDeNr4iZxyibtKlZxCDqCFEhoYiLdc9VOZT2E=
github.com/envoyproxy/go-control-plane/envoy
v1.32.5-0.20250627145903-197b96a9c7f8/go.mod
h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
@@ -287,6 +290,7 @@ github.com/go-git/go-git/v5 v5.13.1
h1:DAQ9APonnlvSWpvolXWIuV6Q6zXy2wHbN4cVlNR5Q
github.com/go-git/go-git/v5 v5.13.1/go.mod
h1:qryJB4cSBoq3FRoBRf5A77joojuBcmPJ0qu3XXXVixc=
github.com/go-jose/go-jose/v4 v4.0.5
h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE=
github.com/go-jose/go-jose/v4 v4.0.5/go.mod
h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA=
+github.com/go-kit/kit v0.9.0/go.mod
h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.4.0/go.mod
h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v0.1.0/go.mod
h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
@@ -301,6 +305,7 @@ github.com/go-openapi/swag v0.23.1
h1:lpsStH0n2ittzTnbaSloVZLuB5+fvSY/+hnagBjSNZ
github.com/go-openapi/swag v0.23.1/go.mod
h1:STZs8TbRvEQQKUA+JZNAm3EWlgaOBGpyFDqQnDHMef0=
github.com/go-sql-driver/mysql v1.7.1
h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod
h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-stack/stack v1.8.0/go.mod
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig/v3 v3.0.0
h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod
h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
@@ -322,6 +327,7 @@ github.com/golang/mock v1.6.0/go.mod
h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+Licev
github.com/golang/protobuf v1.2.0/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod
h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod
h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod
h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod
h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@@ -373,8 +379,12 @@ github.com/gorilla/mux v1.8.1
h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod
h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79
h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod
h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
+github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
+github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod
h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
h1:sGm2vDRFUrQJO/Veii4h4zG2vvqG6uWNkBHSTqXOZk0=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2/go.mod
h1:wd1YpapPLivG6nQgbf7ZkG1hhSOXDhhn4MLTknx2aAc=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod
h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/hashicorp/errwrap v1.0.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0
h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@@ -507,6 +517,7 @@ github.com/opencontainers/runtime-spec v1.2.0
h1:z97+pHb3uELt/yiAWD691HNHQIF07bE
github.com/opencontainers/runtime-spec v1.2.0/go.mod
h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/selinux v1.11.1
h1:nHFvthhM0qY8/m+vfhJylliSshm8G1jJ2jDMcgULaH8=
github.com/opencontainers/selinux v1.11.1/go.mod
h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec=
+github.com/opentracing/opentracing-go v1.1.0/go.mod
h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/ory/viper v1.7.5 h1:+xVdq7SU3e1vNaCsk/ixsfxE4zylk1TJUiJrY647jUE=
github.com/ory/viper v1.7.5/go.mod
h1:ypOuyJmEUb3oENywQZRgeAMwqgOyDqwboO1tj3DjTaM=
github.com/pelletier/go-toml v1.2.0/go.mod
h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -563,6 +574,7 @@ github.com/sergi/go-diff
v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG
github.com/shopspring/decimal v1.4.0
h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod
h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sirupsen/logrus v1.2.0/go.mod
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2/go.mod
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.7.0/go.mod
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0/go.mod
h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sirupsen/logrus v1.9.3
h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
@@ -667,12 +679,15 @@ go.opentelemetry.io/otel/trace v1.37.0
h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mx
go.opentelemetry.io/otel/trace v1.37.0/go.mod
h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
go.starlark.net v0.0.0-20230302034142-4b1e35fe2254
h1:Ss6D3hLXTM0KobyBYEAygXzFfGcjnmfEJOBgSbemCtg=
go.starlark.net v0.0.0-20230302034142-4b1e35fe2254/go.mod
h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds=
+go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod
h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod
h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod
h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod
h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
@@ -733,6 +748,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod
h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -794,6 +810,7 @@ google.golang.org/appengine v1.1.0/go.mod
h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl
google.golang.org/appengine v1.4.0/go.mod
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod
h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod
h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod
h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20240528184218-531527333157
h1:u7WMYrIrVvs0TF5yaKwKNbcJyySYf+HAIFXxWltJOXE=
google.golang.org/genproto v0.0.0-20240528184218-531527333157/go.mod
h1:ubQlAQnzejB8uZzszhrTCU2Fyp6Vi7ZE5nn0c3W8+qQ=
@@ -803,7 +820,9 @@ google.golang.org/genproto/googleapis/rpc
v0.0.0-20250811230008-5f3141c8851a h1:
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250811230008-5f3141c8851a/go.mod
h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo=
google.golang.org/grpc v1.19.0/go.mod
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod
h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod
h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.29.1/go.mod
h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
google.golang.org/grpc v1.74.2/go.mod
h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
diff --git a/operator/cmd/cluster/manifest.go b/operator/cmd/cluster/manifest.go
index 4bd44f00..df5ec605 100644
--- a/operator/cmd/cluster/manifest.go
+++ b/operator/cmd/cluster/manifest.go
@@ -25,7 +25,7 @@ import (
"github.com/apache/dubbo-kubernetes/operator/pkg/render"
"github.com/apache/dubbo-kubernetes/operator/pkg/util/clog"
"github.com/apache/dubbo-kubernetes/pkg/kube"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/spf13/cobra"
"sigs.k8s.io/yaml"
"strings"
diff --git a/operator/pkg/helm/helm.go b/operator/pkg/helm/helm.go
index 60429cbc..d9ab3f9e 100644
--- a/operator/pkg/helm/helm.go
+++ b/operator/pkg/helm/helm.go
@@ -23,7 +23,7 @@ import (
"github.com/apache/dubbo-kubernetes/operator/pkg/manifest"
"github.com/apache/dubbo-kubernetes/operator/pkg/util"
"github.com/apache/dubbo-kubernetes/operator/pkg/values"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
diff --git a/operator/pkg/install/installer.go
b/operator/pkg/install/installer.go
index df6450fe..9b74c0e9 100644
--- a/operator/pkg/install/installer.go
+++ b/operator/pkg/install/installer.go
@@ -29,8 +29,8 @@ import (
"github.com/apache/dubbo-kubernetes/operator/pkg/util/progress"
"github.com/apache/dubbo-kubernetes/operator/pkg/values"
"github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/hashicorp/go-multierror"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
diff --git a/pkg/config/constants/constants.go
b/pkg/config/constants/constants.go
index e9bf56fd..d558fafa 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -36,4 +36,14 @@ const (
CertProviderCustom = "custom"
ThirdPartyJwtPath = "./var/run/secrets/tokens/dubbo-token"
+
+ CertProviderKubernetes = "kubernetes"
+
+ SailWellKnownDNSCertPath = "./var/run/secrets/dubbod/tls/"
+ SailWellKnownDNSCaCertPath = "./var/run/secrets/dubbod/ca/"
+
+ DefaultSailTLSCert = SailWellKnownDNSCertPath + "tls.crt"
+ DefaultSailTLSKey = SailWellKnownDNSCertPath + "tls.key"
+ DefaultSailTLSCaCert = SailWellKnownDNSCaCertPath +
"root-cert.pem"
+ DefaultSailTLSCaCertAlternatePath = SailWellKnownDNSCertPath + "ca.crt"
)
diff --git a/pkg/config/labels/instance.go b/pkg/config/labels/instance.go
index 845ebbe9..6b8fdec8 100644
--- a/pkg/config/labels/instance.go
+++ b/pkg/config/labels/instance.go
@@ -19,13 +19,13 @@ package labels
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"regexp"
"strings"
"github.com/hashicorp/go-multierror"
"github.com/apache/dubbo-kubernetes/pkg/maps"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
)
const (
diff --git a/pkg/config/model.go b/pkg/config/model.go
index 76e8265a..204feb77 100644
--- a/pkg/config/model.go
+++ b/pkg/config/model.go
@@ -21,6 +21,7 @@ import (
"bytes"
"encoding/json"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"reflect"
"time"
@@ -42,7 +43,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/util/gogoprotomarshal"
"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/util/protoconv"
"istio.io/api/label"
)
diff --git a/pkg/config/protocol/instance.go b/pkg/config/protocol/instance.go
new file mode 100644
index 00000000..3748f919
--- /dev/null
+++ b/pkg/config/protocol/instance.go
@@ -0,0 +1,146 @@
+package protocol
+
+import "strings"
+
+// Instance defines network protocols for ports
+type Instance string
+
+func (i Instance) String() string {
+ return string(i)
+}
+
+const (
+ // GRPC declares that the port carries gRPC traffic.
+ GRPC Instance = "GRPC"
+ // GRPCWeb declares that the port carries gRPC traffic.
+ GRPCWeb Instance = "GRPC-Web"
+ // HTTP declares that the port carries HTTP/1.1 traffic.
+ // Note that HTTP/1.0 or earlier may not be supported by the proxy.
+ HTTP Instance = "HTTP"
+ // HTTP_PROXY declares that the port is a generic outbound proxy port.
+ // Note that this is currently applicable only for defining sidecar
egress listeners.
+ // nolint
+ HTTP_PROXY Instance = "HTTP_PROXY"
+ // HTTP2 declares that the port carries HTTP/2 traffic.
+ HTTP2 Instance = "HTTP2"
+ // HTTPS declares that the port carries HTTPS traffic.
+ HTTPS Instance = "HTTPS"
+ // TCP declares the port uses TCP.
+ // This is the default protocol for a service port.
+ TCP Instance = "TCP"
+ // TLS declares that the port carries TLS traffic.
+ // TLS traffic is assumed to contain SNI as part of the handshake.
+ TLS Instance = "TLS"
+ // UDP declares that the port uses UDP.
+ // Note that UDP protocol is not currently supported by the proxy.
+ UDP Instance = "UDP"
+ // Unsupported - value to signify that the protocol is unsupported.
+ Unsupported Instance = "UnsupportedProtocol"
+)
+
+// Parse from string ignoring case
+func Parse(s string) Instance {
+ switch strings.ToLower(s) {
+ case "tcp":
+ return TCP
+ case "udp":
+ return UDP
+ case "grpc":
+ return GRPC
+ case "grpc-web":
+ return GRPCWeb
+ case "http":
+ return HTTP
+ case "http_proxy":
+ return HTTP_PROXY
+ case "http2":
+ return HTTP2
+ case "https":
+ return HTTPS
+ case "tls":
+ return TLS
+ }
+
+ return Unsupported
+}
+
+// IsHTTP2 is true for protocols that use HTTP/2 as transport protocol
+func (i Instance) IsHTTP2() bool {
+ switch i {
+ case HTTP2, GRPC, GRPCWeb:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsHTTPOrSniffed is true for protocols that use HTTP as transport protocol,
or *can* use it if sniffed to be HTTP
+func (i Instance) IsHTTPOrSniffed() bool {
+ return i.IsHTTP() || i.IsUnsupported()
+}
+
+// IsHTTP is true for protocols that use HTTP as transport protocol
+func (i Instance) IsHTTP() bool {
+ switch i {
+ case HTTP, HTTP2, HTTP_PROXY, GRPC, GRPCWeb:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsTCP is true for protocols that use TCP as transport protocol
+func (i Instance) IsTCP() bool {
+ switch i {
+ case TCP, HTTPS, TLS:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsTLS is true for protocols on top of TLS (e.g. HTTPS)
+func (i Instance) IsTLS() bool {
+ switch i {
+ case HTTPS, TLS:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsHTTPS is true if protocol is HTTPS
+func (i Instance) IsHTTPS() bool {
+ switch i {
+ case HTTPS:
+ return true
+ default:
+ return false
+ }
+}
+
+// IsGRPC is true for GRPC protocols.
+func (i Instance) IsGRPC() bool {
+ switch i {
+ case GRPC, GRPCWeb:
+ return true
+ default:
+ return false
+ }
+}
+
+func (i Instance) IsUnsupported() bool {
+ return i == Unsupported
+}
+
+// AfterTLSTermination returns the protocol that will be used if TLS is
terminated on the current protocol.
+func (i Instance) AfterTLSTermination() Instance {
+ switch i {
+ case HTTPS:
+ return HTTP
+ case TLS:
+ return TCP
+ default:
+ return i
+ }
+}
diff --git a/pkg/config/schema/collection/schemas.go
b/pkg/config/schema/collection/schemas.go
index bbf9e618..d5cadbf2 100644
--- a/pkg/config/schema/collection/schemas.go
+++ b/pkg/config/schema/collection/schemas.go
@@ -2,6 +2,7 @@ package collection
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-multierror"
@@ -10,7 +11,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
)
// Schemas contains metadata about configuration resources.
diff --git a/pkg/config/schema/gvk/resources.gen.go
b/pkg/config/schema/gvk/resources.go
similarity index 83%
rename from pkg/config/schema/gvk/resources.gen.go
rename to pkg/config/schema/gvk/resources.go
index 08d09b97..1e5d9e43 100644
--- a/pkg/config/schema/gvk/resources.gen.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -18,7 +18,7 @@
package gvk
import (
- "github.com/apache/dubbo-kubernetes/operator/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
"k8s.io/apimachinery/pkg/runtime/schema"
)
@@ -37,6 +37,9 @@ var (
Service = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "Service"}
ServiceAccount = config.GroupVersionKind{Group: "",
Version: "v1", Kind: "ServiceAccount"}
MeshConfig = config.GroupVersionKind{Group: "",
Version: "v1alpha1", Kind: "MeshConfig"}
+ RequestAuthentication = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "RequestAuthentication"}
+ PeerAuthentication = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "PeerAuthentication"}
+ AuthorizationPolicy = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "AuthorizationPolicy"}
)
func ToGVR(g config.GroupVersionKind) (schema.GroupVersionResource, bool) {
@@ -67,6 +70,12 @@ func ToGVR(g config.GroupVersionKind)
(schema.GroupVersionResource, bool) {
return gvr.Job, true
case MeshConfig:
return gvr.MeshConfig, true
+ case RequestAuthentication:
+ return gvr.RequestAuthentication, true
+ case PeerAuthentication:
+ return gvr.PeerAuthentication, true
+ case AuthorizationPolicy:
+ return gvr.AuthorizationPolicy, true
}
return schema.GroupVersionResource{}, false
}
@@ -92,6 +101,10 @@ func FromGVR(g schema.GroupVersionResource)
(config.GroupVersionKind, bool) {
return DaemonSet, true
case gvr.Job:
return Job, true
+ case gvr.PeerAuthentication:
+ return PeerAuthentication, true
+ case gvr.RequestAuthentication:
+ return RequestAuthentication, true
}
return config.GroupVersionKind{}, false
}
diff --git a/pkg/config/schema/gvr/resource.gen.go
b/pkg/config/schema/gvr/resources.go
similarity index 83%
rename from pkg/config/schema/gvr/resource.gen.go
rename to pkg/config/schema/gvr/resources.go
index 1fd1e2e0..a257a6a6 100644
--- a/pkg/config/schema/gvr/resource.gen.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -35,6 +35,9 @@ var (
Service = schema.GroupVersionResource{Group: "",
Version: "v1", Resource: "services"}
ServiceAccount = schema.GroupVersionResource{Group: "",
Version: "v1", Resource: "serviceaccounts"}
MeshConfig = schema.GroupVersionResource{Group: "",
Version: "v1alpha1", Resource: "meshconfigs"}
+ RequestAuthentication = schema.GroupVersionResource{Group:
"security.dubbo.io", Version: "v1", Resource: "requestauthentications"}
+ PeerAuthentication = schema.GroupVersionResource{Group:
"security.dubbo.io", Version: "v1", Resource: "peerauthentications"}
+ AuthorizationPolicy = schema.GroupVersionResource{Group:
"security.dubbo.io", Version: "v1", Resource: "authorizationpolicies"}
)
func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -55,6 +58,12 @@ func IsClusterScoped(g schema.GroupVersionResource) bool {
return false
case ServiceAccount:
return false
+ case RequestAuthentication:
+ return false
+ case PeerAuthentication:
+ return false
+ case AuthorizationPolicy:
+ return false
}
return false
}
diff --git a/pkg/config/schema/kind/kind.go b/pkg/config/schema/kind/kind.go
new file mode 100644
index 00000000..6c29e6ef
--- /dev/null
+++ b/pkg/config/schema/kind/kind.go
@@ -0,0 +1,3 @@
+package kind
+
+type Kind uint8
diff --git a/pkg/config/schema/kind/resource.go
b/pkg/config/schema/kind/resource.go
new file mode 100644
index 00000000..fdbb42ac
--- /dev/null
+++ b/pkg/config/schema/kind/resource.go
@@ -0,0 +1,60 @@
+package kind
+
+const (
+ Unknown Kind = iota
+ AuthorizationPolicy
+ CustomResourceDefinition
+ DestinationRule
+ MeshConfig
+ MeshNetworks
+ MutatingWebhookConfiguration
+ Namespace
+ PeerAuthentication
+ Pod
+ RequestAuthentication
+ Secret
+ Service
+ ServiceAccount
+ StatefulSet
+ ValidatingWebhookConfiguration
+ VirtualService
+)
+
+func (k Kind) String() string {
+ switch k {
+ case AuthorizationPolicy:
+ return "AuthorizationPolicy"
+ case CustomResourceDefinition:
+ return "CustomResourceDefinition"
+ case DestinationRule:
+ return "DestinationRule"
+ case MeshConfig:
+ return "MeshConfig"
+ case MeshNetworks:
+ return "MeshNetworks"
+ case MutatingWebhookConfiguration:
+ return "MutatingWebhookConfiguration"
+ case Namespace:
+ return "Namespace"
+ case PeerAuthentication:
+ return "PeerAuthentication"
+ case Pod:
+ return "Pod"
+ case RequestAuthentication:
+ return "RequestAuthentication"
+ case Secret:
+ return "Secret"
+ case Service:
+ return "Service"
+ case ServiceAccount:
+ return "ServiceAccount"
+ case StatefulSet:
+ return "StatefulSet"
+ case ValidatingWebhookConfiguration:
+ return "ValidatingWebhookConfiguration"
+ case VirtualService:
+ return "VirtualService"
+ default:
+ return "Unknown"
+ }
+}
diff --git a/pkg/config/schema/kubetypes/common.go
b/pkg/config/schema/kubetypes/common.go
index e091b711..bff043c7 100644
--- a/pkg/config/schema/kubetypes/common.go
+++ b/pkg/config/schema/kubetypes/common.go
@@ -18,7 +18,7 @@
package kubetypes
import (
- "github.com/apache/dubbo-kubernetes/operator/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/typemap"
diff --git a/pkg/config/schema/kubetypes/resources.gen.go
b/pkg/config/schema/kubetypes/resources.go
similarity index 95%
rename from pkg/config/schema/kubetypes/resources.gen.go
rename to pkg/config/schema/kubetypes/resources.go
index 0eb976bf..7090e191 100644
--- a/pkg/config/schema/kubetypes/resources.gen.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -18,7 +18,7 @@
package kubetypes
import (
- "github.com/apache/dubbo-kubernetes/operator/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
istioioapimeshv1alpha1 "istio.io/api/mesh/v1alpha1"
k8sioapicorev1 "k8s.io/api/core/v1"
diff --git a/pkg/config/visibility/visibility.go
b/pkg/config/visibility/visibility.go
new file mode 100644
index 00000000..49fe4f6d
--- /dev/null
+++ b/pkg/config/visibility/visibility.go
@@ -0,0 +1,28 @@
+package visibility
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+)
+
+type Instance string
+
+const (
+ Private Instance = "."
+ Public Instance = "*"
+ None Instance = "~"
+)
+
+func (v Instance) Validate() (errs error) {
+ switch v {
+ case Private, Public:
+ return nil
+ case None:
+ return fmt.Errorf("exportTo ~ (none) is not allowed for Istio
configuration objects")
+ default:
+ if !labels.IsDNS1123Label(string(v)) {
+ return fmt.Errorf("only .,*, or a valid DNS 1123 label
is allowed as exportTo entry")
+ }
+ }
+ return nil
+}
diff --git a/pkg/kube/kclient/client.go b/pkg/kube/kclient/client.go
index 8ee75959..2a340d73 100644
--- a/pkg/kube/kclient/client.go
+++ b/pkg/kube/kclient/client.go
@@ -28,8 +28,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
diff --git a/pkg/kube/krt/collection.go b/pkg/kube/krt/collection.go
index de44a063..be30cceb 100644
--- a/pkg/kube/krt/collection.go
+++ b/pkg/kube/krt/collection.go
@@ -23,8 +23,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/queue"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"k8s.io/klog/v2"
"sync"
)
diff --git a/pkg/kube/krt/fetch.go b/pkg/kube/krt/fetch.go
index 933c9919..b2eaf6b6 100644
--- a/pkg/kube/krt/fetch.go
+++ b/pkg/kube/krt/fetch.go
@@ -18,7 +18,7 @@
package krt
import (
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
)
func FetchOne[T any](ctx HandlerContext, c Collection[T], opts ...FetchOption)
*T {
diff --git a/pkg/kube/krt/files/files.go b/pkg/kube/krt/files/files.go
index 9f4e6d3e..743b8ce5 100644
--- a/pkg/kube/krt/files/files.go
+++ b/pkg/kube/krt/files/files.go
@@ -21,8 +21,8 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/fsnotify/fsnotify"
"go.uber.org/atomic"
"k8s.io/klog/v2"
diff --git a/pkg/kube/krt/index.go b/pkg/kube/krt/index.go
index e94b355c..96788c0e 100644
--- a/pkg/kube/krt/index.go
+++ b/pkg/kube/krt/index.go
@@ -21,8 +21,8 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"k8s.io/client-go/tools/cache"
)
diff --git a/pkg/kube/krt/informer.go b/pkg/kube/krt/informer.go
index 2dd05260..f7d2647b 100644
--- a/pkg/kube/krt/informer.go
+++ b/pkg/kube/krt/informer.go
@@ -23,7 +23,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
diff --git a/pkg/kube/krt/processor.go b/pkg/kube/krt/processor.go
index 636443f6..e4d0faca 100644
--- a/pkg/kube/krt/processor.go
+++ b/pkg/kube/krt/processor.go
@@ -18,7 +18,7 @@
package krt
import (
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"sync"
"sync/atomic"
diff --git a/pkg/kube/krt/singleton.go b/pkg/kube/krt/singleton.go
index 758ef3f1..05186de1 100644
--- a/pkg/kube/krt/singleton.go
+++ b/pkg/kube/krt/singleton.go
@@ -21,8 +21,7 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
-
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"sync/atomic"
)
diff --git a/pkg/kube/krt/static.go b/pkg/kube/krt/static.go
index a0b386be..8698c852 100644
--- a/pkg/kube/krt/static.go
+++ b/pkg/kube/krt/static.go
@@ -5,8 +5,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"sync"
)
diff --git a/pkg/kube/namespace/filter.go b/pkg/kube/namespace/filter.go
index e9baa49b..7c7e6236 100644
--- a/pkg/kube/namespace/filter.go
+++ b/pkg/kube/namespace/filter.go
@@ -19,6 +19,7 @@ package namespace
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"k8s.io/klog/v2"
"sync"
@@ -33,7 +34,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
meshapi "istio.io/api/mesh/v1alpha1"
)
diff --git a/pkg/network/id.go b/pkg/network/id.go
new file mode 100644
index 00000000..23291ab3
--- /dev/null
+++ b/pkg/network/id.go
@@ -0,0 +1,14 @@
+package network
+
+import "github.com/apache/dubbo-kubernetes/pkg/util/identifier"
+
+// ID is the unique identifier for a network.
+type ID string
+
+func (id ID) Equals(other ID) bool {
+ return identifier.IsSameOrEmpty(string(id), string(other))
+}
+
+func (id ID) String() string {
+ return string(id)
+}
diff --git a/pkg/util/slices/slices.go b/pkg/slices/slices.go
similarity index 100%
rename from pkg/util/slices/slices.go
rename to pkg/slices/slices.go
diff --git a/pkg/util/sets/set.go b/pkg/util/sets/set.go
index ca63142f..d9b0ddd5 100644
--- a/pkg/util/sets/set.go
+++ b/pkg/util/sets/set.go
@@ -19,16 +19,13 @@ package sets
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
)
import (
"golang.org/x/exp/constraints"
)
-import (
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
-)
-
type Set[T comparable] map[T]struct{}
type String = Set[string]
diff --git a/pkg/util/smallset/smallset.go b/pkg/util/smallset/smallset.go
index 9e84cdca..753762e2 100644
--- a/pkg/util/smallset/smallset.go
+++ b/pkg/util/smallset/smallset.go
@@ -20,7 +20,7 @@ package smallset
import (
"cmp"
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
)
// Set is an immutable set optimized for *small number of items*. For general
purposes, Sets is likely better
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 183f05a8..814630db 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -360,3 +360,15 @@ func (conn *Connection) PushCh() chan any {
func (conn *Connection) StreamDone() <-chan struct{} {
return conn.stream.Context().Done()
}
+
+func (conn *Connection) InitializedCh() chan struct{} {
+ return conn.initialized
+}
+
+func (conn *Connection) ErrorCh() chan error {
+ return conn.errorChan
+}
+
+func (conn *Connection) StopCh() chan struct{} {
+ return conn.stop
+}
diff --git a/sail/pkg/bootstrap/certcontroller.go
b/sail/pkg/bootstrap/certcontroller.go
index b0d28690..0e61cf31 100644
--- a/sail/pkg/bootstrap/certcontroller.go
+++ b/sail/pkg/bootstrap/certcontroller.go
@@ -18,13 +18,26 @@
package bootstrap
import (
+ "bytes"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/sleep"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
+ "github.com/apache/dubbo-kubernetes/security/pkg/k8s/chiron"
+ "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
+ certutil "github.com/apache/dubbo-kubernetes/security/pkg/util"
"k8s.io/klog/v2"
+ "os"
+ "path"
+ "strings"
+ "time"
)
const (
- defaultCACertPath =
"./var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
+ defaultCertGracePeriodRatio = 0.5
+ rootCertPollingInterval = 60 * time.Second
+ defaultCACertPath =
"./var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
func (s *Server) updateRootCertAndGenKeyCert() error {
@@ -51,3 +64,189 @@ func (s *Server) updateRootCertAndGenKeyCert() error {
s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
return nil
}
+
+func (s *Server) initFileCertificateWatches(tlsOptions TLSOptions) error {
+ if err :=
s.dubbodCertBundleWatcher.SetFromFilesAndNotify(tlsOptions.KeyFile,
tlsOptions.CertFile, tlsOptions.CaCertFile); err != nil {
+ return fmt.Errorf("set keyCertBundle failed: %v", err)
+ }
+ // TODO: Setup watcher for root and restart server if it changes.
+ for _, file := range []string{tlsOptions.CertFile, tlsOptions.KeyFile} {
+ klog.Infof("adding watcher for certificate %s", file)
+ if err := s.fileWatcher.Add(file); err != nil {
+ return fmt.Errorf("could not watch %v: %v", file, err)
+ }
+ }
+ s.addStartFunc("certificate rotation", func(stop <-chan struct{}) error
{
+ go func() {
+ var keyCertTimerC <-chan time.Time
+ for {
+ select {
+ case <-keyCertTimerC:
+ keyCertTimerC = nil
+ if err :=
s.dubbodCertBundleWatcher.SetFromFilesAndNotify(tlsOptions.KeyFile,
tlsOptions.CertFile, tlsOptions.CaCertFile); err != nil {
+ klog.Errorf("Setting
keyCertBundle failed: %v", err)
+ }
+ case
<-s.fileWatcher.Events(tlsOptions.CertFile):
+ if keyCertTimerC == nil {
+ keyCertTimerC =
time.After(watchDebounceDelay)
+ }
+ case <-s.fileWatcher.Events(tlsOptions.KeyFile):
+ if keyCertTimerC == nil {
+ keyCertTimerC =
time.After(watchDebounceDelay)
+ }
+ case err :=
<-s.fileWatcher.Errors(tlsOptions.CertFile):
+ klog.Errorf("error watching %v: %v",
tlsOptions.CertFile, err)
+ case err :=
<-s.fileWatcher.Errors(tlsOptions.KeyFile):
+ klog.Errorf("error watching %v: %v",
tlsOptions.KeyFile, err)
+ case <-stop:
+ return
+ }
+ }
+ }()
+ return nil
+ })
+ return nil
+}
+
+func (s *Server) RotateDNSCertForK8sCA(stop <-chan struct{},
+ defaultCACertPath string,
+ signerName string,
+ approveCsr bool,
+ requestedLifetime time.Duration,
+) {
+ certUtil := certutil.NewCertUtil(int(defaultCertGracePeriodRatio * 100))
+ for {
+ waitTime, _ :=
certUtil.GetWaitTime(s.dubbodCertBundleWatcher.GetKeyCertBundle().CertPem,
time.Now())
+ if !sleep.Until(stop, waitTime) {
+ return
+ }
+ certChain, keyPEM, _, err :=
chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+ strings.Join(s.dnsNames, ","), defaultCACertPath,
signerName, approveCsr, requestedLifetime)
+ if err != nil {
+ klog.Errorf("failed regenerating key and cert for
dubbod by kubernetes: %v", err)
+ continue
+ }
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain,
s.dubbodCertBundleWatcher.GetCABundle())
+ }
+}
+func (s *Server) initDNSCertsK8SRA() error {
+ var certChain, keyPEM, caBundle []byte
+ var err error
+ pilotCertProviderName := features.SailCertProvider
+
+ signerName := strings.TrimPrefix(pilotCertProviderName,
constants.CertProviderKubernetesSignerPrefix)
+ klog.Infof("Generating K8S-signed cert for %v using signer %v",
s.dnsNames, signerName)
+ certChain, keyPEM, _, err = chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+ strings.Join(s.dnsNames, ","), "", signerName, true,
SelfSignedCACertTTL.Get())
+ if err != nil {
+ return fmt.Errorf("failed generating key and cert by
kubernetes: %v", err)
+ }
+ caBundle, err = s.RA.GetRootCertFromMeshConfig(signerName)
+ if err != nil {
+ return err
+ }
+
+ // MeshConfig:Add callback for mesh config update
+ s.environment.AddMeshHandler(func() {
+ newCaBundle, _ := s.RA.GetRootCertFromMeshConfig(signerName)
+ if newCaBundle != nil && !bytes.Equal(newCaBundle,
s.dubbodCertBundleWatcher.GetKeyCertBundle().CABundle) {
+ newCertChain, newKeyPEM, _, err :=
chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
+ strings.Join(s.dnsNames, ","), "", signerName,
true, SelfSignedCACertTTL.Get())
+ if err != nil {
+ klog.Errorf("failed regenerating key and cert
for istiod by kubernetes: %v", err)
+ }
+ s.dubbodCertBundleWatcher.SetAndNotify(newKeyPEM,
newCertChain, newCaBundle)
+ }
+ })
+
+ s.addStartFunc("istiod server certificate rotation", func(stop <-chan
struct{}) error {
+ go func() {
+ // Track TTL of DNS cert and renew cert in accordance
to grace period.
+ s.RotateDNSCertForK8sCA(stop, "", signerName, true,
SelfSignedCACertTTL.Get())
+ }()
+ return nil
+ })
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+ return nil
+}
+
+func (s *Server) initDNSCertsDubbod() error {
+ var certChain, keyPEM, caBundle []byte
+ var err error
+ // Generate certificates for Dubbod DNS names, signed by Dubbod CA
+ certChain, keyPEM, err = s.CA.GenKeyCert(s.dnsNames,
SelfSignedCACertTTL.Get(), false)
+ if err != nil {
+ return fmt.Errorf("failed generating dubbod key cert %v", err)
+ }
+ klog.Infof("Generating dubbod-signed cert for %v:\n %s", s.dnsNames,
certChain)
+
+ fileBundle, err := detectSigningCABundleAndCRL()
+ if err != nil {
+ return fmt.Errorf("unable to determine signing file format %v",
err)
+ }
+
+ dubboGenerated, detectedSigningCABundle := false, false
+ if _, err := os.Stat(fileBundle.SigningKeyFile); err == nil {
+ detectedSigningCABundle = true
+ if _, err := os.Stat(path.Join(LocalCertDir.Get(),
ca.DubboGenerated)); err == nil {
+ dubboGenerated = true
+ }
+ }
+
+ // check if signing key file exists the cert dir and if the
dubbo-generated file
+ // exists (only if USE_CACERTS_FOR_SELF_SIGNED_CA is enabled)
+ if !detectedSigningCABundle {
+ klog.Infof("Use roots from dubbo-ca-secret")
+
+ caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ s.addStartFunc("dubbod server certificate rotation", func(stop
<-chan struct{}) error {
+ go func() {
+ // regenerate dubbod key cert when root cert
changes.
+ s.watchRootCertAndGenKeyCert(stop)
+ }()
+ return nil
+ })
+ } else if features.UseCacertsForSelfSignedCA && dubboGenerated {
+ klog.Infof("Use roots from %v and watch",
fileBundle.RootCertFile)
+
+ caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ // Similar code to dubbo-ca-secret: refresh the root cert, but
in casecrets
+ s.addStartFunc("dubbod server certificate rotation", func(stop
<-chan struct{}) error {
+ go func() {
+ // regenerate dubbod key cert when root cert
changes.
+ s.watchRootCertAndGenKeyCert(stop)
+ }()
+ return nil
+ })
+
+ } else {
+ klog.Infof("Use root cert from %v", fileBundle.RootCertFile)
+
+ caBundle, err = os.ReadFile(fileBundle.RootCertFile)
+ if err != nil {
+ return fmt.Errorf("failed reading %s: %v",
fileBundle.RootCertFile, err)
+ }
+ }
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain, caBundle)
+ return nil
+}
+
+func (s *Server) watchRootCertAndGenKeyCert(stop <-chan struct{}) {
+ caBundle := s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ for {
+ if !sleep.Until(stop, rootCertPollingInterval) {
+ return
+ }
+ newRootCert := s.CA.GetCAKeyCertBundle().GetRootCertPem()
+ if !bytes.Equal(caBundle, newRootCert) {
+ caBundle = newRootCert
+ certChain, keyPEM, err := s.CA.GenKeyCert(s.dnsNames,
SelfSignedCACertTTL.Get(), false)
+ if err != nil {
+ klog.Errorf("failed generating dubbod key cert
%v", err)
+ } else {
+ s.dubbodCertBundleWatcher.SetAndNotify(keyPEM,
certChain, caBundle)
+ klog.Infof("regenerated dubbod dns cert: %s",
certChain)
+ }
+ }
+ }
+}
diff --git a/sail/pkg/bootstrap/options.go b/sail/pkg/bootstrap/options.go
index 83c9baaa..a45e9b11 100644
--- a/sail/pkg/bootstrap/options.go
+++ b/sail/pkg/bootstrap/options.go
@@ -56,6 +56,16 @@ type DiscoveryServerOptions struct {
HTTPSAddr string
GRPCAddr string
SecureGRPCAddr string
+ TLSOptions TLSOptions
+}
+
+type TLSOptions struct {
+ // CaCertFile and related are set using CLI flags.
+ CaCertFile string
+ CertFile string
+ KeyFile string
+ TLSCipherSuites []string
+ CipherSuits []uint16 // This is the parsed cipher suites
}
func NewSailArgs(initFuncs ...func(*SailArgs)) *SailArgs {
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index f95cc434..89e49947 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -19,6 +19,8 @@ package bootstrap
import (
"context"
+ "crypto/tls"
+ "crypto/x509"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
@@ -29,19 +31,27 @@ import (
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
+ sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/network"
+ "github.com/apache/dubbo-kubernetes/pkg/spiffe"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
"github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/server"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/xds"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ra"
"github.com/fsnotify/fsnotify"
+ grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"golang.org/x/net/http2"
"google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/reflection"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/klog"
@@ -49,9 +59,15 @@ import (
"net/http"
"os"
"strings"
+ "sync"
"time"
)
+const (
+ // debounce file watcher events to minimize noise in logs
+ watchDebounceDelay = 100 * time.Millisecond
+)
+
type Server struct {
XDSServer *xds.DiscoveryServer
clusterID cluster.ID
@@ -84,18 +100,30 @@ type Server struct {
CA *ca.DubboCA
dnsNames []string
+ certMu sync.RWMutex
+ dubbodCert *tls.Certificate
+
dubbodCertBundleWatcher *keycertbundle.Watcher
}
func NewServer(args *SailArgs, initFuncs ...func(*Server)) (*Server, error) {
e := model.NewEnvironment()
+ e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffix
+
+ ac := aggregate.NewController(aggregate.Options{
+ MeshHolder: e,
+ ConfigClusterID: getClusterID(args),
+ })
+ e.ServiceDiscovery = ac
+
s := &Server{
- environment: e,
- server: server.New(),
- clusterID: getClusterID(args),
- httpMux: http.NewServeMux(),
- fileWatcher: filewatcher.NewWatcher(),
- internalStop: make(chan struct{}),
+ environment: e,
+ server: server.New(),
+ clusterID: getClusterID(args),
+ httpMux: http.NewServeMux(),
+ dubbodCertBundleWatcher: keycertbundle.NewWatcher(),
+ fileWatcher: filewatcher.NewWatcher(),
+ internalStop: make(chan struct{}),
}
for _, fn := range initFuncs {
fn(s)
@@ -157,6 +185,20 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
return nil, err
}
+ dubbodHost, _, err := e.GetDiscoveryAddress()
+ if err != nil {
+ return nil, err
+ }
+
+ if err := s.initDubbodCerts(args, string(dubbodHost)); err != nil {
+ return nil, err
+ }
+
+ // Secure gRPC Server must be initialized after CA is created as may
use a Aegis generated cert.
+ if err := s.initSecureDiscoveryService(args,
s.environment.Mesh().GetTrustDomain()); err != nil {
+ return nil, fmt.Errorf("error initializing secure gRPC
Listener: %v", err)
+ }
+
return s, nil
}
@@ -298,14 +340,21 @@ func (s *Server) initServers(args *SailArgs) {
}
func (s *Server) initGrpcServer(options *dubbokeepalive.Options) {
+ interceptors := []grpc.UnaryServerInterceptor{
+ // setup server prometheus monitoring (as final interceptor in
chain)
+ grpcprom.UnaryServerInterceptor,
+ }
+ grpcOptions := dubbogrpc.ServerOptions(options, interceptors...)
+ s.grpcServer = grpc.NewServer(grpcOptions...)
+ s.XDSServer.Register(s.grpcServer)
+ reflection.Register(s.grpcServer)
}
-// initControllers initializes the controllers.
func (s *Server) initControllers(args *SailArgs) error {
klog.Info("initializing controllers")
// TODO initMulticluster
- // TODO initSDSServer
+ s.initSDSServer()
if err := s.initConfigController(args); err != nil {
return fmt.Errorf("error initializing config controller: %v",
err)
@@ -316,6 +365,114 @@ func (s *Server) initControllers(args *SailArgs) error {
return nil
}
+func (s *Server) initSecureDiscoveryService(args *SailArgs, trustDomain
string) error {
+ if args.ServerOptions.SecureGRPCAddr == "" {
+ klog.Info("The secure discovery port is disabled, multiplexing
on httpAddr ")
+ return nil
+ }
+
+ peerCertVerifier, err :=
s.createPeerCertVerifier(args.ServerOptions.TLSOptions, trustDomain)
+ if err != nil {
+ return err
+ }
+ if peerCertVerifier == nil {
+ // Running locally without configured certs - no TLS mode
+ klog.Warningf("The secure discovery service is disabled")
+ return nil
+ }
+ klog.Info("initializing secure discovery service")
+
+ cfg := &tls.Config{
+ GetCertificate: s.getDubbodCertificate,
+ ClientAuth: tls.VerifyClientCertIfGiven,
+ ClientCAs: peerCertVerifier.GetGeneralCertPool(),
+ VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains
[][]*x509.Certificate) error {
+ err := peerCertVerifier.VerifyPeerCert(rawCerts,
verifiedChains)
+ if err != nil {
+ klog.Infof("Could not verify certificate: %v",
err)
+ }
+ return err
+ },
+ MinVersion: tls.VersionTLS12,
+ CipherSuites: args.ServerOptions.TLSOptions.CipherSuits,
+ }
+ // Compliance for xDS server TLS.
+ sec_model.EnforceGoCompliance(cfg)
+
+ tlsCreds := credentials.NewTLS(cfg)
+
+ s.secureGrpcAddress = args.ServerOptions.SecureGRPCAddr
+
+ interceptors := []grpc.UnaryServerInterceptor{
+ // setup server prometheus monitoring (as final interceptor in
chain)
+ grpcprom.UnaryServerInterceptor,
+ }
+ opts := dubbogrpc.ServerOptions(args.KeepaliveOptions, interceptors...)
+ opts = append(opts, grpc.Creds(tlsCreds))
+
+ s.secureGrpcServer = grpc.NewServer(opts...)
+ s.XDSServer.Register(s.secureGrpcServer)
+ reflection.Register(s.secureGrpcServer)
+
+ s.addStartFunc("secure gRPC", func(stop <-chan struct{}) error {
+ go func() {
+ <-stop
+ s.secureGrpcServer.Stop()
+ }()
+ return nil
+ })
+
+ return nil
+}
+
+func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions, trustDomain
string) (*spiffe.PeerCertVerifier, error) {
+ customTLSCertsExists, _, _, caCertPath := hasCustomTLSCerts(tlsOptions)
+ if !customTLSCertsExists && s.CA == nil && !s.isK8SSigning() {
+ // Running locally without configured certs - no TLS mode
+ return nil, nil
+ }
+ peerCertVerifier := spiffe.NewPeerCertVerifier()
+ var rootCertBytes []byte
+ var err error
+ if caCertPath != "" {
+ if rootCertBytes, err = os.ReadFile(caCertPath); err != nil {
+ return nil, err
+ }
+ } else {
+ if s.RA != nil {
+ if strings.HasPrefix(features.SailCertProvider,
constants.CertProviderKubernetesSignerPrefix) {
+ signerName :=
strings.TrimPrefix(features.SailCertProvider,
constants.CertProviderKubernetesSignerPrefix)
+ caBundle, _ :=
s.RA.GetRootCertFromMeshConfig(signerName)
+ rootCertBytes = append(rootCertBytes,
caBundle...)
+ } else {
+ rootCertBytes = append(rootCertBytes,
s.RA.GetCAKeyCertBundle().GetRootCertPem()...)
+ }
+ }
+ if s.CA != nil {
+ rootCertBytes = append(rootCertBytes,
s.CA.GetCAKeyCertBundle().GetRootCertPem()...)
+ }
+ }
+
+ if len(rootCertBytes) != 0 {
+ // TODO: trustDomain here is static and will not update if it
dynamically changes in mesh config
+ err := peerCertVerifier.AddMappingFromPEM(trustDomain,
rootCertBytes)
+ if err != nil {
+ return nil, fmt.Errorf("add root CAs into
peerCertVerifier failed: %v", err)
+ }
+ }
+
+ return peerCertVerifier, nil
+}
+
+func (s *Server) getDubbodCertificate(*tls.ClientHelloInfo) (*tls.Certificate,
error) {
+ s.certMu.RLock()
+ defer s.certMu.RUnlock()
+ if s.dubbodCert != nil {
+ return s.dubbodCert, nil
+ }
+ return nil, fmt.Errorf("cert not initialized")
+}
+
func (s *Server) serveHTTP() error {
// At this point we are ready - start Http Listener so that it can
respond to readiness events.
httpListener, err := net.Listen("tcp", s.httpServer.Addr)
@@ -376,7 +533,17 @@ func getClusterID(args *SailArgs) cluster.ID {
return clusterID
}
-// isK8SSigning returns whether K8S (as a RA) is used to sign certs instead of
private keys known by Istiod
+func (s *Server) initSDSServer() {
+ if s.kubeClient == nil {
+ return
+ }
+ if !features.EnableXDSIdentityCheck {
+ // Make sure we have security
+ klog.Warningf("skipping Kubernetes credential reader;
SAIL_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
+ }
+}
+
+// isK8SSigning returns whether K8S (as a RA) is used to sign certs instead of
private keys known by Dubbod
func (s *Server) isK8SSigning() bool {
return s.RA != nil && strings.HasPrefix(features.NaviCertProvider,
constants.CertProviderKubernetesSignerPrefix)
}
@@ -443,8 +610,12 @@ func (s *Server) waitForShutdown(stop <-chan struct{}) {
func (s *Server) cachesSynced() bool {
// TODO multiclusterController HasSynced
- // TODO ServiceController().HasSynced
- // TODO configController.HasSynced
+ if !s.ServiceController().HasSynced() {
+ return false
+ }
+ if !s.configController.HasSynced() {
+ return false
+ }
return true
}
@@ -468,3 +639,144 @@ func (s *Server) waitForCacheSync(stop <-chan struct{})
bool {
expected := s.XDSServer.InboundUpdates.Load()
return kubelib.WaitForCacheSync("push context", stop, func() bool {
return s.pushContextReady(expected) })
}
+
+func (s *Server) initDubbodCerts(args *SailArgs, host string) error {
+ // Skip all certificates
+ var err error
+
+ s.dnsNames = getDNSNames(args, host)
+ if hasCustomCertArgsOrWellKnown, tlsCertPath, tlsKeyPath, caCertPath :=
hasCustomTLSCerts(args.ServerOptions.TLSOptions); hasCustomCertArgsOrWellKnown {
+ // Use the DNS certificate provided via args or in well known
location.
+ err = s.initFileCertificateWatches(TLSOptions{
+ CaCertFile: caCertPath,
+ KeyFile: tlsKeyPath,
+ CertFile: tlsCertPath,
+ })
+ if err != nil {
+ // Not crashing dubbod - This typically happens if
certs are missing and in tests.
+ klog.Errorf("error initializing certificate watches:
%v", err)
+ return nil
+ }
+ } else if features.EnableCAServer && features.SailCertProvider ==
constants.CertProviderDubbod {
+ klog.Infof("initializing Dubbod DNS certificates host: %s,
custom host: %s", host, features.DubbodServiceCustomHost)
+ err = s.initDNSCertsDubbod()
+ } else if features.SailCertProvider == constants.CertProviderKubernetes
{
+ klog.Warningf("SAIL_CERT_PROVIDER=kubernetes is no longer
supported by upstream K8S")
+ } else if strings.HasPrefix(features.SailCertProvider,
constants.CertProviderKubernetesSignerPrefix) {
+ klog.Infof("initializing Dubbod DNS certificates using K8S
RA:%s host: %s, custom host: %s", features.SailCertProvider,
+ host, features.DubbodServiceCustomHost)
+ err = s.initDNSCertsK8SRA()
+ } else {
+ klog.Warningf("SAIL_CERT_PROVIDER=%s is not implemented",
features.SailCertProvider)
+ }
+
+ if err == nil {
+ err = s.initDubbodCertLoader()
+ }
+
+ return err
+}
+
+func getDNSNames(args *SailArgs, host string) []string {
+ // Append custom hostname if there is any
+ customHost := features.DubbodServiceCustomHost
+ var cHosts []string
+
+ if customHost != "" {
+ cHosts = strings.Split(customHost, ",")
+ }
+ sans := sets.New(cHosts...)
+ sans.Insert(host)
+ dnsNames := sets.SortedList(sans)
+ klog.Infof("Discover server subject alt names: %v", dnsNames)
+ return dnsNames
+}
+
+func hasCustomTLSCerts(tlsOptions TLSOptions) (ok bool, tlsCertPath,
tlsKeyPath, caCertPath string) {
+ // load from tls args as priority
+ if hasCustomTLSCertArgs(tlsOptions) {
+ return true, tlsOptions.CertFile, tlsOptions.KeyFile,
tlsOptions.CaCertFile
+ }
+
+ if ok = checkPathsExist(constants.DefaultSailTLSCert,
constants.DefaultSailTLSKey, constants.DefaultSailTLSCaCert); ok {
+ tlsCertPath = constants.DefaultSailTLSCert
+ tlsKeyPath = constants.DefaultSailTLSKey
+ caCertPath = constants.DefaultSailTLSCaCert
+ return
+ }
+
+ if ok = checkPathsExist(constants.DefaultSailTLSCert,
constants.DefaultSailTLSKey, constants.DefaultSailTLSCaCertAlternatePath); ok {
+ tlsCertPath = constants.DefaultSailTLSCert
+ tlsKeyPath = constants.DefaultSailTLSKey
+ caCertPath = constants.DefaultSailTLSCaCertAlternatePath
+ return
+ }
+
+ return
+}
+
+func checkPathsExist(paths ...string) bool {
+ for _, path := range paths {
+ fInfo, err := os.Stat(path)
+
+ if err != nil || fInfo.IsDir() {
+ return false
+ }
+ }
+ return true
+}
+
+func hasCustomTLSCertArgs(tlsOptions TLSOptions) bool {
+ return tlsOptions.CaCertFile != "" && tlsOptions.CertFile != "" &&
tlsOptions.KeyFile != ""
+}
+
+func (s *Server) initDubbodCertLoader() error {
+ if err := s.loadDubbodCert(); err != nil {
+ return fmt.Errorf("first time load DubbodCert failed: %v", err)
+ }
+ _, watchCh := s.dubbodCertBundleWatcher.AddWatcher()
+ s.addStartFunc("reload certs", func(stop <-chan struct{}) error {
+ go s.reloadDubbodCert(watchCh, stop)
+ return nil
+ })
+ return nil
+}
+
+func (s *Server) loadDubbodCert() error {
+ keyCertBundle := s.dubbodCertBundleWatcher.GetKeyCertBundle()
+ keyPair, err := tls.X509KeyPair(keyCertBundle.CertPem,
keyCertBundle.KeyPem)
+ if err != nil {
+ return fmt.Errorf("dubbod loading x509 key pairs failed: %v",
err)
+ }
+ for _, c := range keyPair.Certificate {
+ x509Cert, err := x509.ParseCertificates(c)
+ if err != nil {
+ // This can rarely happen, just in case.
+ return fmt.Errorf("x509 cert - ParseCertificates()
error: %v", err)
+ }
+ for _, c := range x509Cert {
+ klog.Infof("x509 cert - Issuer: %q, Subject: %q, SN:
%x, NotBefore: %q, NotAfter: %q",
+ c.Issuer, c.Subject, c.SerialNumber,
+ c.NotBefore.Format(time.RFC3339),
c.NotAfter.Format(time.RFC3339))
+ }
+ }
+
+ klog.Info("Dubbod certificates are reloaded")
+ s.certMu.Lock()
+ s.dubbodCert = &keyPair
+ s.certMu.Unlock()
+ return nil
+}
+
+func (s *Server) reloadDubbodCert(watchCh <-chan struct{}, stopCh <-chan
struct{}) {
+ for {
+ select {
+ case <-stopCh:
+ return
+ case <-watchCh:
+ if err := s.loadDubbodCert(); err != nil {
+ klog.Errorf("reload dubbod cert failed: %v",
err)
+ }
+ }
+ }
+}
diff --git a/sail/pkg/bootstrap/servicecontroller.go
b/sail/pkg/bootstrap/servicecontroller.go
index 44e493a2..f324fc9c 100644
--- a/sail/pkg/bootstrap/servicecontroller.go
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -16,8 +16,6 @@ func (s *Server) ServiceController() *aggregate.Controller {
func (s *Server) initServiceControllers(args *SailArgs) error {
serviceControllers := s.ServiceController()
- // TODO service entry controller
-
registered := sets.New[provider.ID]()
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
diff --git a/sail/pkg/config/aggregate/config.go
b/sail/pkg/config/aggregate/config.go
index 16dfd442..8a8478ac 100644
--- a/sail/pkg/config/aggregate/config.go
+++ b/sail/pkg/config/aggregate/config.go
@@ -3,13 +3,13 @@ package aggregate
import (
"errors"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"k8s.io/apimachinery/pkg/types"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
)
diff --git a/sail/pkg/config/memory/controller.go
b/sail/pkg/config/memory/controller.go
index 06c121d8..5119bb6a 100644
--- a/sail/pkg/config/memory/controller.go
+++ b/sail/pkg/config/memory/controller.go
@@ -4,7 +4,7 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"k8s.io/apimachinery/pkg/types"
)
diff --git a/sail/pkg/features/sail.go b/sail/pkg/features/sail.go
index a3fe8e72..08213a17 100644
--- a/sail/pkg/features/sail.go
+++ b/sail/pkg/features/sail.go
@@ -55,4 +55,11 @@ var (
"If set to false, Dubbo will not watch for the ca-crl.pem file
in the /etc/cacerts directory "+
"and will not distribute CRL data to namespaces for
proxies to consume.",
).Get()
+ SailCertProvider = env.Register("SAIL_CERT_PROVIDER",
constants.CertProviderDubbod,
+ "The provider of Pilot DNS certificate. K8S RA will be used for
k8s.io/NAME. 'dubbod' value will sign"+
+ " using Dubbo build in CA. Other values will not not
generate TLS certs, but still "+
+ " distribute ./etc/certs/root-cert.pem. Only used if
custom certificates are not mounted.").Get()
+ DubbodServiceCustomHost = env.Register("DUBBOD_CUSTOM_HOST", "",
+ "Custom host name of dubbod that dubbod signs the server cert.
"+
+ "Multiple custom host names are supported, and multiple
values are separated by commas.").Get()
)
diff --git a/sail/pkg/features/security.go b/sail/pkg/features/security.go
index 859f0ffc..fb2284ea 100644
--- a/sail/pkg/features/security.go
+++ b/sail/pkg/features/security.go
@@ -24,4 +24,9 @@ var (
UseCacertsForSelfSignedCA =
env.Register("USE_CACERTS_FOR_SELF_SIGNED_CA", false,
"If enabled, dubbod will use a secret named cacerts to store
its self-signed dubbo-"+
"generated root certificate.").Get()
+ EnableXDSIdentityCheck = env.Register(
+ "SAIL_ENABLE_XDS_IDENTITY_CHECK",
+ true,
+ "If enabled, sail will authorize XDS clients, to ensure they
are acting only as namespaces they have permissions for.",
+ ).Get()
)
diff --git a/sail/pkg/features/tuning.go b/sail/pkg/features/tuning.go
index ca91a465..a99ba302 100644
--- a/sail/pkg/features/tuning.go
+++ b/sail/pkg/features/tuning.go
@@ -25,4 +25,11 @@ var (
100000,
"Sets the maximum number of concurrent grpc streams.",
).Get()
+
+ // MaxRecvMsgSize The max receive buffer size of gRPC received channel
of Pilot in bytes.
+ MaxRecvMsgSize = env.Register(
+ "DUBBO_GPRC_MAXRECVMSGSIZE",
+ 4*1024*1024,
+ "Sets the max receive buffer size of gRPC stream in bytes.",
+ ).Get()
)
diff --git a/sail/pkg/grpc/grpc.go b/sail/pkg/grpc/grpc.go
index 0f380b77..3bf677e4 100644
--- a/sail/pkg/grpc/grpc.go
+++ b/sail/pkg/grpc/grpc.go
@@ -3,6 +3,8 @@ package grpc
import (
dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
@@ -58,6 +60,30 @@ func ClientOptions(options *dubbokeepalive.Options, tlsOpts
*TLSOptions) ([]grpc
return []grpc.DialOption{keepaliveOption, initialWindowSizeOption,
initialConnWindowSizeOption, msgSizeOption, tlsDialOpts}, nil
}
+func ServerOptions(options *dubbokeepalive.Options, interceptors
...grpc.UnaryServerInterceptor) []grpc.ServerOption {
+ maxStreams := features.MaxConcurrentStreams
+ maxRecvMsgSize := features.MaxRecvMsgSize
+
+ grpcOptions := []grpc.ServerOption{
+
grpc.UnaryInterceptor(middleware.ChainUnaryServer(interceptors...)),
+ grpc.MaxConcurrentStreams(uint32(maxStreams)),
+ grpc.MaxRecvMsgSize(maxRecvMsgSize),
+ // Ensure we allow clients sufficient ability to send keep
alives. If this is higher than client
+ // keep alive setting, it will prematurely get a GOAWAY sent.
+ grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+ MinTime: options.Time / 2,
+ }),
+ grpc.KeepaliveParams(keepalive.ServerParameters{
+ Time: options.Time,
+ Timeout: options.Timeout,
+ MaxConnectionAge: options.MaxServerConnectionAge,
+ MaxConnectionAgeGrace:
options.MaxServerConnectionAgeGrace,
+ }),
+ }
+
+ return grpcOptions
+}
+
func GRPCErrorType(err error) ErrorType {
if err == io.EOF {
return GracefulTermination
diff --git a/sail/pkg/model/addressmap.go b/sail/pkg/model/addressmap.go
new file mode 100644
index 00000000..c8b166f3
--- /dev/null
+++ b/sail/pkg/model/addressmap.go
@@ -0,0 +1,75 @@
+package model
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
+)
+
+func (m *AddressMap) GetAddresses() map[cluster.ID][]string {
+ if m == nil {
+ return nil
+ }
+
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.Addresses == nil {
+ return nil
+ }
+
+ out := make(map[cluster.ID][]string)
+ for k, v := range m.Addresses {
+ out[k] = slices.Clone(v)
+ }
+ return out
+}
+
+func (m *AddressMap) GetAddressesFor(c cluster.ID) []string {
+ if m == nil {
+ return nil
+ }
+
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.Addresses == nil {
+ return nil
+ }
+
+ // Copy the Addresses array.
+ return append([]string{}, m.Addresses[c]...)
+}
+
+func (m *AddressMap) SetAddressesFor(c cluster.ID, addresses []string)
*AddressMap {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ if len(addresses) == 0 {
+ // Setting an empty array for the cluster. Remove the entry for
the cluster if it exists.
+ if m.Addresses != nil {
+ delete(m.Addresses, c)
+
+ // Delete the map if there's nothing left.
+ if len(m.Addresses) == 0 {
+ m.Addresses = nil
+ }
+ }
+ } else {
+ // Create the map if it doesn't already exist.
+ if m.Addresses == nil {
+ m.Addresses = make(map[cluster.ID][]string)
+ }
+ m.Addresses[c] = addresses
+ }
+ return m
+}
+
+func (m *AddressMap) Len() int {
+ if m == nil {
+ return 0
+ }
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ return len(m.Addresses)
+}
diff --git a/sail/pkg/model/authentication.go b/sail/pkg/model/authentication.go
new file mode 100644
index 00000000..f1c421af
--- /dev/null
+++ b/sail/pkg/model/authentication.go
@@ -0,0 +1,119 @@
+package model
+
+import (
+ "crypto/md5"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ "istio.io/api/security/v1beta1"
+ "k8s.io/klog/v2"
+ "strings"
+ "time"
+)
+
+type MutualTLSMode int
+
+const (
+ MTLSUnknown MutualTLSMode = iota
+ MTLSDisable
+ MTLSPermissive
+ MTLSStrict
+)
+
+type AuthenticationPolicies struct {
+ requestAuthentications map[string][]config.Config
+ peerAuthentications map[string][]config.Config
+ globalMutualTLSMode MutualTLSMode
+ rootNamespace string
+ namespaceMutualTLSMode map[string]MutualTLSMode
+ aggregateVersion string
+}
+
+func initAuthenticationPolicies(env *Environment) *AuthenticationPolicies {
+ policy := &AuthenticationPolicies{
+ requestAuthentications: map[string][]config.Config{},
+ peerAuthentications: map[string][]config.Config{},
+ globalMutualTLSMode: MTLSUnknown,
+ rootNamespace: env.Mesh().GetRootNamespace(),
+ }
+
+
policy.addRequestAuthentication(sortConfigByCreationTime(env.List(gvk.RequestAuthentication,
NamespaceAll)))
+
policy.addPeerAuthentication(sortConfigByCreationTime(env.List(gvk.PeerAuthentication,
NamespaceAll)))
+
+ return policy
+}
+
+func (policy *AuthenticationPolicies) addRequestAuthentication(configs
[]config.Config) {
+ for _, config := range configs {
+ policy.requestAuthentications[config.Namespace] =
append(policy.requestAuthentications[config.Namespace], config)
+ }
+}
+
+func (policy *AuthenticationPolicies) addPeerAuthentication(configs
[]config.Config) {
+ sortConfigByCreationTime(configs)
+
+ foundNamespaceMTLS :=
make(map[string]v1beta1.PeerAuthentication_MutualTLS_Mode)
+ seenNamespaceOrMeshConfig := make(map[string]time.Time)
+ versions := []string{}
+
+ for _, config := range configs {
+ versions = append(versions,
config.UID+"."+config.ResourceVersion)
+ spec := config.Spec.(*v1beta1.PeerAuthentication)
+ if spec.Selector == nil || len(spec.Selector.MatchLabels) == 0 {
+ if t, ok :=
seenNamespaceOrMeshConfig[config.Namespace]; ok {
+ klog.Warningf(
+ "Namespace/mesh-level
PeerAuthentication is already defined for %q at time %v. Ignore %q which was
created at time %v",
+ config.Namespace, t, config.Name,
config.CreationTimestamp)
+ continue
+ }
+ seenNamespaceOrMeshConfig[config.Namespace] =
config.CreationTimestamp
+
+ mode := v1beta1.PeerAuthentication_MutualTLS_UNSET
+ if spec.Mtls != nil {
+ mode = spec.Mtls.Mode
+ }
+ if config.Namespace == policy.rootNamespace {
+ if mode ==
v1beta1.PeerAuthentication_MutualTLS_UNSET {
+ policy.globalMutualTLSMode =
MTLSPermissive
+ } else {
+ policy.globalMutualTLSMode =
ConvertToMutualTLSMode(mode)
+ }
+ } else {
+ foundNamespaceMTLS[config.Namespace] = mode
+ }
+ }
+
+ policy.peerAuthentications[config.Namespace] =
append(policy.peerAuthentications[config.Namespace], config)
+ }
+
+ // nolint: gosec
+ // Not security sensitive code
+ policy.aggregateVersion = fmt.Sprintf("%x",
md5.Sum([]byte(strings.Join(versions, ";"))))
+
+ policy.namespaceMutualTLSMode = make(map[string]MutualTLSMode,
len(foundNamespaceMTLS))
+
+ inheritedMTLSMode := policy.globalMutualTLSMode
+ if inheritedMTLSMode == MTLSUnknown {
+ inheritedMTLSMode = MTLSPermissive
+ }
+ for ns, mtlsMode := range foundNamespaceMTLS {
+ if mtlsMode == v1beta1.PeerAuthentication_MutualTLS_UNSET {
+ policy.namespaceMutualTLSMode[ns] = inheritedMTLSMode
+ } else {
+ policy.namespaceMutualTLSMode[ns] =
ConvertToMutualTLSMode(mtlsMode)
+ }
+ }
+}
+
+func ConvertToMutualTLSMode(mode v1beta1.PeerAuthentication_MutualTLS_Mode)
MutualTLSMode {
+ switch mode {
+ case v1beta1.PeerAuthentication_MutualTLS_DISABLE:
+ return MTLSDisable
+ case v1beta1.PeerAuthentication_MutualTLS_PERMISSIVE:
+ return MTLSPermissive
+ case v1beta1.PeerAuthentication_MutualTLS_STRICT:
+ return MTLSStrict
+ default:
+ return MTLSUnknown
+ }
+}
diff --git a/sail/pkg/model/authorization.go b/sail/pkg/model/authorization.go
new file mode 100644
index 00000000..9789ac32
--- /dev/null
+++ b/sail/pkg/model/authorization.go
@@ -0,0 +1,48 @@
+package model
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ authpb "istio.io/api/security/v1beta1"
+)
+
+type AuthorizationPolicy struct {
+ Name string `json:"name"`
+ Namespace string `json:"namespace"`
+ Annotations map[string]string `json:"annotations"`
+ Spec *authpb.AuthorizationPolicy `json:"spec"`
+}
+
+type AuthorizationPolicies struct {
+ NamespaceToPolicies map[string][]AuthorizationPolicy
`json:"namespace_to_policies"`
+ RootNamespace string
`json:"root_namespace"`
+}
+
+func GetAuthorizationPolicies(env *Environment) *AuthorizationPolicies {
+ policy := &AuthorizationPolicies{
+ NamespaceToPolicies: map[string][]AuthorizationPolicy{},
+ RootNamespace: env.Mesh().GetRootNamespace(),
+ }
+
+ policies := env.List(gvk.AuthorizationPolicy, NamespaceAll)
+ sortConfigByCreationTime(policies)
+
+ policyCount := make(map[string]int)
+ for _, config := range policies {
+ policyCount[config.Namespace]++
+ }
+
+ for _, config := range policies {
+ authzConfig := AuthorizationPolicy{
+ Name: config.Name,
+ Namespace: config.Namespace,
+ Annotations: config.Annotations,
+ Spec: config.Spec.(*authpb.AuthorizationPolicy),
+ }
+ if _, ok := policy.NamespaceToPolicies[config.Namespace]; !ok {
+ policy.NamespaceToPolicies[config.Namespace] =
make([]AuthorizationPolicy, 0, policyCount[config.Namespace])
+ }
+ policy.NamespaceToPolicies[config.Namespace] =
append(policy.NamespaceToPolicies[config.Namespace], authzConfig)
+ }
+
+ return policy
+}
diff --git a/sail/pkg/model/cluster_local.go b/sail/pkg/model/cluster_local.go
new file mode 100644
index 00000000..9bb6b429
--- /dev/null
+++ b/sail/pkg/model/cluster_local.go
@@ -0,0 +1,14 @@
+package model
+
+import "github.com/apache/dubbo-kubernetes/pkg/config/host"
+
+type ClusterLocalHosts struct {
+ specific map[host.Name]bool
+ wildcard map[host.Name]bool
+}
+
+type ClusterLocalProvider interface {
+ // GetClusterLocalHosts returns the list of cluster-local hosts, sorted
in
+ // ascending order. The caller must not modify the returned list.
+ GetClusterLocalHosts() ClusterLocalHosts
+}
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
index 78e5b91e..6619a5d9 100644
--- a/sail/pkg/model/config.go
+++ b/sail/pkg/model/config.go
@@ -1,8 +1,14 @@
package model
import (
+ "cmp"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "sort"
+)
+
+const (
+ NamespaceAll = ""
)
type ConfigStore interface {
@@ -24,3 +30,16 @@ type ConfigStoreController interface {
Run(stop <-chan struct{})
HasSynced() bool
}
+
+func sortConfigByCreationTime(configs []config.Config) []config.Config {
+ sort.Slice(configs, func(i, j int) bool {
+ if r :=
configs[i].CreationTimestamp.Compare(configs[j].CreationTimestamp); r != 0 {
+ return r == -1 // -1 means i is less than j, so return
true
+ }
+ if r := cmp.Compare(configs[i].Name, configs[j].Name); r != 0 {
+ return r == -1
+ }
+ return cmp.Compare(configs[i].Namespace, configs[j].Namespace)
== -1
+ })
+ return configs
+}
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 49705d61..e6a2f4f1 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+ "github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
meshconfig "istio.io/api/mesh/v1alpha1"
"net"
@@ -31,14 +32,19 @@ import (
type Watcher = meshwatcher.WatcherCollection
+type WatchedResource = xds.WatchedResource
+
type Environment struct {
ServiceDiscovery
Watcher
ConfigStore
- mutex sync.RWMutex
- pushContext *PushContext
- Cache XdsCache
- NetworksWatcher mesh.NetworksWatcher
+ mutex sync.RWMutex
+ pushContext *PushContext
+ Cache XdsCache
+ NetworksWatcher mesh.NetworksWatcher
+ NetworkManager *NetworkManager
+ clusterLocalServices ClusterLocalProvider
+ DomainSuffix string
}
type XdsCacheImpl struct {
@@ -56,24 +62,25 @@ func NewEnvironment() *Environment {
cache = DisabledCache{}
}
return &Environment{
- Cache: cache,
+ pushContext: NewPushContext(),
+ Cache: cache,
}
}
var _ mesh.Holder = &Environment{}
-func (e *Environment) SetPushContext(pc *PushContext) {
- e.mutex.Lock()
- defer e.mutex.Unlock()
- e.pushContext = pc
-}
-
func (e *Environment) PushContext() *PushContext {
e.mutex.RLock()
defer e.mutex.RUnlock()
return e.pushContext
}
+func (e *Environment) SetPushContext(pc *PushContext) {
+ e.mutex.Lock()
+ defer e.mutex.Unlock()
+ e.pushContext = pc
+}
+
func (e *Environment) Mesh() *meshconfig.MeshConfig {
if e != nil && e.Watcher != nil {
return e.Watcher.Mesh()
@@ -109,4 +116,8 @@ func (e *Environment) GetDiscoveryAddress() (host.Name,
string, error) {
return host.Name(hostname), port, nil
}
+func (e *Environment) ClusterLocal() ClusterLocalProvider {
+ return e.clusterLocalServices
+}
+
type Proxy struct{}
diff --git a/sail/pkg/model/controller.go b/sail/pkg/model/controller.go
index a79c32f7..72e2dd10 100644
--- a/sail/pkg/model/controller.go
+++ b/sail/pkg/model/controller.go
@@ -1,8 +1,12 @@
package model
type Controller interface {
- // Run until a signal is received
Run(stop <-chan struct{})
+ HasSynced() bool
+}
+
+type AggregateController interface {
+ Controller
}
type Event int
diff --git a/sail/pkg/model/network.go b/sail/pkg/model/network.go
new file mode 100644
index 00000000..66f7ce2c
--- /dev/null
+++ b/sail/pkg/model/network.go
@@ -0,0 +1,4 @@
+package model
+
+type NetworkManager struct {
+}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 76870e34..4e941844 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -17,10 +17,78 @@
package model
-import meshconfig "istio.io/api/mesh/v1alpha1"
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
+ "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "go.uber.org/atomic"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ "k8s.io/apimachinery/pkg/types"
+ "sync"
+)
type PushContext struct {
- Mesh *meshconfig.MeshConfig `json:"-"`
+ Mesh *meshconfig.MeshConfig `json:"-"`
+ initializeMutex sync.Mutex
+ InitDone atomic.Bool
+ Networks *meshconfig.MeshNetworks
+ networkMgr *NetworkManager
+ clusterLocalHosts ClusterLocalHosts
+ exportToDefaults exportToDefaults
+ AuthnPolicies *AuthenticationPolicies `json:"-"`
+ AuthzPolicies *AuthorizationPolicies `json:"-"`
+ virtualServiceIndex virtualServiceIndex
+ destinationRuleIndex destinationRuleIndex
+ ServiceIndex serviceIndex
+ serviceAccounts map[serviceAccountKey][]string
+}
+
+type serviceAccountKey struct {
+ hostname host.Name
+ namespace string
+}
+
+type virtualServiceIndex struct {
+ exportedToNamespaceByGateway map[types.NamespacedName][]config.Config
+ // this contains all the virtual services with exportTo "." and current
namespace. The keys are namespace,gateway.
+ privateByNamespaceAndGateway map[types.NamespacedName][]config.Config
+ // This contains all virtual services whose exportTo is "*", keyed by
gateway
+ publicByGateway map[string][]config.Config
+ // root vs namespace/name ->delegate vs virtualservice
gvk/namespace/name
+ delegates map[ConfigKey][]ConfigKey
+
+ // This contains destination hosts of virtual services, keyed by
gateway's namespace/name,
+ // only used when PILOT_FILTER_GATEWAY_CLUSTER_CONFIG is enabled
+ destinationsByGateway map[string]sets.String
+
+ // Map of VS hostname -> referenced hostnames
+ referencedDestinations map[string]sets.String
+}
+
+type destinationRuleIndex struct {
+ namespaceLocal map[string]*consolidatedDestRules
+ exportedByNamespace map[string]*consolidatedDestRules
+ rootNamespaceLocal *consolidatedDestRules
+}
+
+type consolidatedDestRules struct {
+ specificDestRules map[host.Name][]*ConsolidatedDestRule
+ wildcardDestRules map[host.Name][]*ConsolidatedDestRule
+}
+
+type serviceIndex struct {
+ privateByNamespace map[string][]*Service
+ public []*Service
+ exportedToNamespace map[string][]*Service
+ HostnameAndNamespace map[host.Name]map[string]*Service `json:"-"`
+}
+
+type ConsolidatedDestRule struct {
+ exportTo sets.Set[visibility.Instance]
+ rule *config.Config
+ from []types.NamespacedName
}
type TriggerReason string
@@ -28,13 +96,30 @@ type TriggerReason string
type ReasonStats map[TriggerReason]int
type PushRequest struct {
- Reason ReasonStats
+ Reason ReasonStats
+ ConfigsUpdated sets.Set[ConfigKey]
+ Forced bool
+ Full bool
+ Push *PushContext
+ LastPushContext *PushContext
}
func NewPushContext() *PushContext {
return &PushContext{}
}
+type ConfigKey struct {
+ Kind kind.Kind
+ Name string
+ Namespace string
+}
+
+type exportToDefaults struct {
+ service sets.Set[visibility.Instance]
+ virtualService sets.Set[visibility.Instance]
+ destinationRule sets.Set[visibility.Instance]
+}
+
func (pr *PushRequest) CopyMerge(other *PushRequest) *PushRequest {
if pr == nil {
return other
@@ -48,4 +133,141 @@ func (pr *PushRequest) CopyMerge(other *PushRequest)
*PushRequest {
}
type XDSUpdater interface {
-}
\ No newline at end of file
+}
+
+func (ps *PushContext) InitContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
+ ps.initializeMutex.Lock()
+ defer ps.initializeMutex.Unlock()
+ if ps.InitDone.Load() {
+ return
+ }
+
+ ps.Mesh = env.Mesh()
+ ps.Networks = env.MeshNetworks()
+
+ ps.initDefaultExportMaps()
+
+ if pushReq == nil || oldPushContext == nil ||
!oldPushContext.InitDone.Load() || pushReq.Forced {
+ ps.createNewContext(env)
+ } else {
+ ps.updateContext(env, oldPushContext, pushReq)
+ }
+
+ ps.networkMgr = env.NetworkManager
+
+ ps.clusterLocalHosts = env.ClusterLocal().GetClusterLocalHosts()
+
+ ps.InitDone.Store(true)
+}
+
+func (ps *PushContext) initDefaultExportMaps() {
+ ps.exportToDefaults.destinationRule = sets.New[visibility.Instance]()
+ if ps.Mesh.DefaultDestinationRuleExportTo != nil {
+ for _, e := range ps.Mesh.DefaultDestinationRuleExportTo {
+
ps.exportToDefaults.destinationRule.Insert(visibility.Instance(e))
+ }
+ } else {
+ // default to *
+ ps.exportToDefaults.destinationRule.Insert(visibility.Public)
+ }
+
+ ps.exportToDefaults.service = sets.New[visibility.Instance]()
+ if ps.Mesh.DefaultServiceExportTo != nil {
+ for _, e := range ps.Mesh.DefaultServiceExportTo {
+
ps.exportToDefaults.service.Insert(visibility.Instance(e))
+ }
+ } else {
+ ps.exportToDefaults.service.Insert(visibility.Public)
+ }
+
+ ps.exportToDefaults.virtualService = sets.New[visibility.Instance]()
+ if ps.Mesh.DefaultVirtualServiceExportTo != nil {
+ for _, e := range ps.Mesh.DefaultVirtualServiceExportTo {
+
ps.exportToDefaults.virtualService.Insert(visibility.Instance(e))
+ }
+ } else {
+ ps.exportToDefaults.virtualService.Insert(visibility.Public)
+ }
+}
+
+func (ps *PushContext) initServiceRegistry(env *Environment, configsUpdate
sets.Set[ConfigKey]) {
+
+}
+
+func (ps *PushContext) initServiceAccounts(env *Environment, services
[]*Service) {
+}
+
+func (ps *PushContext) initVirtualServices(env *Environment) {
+}
+
+func (ps *PushContext) initDestinationRules(env *Environment) {
+}
+
+func (ps *PushContext) initAuthnPolicies(env *Environment) {
+ ps.AuthnPolicies = initAuthenticationPolicies(env)
+}
+
+func (ps *PushContext) initAuthorizationPolicies(env *Environment) {
+ ps.AuthzPolicies = GetAuthorizationPolicies(env)
+}
+
+func (ps *PushContext) createNewContext(env *Environment) {
+ ps.initServiceRegistry(env, nil)
+ ps.initVirtualServices(env)
+ ps.initDestinationRules(env)
+ ps.initAuthnPolicies(env)
+ ps.initAuthorizationPolicies(env)
+}
+
+func (ps *PushContext) updateContext(env *Environment, oldPushContext
*PushContext, pushReq *PushRequest) {
+ var servicesChanged, virtualServicesChanged, destinationRulesChanged,
authnChanged, authzChanged bool
+
+ // We do not need to watch Ingress or Gateway API changes. Both of
these have their own controllers which will send
+ // events for Istio types (Gateway and VirtualService).
+ for conf := range pushReq.ConfigsUpdated {
+ switch conf.Kind {
+ case kind.DestinationRule:
+ destinationRulesChanged = true
+ case kind.VirtualService:
+ virtualServicesChanged = true
+ case kind.AuthorizationPolicy:
+ authzChanged = true
+ case kind.RequestAuthentication,
+ kind.PeerAuthentication:
+ authnChanged = true
+ }
+ }
+
+ if servicesChanged {
+ // Services have changed. initialize service registry
+ ps.initServiceRegistry(env, pushReq.ConfigsUpdated)
+ } else {
+ // make sure we copy over things that would be generated in
initServiceRegistry
+ ps.ServiceIndex = oldPushContext.ServiceIndex
+ ps.serviceAccounts = oldPushContext.serviceAccounts
+ }
+
+ if virtualServicesChanged {
+ ps.initVirtualServices(env)
+ } else {
+ ps.virtualServiceIndex = oldPushContext.virtualServiceIndex
+ }
+
+ if destinationRulesChanged {
+ ps.initDestinationRules(env)
+ } else {
+ ps.destinationRuleIndex = oldPushContext.destinationRuleIndex
+ }
+
+ if authnChanged {
+ ps.initAuthnPolicies(env)
+ } else {
+ ps.AuthnPolicies = oldPushContext.AuthnPolicies
+ }
+
+ if authzChanged {
+ ps.initAuthorizationPolicies(env)
+ } else {
+ ps.AuthzPolicies = oldPushContext.AuthzPolicies
+ }
+}
diff --git a/sail/pkg/model/service.go b/sail/pkg/model/service.go
index 6573e2fb..7b6c26de 100644
--- a/sail/pkg/model/service.go
+++ b/sail/pkg/model/service.go
@@ -1,4 +1,148 @@
package model
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
+ "github.com/apache/dubbo-kubernetes/pkg/config/protocol"
+ "github.com/apache/dubbo-kubernetes/pkg/config/visibility"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+ "sync"
+ "time"
+)
+
+type NamespacedHostname struct {
+ Hostname host.Name
+ Namespace string
+}
+
+type ServiceAttributes struct {
+ Labels map[string]string
+ LabelSelectors map[string]string
+ ExportTo sets.Set[visibility.Instance]
+ ClusterExternalAddresses *AddressMap
+ ClusterExternalPorts map[cluster.ID]map[uint32]uint32
+ Aliases []NamespacedHostname
+ PassthroughTargetPorts map[uint32]uint32
+ // Name is "destination.service.name" attribute
+ Name string
+ // Namespace is "destination.service.namespace" attribute
+ Namespace string
+ ServiceRegistry provider.ID
+}
+
+type AddressMap struct {
+ Addresses map[cluster.ID][]string
+
+ // NOTE: The copystructure library is not able to copy unexported
fields, so the mutex will not be copied.
+ mutex sync.RWMutex
+}
+
+func (m *AddressMap) DeepCopy() *AddressMap {
+ if m == nil {
+ return nil
+ }
+ return &AddressMap{
+ Addresses: m.GetAddresses(),
+ }
+}
+
+type Service struct {
+ Attributes ServiceAttributes
+ Hostname host.Name `json:"hostname"`
+ Ports PortList `json:"ports,omitempty"`
+ ServiceAccounts []string `json:"serviceAccounts,omitempty"`
+ ClusterVIPs AddressMap `json:"clusterVIPs,omitempty"`
+ CreationTime time.Time `json:"creationTime,omitempty"`
+}
+
+func (s *Service) DeepCopy() *Service {
+ // nolint: govet
+ out := *s
+ out.Attributes = s.Attributes.DeepCopy()
+ if s.Ports != nil {
+ out.Ports = make(PortList, len(s.Ports))
+ for i, port := range s.Ports {
+ if port != nil {
+ out.Ports[i] = &Port{
+ Name: port.Name,
+ Port: port.Port,
+ Protocol: port.Protocol,
+ }
+ } else {
+ out.Ports[i] = nil
+ }
+ }
+ }
+
+ out.ServiceAccounts = slices.Clone(s.ServiceAccounts)
+ out.ClusterVIPs = *s.ClusterVIPs.DeepCopy()
+ return &out
+}
+
+func (s *Service) Key() string {
+ if s == nil {
+ return ""
+ }
+
+ return s.Attributes.Namespace + "/" + string(s.Hostname)
+}
+
+type Port struct {
+ Name string `json:"name,omitempty"`
+ Port int `json:"port"`
+ Protocol protocol.Instance `json:"protocol,omitempty"`
+}
+
+type PortList []*Port
+
+func (p *Port) Equals(other *Port) bool {
+ if p == nil {
+ return other == nil
+ }
+ if other == nil {
+ return p == nil
+ }
+ return p.Name == other.Name && p.Port == other.Port && p.Protocol ==
other.Protocol
+}
+
+func (ports PortList) Equals(other PortList) bool {
+ return slices.EqualFunc(ports, other, func(a, b *Port) bool {
+ return a.Equals(b)
+ })
+}
+
type ServiceDiscovery interface {
+ Services() []*Service
+ GetService(hostname host.Name) *Service
+}
+
+func (s *ServiceAttributes) DeepCopy() ServiceAttributes {
+ // AddressMap contains a mutex, which is safe to copy in this case.
+ // nolint: govet
+ out := *s
+
+ out.Labels = maps.Clone(s.Labels)
+ if s.ExportTo != nil {
+ out.ExportTo = s.ExportTo.Copy()
+ }
+
+ out.LabelSelectors = maps.Clone(s.LabelSelectors)
+ out.ClusterExternalAddresses = s.ClusterExternalAddresses.DeepCopy()
+
+ if s.ClusterExternalPorts != nil {
+ out.ClusterExternalPorts =
make(map[cluster.ID]map[uint32]uint32, len(s.ClusterExternalPorts))
+ for k, m := range s.ClusterExternalPorts {
+ out.ClusterExternalPorts[k] = maps.Clone(m)
+ }
+ }
+
+ out.Aliases = slices.Clone(s.Aliases)
+ out.PassthroughTargetPorts = maps.Clone(out.PassthroughTargetPorts)
+
+ // AddressMap contains a mutex, which is safe to return a copy in this
case.
+ // nolint: govet
+ return out
}
diff --git a/sail/pkg/serviceregistry/aggregate/controller.go
b/sail/pkg/serviceregistry/aggregate/controller.go
index 0b220755..2179a5b7 100644
--- a/sail/pkg/serviceregistry/aggregate/controller.go
+++ b/sail/pkg/serviceregistry/aggregate/controller.go
@@ -2,12 +2,20 @@ package aggregate
import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
"k8s.io/klog/v2"
"sync"
)
+var (
+ _ model.ServiceDiscovery = &Controller{}
+ _ model.AggregateController = &Controller{}
+)
+
type Controller struct {
registries []*registryEntry
storeLock sync.RWMutex
@@ -53,3 +61,97 @@ func (c *Controller) Run(stop <-chan struct{}) {
<-stop
klog.Info("Registry Aggregator terminated")
}
+
+func (c *Controller) HasSynced() bool {
+ for _, r := range c.GetRegistries() {
+ if !r.HasSynced() {
+ klog.V(2).Infof("registry %s is syncing", r.Cluster())
+ return false
+ }
+ }
+ return true
+}
+
+func (c *Controller) Services() []*model.Service {
+ // smap is a map of hostname (string) to service index, used to
identify services that
+ // are installed in multiple clusters.
+ smap := make(map[host.Name]int)
+ index := 0
+ services := make([]*model.Service, 0)
+ // Locking Registries list while walking it to prevent inconsistent
results
+ for _, r := range c.GetRegistries() {
+ svcs := r.Services()
+ if r.Provider() != provider.Kubernetes {
+ index += len(svcs)
+ services = append(services, svcs...)
+ } else {
+ for _, s := range svcs {
+ previous, ok := smap[s.Hostname]
+ if !ok {
+ // First time we see a service. The
result will have a single service per hostname
+ // The first cluster will be listed
first, so the services in the primary cluster
+ // will be used for default settings.
If a service appears in multiple clusters,
+ // the order is less clear.
+ smap[s.Hostname] = index
+ index++
+ services = append(services, s)
+ } else {
+ // We must deepcopy before merge, and
after merging, the ClusterVips length will be >= 2.
+ // This is an optimization to prevent
deepcopy multi-times
+ if services[previous].ClusterVIPs.Len()
< 2 {
+ // Deep copy before merging,
otherwise there is a case
+ // a service in remote cluster
can be deleted, but the ClusterIP left.
+ services[previous] =
services[previous].DeepCopy()
+ }
+ // If it is seen second time, that
means it is from a different cluster, update cluster VIPs.
+ mergeService(services[previous], s, r)
+ }
+ }
+ }
+ }
+ return services
+}
+
+func (c *Controller) GetService(hostname host.Name) *model.Service {
+ var out *model.Service
+ for _, r := range c.GetRegistries() {
+ service := r.GetService(hostname)
+ if service == nil {
+ continue
+ }
+ if r.Provider() != provider.Kubernetes {
+ return service
+ }
+ if out == nil {
+ out = service.DeepCopy()
+ } else {
+ // If we are seeing the service for the second time, it
means it is available in multiple clusters.
+ mergeService(out, service, r)
+ }
+ }
+ return out
+}
+
+func (c *Controller) GetRegistries() []serviceregistry.Instance {
+ c.storeLock.RLock()
+ defer c.storeLock.RUnlock()
+
+ // copy registries to prevent race, no need to deep copy here.
+ out := make([]serviceregistry.Instance, len(c.registries))
+ for i := range c.registries {
+ out[i] = c.registries[i]
+ }
+ return out
+}
+
+func mergeService(dst, src *model.Service, srcRegistry
serviceregistry.Instance) {
+ if !src.Ports.Equals(dst.Ports) {
+ klog.V(2).Infof("service %s defined from cluster %s is
different from others", src.Hostname, srcRegistry.Cluster())
+ }
+ // Prefer the k8s HostVIPs where possible
+ clusterID := srcRegistry.Cluster()
+ if len(dst.ClusterVIPs.GetAddressesFor(clusterID)) == 0 {
+ newAddresses := src.ClusterVIPs.GetAddressesFor(clusterID)
+ dst.ClusterVIPs.SetAddressesFor(clusterID, newAddresses)
+ }
+}
diff --git a/sail/pkg/serviceregistry/kube/controller/controller.go
b/sail/pkg/serviceregistry/kube/controller/controller.go
index dac2cf31..893ec60f 100644
--- a/sail/pkg/serviceregistry/kube/controller/controller.go
+++ b/sail/pkg/serviceregistry/kube/controller/controller.go
@@ -19,26 +19,104 @@ package controller
import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
+ kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/queue"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
+ "go.uber.org/atomic"
+ "k8s.io/klog/v2"
+ "sort"
+ "sync"
+ "time"
)
+var (
+ _ serviceregistry.Instance = &Controller{}
+)
+
+type Controller struct {
+ opts Options
+ client kubelib.Client
+ sync.RWMutex
+ servicesMap map[host.Name]*model.Service
+ queue queue.Instance
+ initialSyncTimedout *atomic.Bool
+}
+
type Options struct {
- KubernetesAPIQPS float32
- KubernetesAPIBurst int
- DomainSuffix string
- // XDSUpdater will push changes to the xDS server.
- XDSUpdater model.XDSUpdater
- // MeshNetworksWatcher observes changes to the mesh networks config.
- MeshNetworksWatcher mesh.NetworksWatcher
- // MeshWatcher observes changes to the mesh config
+ KubernetesAPIQPS float32
+ KubernetesAPIBurst int
+ DomainSuffix string
+ XDSUpdater model.XDSUpdater
+ MeshNetworksWatcher mesh.NetworksWatcher
MeshWatcher meshwatcher.WatcherCollection
ClusterID cluster.ID
ClusterAliases map[string]string
SystemNamespace string
MeshServiceController *aggregate.Controller
KrtDebugger *krt.DebugHandler
+ SyncTimeout time.Duration
+}
+
+func (c *Controller) Services() []*model.Service {
+ c.RLock()
+ out := make([]*model.Service, 0, len(c.servicesMap))
+ for _, svc := range c.servicesMap {
+ out = append(out, svc)
+ }
+ c.RUnlock()
+ sort.Slice(out, func(i, j int) bool { return out[i].Hostname <
out[j].Hostname })
+ return out
+}
+
+// GetService implements a service catalog operation by hostname specified.
+func (c *Controller) GetService(hostname host.Name) *model.Service {
+ c.RLock()
+ svc := c.servicesMap[hostname]
+ c.RUnlock()
+ return svc
+}
+
+func (c *Controller) Provider() provider.ID {
+ return provider.Kubernetes
+}
+
+func (c *Controller) Cluster() cluster.ID {
+ return c.opts.ClusterID
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+ if c.opts.SyncTimeout != 0 {
+ time.AfterFunc(c.opts.SyncTimeout, func() {
+ if !c.queue.HasSynced() {
+ klog.Warningf("kube controller for %s initial
sync timed out", c.opts.ClusterID)
+ c.initialSyncTimedout.Store(true)
+ }
+ })
+ }
+ st := time.Now()
+
+ kubelib.WaitForCacheSync("kube controller", stop, c.informersSynced)
+ klog.Infof("kube controller for %s synced after %v", c.opts.ClusterID,
time.Since(st))
+
+ // after the in-order sync we can start processing the queue
+ c.queue.Run(stop)
+ klog.Infof("Controller terminated")
+}
+
+func (c *Controller) HasSynced() bool {
+ if c.initialSyncTimedout.Load() {
+ return true
+ }
+ return c.queue.HasSynced()
+}
+
+func (c *Controller) informersSynced() bool {
+ return false
}
diff --git a/sail/pkg/trustbundle/trustbundle.go
b/sail/pkg/trustbundle/trustbundle.go
index 202e9011..33964cf1 100644
--- a/sail/pkg/trustbundle/trustbundle.go
+++ b/sail/pkg/trustbundle/trustbundle.go
@@ -21,6 +21,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"k8s.io/klog/v2"
"sort"
"strings"
@@ -30,7 +31,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/spiffe"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
meshconfig "istio.io/api/mesh/v1alpha1"
)
diff --git a/sail/pkg/xds/ads.go b/sail/pkg/xds/ads.go
index 5a71bd04..5591c2e6 100644
--- a/sail/pkg/xds/ads.go
+++ b/sail/pkg/xds/ads.go
@@ -18,13 +18,19 @@
package xds
import (
+ "context"
"github.com/apache/dubbo-kubernetes/pkg/xds"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "time"
)
-type DeltaDiscoveryStream =
discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
+type (
+ DiscoveryStream = xds.DiscoveryStream
+ DeltaDiscoveryStream =
discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
+ DeltaDiscoveryClient =
discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient
+)
type Connection struct {
xds.Connection
@@ -36,6 +42,11 @@ type Connection struct {
ids []string
}
+type Event struct {
+ pushRequest *model.PushRequest
+ done func()
+}
+
func (conn *Connection) XdsConnection() *xds.Connection {
return &conn.Connection
}
@@ -43,3 +54,51 @@ func (conn *Connection) XdsConnection() *xds.Connection {
func (conn *Connection) Proxy() *model.Proxy {
return conn.proxy
}
+
+func (s *DiscoveryServer) DeltaAggregatedResources(stream
discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
+ return s.StreamDeltas(stream)
+}
+
+func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream)
error {
+ return s.Stream(stream)
+}
+
+func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy,
error) {
+ return nil, nil
+}
+
+func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection,
identities []string) error {
+ return nil
+}
+
+func (s *DiscoveryServer) closeConnection(con *Connection) {
+ if con.ID() == "" {
+ return
+ }
+}
+
+func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
+ return nil
+}
+
+func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error {
+ if s.RequestRateLimit.Limit() == 0 {
+ // Allow opt out when rate limiting is set to 0qps
+ return nil
+ }
+ // Give a bit of time for queue to clear out, but if not fail fast.
Client will connect to another
+ // instance in best case, or retry with backoff.
+ wait, cancel := context.WithTimeout(ctx, time.Second)
+ defer cancel()
+ return s.RequestRateLimit.Wait(wait)
+}
+
+func newConnection(peerAddr string, stream DiscoveryStream) *Connection {
+ return &Connection{
+ Connection: xds.NewConnection(peerAddr, stream),
+ }
+}
+
+func (conn *Connection) watchedResourcesByOrder() []*model.WatchedResource {
+ return nil
+}
diff --git a/sail/pkg/xds/auth.go b/sail/pkg/xds/auth.go
new file mode 100644
index 00000000..02780d17
--- /dev/null
+++ b/sail/pkg/xds/auth.go
@@ -0,0 +1,9 @@
+package xds
+
+import (
+ "context"
+)
+
+func (s *DiscoveryServer) authenticate(ctx context.Context) ([]string, error) {
+ return nil, nil
+}
diff --git a/sail/pkg/xds/delta.go b/sail/pkg/xds/delta.go
new file mode 100644
index 00000000..82064a64
--- /dev/null
+++ b/sail/pkg/xds/delta.go
@@ -0,0 +1,198 @@
+package xds
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/xds"
+ dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ v3 "github.com/apache/dubbo-kubernetes/sail/pkg/xds/v3"
+ discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
+ "k8s.io/klog/v2"
+)
+
+func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
+ ctx := stream.Context()
+ peerAddr := "0.0.0.0"
+ if peerInfo, ok := peer.FromContext(ctx); ok {
+ peerAddr = peerInfo.Addr.String()
+ }
+
+ if err := s.WaitForRequestLimit(stream.Context()); err != nil {
+ klog.Warningf("ADS: %q exceeded rate limit: %v", peerAddr, err)
+ return status.Errorf(codes.ResourceExhausted, "request rate
limit exceeded: %v", err)
+ }
+
+ ids, err := s.authenticate(ctx)
+ if err != nil {
+ return status.Error(codes.Unauthenticated, err.Error())
+ }
+ if ids != nil {
+ klog.V(2).Infof("Authenticated XDS: %v with identity %v",
peerAddr, ids)
+ } else {
+ klog.V(2).Infof("Unauthenticated XDS: %v", peerAddr)
+ }
+
+ // InitContext returns immediately if the context was already
initialized.
+ s.globalPushContext().InitContext(s.Env, nil, nil)
+ con := newDeltaConnection(peerAddr, stream)
+
+ // Do not call: defer close(con.pushChannel). The push channel will be
garbage collected
+ // when the connection is no longer used. Closing the channel can cause
subtle race conditions
+ // with push. According to the spec: "It's only necessary to close a
channel when it is important
+ // to tell the receiving goroutines that all data have been sent."
+
+ // Block until either a request is received or a push is triggered.
+ // We need 2 go routines because 'read' blocks in Recv().
+ go s.receiveDelta(con, ids)
+
+ // Wait for the proxy to be fully initialized before we start serving
traffic. Because
+ // initialization doesn't have dependencies that will block, there is
no need to add any timeout
+ // here. Prior to this explicit wait, we were implicitly waiting by
receive() not sending to
+ // reqChannel and the connection not being enqueued for pushes to
pushChannel until the
+ // initialization is complete.
+ <-con.InitializedCh()
+
+ for {
+ // Go select{} statements are not ordered; the same channel can
be chosen many times.
+ // For requests, these are higher priority (client may be
blocked on startup until these are done)
+ // and often very cheap to handle (simple ACK), so we check it
first.
+ select {
+ case req, ok := <-con.deltaReqChan:
+ if ok {
+ if err := s.processDeltaRequest(req, con); err
!= nil {
+ return err
+ }
+ } else {
+ // Remote side closed connection or error
processing the request.
+ return <-con.ErrorCh()
+ }
+ case <-con.StopCh():
+ return nil
+ default:
+ }
+ // If there wasn't already a request, poll for requests and
pushes. Note: if we have a huge
+ // amount of incoming requests, we may still send some pushes,
as we do not `continue` above;
+ // however, requests will be handled ~2x as much as pushes.
This ensures a wave of requests
+ // cannot completely starve pushes. However, this scenario is
unlikely.
+ select {
+ case req, ok := <-con.deltaReqChan:
+ if ok {
+ if err := s.processDeltaRequest(req, con); err
!= nil {
+ return err
+ }
+ } else {
+ // Remote side closed connection or error
processing the request.
+ return <-con.ErrorCh()
+ }
+ case ev := <-con.PushCh():
+ pushEv := ev.(*Event)
+ err := s.pushConnectionDelta(con, pushEv)
+ pushEv.done()
+ if err != nil {
+ return err
+ }
+ case <-con.StopCh():
+ return nil
+ }
+ }
+}
+
+func (s *DiscoveryServer) receiveDelta(con *Connection, identities []string) {
+ defer func() {
+ close(con.deltaReqChan)
+ close(con.ErrorCh())
+ // Close the initialized channel, if its not already closed, to
prevent blocking the stream
+ select {
+ case <-con.InitializedCh():
+ default:
+ close(con.InitializedCh())
+ }
+ }()
+ firstRequest := true
+ for {
+ req, err := con.deltaStream.Recv()
+ if err != nil {
+ if dubbogrpc.GRPCErrorType(err) !=
dubbogrpc.UnexpectedError {
+ klog.Infof("ADS: %q %s terminated", con.Peer(),
con.ID())
+ return
+ }
+ con.ErrorCh() <- err
+ klog.Errorf("ADS: %q %s terminated with error: %v",
con.Peer(), con.ID(), err)
+ return
+ }
+ // This should be only set for the first request. The node id
may not be set - for example malicious clients.
+ if firstRequest {
+ // probe happens before envoy sends first xDS request
+ if req.TypeUrl == v3.HealthInfoType {
+ klog.Warningf("ADS: %q %s send health check
probe before normal xDS request", con.Peer(), con.ID())
+ continue
+ }
+ firstRequest = false
+ if req.Node == nil || req.Node.Id == "" {
+ con.ErrorCh() <-
status.New(codes.InvalidArgument, "missing node information").Err()
+ return
+ }
+ if err := s.initConnection(req.Node, con, identities);
err != nil {
+ con.ErrorCh() <- err
+ return
+ }
+ defer s.closeConnection(con)
+ klog.Infof("ADS: new delta connection for node:%s",
con.ID())
+ }
+
+ select {
+ case con.deltaReqChan <- req:
+ case <-con.deltaStream.Context().Done():
+ klog.Infof("ADS: %q %s terminated with stream closed",
con.Peer(), con.ID())
+ return
+ }
+ }
+}
+
+func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event)
error {
+ pushRequest := pushEv.pushRequest
+
+ if pushRequest.Full {
+ // Update Proxy with current information.
+ s.computeProxyState(con.proxy, pushRequest)
+ }
+
+ pushRequest, needsPush := s.ProxyNeedsPush(con.proxy, pushRequest)
+ if !needsPush {
+ klog.V(2).Infof("Skipping push to %v, no updates required",
con.ID())
+ return nil
+ }
+
+ // Send pushes to all generators
+ // Each Generator is responsible for determining if the push event
requires a push
+ wrl := con.watchedResourcesByOrder()
+ for _, w := range wrl {
+ if err := s.pushDeltaXds(con, w, pushRequest); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (s *DiscoveryServer) processDeltaRequest(req
*discovery.DeltaDiscoveryRequest, con *Connection) error {
+ return nil
+}
+
+func (s *DiscoveryServer) computeProxyState(proxy *model.Proxy, request
*model.PushRequest) {
+ return
+}
+
+func (s *DiscoveryServer) pushDeltaXds(con *Connection, w
*model.WatchedResource, req *model.PushRequest) error {
+ return nil
+}
+
+func newDeltaConnection(peerAddr string, stream DeltaDiscoveryStream)
*Connection {
+ return &Connection{
+ Connection: xds.NewConnection(peerAddr, nil),
+ deltaStream: stream,
+ deltaReqChan: make(chan *discovery.DeltaDiscoveryRequest, 1),
+ }
+}
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index 9b51f9e5..a4c58ed8 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -21,7 +21,10 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"go.uber.org/atomic"
+ "golang.org/x/time/rate"
+ "google.golang.org/grpc"
"k8s.io/klog/v2"
"time"
)
@@ -36,6 +39,8 @@ type DiscoveryServer struct {
krtDebugger *krt.DebugHandler
InboundUpdates *atomic.Int64
CommittedUpdates *atomic.Int64
+ RequestRateLimit *rate.Limiter
+ ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest)
(*model.PushRequest, bool)
}
func NewDiscoveryServer(env *model.Environment, clusterAliases
map[string]string, debugger *krt.DebugHandler) *DiscoveryServer {
@@ -51,6 +56,11 @@ func NewDiscoveryServer(env *model.Environment,
clusterAliases map[string]string
return out
}
+func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
+ // Register v3 server
+ discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
+}
+
func (s *DiscoveryServer) CachesSynced() {
klog.Infof("All caches have been synced up in %v, marking server
ready", time.Since(s.DiscoveryStartTime))
s.serverReady.Store(true)
@@ -59,3 +69,7 @@ func (s *DiscoveryServer) CachesSynced() {
func (s *DiscoveryServer) Shutdown() {
s.pushQueue.ShutDown()
}
+
+func (s *DiscoveryServer) globalPushContext() *model.PushContext {
+ return s.Env.PushContext()
+}
diff --git a/sail/pkg/xds/v3/model.go b/sail/pkg/xds/v3/model.go
index 790de20c..38aad5fb 100644
--- a/sail/pkg/xds/v3/model.go
+++ b/sail/pkg/xds/v3/model.go
@@ -3,9 +3,14 @@ package v3
import "github.com/apache/dubbo-kubernetes/pkg/model"
const (
- ClusterType = model.ClusterType
- ListenerType = model.ListenerType
- EndpointType = model.EndpointType
- RouteType = model.RouteType
- DebugType = model.DebugType
+ ClusterType = model.ClusterType
+ ListenerType = model.ListenerType
+ EndpointType = model.EndpointType
+ RouteType = model.RouteType
+ DebugType = model.DebugType
+ HealthInfoType = model.HealthInfoType
)
+
+func GetShortType(typeURL string) string {
+ return model.GetShortType(typeURL)
+}
diff --git a/security/pkg/pki/ra/common.go b/security/pkg/pki/ra/common.go
index 3ce5b20a..41e6da3e 100644
--- a/security/pkg/pki/ra/common.go
+++ b/security/pkg/pki/ra/common.go
@@ -22,7 +22,7 @@ import (
"crypto/x509"
"encoding/asn1"
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/util/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
raerror "github.com/apache/dubbo-kubernetes/security/pkg/pki/error"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/util"