This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b31a5c9f [#627] fix(operator): support specifying custom ports (#629)
b31a5c9f is described below

commit b31a5c9fff01948bce159f9c95cb58cc95e2f9cb
Author: jasonawang <[email protected]>
AuthorDate: Mon Feb 20 20:27:26 2023 +0800

    [#627] fix(operator): support specifying custom ports (#629)
    
    ### What changes were proposed in this pull request?
    Set coordinator/shuffler server's container port to the fields of RSS spec
    
    ### Why are the changes needed?
    Fix #627.
    
    ### Does this PR introduce _any_ user-facing change?
    For RSS cluster admin, they can set custom ports for shuffle servers and 
coordinators.
    
    ### How was this patch tested?
    Manually verified.
---
 .../operator/pkg/controller/constants/constants.go |   4 -
 .../pkg/controller/sync/coordinator/coordinator.go |  10 +-
 .../sync/coordinator/coordinator_test.go           | 184 ++++++++++++++++----
 .../controller/sync/shuffleserver/shuffleserver.go |  12 +-
 .../sync/shuffleserver/shuffleserver_test.go       | 188 ++++++++++++++++-----
 5 files changed, 305 insertions(+), 93 deletions(-)

diff --git a/deploy/kubernetes/operator/pkg/controller/constants/constants.go 
b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
index 7c74d8bd..8bda1ca6 100644
--- a/deploy/kubernetes/operator/pkg/controller/constants/constants.go
+++ b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
@@ -18,10 +18,6 @@
 package constants
 
 const (
-       // ContainerShuffleServerRPCPort indicates rpc port used in shuffle 
server containers.
-       ContainerShuffleServerRPCPort int32 = 19997
-       // ContainerShuffleServerHTTPPort indicates http port used in shuffle 
server containers.
-       ContainerShuffleServerHTTPPort int32 = 19996
        // ContainerCoordinatorRPCPort indicates rpc port used in coordinator 
containers.
        ContainerCoordinatorRPCPort int32 = 19997
        // ContainerCoordinatorHTTPPort indicates http port used in coordinator 
containers.
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go 
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
index a3ff5e58..0fccc65f 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -281,7 +281,7 @@ func GenerateAddresses(rss 
*unifflev1alpha1.RemoteShuffleService) string {
        for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
                name := GenerateNameByIndex(rss, i)
                serviceName := appendHeadless(name)
-               current := fmt.Sprintf("%v:%v", serviceName, 
controllerconstants.ContainerShuffleServerRPCPort)
+               current := fmt.Sprintf("%v:%v", serviceName, 
*rss.Spec.Coordinator.RPCPort)
                names = append(names, current)
        }
        return strings.Join(names, ",")
@@ -312,11 +312,11 @@ func generateMainContainer(rss 
*unifflev1alpha1.RemoteShuffleService) *corev1.Co
 func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) 
[]corev1.ContainerPort {
        ports := []corev1.ContainerPort{
                {
-                       ContainerPort: 
controllerconstants.ContainerCoordinatorRPCPort,
+                       ContainerPort: *rss.Spec.Coordinator.RPCPort,
                        Protocol:      corev1.ProtocolTCP,
                },
                {
-                       ContainerPort: 
controllerconstants.ContainerCoordinatorHTTPPort,
+                       ContainerPort: *rss.Spec.Coordinator.HTTPPort,
                        Protocol:      corev1.ProtocolTCP,
                },
        }
@@ -329,11 +329,11 @@ func generateMainContainerENV(rss 
*unifflev1alpha1.RemoteShuffleService) []corev
        env := []corev1.EnvVar{
                {
                        Name:  controllerconstants.CoordinatorRPCPortEnv,
-                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
                },
                {
                        Name:  controllerconstants.CoordinatorHTTPPortEnv,
-                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
                },
                {
                        Name:  controllerconstants.XmxSizeEnv,
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
 
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
index 6df2216b..361839b9 100644
--- 
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
+++ 
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
@@ -36,7 +36,9 @@ import (
 )
 
 const (
-       testRuntimeClassName = "test-runtime"
+       testRuntimeClassName       = "test-runtime"
+       testRPCPort          int32 = 19990
+       testHTTPPort         int32 = 19991
 )
 
 // IsValidDeploy checks generated deployment, returns whether it is valid and 
error message.
@@ -86,6 +88,57 @@ func buildRssWithCustomENVs() 
*uniffleapi.RemoteShuffleService {
        return rss
 }
 
+func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
+       rss := utils.BuildRSSWithDefaultValue()
+       rss.Spec.Coordinator.RPCPort = pointer.Int32(testRPCPort)
+       return rss
+}
+
+func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
+       rss := utils.BuildRSSWithDefaultValue()
+       rss.Spec.Coordinator.HTTPPort = pointer.Int32(testHTTPPort)
+       return rss
+}
+
+func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) 
[]corev1.EnvVar {
+       return []corev1.EnvVar{
+               {
+                       Name:  controllerconstants.CoordinatorRPCPortEnv,
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
+               },
+               {
+                       Name:  controllerconstants.CoordinatorHTTPPortEnv,
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
+               },
+               {
+                       Name:  controllerconstants.XmxSizeEnv,
+                       Value: rss.Spec.Coordinator.XmxSize,
+               },
+               {
+                       Name:  controllerconstants.ServiceNameEnv,
+                       Value: controllerconstants.CoordinatorServiceName,
+               },
+               {
+                       Name: controllerconstants.NodeNameEnv,
+                       ValueFrom: &corev1.EnvVarSource{
+                               FieldRef: &corev1.ObjectFieldSelector{
+                                       APIVersion: "v1",
+                                       FieldPath:  "spec.nodeName",
+                               },
+                       },
+               },
+               {
+                       Name: controllerconstants.RssIPEnv,
+                       ValueFrom: &corev1.EnvVarSource{
+                               FieldRef: &corev1.ObjectFieldSelector{
+                                       APIVersion: "v1",
+                                       FieldPath:  "status.podIP",
+                               },
+                       },
+               },
+       }
+}
+
 func TestGenerateDeploy(t *testing.T) {
        for _, tt := range []struct {
                name string
@@ -145,42 +198,7 @@ func TestGenerateDeploy(t *testing.T) {
                        rss:  buildRssWithCustomENVs(),
                        IsValidDeploy: func(deploy *appsv1.Deployment, rss 
*uniffleapi.RemoteShuffleService) (
                                valid bool, err error) {
-                               expectENVs := []corev1.EnvVar{
-                                       {
-                                               Name:  
controllerconstants.CoordinatorRPCPortEnv,
-                                               Value: 
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
-                                       },
-                                       {
-                                               Name:  
controllerconstants.CoordinatorHTTPPortEnv,
-                                               Value: 
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
-                                       },
-                                       {
-                                               Name:  
controllerconstants.XmxSizeEnv,
-                                               Value: 
rss.Spec.Coordinator.XmxSize,
-                                       },
-                                       {
-                                               Name:  
controllerconstants.ServiceNameEnv,
-                                               Value: 
controllerconstants.CoordinatorServiceName,
-                                       },
-                                       {
-                                               Name: 
controllerconstants.NodeNameEnv,
-                                               ValueFrom: &corev1.EnvVarSource{
-                                                       FieldRef: 
&corev1.ObjectFieldSelector{
-                                                               APIVersion: 
"v1",
-                                                               FieldPath:  
"spec.nodeName",
-                                                       },
-                                               },
-                                       },
-                                       {
-                                               Name: 
controllerconstants.RssIPEnv,
-                                               ValueFrom: &corev1.EnvVarSource{
-                                                       FieldRef: 
&corev1.ObjectFieldSelector{
-                                                               APIVersion: 
"v1",
-                                                               FieldPath:  
"status.podIP",
-                                                       },
-                                               },
-                                       },
-                               }
+                               expectENVs := buildCommonExpectedENVs(rss)
                                defaultEnvNames := sets.NewString()
                                for i := range expectENVs {
                                        
defaultEnvNames.Insert(expectENVs[i].Name)
@@ -202,6 +220,94 @@ func TestGenerateDeploy(t *testing.T) {
                                return
                        },
                },
+               {
+                       name: "set custom rpc port used by coordinator",
+                       rss:  buildRssWithCustomRPCPort(),
+                       IsValidDeploy: func(deploy *appsv1.Deployment, rss 
*uniffleapi.RemoteShuffleService) (
+                               valid bool, err error) {
+                               // check envs
+                               expectENVs := buildCommonExpectedENVs(rss)
+                               for i := range expectENVs {
+                                       if expectENVs[i].Name == 
controllerconstants.CoordinatorRPCPortEnv {
+                                               expectENVs[i].Value = 
strconv.FormatInt(int64(testRPCPort), 10)
+                                       }
+                               }
+                               actualENVs := 
deploy.Spec.Template.Spec.Containers[0].Env
+                               valid = reflect.DeepEqual(expectENVs, 
actualENVs)
+                               if !valid {
+                                       actualEnvBody, _ := 
json.Marshal(actualENVs)
+                                       expectEnvBody, _ := 
json.Marshal(expectENVs)
+                                       err = fmt.Errorf("unexpected 
ENVs:\n%v,\nexpected:\n%v",
+                                               string(actualEnvBody), 
string(expectEnvBody))
+                                       return
+                               }
+
+                               // check ports
+                               expectPorts := []corev1.ContainerPort{
+                                       {
+                                               ContainerPort: testRPCPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                                       {
+                                               ContainerPort: 
*rss.Spec.Coordinator.HTTPPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                               }
+                               actualPorts := 
deploy.Spec.Template.Spec.Containers[0].Ports
+                               valid = reflect.DeepEqual(expectPorts, 
actualPorts)
+                               if !valid {
+                                       actualPortsBody, _ := 
json.Marshal(actualPorts)
+                                       expectPortsBody, _ := 
json.Marshal(expectPorts)
+                                       err = fmt.Errorf("unexpected 
Ports:\n%v,\nexpected:\n%v",
+                                               string(actualPortsBody), 
string(expectPortsBody))
+                               }
+                               return
+                       },
+               },
+               {
+                       name: "set custom http port used by coordinator",
+                       rss:  buildRssWithCustomHTTPPort(),
+                       IsValidDeploy: func(deploy *appsv1.Deployment, rss 
*uniffleapi.RemoteShuffleService) (
+                               valid bool, err error) {
+                               // check envs
+                               expectENVs := buildCommonExpectedENVs(rss)
+                               for i := range expectENVs {
+                                       if expectENVs[i].Name == 
controllerconstants.CoordinatorHTTPPortEnv {
+                                               expectENVs[i].Value = 
strconv.FormatInt(int64(testHTTPPort), 10)
+                                       }
+                               }
+                               actualENVs := 
deploy.Spec.Template.Spec.Containers[0].Env
+                               valid = reflect.DeepEqual(expectENVs, 
actualENVs)
+                               if !valid {
+                                       actualEnvBody, _ := 
json.Marshal(actualENVs)
+                                       expectEnvBody, _ := 
json.Marshal(expectENVs)
+                                       err = fmt.Errorf("unexpected 
ENVs:\n%v,\nexpected:\n%v",
+                                               string(actualEnvBody), 
string(expectEnvBody))
+                                       return
+                               }
+
+                               // check ports
+                               expectPorts := []corev1.ContainerPort{
+                                       {
+                                               ContainerPort: 
*rss.Spec.Coordinator.RPCPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                                       {
+                                               ContainerPort: testHTTPPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                               }
+                               actualPorts := 
deploy.Spec.Template.Spec.Containers[0].Ports
+                               valid = reflect.DeepEqual(expectPorts, 
actualPorts)
+                               if !valid {
+                                       actualPortsBody, _ := 
json.Marshal(actualPorts)
+                                       expectPortsBody, _ := 
json.Marshal(expectPorts)
+                                       err = fmt.Errorf("unexpected 
Ports:\n%v,\nexpected:\n%v",
+                                               string(actualPortsBody), 
string(expectPortsBody))
+                               }
+                               return
+                       },
+               },
        } {
                t.Run(tt.name, func(tc *testing.T) {
                        deploy := GenerateDeploy(tt.rss, 0)
@@ -269,4 +375,8 @@ func TestGenerateAddresses(t *testing.T) {
        rss := buildRssWithLabels()
        quorum := GenerateAddresses(rss)
        assertion.Contains(quorum, "headless")
+
+       rss = buildRssWithCustomRPCPort()
+       quorum = GenerateAddresses(rss)
+       assertion.Contains(quorum, strconv.FormatInt(int64(testRPCPort), 10))
 }
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
index 43a57be8..cb77d0af 100644
--- 
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
+++ 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -136,9 +136,9 @@ func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) 
*appsv1.StatefulSet
                                                constants.AnnotationRssName: 
rss.Name,
                                                constants.AnnotationRssUID:  
string(rss.UID),
                                                
constants.AnnotationMetricsServerPort: fmt.Sprintf("%v",
-                                                       
controllerconstants.ContainerShuffleServerHTTPPort),
+                                                       
*rss.Spec.ShuffleServer.HTTPPort),
                                                
constants.AnnotationShuffleServerPort: fmt.Sprintf("%v",
-                                                       
controllerconstants.ContainerShuffleServerRPCPort),
+                                                       
*rss.Spec.ShuffleServer.RPCPort),
                                        },
                                },
                                Spec: podSpec,
@@ -217,11 +217,11 @@ func generateMainContainer(rss 
*unifflev1alpha1.RemoteShuffleService) *corev1.Co
 func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) 
[]corev1.ContainerPort {
        ports := []corev1.ContainerPort{
                {
-                       ContainerPort: 
controllerconstants.ContainerShuffleServerRPCPort,
+                       ContainerPort: *rss.Spec.ShuffleServer.RPCPort,
                        Protocol:      corev1.ProtocolTCP,
                },
                {
-                       ContainerPort: 
controllerconstants.ContainerShuffleServerHTTPPort,
+                       ContainerPort: *rss.Spec.ShuffleServer.HTTPPort,
                        Protocol:      corev1.ProtocolTCP,
                },
        }
@@ -234,11 +234,11 @@ func generateMainContainerENV(rss 
*unifflev1alpha1.RemoteShuffleService) []corev
        env := []corev1.EnvVar{
                {
                        Name:  controllerconstants.ShuffleServerRPCPortEnv,
-                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10),
                },
                {
                        Name:  controllerconstants.ShuffleServerHTTPPortEnv,
-                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 
10),
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10),
                },
                {
                        Name:  controllerconstants.RSSCoordinatorQuorumEnv,
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
index a1c6576c..80472b12 100644
--- 
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
+++ 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
@@ -36,7 +36,9 @@ import (
 )
 
 const (
-       testRuntimeClassName = "test-runtime"
+       testRuntimeClassName       = "test-runtime"
+       testRPCPort          int32 = 19998
+       testHTTPPort         int32 = 19999
 )
 
 // IsValidSts checks generated statefulSet, returns whether it is valid and 
error message.
@@ -91,6 +93,61 @@ func buildRssWithCustomENVs() 
*uniffleapi.RemoteShuffleService {
        return rss
 }
 
+func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
+       rss := utils.BuildRSSWithDefaultValue()
+       rss.Spec.ShuffleServer.RPCPort = pointer.Int32(testRPCPort)
+       return rss
+}
+
+func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
+       rss := utils.BuildRSSWithDefaultValue()
+       rss.Spec.ShuffleServer.HTTPPort = pointer.Int32(testHTTPPort)
+       return rss
+}
+
+func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) 
[]corev1.EnvVar {
+       return []corev1.EnvVar{
+               {
+                       Name:  controllerconstants.ShuffleServerRPCPortEnv,
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10),
+               },
+               {
+                       Name:  controllerconstants.ShuffleServerHTTPPortEnv,
+                       Value: 
strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10),
+               },
+               {
+                       Name:  controllerconstants.RSSCoordinatorQuorumEnv,
+                       Value: coordinator.GenerateAddresses(rss),
+               },
+               {
+                       Name:  controllerconstants.XmxSizeEnv,
+                       Value: rss.Spec.ShuffleServer.XmxSize,
+               },
+               {
+                       Name:  controllerconstants.ServiceNameEnv,
+                       Value: controllerconstants.ShuffleServerServiceName,
+               },
+               {
+                       Name: controllerconstants.NodeNameEnv,
+                       ValueFrom: &corev1.EnvVarSource{
+                               FieldRef: &corev1.ObjectFieldSelector{
+                                       APIVersion: "v1",
+                                       FieldPath:  "spec.nodeName",
+                               },
+                       },
+               },
+               {
+                       Name: controllerconstants.RssIPEnv,
+                       ValueFrom: &corev1.EnvVarSource{
+                               FieldRef: &corev1.ObjectFieldSelector{
+                                       APIVersion: "v1",
+                                       FieldPath:  "status.podIP",
+                               },
+                       },
+               },
+       }
+}
+
 func TestGenerateSts(t *testing.T) {
        for _, tt := range []struct {
                name string
@@ -148,46 +205,7 @@ func TestGenerateSts(t *testing.T) {
                        name: "set custom environment variables",
                        rss:  buildRssWithCustomENVs(),
                        IsValidSts: func(sts *appsv1.StatefulSet, rss 
*uniffleapi.RemoteShuffleService) (valid bool, err error) {
-                               expectENVs := []corev1.EnvVar{
-                                       {
-                                               Name:  
controllerconstants.ShuffleServerRPCPortEnv,
-                                               Value: 
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
-                                       },
-                                       {
-                                               Name:  
controllerconstants.ShuffleServerHTTPPortEnv,
-                                               Value: 
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 
10),
-                                       },
-                                       {
-                                               Name:  
controllerconstants.RSSCoordinatorQuorumEnv,
-                                               Value: 
coordinator.GenerateAddresses(rss),
-                                       },
-                                       {
-                                               Name:  
controllerconstants.XmxSizeEnv,
-                                               Value: 
rss.Spec.ShuffleServer.XmxSize,
-                                       },
-                                       {
-                                               Name:  
controllerconstants.ServiceNameEnv,
-                                               Value: 
controllerconstants.ShuffleServerServiceName,
-                                       },
-                                       {
-                                               Name: 
controllerconstants.NodeNameEnv,
-                                               ValueFrom: &corev1.EnvVarSource{
-                                                       FieldRef: 
&corev1.ObjectFieldSelector{
-                                                               APIVersion: 
"v1",
-                                                               FieldPath:  
"spec.nodeName",
-                                                       },
-                                               },
-                                       },
-                                       {
-                                               Name: 
controllerconstants.RssIPEnv,
-                                               ValueFrom: &corev1.EnvVarSource{
-                                                       FieldRef: 
&corev1.ObjectFieldSelector{
-                                                               APIVersion: 
"v1",
-                                                               FieldPath:  
"status.podIP",
-                                                       },
-                                               },
-                                       },
-                               }
+                               expectENVs := buildCommonExpectedENVs(rss)
                                defaultEnvNames := sets.NewString()
                                for i := range expectENVs {
                                        
defaultEnvNames.Insert(expectENVs[i].Name)
@@ -209,6 +227,94 @@ func TestGenerateSts(t *testing.T) {
                                return
                        },
                },
+               {
+                       name: "set custom rpc port used by shuffle server",
+                       rss:  buildRssWithCustomRPCPort(),
+                       IsValidSts: func(sts *appsv1.StatefulSet, rss 
*uniffleapi.RemoteShuffleService) (
+                               valid bool, err error) {
+                               // check envs
+                               expectENVs := buildCommonExpectedENVs(rss)
+                               for i := range expectENVs {
+                                       if expectENVs[i].Name == 
controllerconstants.ShuffleServerRPCPortEnv {
+                                               expectENVs[i].Value = 
strconv.FormatInt(int64(testRPCPort), 10)
+                                       }
+                               }
+                               actualENVs := 
sts.Spec.Template.Spec.Containers[0].Env
+                               valid = reflect.DeepEqual(expectENVs, 
actualENVs)
+                               if !valid {
+                                       actualEnvBody, _ := 
json.Marshal(actualENVs)
+                                       expectEnvBody, _ := 
json.Marshal(expectENVs)
+                                       err = fmt.Errorf("unexpected 
ENVs:\n%v,\nexpected:\n%v",
+                                               string(actualEnvBody), 
string(expectEnvBody))
+                                       return
+                               }
+
+                               // check ports
+                               expectPorts := []corev1.ContainerPort{
+                                       {
+                                               ContainerPort: testRPCPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                                       {
+                                               ContainerPort: 
*rss.Spec.ShuffleServer.HTTPPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                               }
+                               actualPorts := 
sts.Spec.Template.Spec.Containers[0].Ports
+                               valid = reflect.DeepEqual(expectPorts, 
actualPorts)
+                               if !valid {
+                                       actualPortsBody, _ := 
json.Marshal(actualPorts)
+                                       expectPortsBody, _ := 
json.Marshal(expectPorts)
+                                       err = fmt.Errorf("unexpected 
Ports:\n%v,\nexpected:\n%v",
+                                               string(actualPortsBody), 
string(expectPortsBody))
+                               }
+                               return
+                       },
+               },
+               {
+                       name: "set custom http port used by shuffle server",
+                       rss:  buildRssWithCustomHTTPPort(),
+                       IsValidSts: func(sts *appsv1.StatefulSet, rss 
*uniffleapi.RemoteShuffleService) (
+                               valid bool, err error) {
+                               // check envs
+                               expectENVs := buildCommonExpectedENVs(rss)
+                               for i := range expectENVs {
+                                       if expectENVs[i].Name == 
controllerconstants.ShuffleServerHTTPPortEnv {
+                                               expectENVs[i].Value = 
strconv.FormatInt(int64(testHTTPPort), 10)
+                                       }
+                               }
+                               actualENVs := 
sts.Spec.Template.Spec.Containers[0].Env
+                               valid = reflect.DeepEqual(expectENVs, 
actualENVs)
+                               if !valid {
+                                       actualEnvBody, _ := 
json.Marshal(actualENVs)
+                                       expectEnvBody, _ := 
json.Marshal(expectENVs)
+                                       err = fmt.Errorf("unexpected 
ENVs:\n%v,\nexpected:\n%v",
+                                               string(actualEnvBody), 
string(expectEnvBody))
+                                       return
+                               }
+
+                               // check ports
+                               expectPorts := []corev1.ContainerPort{
+                                       {
+                                               ContainerPort: 
*rss.Spec.ShuffleServer.RPCPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                                       {
+                                               ContainerPort: testHTTPPort,
+                                               Protocol:      
corev1.ProtocolTCP,
+                                       },
+                               }
+                               actualPorts := 
sts.Spec.Template.Spec.Containers[0].Ports
+                               valid = reflect.DeepEqual(expectPorts, 
actualPorts)
+                               if !valid {
+                                       actualPortsBody, _ := 
json.Marshal(actualPorts)
+                                       expectPortsBody, _ := 
json.Marshal(expectPorts)
+                                       err = fmt.Errorf("unexpected 
Ports:\n%v,\nexpected:\n%v",
+                                               string(actualPortsBody), 
string(expectPortsBody))
+                               }
+                               return
+                       },
+               },
        } {
                t.Run(tt.name, func(tc *testing.T) {
                        sts := GenerateSts(tt.rss)

Reply via email to