This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.7 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 3601f45ca13aa93d30286db66d23769c70c56a36 Author: jasonawang <[email protected]> AuthorDate: Mon Feb 20 20:27:26 2023 +0800 [#627] fix(operator): support specifying custom ports (#629) ### What changes were proposed in this pull request? Set coordinator/shuffler server's container port to the fields of RSS spec ### Why are the changes needed? Fix #627. ### Does this PR introduce _any_ user-facing change? For RSS cluster admin, they can set custom ports for shuffle servers and coordinators. ### How was this patch tested? Manually verified. --- .../operator/pkg/controller/constants/constants.go | 4 - .../pkg/controller/sync/coordinator/coordinator.go | 10 +- .../sync/coordinator/coordinator_test.go | 184 ++++++++++++++++---- .../controller/sync/shuffleserver/shuffleserver.go | 12 +- .../sync/shuffleserver/shuffleserver_test.go | 188 ++++++++++++++++----- 5 files changed, 305 insertions(+), 93 deletions(-) diff --git a/deploy/kubernetes/operator/pkg/controller/constants/constants.go b/deploy/kubernetes/operator/pkg/controller/constants/constants.go index 7c74d8bd..8bda1ca6 100644 --- a/deploy/kubernetes/operator/pkg/controller/constants/constants.go +++ b/deploy/kubernetes/operator/pkg/controller/constants/constants.go @@ -18,10 +18,6 @@ package constants const ( - // ContainerShuffleServerRPCPort indicates rpc port used in shuffle server containers. - ContainerShuffleServerRPCPort int32 = 19997 - // ContainerShuffleServerHTTPPort indicates http port used in shuffle server containers. - ContainerShuffleServerHTTPPort int32 = 19996 // ContainerCoordinatorRPCPort indicates rpc port used in coordinator containers. ContainerCoordinatorRPCPort int32 = 19997 // ContainerCoordinatorHTTPPort indicates http port used in coordinator containers. diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go index a3ff5e58..0fccc65f 100644 --- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go +++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go @@ -281,7 +281,7 @@ func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string { for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ { name := GenerateNameByIndex(rss, i) serviceName := appendHeadless(name) - current := fmt.Sprintf("%v:%v", serviceName, controllerconstants.ContainerShuffleServerRPCPort) + current := fmt.Sprintf("%v:%v", serviceName, *rss.Spec.Coordinator.RPCPort) names = append(names, current) } return strings.Join(names, ",") @@ -312,11 +312,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort { ports := []corev1.ContainerPort{ { - ContainerPort: controllerconstants.ContainerCoordinatorRPCPort, + ContainerPort: *rss.Spec.Coordinator.RPCPort, Protocol: corev1.ProtocolTCP, }, { - ContainerPort: controllerconstants.ContainerCoordinatorHTTPPort, + ContainerPort: *rss.Spec.Coordinator.HTTPPort, Protocol: corev1.ProtocolTCP, }, } @@ -329,11 +329,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev env := []corev1.EnvVar{ { Name: controllerconstants.CoordinatorRPCPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10), + Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10), }, { Name: controllerconstants.CoordinatorHTTPPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10), + Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10), }, { Name: controllerconstants.XmxSizeEnv, diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go index 6df2216b..361839b9 100644 --- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go +++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go @@ -36,7 +36,9 @@ import ( ) const ( - testRuntimeClassName = "test-runtime" + testRuntimeClassName = "test-runtime" + testRPCPort int32 = 19990 + testHTTPPort int32 = 19991 ) // IsValidDeploy checks generated deployment, returns whether it is valid and error message. @@ -86,6 +88,57 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService { return rss } +func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService { + rss := utils.BuildRSSWithDefaultValue() + rss.Spec.Coordinator.RPCPort = pointer.Int32(testRPCPort) + return rss +} + +func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService { + rss := utils.BuildRSSWithDefaultValue() + rss.Spec.Coordinator.HTTPPort = pointer.Int32(testHTTPPort) + return rss +} + +func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar { + return []corev1.EnvVar{ + { + Name: controllerconstants.CoordinatorRPCPortEnv, + Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10), + }, + { + Name: controllerconstants.CoordinatorHTTPPortEnv, + Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10), + }, + { + Name: controllerconstants.XmxSizeEnv, + Value: rss.Spec.Coordinator.XmxSize, + }, + { + Name: controllerconstants.ServiceNameEnv, + Value: controllerconstants.CoordinatorServiceName, + }, + { + Name: controllerconstants.NodeNameEnv, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "spec.nodeName", + }, + }, + }, + { + Name: controllerconstants.RssIPEnv, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + }, + } +} + func TestGenerateDeploy(t *testing.T) { for _, tt := range []struct { name string @@ -145,42 +198,7 @@ func TestGenerateDeploy(t *testing.T) { rss: buildRssWithCustomENVs(), IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) ( valid bool, err error) { - expectENVs := []corev1.EnvVar{ - { - Name: controllerconstants.CoordinatorRPCPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10), - }, - { - Name: controllerconstants.CoordinatorHTTPPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10), - }, - { - Name: controllerconstants.XmxSizeEnv, - Value: rss.Spec.Coordinator.XmxSize, - }, - { - Name: controllerconstants.ServiceNameEnv, - Value: controllerconstants.CoordinatorServiceName, - }, - { - Name: controllerconstants.NodeNameEnv, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "spec.nodeName", - }, - }, - }, - { - Name: controllerconstants.RssIPEnv, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "status.podIP", - }, - }, - }, - } + expectENVs := buildCommonExpectedENVs(rss) defaultEnvNames := sets.NewString() for i := range expectENVs { defaultEnvNames.Insert(expectENVs[i].Name) @@ -202,6 +220,94 @@ func TestGenerateDeploy(t *testing.T) { return }, }, + { + name: "set custom rpc port used by coordinator", + rss: buildRssWithCustomRPCPort(), + IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) ( + valid bool, err error) { + // check envs + expectENVs := buildCommonExpectedENVs(rss) + for i := range expectENVs { + if expectENVs[i].Name == controllerconstants.CoordinatorRPCPortEnv { + expectENVs[i].Value = strconv.FormatInt(int64(testRPCPort), 10) + } + } + actualENVs := deploy.Spec.Template.Spec.Containers[0].Env + valid = reflect.DeepEqual(expectENVs, actualENVs) + if !valid { + actualEnvBody, _ := json.Marshal(actualENVs) + expectEnvBody, _ := json.Marshal(expectENVs) + err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v", + string(actualEnvBody), string(expectEnvBody)) + return + } + + // check ports + expectPorts := []corev1.ContainerPort{ + { + ContainerPort: testRPCPort, + Protocol: corev1.ProtocolTCP, + }, + { + ContainerPort: *rss.Spec.Coordinator.HTTPPort, + Protocol: corev1.ProtocolTCP, + }, + } + actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports + valid = reflect.DeepEqual(expectPorts, actualPorts) + if !valid { + actualPortsBody, _ := json.Marshal(actualPorts) + expectPortsBody, _ := json.Marshal(expectPorts) + err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v", + string(actualPortsBody), string(expectPortsBody)) + } + return + }, + }, + { + name: "set custom http port used by coordinator", + rss: buildRssWithCustomHTTPPort(), + IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) ( + valid bool, err error) { + // check envs + expectENVs := buildCommonExpectedENVs(rss) + for i := range expectENVs { + if expectENVs[i].Name == controllerconstants.CoordinatorHTTPPortEnv { + expectENVs[i].Value = strconv.FormatInt(int64(testHTTPPort), 10) + } + } + actualENVs := deploy.Spec.Template.Spec.Containers[0].Env + valid = reflect.DeepEqual(expectENVs, actualENVs) + if !valid { + actualEnvBody, _ := json.Marshal(actualENVs) + expectEnvBody, _ := json.Marshal(expectENVs) + err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v", + string(actualEnvBody), string(expectEnvBody)) + return + } + + // check ports + expectPorts := []corev1.ContainerPort{ + { + ContainerPort: *rss.Spec.Coordinator.RPCPort, + Protocol: corev1.ProtocolTCP, + }, + { + ContainerPort: testHTTPPort, + Protocol: corev1.ProtocolTCP, + }, + } + actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports + valid = reflect.DeepEqual(expectPorts, actualPorts) + if !valid { + actualPortsBody, _ := json.Marshal(actualPorts) + expectPortsBody, _ := json.Marshal(expectPorts) + err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v", + string(actualPortsBody), string(expectPortsBody)) + } + return + }, + }, } { t.Run(tt.name, func(tc *testing.T) { deploy := GenerateDeploy(tt.rss, 0) @@ -269,4 +375,8 @@ func TestGenerateAddresses(t *testing.T) { rss := buildRssWithLabels() quorum := GenerateAddresses(rss) assertion.Contains(quorum, "headless") + + rss = buildRssWithCustomRPCPort() + quorum = GenerateAddresses(rss) + assertion.Contains(quorum, strconv.FormatInt(int64(testRPCPort), 10)) } diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go index 43a57be8..cb77d0af 100644 --- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go +++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go @@ -136,9 +136,9 @@ func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) *appsv1.StatefulSet constants.AnnotationRssName: rss.Name, constants.AnnotationRssUID: string(rss.UID), constants.AnnotationMetricsServerPort: fmt.Sprintf("%v", - controllerconstants.ContainerShuffleServerHTTPPort), + *rss.Spec.ShuffleServer.HTTPPort), constants.AnnotationShuffleServerPort: fmt.Sprintf("%v", - controllerconstants.ContainerShuffleServerRPCPort), + *rss.Spec.ShuffleServer.RPCPort), }, }, Spec: podSpec, @@ -217,11 +217,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort { ports := []corev1.ContainerPort{ { - ContainerPort: controllerconstants.ContainerShuffleServerRPCPort, + ContainerPort: *rss.Spec.ShuffleServer.RPCPort, Protocol: corev1.ProtocolTCP, }, { - ContainerPort: controllerconstants.ContainerShuffleServerHTTPPort, + ContainerPort: *rss.Spec.ShuffleServer.HTTPPort, Protocol: corev1.ProtocolTCP, }, } @@ -234,11 +234,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev env := []corev1.EnvVar{ { Name: controllerconstants.ShuffleServerRPCPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10), + Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10), }, { Name: controllerconstants.ShuffleServerHTTPPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 10), + Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10), }, { Name: controllerconstants.RSSCoordinatorQuorumEnv, diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go index a1c6576c..80472b12 100644 --- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go +++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go @@ -36,7 +36,9 @@ import ( ) const ( - testRuntimeClassName = "test-runtime" + testRuntimeClassName = "test-runtime" + testRPCPort int32 = 19998 + testHTTPPort int32 = 19999 ) // IsValidSts checks generated statefulSet, returns whether it is valid and error message. @@ -91,6 +93,61 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService { return rss } +func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService { + rss := utils.BuildRSSWithDefaultValue() + rss.Spec.ShuffleServer.RPCPort = pointer.Int32(testRPCPort) + return rss +} + +func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService { + rss := utils.BuildRSSWithDefaultValue() + rss.Spec.ShuffleServer.HTTPPort = pointer.Int32(testHTTPPort) + return rss +} + +func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar { + return []corev1.EnvVar{ + { + Name: controllerconstants.ShuffleServerRPCPortEnv, + Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10), + }, + { + Name: controllerconstants.ShuffleServerHTTPPortEnv, + Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10), + }, + { + Name: controllerconstants.RSSCoordinatorQuorumEnv, + Value: coordinator.GenerateAddresses(rss), + }, + { + Name: controllerconstants.XmxSizeEnv, + Value: rss.Spec.ShuffleServer.XmxSize, + }, + { + Name: controllerconstants.ServiceNameEnv, + Value: controllerconstants.ShuffleServerServiceName, + }, + { + Name: controllerconstants.NodeNameEnv, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "spec.nodeName", + }, + }, + }, + { + Name: controllerconstants.RssIPEnv, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + }, + } +} + func TestGenerateSts(t *testing.T) { for _, tt := range []struct { name string @@ -148,46 +205,7 @@ func TestGenerateSts(t *testing.T) { name: "set custom environment variables", rss: buildRssWithCustomENVs(), IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (valid bool, err error) { - expectENVs := []corev1.EnvVar{ - { - Name: controllerconstants.ShuffleServerRPCPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10), - }, - { - Name: controllerconstants.ShuffleServerHTTPPortEnv, - Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 10), - }, - { - Name: controllerconstants.RSSCoordinatorQuorumEnv, - Value: coordinator.GenerateAddresses(rss), - }, - { - Name: controllerconstants.XmxSizeEnv, - Value: rss.Spec.ShuffleServer.XmxSize, - }, - { - Name: controllerconstants.ServiceNameEnv, - Value: controllerconstants.ShuffleServerServiceName, - }, - { - Name: controllerconstants.NodeNameEnv, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "spec.nodeName", - }, - }, - }, - { - Name: controllerconstants.RssIPEnv, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "status.podIP", - }, - }, - }, - } + expectENVs := buildCommonExpectedENVs(rss) defaultEnvNames := sets.NewString() for i := range expectENVs { defaultEnvNames.Insert(expectENVs[i].Name) @@ -209,6 +227,94 @@ func TestGenerateSts(t *testing.T) { return }, }, + { + name: "set custom rpc port used by shuffle server", + rss: buildRssWithCustomRPCPort(), + IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) ( + valid bool, err error) { + // check envs + expectENVs := buildCommonExpectedENVs(rss) + for i := range expectENVs { + if expectENVs[i].Name == controllerconstants.ShuffleServerRPCPortEnv { + expectENVs[i].Value = strconv.FormatInt(int64(testRPCPort), 10) + } + } + actualENVs := sts.Spec.Template.Spec.Containers[0].Env + valid = reflect.DeepEqual(expectENVs, actualENVs) + if !valid { + actualEnvBody, _ := json.Marshal(actualENVs) + expectEnvBody, _ := json.Marshal(expectENVs) + err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v", + string(actualEnvBody), string(expectEnvBody)) + return + } + + // check ports + expectPorts := []corev1.ContainerPort{ + { + ContainerPort: testRPCPort, + Protocol: corev1.ProtocolTCP, + }, + { + ContainerPort: *rss.Spec.ShuffleServer.HTTPPort, + Protocol: corev1.ProtocolTCP, + }, + } + actualPorts := sts.Spec.Template.Spec.Containers[0].Ports + valid = reflect.DeepEqual(expectPorts, actualPorts) + if !valid { + actualPortsBody, _ := json.Marshal(actualPorts) + expectPortsBody, _ := json.Marshal(expectPorts) + err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v", + string(actualPortsBody), string(expectPortsBody)) + } + return + }, + }, + { + name: "set custom http port used by shuffle server", + rss: buildRssWithCustomHTTPPort(), + IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) ( + valid bool, err error) { + // check envs + expectENVs := buildCommonExpectedENVs(rss) + for i := range expectENVs { + if expectENVs[i].Name == controllerconstants.ShuffleServerHTTPPortEnv { + expectENVs[i].Value = strconv.FormatInt(int64(testHTTPPort), 10) + } + } + actualENVs := sts.Spec.Template.Spec.Containers[0].Env + valid = reflect.DeepEqual(expectENVs, actualENVs) + if !valid { + actualEnvBody, _ := json.Marshal(actualENVs) + expectEnvBody, _ := json.Marshal(expectENVs) + err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v", + string(actualEnvBody), string(expectEnvBody)) + return + } + + // check ports + expectPorts := []corev1.ContainerPort{ + { + ContainerPort: *rss.Spec.ShuffleServer.RPCPort, + Protocol: corev1.ProtocolTCP, + }, + { + ContainerPort: testHTTPPort, + Protocol: corev1.ProtocolTCP, + }, + } + actualPorts := sts.Spec.Template.Spec.Containers[0].Ports + valid = reflect.DeepEqual(expectPorts, actualPorts) + if !valid { + actualPortsBody, _ := json.Marshal(actualPorts) + expectPortsBody, _ := json.Marshal(expectPorts) + err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v", + string(actualPortsBody), string(expectPortsBody)) + } + return + }, + }, } { t.Run(tt.name, func(tc *testing.T) { sts := GenerateSts(tt.rss)
