This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 28fd03acd [MINOR] feat(operator): Add support to pass annotations to
coordinator nodeport/headless service (#2187)
28fd03acd is described below
commit 28fd03acd8aca62016d5b2f88b4e98d48a4c6e98
Author: shlomi tubul <[email protected]>
AuthorDate: Mon Oct 21 06:05:49 2024 +0300
[MINOR] feat(operator): Add support to pass annotations to coordinator
nodeport/headless service (#2187)
### What changes were proposed in this pull request?
Add support to pass a list of annotations per coordinator service (node
port/headless).
### Why are the changes needed?
Some tools use service annotations to create DNS records and currently is
not supported.
### Does this PR introduce _any_ user-facing change?
Introduce 2 new optional fields in CRD:
1. headlessServiceAnnotations
2. nodePortServiceAnnotations
### How was this patch tested?
UT + manually
---
.../uniffle/v1alpha1/remoteshuffleservice_types.go | 8 ++++
.../api/uniffle/v1alpha1/zz_generated.deepcopy.go | 26 +++++++++++++
.../uniffle.apache.org_remoteshuffleservices.yaml | 16 ++++++++
.../pkg/controller/sync/coordinator/coordinator.go | 27 +++++++++++--
.../sync/coordinator/coordinator_test.go | 45 ++++++++++++++++++++++
5 files changed, 118 insertions(+), 4 deletions(-)
diff --git
a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
index 2903f0932..8ec293e96 100644
---
a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
+++
b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
@@ -106,6 +106,14 @@ type CoordinatorConfig struct {
// HTTPNodePort defines http port of node port service used for
coordinators' external access.
// +optional
HTTPNodePort []int32 `json:"httpNodePort,omitempty"`
+
+ // NodePortServiceAnnotations is a list of annotations for the NodePort
service.
+ // +optional
+ NodePortServiceAnnotations []map[string]string
`json:"nodePortServiceAnnotations,omitempty"`
+
+ // HeadlessServiceAnnotations is a list of annotations for the headless
service.
+ // +optional
+ HeadlessServiceAnnotations []map[string]string
`json:"headlessServiceAnnotations,omitempty"`
}
// ShuffleServerConfig records configuration used to generate workload of
shuffle servers.
diff --git
a/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go
b/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go
index c9e15d7a5..da70877ba 100644
--- a/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go
+++ b/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go
@@ -99,6 +99,32 @@ func (in *CoordinatorConfig) DeepCopyInto(out
*CoordinatorConfig) {
*out = make([]int32, len(*in))
copy(*out, *in)
}
+ if in.NodePortServiceAnnotations != nil {
+ in, out := &in.NodePortServiceAnnotations,
&out.NodePortServiceAnnotations
+ *out = make([]map[string]string, len(*in))
+ for i := range *in {
+ if (*in)[i] != nil {
+ in, out := &(*in)[i], &(*out)[i]
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ }
+ }
+ if in.HeadlessServiceAnnotations != nil {
+ in, out := &in.HeadlessServiceAnnotations,
&out.HeadlessServiceAnnotations
+ *out = make([]map[string]string, len(*in))
+ for i := range *in {
+ if (*in)[i] != nil {
+ in, out := &(*in)[i], &(*out)[i]
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ }
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new CoordinatorConfig.
diff --git
a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
index 46e3e23b8..e36e46b40 100644
---
a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
+++
b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
@@ -1786,6 +1786,14 @@ spec:
description: ExcludeNodesFilePath indicates exclude nodes
file
path in coordinators' containers.
type: string
+ headlessServiceAnnotations:
+ description: HeadlessServiceAnnotations is a list of
annotations
+ for the headless service.
+ items:
+ additionalProperties:
+ type: string
+ type: object
+ type: array
hostNetwork:
default: true
description: HostNetwork indicates whether we need to
enable host
@@ -1827,6 +1835,14 @@ spec:
description: LogHostPath represents host path used to save
logs
of shuffle servers.
type: string
+ nodePortServiceAnnotations:
+ description: NodePortServiceAnnotations is a list of
annotations
+ for the NodePort service.
+ items:
+ additionalProperties:
+ type: string
+ type: object
+ type: array
nodeSelector:
additionalProperties:
type: string
diff --git
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
index 84b6c84a6..b134d6ca4 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -105,10 +105,19 @@ func GenerateHeadlessSvc(rss
*unifflev1alpha1.RemoteShuffleService, index int) *
name := GenerateNameByIndex(rss, index)
serviceName := appendHeadless(name)
+ annotations := map[string]string{}
+
+ if len(rss.Spec.Coordinator.HeadlessServiceAnnotations) > index {
+ for key, value := range
rss.Spec.Coordinator.HeadlessServiceAnnotations[index] {
+ annotations[key] = value
+ }
+ }
+
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
- Name: serviceName,
- Namespace: rss.Namespace,
+ Name: serviceName,
+ Namespace: rss.Namespace,
+ Annotations: annotations,
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
@@ -140,10 +149,20 @@ func GenerateHeadlessSvc(rss
*unifflev1alpha1.RemoteShuffleService, index int) *
// this function is skipped.
func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int)
*corev1.Service {
name := GenerateNameByIndex(rss, index)
+
+ annotations := map[string]string{}
+
+ if len(rss.Spec.Coordinator.NodePortServiceAnnotations) > index {
+ for key, value := range
rss.Spec.Coordinator.NodePortServiceAnnotations[index] {
+ annotations[key] = value
+ }
+ }
+
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: rss.Namespace,
+ Name: name,
+ Namespace: rss.Namespace,
+ Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
diff --git
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
index 22caf5fc4..ebfdb3cf5 100644
---
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
+++
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
@@ -138,8 +138,24 @@ var (
"key1": "value1",
"key2": "value2",
}
+
+ testSvcAnnotationsList = []map[string]string{
+ {
+ "annotation1": "value1",
+ },
+ {
+ "annotation2": "value2",
+ },
+ }
)
+func buildRssWithSvcAnnotations() *uniffleapi.RemoteShuffleService {
+ rss := utils.BuildRSSWithDefaultValue()
+ rss.Spec.Coordinator.NodePortServiceAnnotations = testSvcAnnotationsList
+ rss.Spec.Coordinator.HeadlessServiceAnnotations = testSvcAnnotationsList
+ return rss
+}
+
func buildRssWithLabels() *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.Labels = testLabels
@@ -546,6 +562,35 @@ func TestGenerateSvcForCoordinator(t *testing.T) {
}
}
+func TestGenerateSvcWithAnnotationsForCoordinator(t *testing.T) {
+ for _, tt := range []struct {
+ name string
+ rss *uniffleapi.RemoteShuffleService
+ expectedAnnotations []map[string]string
+ }{
+ {
+ name: "nodeport and headless services with annotations",
+ rss: buildRssWithSvcAnnotations(),
+ expectedAnnotations: []map[string]string{
+ {"annotation1": "value1"},
+ {"annotation1": "value1"},
+ {"annotation2": "value2"},
+ {"annotation2": "value2"}},
+ },
+ } {
+ t.Run(tt.name, func(tc *testing.T) {
+ _, _, services, _ := GenerateCoordinators(tt.rss)
+
+ for i, svc := range services {
+ match :=
reflect.DeepEqual(tt.expectedAnnotations[i], svc.Annotations)
+ if !match {
+ tc.Errorf("unexpected annotations: %v,
expected: %v", svc.Annotations, tt.expectedAnnotations[i])
+ }
+ }
+ })
+ }
+}
+
func TestGenerateAddresses(t *testing.T) {
assertion := assert.New(t)
rss := buildRssWithLabels()