This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5cba4e77 [ISSUE #525][operator] refine svc creations (#530)
5cba4e77 is described below
commit 5cba4e77618ebb0d7dd057ca6e23513c33d8a918
Author: advancedxy <[email protected]>
AuthorDate: Mon Feb 6 15:08:36 2023 +0800
[ISSUE #525][operator] refine svc creations (#530)
### What changes were proposed in this pull request?
1. generate headless svc for coordinators and pass this service to shuffle
servers
2. makes RPCNodePort/HTTPNodePort optional for coordinators
3. remove service creation for shuffle servers.
### Why are the changes needed?
This fixes #525
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
It has already been manually verified, and will add more UTs.
---
.../uniffle/v1alpha1/remoteshuffleservice_types.go | 2 +
.../uniffle.apache.org_remoteshuffleservices.yaml | 2 -
.../operator/pkg/controller/controller/rss.go | 11 ++-
.../pkg/controller/sync/coordinator/coordinator.go | 58 ++++++++++--
.../sync/coordinator/coordinator_test.go | 60 ++++++++++++
.../controller/sync/shuffleserver/shuffleserver.go | 103 +--------------------
.../operator/pkg/webhook/inspector/rss.go | 6 +-
.../operator/pkg/webhook/inspector/rss_test.go | 61 ++++++++++++
8 files changed, 187 insertions(+), 116 deletions(-)
diff --git
a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
index 041dec9b..adf8aab8 100644
---
a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
+++
b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
@@ -96,9 +96,11 @@ type CoordinatorConfig struct {
ExcludeNodesFilePath string `json:"excludeNodesFilePath,omitempty"`
// RPCNodePort defines rpc port of node port service used for
coordinators' external access.
+ // +optional
RPCNodePort []int32 `json:"rpcNodePort"`
// HTTPNodePort defines http port of node port service used for
coordinators' external access.
+ // +optional
HTTPNodePort []int32 `json:"httpNodePort"`
}
diff --git
a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
index ba86981d..6cf6234d 100644
---
a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
+++
b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
@@ -3279,9 +3279,7 @@ spec:
type: string
required:
- configDir
- - httpNodePort
- image
- - rpcNodePort
- xmxSize
type: object
shuffleServer:
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/rss.go
b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
index 78834845..4083e082 100644
--- a/deploy/kubernetes/operator/pkg/controller/controller/rss.go
+++ b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
@@ -618,16 +618,17 @@ func (r *rssController) syncShuffleServer(rss
*unifflev1alpha1.RemoteShuffleServ
if rss.Status.Phase == unifflev1alpha1.RSSRunning &&
!*rss.Spec.ShuffleServer.Sync {
return nil
}
- serviceAccount, services, statefulSet :=
shuffleserver.GenerateShuffleServers(rss)
+ // we don't need to generate svc for shuffle servers:
+ // shuffle servers are access directly through coordinator's shuffler
assignments. service for shuffle server is
+ // pointless. For spark apps running in the cluster, executor
containers could access shuffler server via container
+ // network(overlay or host network). If shuffle servers should be
exposed to external, host network should be used
+ // and external executor should access the host node ip:port directly.
+ serviceAccount, statefulSet := shuffleserver.GenerateShuffleServers(rss)
if err := kubeutil.SyncServiceAccount(r.kubeClient, serviceAccount);
err != nil {
klog.Errorf("sync SA (%v) for rss (%v) failed: %v",
utils.UniqueName(serviceAccount),
utils.UniqueName(rss), err)
return err
}
- if err := kubeutil.SyncServices(r.kubeClient, services); err != nil {
- klog.Errorf("sync SVCs for rss (%v) failed: %v",
utils.UniqueName(rss), err)
- return err
- }
if _, _, err := kubeutil.SyncStatefulSet(r.kubeClient, statefulSet,
true); err != nil {
klog.Errorf("sync StatefulSet for rss (%v) failed: %v",
utils.UniqueName(rss), err)
return err
diff --git
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
index 47e2d3ac..a3ff5e58 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -54,13 +54,18 @@ func GenerateCoordinators(rss
*unifflev1alpha1.RemoteShuffleService) (
sa := GenerateSA(rss)
cm := GenerateCM(rss)
count := *rss.Spec.Coordinator.Count
- services := make([]*corev1.Service, count)
+ services := make([]*corev1.Service, 0)
deployments := make([]*appsv1.Deployment, count)
for i := 0; i < int(count); i++ {
- svc := GenerateSvc(rss, i)
+ // only generate svc when nodePorts are specified
+ if len(rss.Spec.Coordinator.RPCNodePort) > 0 {
+ svc := GenerateSvc(rss, i)
+ services = append(services, svc)
+ }
+ headlessSvc := GenerateHeadlessSvc(rss, i)
deploy := GenerateDeploy(rss, i)
- services[i] = svc
deployments[i] = deploy
+ services = append(services, headlessSvc)
}
return sa, cm, services, deployments
}
@@ -95,7 +100,43 @@ func GenerateCM(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.ConfigMap {
return cm
}
-// GenerateSvc generates service used by specific coordinator.
+// GenerateHeadlessSvc generates a headless service for corresponding
coordinator.
+func GenerateHeadlessSvc(rss *unifflev1alpha1.RemoteShuffleService, index int)
*corev1.Service {
+ name := GenerateNameByIndex(rss, index)
+ serviceName := appendHeadless(name)
+
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: serviceName,
+ Namespace: rss.Namespace,
+ },
+ Spec: corev1.ServiceSpec{
+ ClusterIP: corev1.ClusterIPNone,
+ Selector: map[string]string{
+ "app": name,
+ },
+ Ports: []corev1.ServicePort{
+ {
+ Name: "rpc",
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerCoordinatorRPCPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.Coordinator.RPCPort)),
+ },
+ {
+ Name: "http",
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerCoordinatorHTTPPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.Coordinator.HTTPPort)),
+ },
+ },
+ },
+ }
+ util.AddOwnerReference(&svc.ObjectMeta, rss)
+ return svc
+}
+
+// GenerateSvc generates NodePort service used by specific coordinator. If no
RPCNodePort/HTTPNodePort is specified,
+// this function is skipped.
func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int)
*corev1.Service {
name := GenerateNameByIndex(rss, index)
svc := &corev1.Service{
@@ -230,12 +271,17 @@ func GenerateNameByIndex(rss
*unifflev1alpha1.RemoteShuffleService, index int) s
return fmt.Sprintf("%v-%v-%v", constants.RSSCoordinator, rss.Name,
index)
}
+func appendHeadless(name string) string {
+ return name + "-headless"
+}
+
// GenerateAddresses returns addresses of coordinators accessed by shuffle
servers.
func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
var names []string
for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
- current := fmt.Sprintf("%v:%v", GenerateNameByIndex(rss, i),
- controllerconstants.ContainerShuffleServerRPCPort)
+ name := GenerateNameByIndex(rss, i)
+ serviceName := appendHeadless(name)
+ current := fmt.Sprintf("%v:%v", serviceName,
controllerconstants.ContainerShuffleServerRPCPort)
names = append(names, current)
}
return strings.Join(names, ",")
diff --git
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
index 6c16eebc..6df2216b 100644
---
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
+++
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
@@ -24,6 +24,7 @@ import (
"strconv"
"testing"
+ "github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
@@ -210,3 +211,62 @@ func TestGenerateDeploy(t *testing.T) {
})
}
}
+
+// generateServiceCuntMap generates a map with service type and its
corresponding count. The headless svc is treated
+// differently: the service type for headless is treated as an empty service.
+func generateServiceCountMap(services []*corev1.Service)
map[corev1.ServiceType]int {
+ result := make(map[corev1.ServiceType]int)
+ var empty corev1.ServiceType
+ for _, service := range services {
+ sType := service.Spec.Type
+ if (sType == corev1.ServiceTypeClusterIP || sType == empty) &&
service.Spec.ClusterIP == corev1.ClusterIPNone {
+ result[empty]++
+ } else {
+ result[service.Spec.Type]++
+ }
+ }
+ return result
+}
+
+func TestGenerateSvcForCoordinator(t *testing.T) {
+ for _, tt := range []struct {
+ name string
+ rss *uniffleapi.RemoteShuffleService
+ serviceCntMap map[corev1.ServiceType]int
+ }{
+ {
+ name: "with RPCNodePort",
+ rss: buildRssWithLabels(),
+ serviceCntMap: map[corev1.ServiceType]int{
+ "": 2, // defaults to
headless service
+ corev1.ServiceTypeNodePort: 2,
+ },
+ },
+ {
+ name: "without RPCNodePort",
+ rss: func() *uniffleapi.RemoteShuffleService {
+ withoutRPCNodePortRss := buildRssWithLabels()
+
withoutRPCNodePortRss.Spec.Coordinator.RPCNodePort = make([]int32, 0)
+
withoutRPCNodePortRss.Spec.Coordinator.HTTPNodePort = make([]int32, 0)
+ return withoutRPCNodePortRss
+ }(),
+ serviceCntMap: map[corev1.ServiceType]int{
+ "": 2,
+ },
+ },
+ } {
+ t.Run(tt.name, func(t *testing.T) {
+ assertion := assert.New(t)
+ _, _, services, _ := GenerateCoordinators(tt.rss)
+ result := generateServiceCountMap(services)
+ assertion.Equal(tt.serviceCntMap, result)
+ })
+ }
+}
+
+func TestGenerateAddresses(t *testing.T) {
+ assertion := assert.New(t)
+ rss := buildRssWithLabels()
+ quorum := GenerateAddresses(rss)
+ assertion.Contains(quorum, "headless")
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
index e590f756..43a57be8 100644
---
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
+++
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -26,7 +26,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"
@@ -52,18 +51,10 @@ func init() {
}
// GenerateShuffleServers generates objects related to shuffle servers.
-func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService) (
- *corev1.ServiceAccount, []*corev1.Service, *appsv1.StatefulSet) {
+func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService)
(*corev1.ServiceAccount, *appsv1.StatefulSet) {
sa := GenerateSA(rss)
- var services []*corev1.Service
- if needGenerateHeadlessSVC(rss) {
- services = append(services, GenerateHeadlessSVC(rss))
- }
- if needGenerateNodePortSVC(rss) {
- services = append(services, GenerateNodePortSVC(rss))
- }
sts := GenerateSts(rss)
- return sa, services, sts
+ return sa, sts
}
// GenerateSA generates service account of shuffle servers.
@@ -78,76 +69,6 @@ func GenerateSA(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.ServiceAccoun
return sa
}
-// GenerateHeadlessSVC generates headless service used by shuffle servers.
-func GenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.Service {
- name := generateHeadlessSVCName(rss)
- svc := &corev1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: rss.Namespace,
- },
- Spec: corev1.ServiceSpec{
- ClusterIP: corev1.ClusterIPNone,
- Selector: map[string]string{
- "app": GenerateName(rss),
- },
- },
- }
- if rss.Spec.ShuffleServer.RPCPort != nil {
- svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
- Name: "rpc",
- Protocol: corev1.ProtocolTCP,
- Port:
controllerconstants.ContainerShuffleServerRPCPort,
- TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
- })
- }
- if rss.Spec.ShuffleServer.HTTPPort != nil {
- svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
- Name: "http",
- Protocol: corev1.ProtocolTCP,
- Port:
controllerconstants.ContainerShuffleServerHTTPPort,
- TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
- })
- }
- util.AddOwnerReference(&svc.ObjectMeta, rss)
- return svc
-}
-
-// GenerateNodePortSVC generates nodePort service used by shuffle servers.
-func GenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.Service {
- name := GenerateName(rss)
- svc := &corev1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: rss.Namespace,
- },
- Spec: corev1.ServiceSpec{
- Type: corev1.ServiceTypeNodePort,
- Selector: map[string]string{
- "app": name,
- },
- },
- }
- if needNodePortForRPC(rss) {
- svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
- Protocol: corev1.ProtocolTCP,
- Port:
controllerconstants.ContainerShuffleServerRPCPort,
- TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
- NodePort: *rss.Spec.ShuffleServer.RPCNodePort,
- })
- }
- if needNodePortForHTTP(rss) {
- svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
- Protocol: corev1.ProtocolTCP,
- Port:
controllerconstants.ContainerShuffleServerHTTPPort,
- TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
- NodePort: *rss.Spec.ShuffleServer.HTTPNodePort,
- })
- }
- util.AddOwnerReference(&svc.ObjectMeta, rss)
- return svc
-}
-
// getReplicas returns replicas of shuffle servers.
func getReplicas(rss *unifflev1alpha1.RemoteShuffleService) *int32 {
// TODO: we will support hpa for rss object,
@@ -357,23 +278,3 @@ func generateMainContainerENV(rss
*unifflev1alpha1.RemoteShuffleService) []corev
}
return env
}
-
-// needGenerateNodePortSVC returns whether we need node port service for
shuffle servers.
-func needGenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
- return needNodePortForRPC(rss) || needNodePortForHTTP(rss)
-}
-
-// needGenerateHeadlessSVC returns whether we need headless service for
shuffle servers.
-func needGenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
- return rss.Spec.ShuffleServer.RPCPort != nil ||
rss.Spec.ShuffleServer.HTTPPort != nil
-}
-
-// needNodePortForRPC returns whether we need node port service for rpc
service of shuffle servers.
-func needNodePortForRPC(rss *unifflev1alpha1.RemoteShuffleService) bool {
- return rss.Spec.ShuffleServer.RPCPort != nil &&
rss.Spec.ShuffleServer.RPCNodePort != nil
-}
-
-// needNodePortForRPC returns whether we need node port service for http
service of shuffle servers.
-func needNodePortForHTTP(rss *unifflev1alpha1.RemoteShuffleService) bool {
- return rss.Spec.ShuffleServer.HTTPPort != nil &&
rss.Spec.ShuffleServer.HTTPNodePort != nil
-}
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
index ce1033c4..c641fba5 100644
--- a/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
@@ -156,8 +156,10 @@ func generateRSSPatches(ar *admissionv1.AdmissionReview,
// validateCoordinator validates configurations for coordinators.
func validateCoordinator(coordinator *unifflev1alpha1.CoordinatorConfig) error
{
- if len(coordinator.RPCNodePort) != int(*coordinator.Count) ||
- len(coordinator.HTTPNodePort) != int(*coordinator.Count) {
+ // number of RPCNodePort must equal with number of HTTPNodePort
+ if len(coordinator.RPCNodePort) != len(coordinator.HTTPNodePort) ||
+ // RPCNodePort/HTTPNodePort could be zero
+ (len(coordinator.HTTPNodePort) > 0 &&
len(coordinator.HTTPNodePort) != int(*coordinator.Count)) {
return fmt.Errorf("invalid number of http or rpc node ports
(%v/%v) <> (%v)",
len(coordinator.RPCNodePort),
len(coordinator.HTTPNodePort), *coordinator.Count)
}
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go
b/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go
index 101a3e4c..e0f3dce6 100644
--- a/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go
@@ -21,6 +21,7 @@ import (
"encoding/json"
"testing"
+ "github.com/stretchr/testify/assert"
admissionv1 "k8s.io/api/admission/v1"
nodev1 "k8s.io/api/node/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -294,3 +295,63 @@ func buildTestRuntimeClass() *nodev1.RuntimeClass {
Handler: "/etc/runtime/bin",
}
}
+
+func TestValidateCoordinator(t *testing.T) {
+ for _, tt := range []struct {
+ name string
+ coordinator *uniffleapi.CoordinatorConfig
+ allowed bool
+ }{
+ {
+ name: "empty RPCNodePort",
+ coordinator: &uniffleapi.CoordinatorConfig{
+ Count: pointer.Int32(2),
+ },
+ allowed: true,
+ },
+ {
+ name: "same number of RPCNodePort and HTTPNodePort",
+ coordinator: &uniffleapi.CoordinatorConfig{
+ Count: pointer.Int32(2),
+ RPCNodePort: []int32{19996, 19997},
+ HTTPNodePort: []int32{19996, 19997},
+ },
+ allowed: true,
+ },
+ {
+ name: "different number of RPCNodePort and
HTTPNodePort",
+ coordinator: &uniffleapi.CoordinatorConfig{
+ Count: pointer.Int32(2),
+ RPCNodePort: []int32{19996, 19997, 19998},
+ HTTPNodePort: []int32{19991, 19992},
+ },
+ allowed: false,
+ },
+ {
+ name: "same number of RPCNodePort and HTTPNodePort but
with different coordinator count",
+ coordinator: &uniffleapi.CoordinatorConfig{
+ Count: pointer.Int32(1),
+ RPCNodePort: []int32{19996, 19997},
+ HTTPNodePort: []int32{19991, 19992},
+ },
+ allowed: false,
+ },
+ } {
+ t.Run(tt.name, func(t *testing.T) {
+ assertion := assert.New(t)
+ tt.coordinator.ExcludeNodesFilePath = "/exclude_nodes"
+ tt.coordinator.CommonConfig = &uniffleapi.CommonConfig{
+ RSSPodSpec: &uniffleapi.RSSPodSpec{
+ LogHostPath: "",
+ HostPathMounts: map[string]string{},
+ },
+ }
+ err := validateCoordinator(tt.coordinator)
+ if tt.allowed {
+ assertion.Nil(err, "expected allowed, but got
error: %v", err)
+ } else {
+ assertion.Error(err, "expected denied, but got
accepted")
+ }
+ })
+ }
+}