This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d931e8bf [ISSUE-48][FEATURE][FOLLOW UP] Add webhook component (#188)
d931e8bf is described below

commit d931e8bf937982dd6c2f9fed84d65f67659c312a
Author: jasonawang <[email protected]>
AuthorDate: Thu Sep 1 10:33:59 2022 +0800

    [ISSUE-48][FEATURE][FOLLOW UP] Add webhook component (#188)
    
    ### What changes were proposed in this pull request?
    for issue #48
    I add webhook module this time, maybe some logic is missing unit test, I 
will add in the next PR.
    
    ### Why are the changes needed?
    Support K8S
    
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, We will add the doc later..
    
    ### How was this patch tested?
    UT
---
 deploy/kubernetes/operator/PROJECT                 |   4 +-
 deploy/kubernetes/operator/cmd/webhook/main.go     |  44 ++++
 deploy/kubernetes/operator/go.mod                  |  10 +-
 deploy/kubernetes/operator/go.sum                  |  49 ++++
 deploy/kubernetes/operator/hack/revive.toml        |   1 -
 deploy/kubernetes/operator/hack/update-codegen.sh  |   2 +-
 deploy/kubernetes/operator/pkg/.gitkeep            |  16 --
 .../kubernetes/operator/pkg/constants/constants.go |  60 +++++
 deploy/kubernetes/operator/pkg/utils/certs.go      | 230 +++++++++++++++++++
 deploy/kubernetes/operator/pkg/utils/config.go     |  75 +++++++
 .../kubernetes/operator/pkg/utils/coordinator.go   |  54 +++++
 .../operator/pkg/utils/shufflerserver.go           | 101 +++++++++
 deploy/kubernetes/operator/pkg/utils/util.go       | 102 +++++++++
 .../operator/pkg/webhook/config/config.go          | 123 ++++++++++
 .../operator/pkg/webhook/constants/constants.go    |  35 +++
 .../operator/pkg/webhook/inspector/inspector.go    | 118 ++++++++++
 .../operator/pkg/webhook/inspector/pod.go          | 146 ++++++++++++
 .../operator/pkg/webhook/inspector/rss.go          | 157 +++++++++++++
 deploy/kubernetes/operator/pkg/webhook/manager.go  | 222 ++++++++++++++++++
 .../operator/pkg/webhook/manager_test.go           | 142 ++++++++++++
 .../operator/pkg/webhook/syncer/syncer.go          | 248 +++++++++++++++++++++
 .../kubernetes/operator/pkg/webhook/util/patch.go  |  25 +++
 .../kubernetes/operator/pkg/webhook/util/util.go   | 213 ++++++++++++++++++
 23 files changed, 2156 insertions(+), 21 deletions(-)

diff --git a/deploy/kubernetes/operator/PROJECT 
b/deploy/kubernetes/operator/PROJECT
index 3cb854f1..e62a3a53 100644
--- a/deploy/kubernetes/operator/PROJECT
+++ b/deploy/kubernetes/operator/PROJECT
@@ -2,7 +2,7 @@ domain: apache.org
 layout:
 - go.kubebuilder.io/v3
 projectName: operator
-repo: github.com/apache/incubator-uniffle
+repo: github.com/apache/incubator-uniffle/deploy/kubernetes/operator
 resources:
 - api:
     crdVersion: v1
@@ -10,6 +10,6 @@ resources:
   domain: apache.org
   group: uniffle
   kind: Remoteshuffleservice
-  path: github.com/apache/incubator-uniffle/api/v1alpha1
+  path: 
github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1
   version: v1alpha1
 version: "3"
diff --git a/deploy/kubernetes/operator/cmd/webhook/main.go 
b/deploy/kubernetes/operator/cmd/webhook/main.go
new file mode 100644
index 00000000..ef1735c3
--- /dev/null
+++ b/deploy/kubernetes/operator/cmd/webhook/main.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+       "flag"
+
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+)
+
+func main() {
+       klog.InitFlags(nil)
+       cfg := &config.Config{}
+       cfg.AddFlags()
+       flag.Parse()
+
+       cfg.Complete()
+       klog.Infof("run config: %+v", cfg)
+
+       // create an admission webhook manager.
+       am := webhook.NewAdmissionManager(cfg)
+       // start the admission webhook manager.
+       if err := am.Start(cfg.RunCtx); err != nil {
+               klog.Fatalf("start admission webhook failed: %v", err)
+       }
+}
diff --git a/deploy/kubernetes/operator/go.mod 
b/deploy/kubernetes/operator/go.mod
index 2845d777..6121ae4c 100644
--- a/deploy/kubernetes/operator/go.mod
+++ b/deploy/kubernetes/operator/go.mod
@@ -1,11 +1,19 @@
-module github.com/apache/incubator-uniffle
+module github.com/apache/incubator-uniffle/deploy/kubernetes/operator
 
 go 1.16
 
 require (
+       github.com/onsi/ginkgo/v2 v2.1.4
+       github.com/onsi/gomega v1.19.0
+       github.com/parnurzeal/gorequest v0.2.16
+       golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+       gomodules.xyz/jsonpatch/v2 v2.2.0
        k8s.io/api v0.22.1
        k8s.io/apimachinery v0.22.1
        k8s.io/client-go v0.22.1
        k8s.io/code-generator v0.22.1
+       k8s.io/klog/v2 v2.9.0
+       k8s.io/utils v0.0.0-20210802155522-efc7438f0176
+       moul.io/http2curl v1.0.0 // indirect
        sigs.k8s.io/controller-runtime v0.10.0
 )
diff --git a/deploy/kubernetes/operator/go.sum 
b/deploy/kubernetes/operator/go.sum
index c1dc279b..a3e5c1ac 100644
--- a/deploy/kubernetes/operator/go.sum
+++ b/deploy/kubernetes/operator/go.sum
@@ -54,6 +54,7 @@ github.com/benbjohnson/clock v1.0.3/go.mod 
h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiU
 github.com/benbjohnson/clock v1.1.0/go.mod 
h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bgentry/speakeasy v0.1.0/go.mod 
h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
 github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod 
h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
@@ -61,7 +62,9 @@ github.com/blang/semver v3.5.1+incompatible/go.mod 
h1:kRBLl5iJ+tD4TcOOxsy/0fnweb
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod 
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod 
h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
 github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod 
h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
+github.com/cespare/xxhash v1.1.0 
h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod 
h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/cespare/xxhash/v2 v2.1.1 
h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/chzyer/logex v1.1.10/go.mod 
h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod 
h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
@@ -124,6 +127,7 @@ github.com/go-logr/logr v0.1.0/go.mod 
h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7
 github.com/go-logr/logr v0.2.0/go.mod 
h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
 github.com/go-logr/logr v0.4.0 h1:K7/B1jt6fIBQVd4Owv2MqGQClcgf0R266+7C/QjRcLc=
 github.com/go-logr/logr v0.4.0/go.mod 
h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
+github.com/go-logr/zapr v0.4.0 h1:uc1uML3hRYL9/ZZPdgHS/n8Nzo+eaYL/Efxkkamf7OM=
 github.com/go-logr/zapr v0.4.0/go.mod 
h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk=
 github.com/go-openapi/jsonpointer v0.19.3/go.mod 
h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
 github.com/go-openapi/jsonpointer v0.19.5 
h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
@@ -147,6 +151,7 @@ github.com/golang/groupcache 
v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er
 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da 
h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/mock v1.1.1/go.mod 
h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/mock v1.2.0/go.mod 
h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@@ -190,6 +195,7 @@ github.com/google/pprof 
v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI
 github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod 
h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
 github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod 
h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
 github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod 
h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod 
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/renameio v0.1.0/go.mod 
h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.1/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
@@ -229,7 +235,9 @@ github.com/hashicorp/memberlist v0.1.3/go.mod 
h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
 github.com/hashicorp/serf v0.8.2/go.mod 
h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
 github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod 
h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
+github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod 
h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/imdario/mergo v0.3.5/go.mod 
h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
+github.com/imdario/mergo v0.3.12 
h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
 github.com/imdario/mergo v0.3.12/go.mod 
h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/jessevdk/go-flags v1.4.0/go.mod 
h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
@@ -268,6 +276,7 @@ github.com/mailru/easyjson v0.7.6/go.mod 
h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
 github.com/mattn/go-colorable v0.0.9/go.mod 
h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
 github.com/mattn/go-isatty v0.0.3/go.mod 
h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod 
h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/matttproud/golang_protobuf_extensions 
v1.0.2-0.20181231171920-c182affec369 
h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
 github.com/matttproud/golang_protobuf_extensions 
v1.0.2-0.20181231171920-c182affec369/go.mod 
h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
 github.com/miekg/dns v1.0.14/go.mod 
h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
 github.com/mitchellh/cli v1.0.0/go.mod 
h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
@@ -303,12 +312,20 @@ github.com/onsi/ginkgo v1.12.1/go.mod 
h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108
 github.com/onsi/ginkgo v1.14.0/go.mod 
h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
 github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
 github.com/onsi/ginkgo v1.16.4/go.mod 
h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo/v2 v2.1.3/go.mod 
h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/ginkgo/v2 v2.1.4 
h1:GNapqRSid3zijZ9H77KrgVG4/8KqiyRsxcSxe+7ApXY=
+github.com/onsi/ginkgo/v2 v2.1.4/go.mod 
h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
 github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod 
h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v1.7.1/go.mod 
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 github.com/onsi/gomega v1.10.1/go.mod 
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
 github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
 github.com/onsi/gomega v1.15.0/go.mod 
h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
+github.com/onsi/gomega v1.17.0/go.mod 
h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
+github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
+github.com/onsi/gomega v1.19.0/go.mod 
h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
 github.com/opentracing/opentracing-go v1.1.0/go.mod 
h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
+github.com/parnurzeal/gorequest v0.2.16 
h1:T/5x+/4BT+nj+3eSknXmCTnEVGSzFzPGdpqmUVVZXHQ=
+github.com/parnurzeal/gorequest v0.2.16/go.mod 
h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod 
h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.2.0/go.mod 
h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod 
h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
@@ -324,20 +341,24 @@ github.com/prometheus/client_golang v0.9.1/go.mod 
h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
 github.com/prometheus/client_golang v0.9.3/go.mod 
h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
 github.com/prometheus/client_golang v1.0.0/go.mod 
h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.7.1/go.mod 
h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
+github.com/prometheus/client_golang v1.11.0 
h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
 github.com/prometheus/client_golang v1.11.0/go.mod 
h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod 
h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.2.0 
h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
 github.com/prometheus/client_model v0.2.0/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod 
h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
 github.com/prometheus/common v0.4.0/go.mod 
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
 github.com/prometheus/common v0.4.1/go.mod 
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
 github.com/prometheus/common v0.10.0/go.mod 
h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
+github.com/prometheus/common v0.26.0 
h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ=
 github.com/prometheus/common v0.26.0/go.mod 
h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod 
h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
 github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod 
h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.0.2/go.mod 
h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.1.3/go.mod 
h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
+github.com/prometheus/procfs v0.6.0 
h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
 github.com/prometheus/procfs v0.6.0/go.mod 
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 github.com/prometheus/tsdb v0.7.1/go.mod 
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod 
h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@@ -384,6 +405,7 @@ github.com/xiang90/probing 
v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
 github.com/yuin/goldmark v1.1.27/go.mod 
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod 
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.3.5/go.mod 
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yuin/goldmark v1.4.1/go.mod 
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
 go.etcd.io/etcd/api/v3 v3.5.0/go.mod 
h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
@@ -410,12 +432,15 @@ go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod 
h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4
 go.opentelemetry.io/otel/trace v0.20.0/go.mod 
h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod 
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.uber.org/atomic v1.4.0/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.10/go.mod 
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 go.uber.org/multierr v1.1.0/go.mod 
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
 go.uber.org/multierr v1.6.0/go.mod 
h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
 go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
 go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
+go.uber.org/zap v1.19.0 h1:mZQZefskPPCMIBCSEH0v2/iUqqLrYtaeqwD6FUGUnFE=
 go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -426,6 +451,7 @@ golang.org/x/crypto 
v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod 
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod 
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod 
h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod 
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod 
h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -459,6 +485,8 @@ golang.org/x/mod v0.2.0/go.mod 
h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 
h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
+golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod 
h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -489,10 +517,14 @@ golang.org/x/net 
v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
 golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod 
h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod 
h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod 
h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod 
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod 
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod 
h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
 golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 
h1:ADo5wSpq2gqaCGQWzk7S5vd//0iyyLeAratkEoG5dLE=
 golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220225172249-27dd8689420f 
h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
+golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -507,6 +539,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c 
h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -553,13 +586,20 @@ golang.org/x/sys 
v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2 
h1:c8PlLMqBbOHoqtjteWm5/kbe6rNY2pbRfbIMVnepueo=
 golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 
h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs=
+golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod 
h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d 
h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
 golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 
h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -568,6 +608,8 @@ golang.org/x/text v0.3.3/go.mod 
h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -616,11 +658,14 @@ golang.org/x/tools 
v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
 golang.org/x/tools v0.1.2/go.mod 
h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20=
+golang.org/x/tools v0.1.10/go.mod 
h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 
h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gomodules.xyz/jsonpatch/v2 v2.2.0 
h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
 gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod 
h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
 google.golang.org/api v0.4.0/go.mod 
h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.7.0/go.mod 
h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
@@ -728,6 +773,7 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod 
h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
 honnef.co/go/tools v0.0.1-2020.1.3/go.mod 
h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
 k8s.io/api v0.22.1 h1:ISu3tD/jRhYfSW8jI/Q1e+lRxkR7w9UwQEZ7FgslrwY=
 k8s.io/api v0.22.1/go.mod h1:bh13rkTp3F1XEaLGykbyRD2QaTTzPm0e/BMd8ptFONY=
+k8s.io/apiextensions-apiserver v0.22.1 
h1:YSJYzlFNFSfUle+yeEXX0lSQyLEoxoPJySRupepb0gE=
 k8s.io/apiextensions-apiserver v0.22.1/go.mod 
h1:HeGmorjtRmRLE+Q8dJu6AYRoZccvCMsghwS8XTUYb2c=
 k8s.io/apimachinery v0.22.1 h1:DTARnyzmdHMz7bFWFDDm22AM4pLWTQECMpRTFu2d2OM=
 k8s.io/apimachinery v0.22.1/go.mod 
h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
@@ -736,6 +782,7 @@ k8s.io/client-go v0.22.1 
h1:jW0ZSHi8wW260FvcXHkIa0NLxFBQszTlhiAVsU5mopw=
 k8s.io/client-go v0.22.1/go.mod h1:BquC5A4UOo4qVDUtoc04/+Nxp1MeHcVc1HJm1KmG8kk=
 k8s.io/code-generator v0.22.1 h1:zAcKpn+xe9Iyc4qtZlfg4tD0f+SO2h5+e/s4pZPOVhs=
 k8s.io/code-generator v0.22.1/go.mod 
h1:eV77Y09IopzeXOJzndrDyCI88UBok2h6WxAlBwpxa+o=
+k8s.io/component-base v0.22.1 h1:SFqIXsEN3v3Kkr1bS6rstrs1wd45StJqbtgbQ4nRQdo=
 k8s.io/component-base v0.22.1/go.mod 
h1:0D+Bl8rrnsPN9v0dyYvkqFfBeAd4u7n77ze+p8CMiPo=
 k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod 
h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
 k8s.io/gengo v0.0.0-20201214224949-b6c5ce23f027 
h1:Uusb3oh8XcdzDF/ndlI4ToKTYVlkCSJP39SRY2mfRAw=
@@ -749,6 +796,8 @@ k8s.io/kube-openapi 
v0.0.0-20210421082810-95288971da7e/go.mod h1:vHXdDvt9+2spS2R
 k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9/go.mod 
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
 k8s.io/utils v0.0.0-20210802155522-efc7438f0176 
h1:Mx0aa+SUAcNRQbs5jUzV8lkDlGFU8laZsY9jrcVX5SY=
 k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod 
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
+moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8=
+moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE=
 rsc.io/binaryregexp v0.2.0/go.mod 
h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
 rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
diff --git a/deploy/kubernetes/operator/hack/revive.toml 
b/deploy/kubernetes/operator/hack/revive.toml
index 56b8ec01..2db9acdb 100644
--- a/deploy/kubernetes/operator/hack/revive.toml
+++ b/deploy/kubernetes/operator/hack/revive.toml
@@ -29,7 +29,6 @@ warningCode = 1
 [rule.if-return]
 [rule.increment-decrement]
 [rule.var-declaration]
-[rule.package-comments]
 [rule.range]
 [rule.receiver-naming]
 [rule.time-naming]
diff --git a/deploy/kubernetes/operator/hack/update-codegen.sh 
b/deploy/kubernetes/operator/hack/update-codegen.sh
index b97447c4..518fc197 100755
--- a/deploy/kubernetes/operator/hack/update-codegen.sh
+++ b/deploy/kubernetes/operator/hack/update-codegen.sh
@@ -25,7 +25,7 @@ SCRIPT_ROOT=$(dirname "${BASH_SOURCE}")/..
 
 go mod vendor
 
-MODULE="github.com/apache/incubator-uniffle"
+MODULE="github.com/apache/incubator-uniffle/deploy/kubernetes/operator"
 GENERATED_BASE="pkg"
 OUTPUT_PACKAGE="${MODULE}/pkg/generated"
 APIS_PACKAGE="${MODULE}/api"
diff --git a/deploy/kubernetes/operator/pkg/.gitkeep 
b/deploy/kubernetes/operator/pkg/.gitkeep
deleted file mode 100644
index ecb1860d..00000000
--- a/deploy/kubernetes/operator/pkg/.gitkeep
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
\ No newline at end of file
diff --git a/deploy/kubernetes/operator/pkg/constants/constants.go 
b/deploy/kubernetes/operator/pkg/constants/constants.go
new file mode 100644
index 00000000..70a9395e
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/constants/constants.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constants
+
+const (
+       // LeaderIDSuffix is the suffix of leader id used for components' 
leader election
+       LeaderIDSuffix = "uniffle.apache.org"
+
+       // PodNamespaceEnv is the name of environment variable indicates pod's 
namespace.
+       PodNamespaceEnv = "POD_NAMESPACE"
+       // DefaultNamespace is used when environment variable of 
PodNamespaceEnv is not set.
+       DefaultNamespace = "kube-system"
+
+       // AnnotationMetricsServerPort represents annotation of metrics 
servers' port.
+       AnnotationMetricsServerPort = "uniffle.apache.org/metrics-server-port"
+       // AnnotationShuffleServerPort represents annotation of port used to 
identify shuffle servers.
+       AnnotationShuffleServerPort = "uniffle.apache.org/shuffle-server-port"
+       // AnnotationRssName represents annotation of rss object name used by 
shuffle servers' pods.
+       AnnotationRssName = "uniffle.apache.org/rss-name"
+       // AnnotationRssUID represents annotation of rss object uid used by 
shuffle servers' pods.
+       AnnotationRssUID = "uniffle.apache.org/rss-uid"
+
+       // LabelCoordinator represents label of coordinators.
+       LabelCoordinator = "uniffle.apache.org/coordinator"
+       // LabelShuffleServer represents label of shuffle servers.
+       LabelShuffleServer = "uniffle.apache.org/shuffle-server"
+
+       // RSSFinalizerName represents finalizer name of rss objects.
+       RSSFinalizerName = "WaitingShuffleServer"
+
+       // RSSCoordinator represents the prefix or identifier of the 
coordinator.
+       RSSCoordinator = "rss-coordinator"
+       // RSSShuffleServer represents the prefix or identifier of the shuffle 
server.
+       RSSShuffleServer = "rss-shuffle-server"
+
+       // DefaultInitContainerImage represents default image of init container 
used to change owner of host paths.
+       DefaultInitContainerImage = "busybox:latest"
+
+       // ShuffleServerConfigKey represents shuffle server configuration key 
in configMap used by a rss object.
+       ShuffleServerConfigKey = "server.conf"
+       // CoordinatorConfigKey represents coordinator configuration key in 
configMap used by a rss object.
+       CoordinatorConfigKey = "coordinator.conf"
+       // Log4jPropertiesKey represents log4j properties key in configMap used 
by a rss object.
+       Log4jPropertiesKey = "log4j.properties"
+)
diff --git a/deploy/kubernetes/operator/pkg/utils/certs.go 
b/deploy/kubernetes/operator/pkg/utils/certs.go
new file mode 100644
index 00000000..8bd16878
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/certs.go
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "crypto"
+       "crypto/rand"
+       "crypto/rsa"
+       "crypto/x509"
+       "crypto/x509/pkix"
+       "encoding/pem"
+       "errors"
+       "fmt"
+       "math"
+       "math/big"
+       "net"
+       "strings"
+       "time"
+
+       "k8s.io/client-go/util/cert"
+       "k8s.io/client-go/util/keyutil"
+       "k8s.io/klog/v2"
+)
+
+const (
+       certificateBlockType = "CERTIFICATE"
+       rsaKeySize           = 2048
+       duration365d         = time.Hour * 24 * 365 * 100
+       blockPrivateType     = "RSA PRIVATE KEY"
+       blockCertType        = "CERTIFICATE"
+)
+
+// newPrivateKey generates a private key.
+func newPrivateKey() (*rsa.PrivateKey, error) {
+       return rsa.GenerateKey(rand.Reader, rsaKeySize)
+}
+
+// SetUpCaKey sets up a new ca private key
+func SetUpCaKey() ([]byte, error) {
+       signingKey, err := newPrivateKey()
+       if err != nil {
+               return nil, fmt.Errorf("failed to create CA private key %v", 
err)
+       }
+       privateSigningKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(signingKey)
+       if err != nil {
+               klog.Errorf("marshall private key to pem error:%v", err)
+               return nil, err
+       }
+       return privateSigningKeyPEM, nil
+}
+
+// SetUpCaCert sets up a new ca certification.
+func SetUpCaCert(commonName string, pemCAKey []byte) ([]byte, error) {
+       caKey, err := decodePemToRSA(pemCAKey)
+       if err != nil {
+               klog.Errorf("decode pem to rsa private error %v", err)
+               return nil, err
+       }
+       signingCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: 
commonName}, caKey)
+       if err != nil {
+               klog.Errorf("self sign error %v", err)
+               return nil, fmt.Errorf("failed to create CA cert for apiserver 
%v", err)
+       }
+       return encodeCertPEM(signingCert), nil
+}
+
+// SetUpSignedCertAndKey uses the ca certificate and ca private key to 
generate certificates.
+func SetUpSignedCertAndKey(domains []string, ips []net.IP, commonName string,
+       pemCAKey, pemCACert []byte, usage []x509.ExtKeyUsage) (
+       []byte, []byte, error) {
+       // decode ca private key in PEM format.
+       caKey, err := decodePemToRSA(pemCAKey)
+       if err != nil {
+               klog.Errorf("decode pem to rsa private error %v", err)
+               return nil, nil, err
+       }
+       // decode ca certificate in PEM format.
+       caCert, err := decodePemToCert(pemCACert)
+       if err != nil {
+               klog.Errorf("decode pem to cert error %v", err)
+               return nil, nil, err
+       }
+       // create a new private key.
+       signedKey, err := newPrivateKey()
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to create private key %v", 
err)
+       }
+       // generate a new certificate by new private key, ca certificate and ca 
private key.
+       signedCert, err := newSignedCert(
+               &cert.Config{
+                       CommonName: commonName,
+                       AltNames:   cert.AltNames{DNSNames: domains, IPs: ips},
+                       Usages:     usage,
+               },
+               signedKey, caCert, caKey,
+       )
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to create cert %v", err)
+       }
+       privateSigningKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(signedKey)
+       if err != nil {
+               klog.Errorf("marshall private key to pem error:%v", err)
+               return nil, nil, err
+       }
+       // return a new certificate and private key in PEM format.
+       return encodeCertPEM(signedCert), privateSigningKeyPEM, nil
+}
+
+// SetupServerCert sets up the server certificate and private key.
+func SetupServerCert(domain, commonName string) ([]byte, []byte, []byte, 
error) {
+       signingKey, err := newPrivateKey()
+       if err != nil {
+               return nil, nil, nil, fmt.Errorf("failed to create CA private 
key %v", err)
+       }
+       signingCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: 
commonName}, signingKey)
+       if err != nil {
+               return nil, nil, nil, fmt.Errorf("failed to create CA cert for 
apiserver %v", err)
+       }
+       key, err := newPrivateKey()
+       if err != nil {
+               return nil, nil, nil, fmt.Errorf("failed to create private key 
for %v", err)
+       }
+
+       signedCert, err := newSignedCert(
+               &cert.Config{
+                       CommonName: commonName,
+                       AltNames:   cert.AltNames{DNSNames: 
strings.Split(domain, ",")},
+                       Usages:     
[]x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
+               },
+               key, signingCert, signingKey,
+       )
+       if err != nil {
+               return nil, nil, nil, fmt.Errorf("failed to create cert %v", 
err)
+       }
+       privateKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(key)
+       if err != nil {
+               return nil, nil, nil, fmt.Errorf("failed to marshal key %v", 
err)
+       }
+       return encodeCertPEM(signedCert), privateKeyPEM, 
encodeCertPEM(signingCert), nil
+}
+
+// newSignedCert generates s self-signed cert.
+func newSignedCert(cfg *cert.Config, key crypto.Signer, caCert 
*x509.Certificate,
+       caKey crypto.Signer) (*x509.Certificate, error) {
+       serial, err := rand.Int(rand.Reader, 
new(big.Int).SetInt64(math.MaxInt64))
+       if err != nil {
+               return nil, err
+       }
+       if len(cfg.CommonName) == 0 {
+               return nil, errors.New("must specify a CommonName")
+       }
+       if len(cfg.Usages) == 0 {
+               return nil, errors.New("must specify at least one ExtKeyUsage")
+       }
+
+       certTmpl := x509.Certificate{
+               Subject: pkix.Name{
+                       CommonName:   cfg.CommonName,
+                       Organization: cfg.Organization,
+               },
+               DNSNames:     cfg.AltNames.DNSNames,
+               IPAddresses:  cfg.AltNames.IPs,
+               SerialNumber: serial,
+               NotBefore:    caCert.NotBefore,
+               NotAfter:     time.Now().Add(duration365d).UTC(),
+               KeyUsage:     x509.KeyUsageKeyEncipherment | 
x509.KeyUsageDigitalSignature,
+               ExtKeyUsage:  cfg.Usages,
+       }
+       certDERBytes, err := x509.CreateCertificate(rand.Reader, &certTmpl, 
caCert, key.Public(), caKey)
+       if err != nil {
+               return nil, err
+       }
+       return x509.ParseCertificate(certDERBytes)
+}
+
+// encodeCertPEM encodes a certificate.
+func encodeCertPEM(cert *x509.Certificate) []byte {
+       block := pem.Block{
+               Type:  certificateBlockType,
+               Bytes: cert.Raw,
+       }
+       return pem.EncodeToMemory(&block)
+}
+
+// decodePemToRSA decodes pem key to RSA private key.
+func decodePemToRSA(pemKey []byte) (*rsa.PrivateKey, error) {
+       block, _ := pem.Decode(pemKey)
+       if block == nil || block.Type != blockPrivateType {
+               err := errors.New("block is nil or block type is wrong")
+               klog.Errorf("decode pem key error %v", err)
+               return nil, err
+       }
+       pri, err := x509.ParsePKCS1PrivateKey(block.Bytes)
+       if err != nil {
+               klog.Errorf("parse pem key error %v", err)
+               return nil, err
+       }
+       return pri, nil
+}
+
+// decodePemToCert decodes pem cert to x509 certificate.
+func decodePemToCert(pemCert []byte) (*x509.Certificate, error) {
+       block, _ := pem.Decode(pemCert)
+       if block == nil || block.Type != blockCertType {
+               err := errors.New("block is nil or block type is wrong")
+               klog.Errorf("decode pem cert error %v", err)
+               return nil, err
+       }
+       certificate, err := x509.ParseCertificate(block.Bytes)
+       if err != nil {
+               klog.Errorf("parse pem cert error %v", err)
+               return nil, err
+       }
+       return certificate, nil
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/config.go 
b/deploy/kubernetes/operator/pkg/utils/config.go
new file mode 100644
index 00000000..9fbf2832
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/config.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "context"
+       "flag"
+
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/rest"
+       "k8s.io/client-go/tools/clientcmd"
+       "k8s.io/klog/v2"
+       "sigs.k8s.io/controller-runtime/pkg/manager/signals"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+)
+
+const (
+       flagKubeConfig = "kubeconfig"
+)
+
+// GenericConfig stores the basic configuration of admission webhook server.
+type GenericConfig struct {
+       KubeConfig string
+       RESTConfig *rest.Config
+       KubeClient kubernetes.Interface
+       RSSClient  versioned.Interface
+       RunCtx     context.Context
+}
+
+// AddFlags adds all configurations to the global flags.
+func (c *GenericConfig) AddFlags() {
+       if flag.Lookup(flagKubeConfig) == nil {
+               flag.StringVar(&c.KubeConfig, flagKubeConfig, "",
+                       "Paths to a kubeconfig. Only required if 
out-of-cluster.")
+       }
+}
+
+// Complete is called before the component runs.
+func (c *GenericConfig) Complete() {
+       c.KubeConfig = flag.Lookup(flagKubeConfig).Value.String()
+
+       restConfig, err := clientcmd.BuildConfigFromFlags("", c.KubeConfig)
+       if err != nil {
+               klog.Fatalf("create *rest.Config failed: %v", err)
+       }
+       c.RESTConfig = restConfig
+
+       c.KubeClient, err = kubernetes.NewForConfig(restConfig)
+       if err != nil {
+               klog.Fatalf("create kubeClient failed: %v", err)
+       }
+
+       c.RSSClient, err = versioned.NewForConfig(restConfig)
+       if err != nil {
+               klog.Fatalf("create rssClient failed: %v", err)
+       }
+
+       c.RunCtx = signals.SetupSignalHandler()
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/coordinator.go 
b/deploy/kubernetes/operator/pkg/utils/coordinator.go
new file mode 100644
index 00000000..b4204694
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/coordinator.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "path/filepath"
+
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/client-go/informers"
+       "k8s.io/client-go/kubernetes"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+// GenerateCoordinatorName returns service account or configMap name of 
coordinators.
+func GenerateCoordinatorName(rss *unifflev1alpha1.RemoteShuffleService) string 
{
+       return constants.RSSCoordinator + "-" + rss.Name
+}
+
+// GetExcludeNodesConfigMapKey returns configMap key of excluded nodes.
+func GetExcludeNodesConfigMapKey(rss *unifflev1alpha1.RemoteShuffleService) 
string {
+       return filepath.Base(rss.Spec.Coordinator.ExcludeNodesFilePath)
+}
+
+// GetExcludeNodesMountPath returns excluded nodes file's directory which is 
used as configMap
+// volume mouth path.
+func GetExcludeNodesMountPath(rss *unifflev1alpha1.RemoteShuffleService) 
string {
+       return filepath.Dir(rss.Spec.Coordinator.ExcludeNodesFilePath)
+}
+
+// BuildCoordinatorInformerFactory builds informer factory for objects related 
to coordinators.
+func BuildCoordinatorInformerFactory(kubeClient kubernetes.Interface) 
informers.SharedInformerFactory {
+       option := func(options *metav1.ListOptions) {
+               options.LabelSelector = constants.LabelCoordinator + "=true"
+       }
+       return informers.NewSharedInformerFactoryWithOptions(
+               kubeClient, 0, informers.WithTweakListOptions(option))
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/shufflerserver.go 
b/deploy/kubernetes/operator/pkg/utils/shufflerserver.go
new file mode 100644
index 00000000..4e427181
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/shufflerserver.go
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "strings"
+
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/sets"
+       "k8s.io/client-go/informers"
+       "k8s.io/client-go/kubernetes"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+// GetShuffleServerNode returns shuffle server node name by pod.
+func GetShuffleServerNode(pod *corev1.Pod) string {
+       return pod.Status.PodIP + ":" + GetShuffleServerPort(pod)
+}
+
+// GetShuffleServerPort returns shuffle server port by pod.
+func GetShuffleServerPort(pod *corev1.Pod) string {
+       return pod.Annotations[constants.AnnotationShuffleServerPort]
+}
+
+// GetMetricsServerPort returns metrics server port by pod.
+func GetMetricsServerPort(pod *corev1.Pod) string {
+       return pod.Annotations[constants.AnnotationMetricsServerPort]
+}
+
+// BuildShuffleServerKey returns shuffler server key used in rss object's 
status.
+func BuildShuffleServerKey(pod *corev1.Pod) string {
+       return GetRevisionFromPod(pod) + "/" + pod.Name + "/" + 
GetShuffleServerNode(pod)
+}
+
+// ParseShuffleServerKey parses shuffler server key used in rss object's 
status.
+func ParseShuffleServerKey(key string) (revision, podName, node string) {
+       values := strings.Split(key, "/")
+       if len(values) == 3 {
+               revision = values[0]
+               podName = values[1]
+               node = values[2]
+       }
+       return
+}
+
+// ConvertShuffleServerKeysToNodes converts shuffle server keys to nodes.
+func ConvertShuffleServerKeysToNodes(keys sets.String) sets.String {
+       values := keys.List()
+       nodes := sets.NewString()
+       for _, v := range values {
+               _, _, node := ParseShuffleServerKey(v)
+               nodes.Insert(node)
+       }
+       return nodes
+}
+
+// GetRevisionFromPod returns revision of the pod belongs to a statefulSet.
+func GetRevisionFromPod(pod *corev1.Pod) string {
+       return pod.Labels[appsv1.ControllerRevisionHashLabelKey]
+}
+
+// GenerateShuffleServerName returns workload or nodePort service name of 
shuffle servers.
+func GenerateShuffleServerName(rss *unifflev1alpha1.RemoteShuffleService) 
string {
+       return constants.RSSShuffleServer + "-" + rss.Name
+}
+
+// GenerateShuffleServerLabels returns labels used by statefulSets or pods of 
shuffle servers.
+func GenerateShuffleServerLabels(rss *unifflev1alpha1.RemoteShuffleService) 
map[string]string {
+       return map[string]string{
+               "app":                        GenerateShuffleServerName(rss),
+               constants.LabelShuffleServer: "true",
+       }
+}
+
+// BuildShuffleServerInformerFactory builds an informer factory for shuffle 
servers.
+func BuildShuffleServerInformerFactory(kubeClient kubernetes.Interface) 
informers.SharedInformerFactory {
+       option := func(options *metav1.ListOptions) {
+               options.LabelSelector = constants.LabelShuffleServer + "=true"
+       }
+       return informers.NewSharedInformerFactoryWithOptions(
+               kubeClient, 0, informers.WithTweakListOptions(option))
+}
diff --git a/deploy/kubernetes/operator/pkg/utils/util.go 
b/deploy/kubernetes/operator/pkg/utils/util.go
new file mode 100644
index 00000000..bd336f48
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/utils/util.go
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "sort"
+
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/sets"
+       "k8s.io/client-go/informers"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/util/retry"
+       "k8s.io/klog/v2"
+       "k8s.io/utils/strings/slices"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+// CreatePodInformerFactory creates pod informer factory by label selector.
+func CreatePodInformerFactory(kubeClient kubernetes.Interface,
+       key, value string) informers.SharedInformerFactory {
+       option := func(options *metav1.ListOptions) {
+               if len(value) > 0 {
+                       options.LabelSelector = key + "=" + value
+               } else {
+                       options.LabelSelector = key
+               }
+       }
+       return informers.NewSharedInformerFactoryWithOptions(kubeClient, 0,
+               informers.WithTweakListOptions(option))
+}
+
+// GetCurrentNamespace returns current namespace.
+func GetCurrentNamespace() string {
+       namespace := os.Getenv(constants.PodNamespaceEnv)
+       if namespace == "" {
+               namespace = constants.DefaultNamespace
+       }
+       return namespace
+}
+
+// UpdateSecret updates Secret and retries when conflicts are encountered.
+func UpdateSecret(kubeClient kubernetes.Interface, namespace, secretName 
string,
+       updateFunc func(secret *corev1.Secret)) error {
+       return retry.RetryOnConflict(retry.DefaultRetry, func() error {
+               var err error
+               var secret *corev1.Secret
+               secret, err = kubeClient.CoreV1().Secrets(namespace).
+                       Get(context.Background(), secretName, 
metav1.GetOptions{})
+               if err != nil {
+                       klog.Errorf("get secret %v/%v failed: %v", namespace, 
secretName, err)
+                       return err
+               }
+               updateFunc(secret)
+               _, err = 
kubeClient.CoreV1().Secrets(namespace).Update(context.Background(), secret,
+                       metav1.UpdateOptions{})
+               if err != nil {
+                       klog.Errorf("update configMap %v/%v failed: %v", 
namespace, secretName, err)
+               }
+               return err
+       })
+}
+
+// UniqueName returns unique name of an object.
+func UniqueName(object metav1.Object) string {
+       return fmt.Sprintf("%v/%v/%v", object.GetNamespace(), object.GetName(), 
object.GetUID())
+}
+
+// GetRssNameByPod returns rss object name from a pod.
+func GetRssNameByPod(pod *corev1.Pod) string {
+       return pod.Annotations[constants.AnnotationRssName]
+}
+
+// GetSortedList returns sorted slice.
+func GetSortedList(values sets.String) []string {
+       sorted := slices.Filter(nil, values.List(), func(v string) bool {
+               return len(v) > 0
+       })
+       sort.Slice(sorted, func(i, j int) bool {
+               return sorted[i] < sorted[j]
+       })
+       return sorted
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/config/config.go 
b/deploy/kubernetes/operator/pkg/webhook/config/config.go
new file mode 100644
index 00000000..438680f5
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/config/config.go
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+       "crypto/tls"
+       "flag"
+       "fmt"
+       "io/ioutil"
+
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+       webhookconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+)
+
+const (
+       flagIgnoreLastApps       = "ignore-last-apps"
+       flagIgnoreRSS            = "ignore-rss"
+       flagPort                 = "port"
+       flagExternalService      = "external-service"
+       flagServerCertFile       = "server-cert-file"
+       flagServerPrivateKeyFile = "server-private-key-file"
+       flagCACertFile           = "ca-cert-file"
+)
+
+// Config contains all configurations.
+type Config struct {
+       IgnoreLastApps bool
+       IgnoreRSS      bool
+       HTTPConfig
+       utils.GenericConfig
+}
+
+// HTTPConfig stores all the http-related configurations.
+type HTTPConfig struct {
+       Port            int
+       ExternalService string
+       ServerCertFile  string
+       ServerKeyFile   string
+       CACertFile      string
+}
+
+// AddFlags stores http-related configurations.
+func (c *HTTPConfig) AddFlags() {
+       flag.IntVar(&c.Port, flagPort, 9876, "Listening port of admission 
webhook server.")
+       flag.StringVar(&c.ExternalService, flagExternalService, 
webhookconstants.ComponentName,
+               "Service name which provides external access to admission 
webhook server.")
+       flag.StringVar(&c.ServerCertFile, flagServerCertFile, "",
+               "File containing the default x509 Certificate for HTTPS. (CA 
cert, if any, concatenated after server cert). "+
+                       "If HTTPS serving is enabled, and --server-cert-file 
and --server-private-key-file are not provided, "+
+                       "a self-signed certificate and key will be generated.")
+       flag.StringVar(&c.ServerKeyFile, flagServerPrivateKeyFile, "",
+               "File containing the default x509 private key matching 
--server-cert-file.")
+       flag.StringVar(&c.CACertFile, flagCACertFile, "", "File containing the 
ca certificate.")
+}
+
+// NeedLoadCertsFromSecret returns whether we need to load certs from specify 
secret.
+func (c *HTTPConfig) NeedLoadCertsFromSecret() bool {
+       return len(c.CACertFile) == 0 || len(c.ServerCertFile) == 0 || 
len(c.ServerKeyFile) == 0
+}
+
+// Addr returns the webhook server's listening address.
+func (c *HTTPConfig) Addr() string {
+       return fmt.Sprintf(":%v", c.Port)
+}
+
+// TLSConfig returns the TLS config.
+func (c *HTTPConfig) TLSConfig() *tls.Config {
+       cert, err := tls.LoadX509KeyPair(c.ServerCertFile, c.ServerKeyFile)
+       if err != nil {
+               klog.Fatal(err)
+       }
+       tlsConfig := &tls.Config{
+               Certificates: []tls.Certificate{cert},
+       }
+       return tlsConfig
+}
+
+// GetCaCert return contents of ca cert.
+func (c *HTTPConfig) GetCaCert() []byte {
+       caCertBody, err := ioutil.ReadFile(c.CACertFile)
+       if err != nil {
+               klog.Fatalf("read ca cert file %v failed: %v", c.CACertFile, 
err)
+       }
+       return caCertBody
+}
+
+// LeaderElectionID returns leader election ID.
+func (c *Config) LeaderElectionID() string {
+       return "rss-webhook-" + constants.LeaderIDSuffix
+}
+
+// AddFlags adds all configurations to the global flags.
+func (c *Config) AddFlags() {
+       flag.BoolVar(&c.IgnoreLastApps, flagIgnoreLastApps, false,
+               "Used when debugging, it means we will ignore checking last 
apps.")
+       flag.BoolVar(&c.IgnoreRSS, flagIgnoreRSS, false,
+               "Used when debugging, it means we will ignore checking rss 
objects.")
+       c.HTTPConfig.AddFlags()
+       c.GenericConfig.AddFlags()
+}
+
+// Complete is called before rss-webhook runs.
+func (c *Config) Complete() {
+       c.GenericConfig.Complete()
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/constants/constants.go 
b/deploy/kubernetes/operator/pkg/webhook/constants/constants.go
new file mode 100644
index 00000000..4c94865c
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/constants/constants.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constants
+
+const (
+       validatingPrefix = "/validate"
+       mutatingPrefix   = "/mutate"
+       rssPath          = "/rss"
+       podPath          = "/pod"
+
+       // ValidatingPodPath represents pod objects' validating handler's path
+       ValidatingPodPath = validatingPrefix + podPath
+       // ValidatingRssPath represents rss objects' validating handler's path
+       ValidatingRssPath = validatingPrefix + rssPath
+       // MutatingRssPath represents rss objects' mutating handler's path
+       MutatingRssPath = mutatingPrefix + rssPath
+
+       // ComponentName represents name of component.
+       ComponentName = "rss-webhook"
+)
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/inspector.go 
b/deploy/kubernetes/operator/pkg/webhook/inspector/inspector.go
new file mode 100644
index 00000000..175c4df8
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/inspector.go
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inspector
+
+import (
+       "context"
+       "crypto/tls"
+       "fmt"
+       "net/http"
+
+       "k8s.io/client-go/informers"
+       "k8s.io/client-go/kubernetes"
+       corelisters "k8s.io/client-go/listers/core/v1"
+       "k8s.io/client-go/tools/cache"
+       "k8s.io/klog/v2"
+       "sigs.k8s.io/controller-runtime/pkg/manager"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/informers/externalversions"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/listers/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+       webhookconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/util"
+)
+
+var _ Inspector = &inspector{}
+
+// Inspector intercepts the request and checks whether the pod can be deleted.
+type Inspector interface {
+       manager.Runnable
+}
+
+// NewInspector creates an Inspector.
+func NewInspector(cfg *config.Config, tlsConfig *tls.Config) Inspector {
+       return newInspector(cfg, tlsConfig)
+}
+
+// newInspector creates an inspector.
+func newInspector(cfg *config.Config, tlsConfig *tls.Config) *inspector {
+       rssInformerFactory := 
externalversions.NewSharedInformerFactory(cfg.RSSClient, 0)
+       cmInformerFactory := 
utils.BuildCoordinatorInformerFactory(cfg.KubeClient)
+       i := &inspector{
+               ignoreLastApps:     cfg.IgnoreLastApps,
+               ignoreRSS:          cfg.IgnoreRSS,
+               tlsConfig:          tlsConfig,
+               kubeClient:         cfg.KubeClient,
+               rssClient:          cfg.RSSClient,
+               rssInformerFactory: rssInformerFactory,
+               rssInformer:        
rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Informer(),
+               rssLister:          
rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Lister(),
+               cmInformerFactory:  cmInformerFactory,
+               cmInformer:         
cmInformerFactory.Core().V1().ConfigMaps().Informer(),
+               cmLister:           
cmInformerFactory.Core().V1().ConfigMaps().Lister(),
+       }
+
+       // register handler functions for admission webhook server.
+       mux := http.NewServeMux()
+       mux.HandleFunc(webhookconstants.ValidatingPodPath,
+               
util.WithAdmissionReviewHandler(i.validateDeletingShuffleServer))
+       mux.HandleFunc(webhookconstants.ValidatingRssPath,
+               util.WithAdmissionReviewHandler(i.validateRSS))
+       mux.HandleFunc(webhookconstants.MutatingRssPath,
+               util.WithAdmissionReviewHandler(i.mutateRSS))
+       i.server = &http.Server{
+               Addr:      cfg.Addr(),
+               Handler:   mux,
+               TLSConfig: tlsConfig,
+       }
+
+       return i
+}
+
+// inspector implements the Inspector interface.
+type inspector struct {
+       ignoreLastApps     bool
+       ignoreRSS          bool
+       tlsConfig          *tls.Config
+       server             *http.Server
+       kubeClient         kubernetes.Interface
+       rssClient          versioned.Interface
+       rssInformerFactory externalversions.SharedInformerFactory
+       rssInformer        cache.SharedIndexInformer
+       rssLister          v1alpha1.RemoteShuffleServiceLister
+       cmInformerFactory  informers.SharedInformerFactory
+       cmInformer         cache.SharedIndexInformer
+       cmLister           corelisters.ConfigMapLister
+}
+
+// Start starts the Inspector.
+func (i *inspector) Start(ctx context.Context) error {
+       i.rssInformerFactory.Start(ctx.Done())
+       i.cmInformerFactory.Start(ctx.Done())
+       if !cache.WaitForCacheSync(ctx.Done(), i.rssInformer.HasSynced, 
i.cmInformer.HasSynced) {
+               return fmt.Errorf("wait for cache synced failed")
+       }
+       klog.V(2).Info("inspector started")
+       // set up the http server for listening pods and rss objects' 
validating and mutating requests.
+       if err := i.server.ListenAndServeTLS("", ""); err != nil {
+               return fmt.Errorf("listen error: %v", err)
+       }
+       return nil
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/pod.go 
b/deploy/kubernetes/operator/pkg/webhook/inspector/pod.go
new file mode 100644
index 00000000..14b7c1ef
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/pod.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inspector
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "strings"
+
+       admissionv1 "k8s.io/api/admission/v1"
+       corev1 "k8s.io/api/core/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/sets"
+       "k8s.io/klog/v2"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/util"
+)
+
+// validateDeletingShuffleServer validates the delete operation towards 
shuffle server pods,
+// and update exclude nodes in configMap.
+func (i *inspector) validateDeletingShuffleServer(ar 
*admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
+       pod := &corev1.Pod{}
+       if err := json.Unmarshal(ar.Request.OldObject.Raw, pod); err != nil {
+               klog.Errorf("unmarshal object of AdmissionReview (%v) failed: 
%v",
+                       string(ar.Request.Object.Raw), err)
+               return util.AdmissionReviewFailed(ar, err)
+       }
+       rssName := utils.GetRssNameByPod(pod)
+       // allow pods which are not shuffle servers or have been set deletion 
timestamp.
+       if rssName == "" || !util.NeedInspectPod(pod) {
+               klog.V(4).Infof("ignored non shuffle server or deleting pod: 
%v->(%+v/%+v/%v)",
+                       utils.UniqueName(pod), pod.Labels, pod.Annotations, 
pod.DeletionTimestamp)
+               return util.AdmissionReviewAllow(ar)
+       }
+       klog.V(4).Infof("check shuffle server pod: %v", 
utils.BuildShuffleServerKey(pod))
+       rss, err := 
i.rssLister.RemoteShuffleServices(pod.Namespace).Get(rssName)
+       if err != nil {
+               if apierrors.IsNotFound(err) {
+                       return util.AdmissionReviewAllow(ar)
+               }
+               return util.AdmissionReviewFailed(ar, err)
+       }
+       // we can only delete shuffle server pods when rss is in upgrading 
phase.
+       if rss.Status.Phase != unifflev1alpha1.RSSUpgrading && rss.Status.Phase 
!= unifflev1alpha1.RSSTerminating {
+               message := fmt.Sprintf("can not delete the shuffle server pod 
(%v) directly",
+                       utils.UniqueName(pod))
+               klog.V(4).Info(message)
+               return util.AdmissionReviewForbidden(ar, message)
+       }
+
+       // when the rss uses specific upgrade mode, we need to check whether 
the pod is specific.
+       if rss.Spec.ShuffleServer.UpgradeStrategy.Type == 
unifflev1alpha1.SpecificUpgrade {
+               specificNames := 
rss.Spec.ShuffleServer.UpgradeStrategy.SpecificNames
+               isSpecific := false
+               for _, name := range specificNames {
+                       if name == pod.Name {
+                               isSpecific = true
+                               break
+                       }
+               }
+               if !isSpecific {
+                       message := fmt.Sprintf("can not delete the shuffle 
server pod (%v) which is not specific",
+                               utils.UniqueName(pod))
+                       klog.V(4).Info(message)
+                       return util.AdmissionReviewForbidden(ar, message)
+               }
+       }
+
+       // update targetKeys field in status of rss and exclude nodes in 
configMap used by coordinators.
+       if err = i.updateTargetKeysAndExcludeNodes(rss, pod); err != nil {
+               return util.AdmissionReviewFailed(ar, err)
+       }
+
+       // check whether the shuffle server pod can be deleted.
+       if i.ignoreLastApps || util.HasZeroApps(pod) {
+               klog.V(3).Infof("shuffle server pod (%v) will be deleted", 
utils.BuildShuffleServerKey(pod))
+               return util.AdmissionReviewAllow(ar)
+       }
+       message := "there are some apps still running in shuffle server: " + 
utils.GetShuffleServerNode(pod)
+       return util.AdmissionReviewForbidden(ar, message)
+}
+
+// updateTargetKeysAndExcludeNodes updates targetKeys field in status of rss 
and exclude nodes in
+// configMap used by coordinators.
+func (i *inspector) updateTargetKeysAndExcludeNodes(rss 
*unifflev1alpha1.RemoteShuffleService,
+       pod *corev1.Pod) error {
+       targetKeys := sets.NewString(rss.Status.TargetKeys...)
+       deletedKeys := sets.NewString(rss.Status.DeletedKeys...)
+       currentKey := utils.BuildShuffleServerKey(pod)
+       if deletedKeys.Has(currentKey) {
+               klog.V(4).Infof("pod (%v) has been deleted", currentKey)
+               return nil
+       }
+
+       namespace := rss.Namespace
+       targetKeys.Insert(currentKey)
+       rssCopy := rss.DeepCopy()
+       rssCopy.Status.TargetKeys = utils.GetSortedList(targetKeys)
+       if _, err := 
i.rssClient.UniffleV1alpha1().RemoteShuffleServices(namespace).
+               UpdateStatus(context.Background(), rssCopy, 
metav1.UpdateOptions{}); err != nil {
+               klog.Errorf("update target keys in status of rss (%v) failed: 
%v",
+                       utils.UniqueName(rss), err)
+               return err
+       }
+
+       cmName := utils.GenerateCoordinatorName(rss)
+       cm, err := i.cmLister.ConfigMaps(namespace).Get(cmName)
+       if err != nil {
+               klog.Errorf("get configMap (%v/%v) of excluded nodes for rss 
(%v) failed: %v",
+                       namespace, cmName, utils.UniqueName(rss), err)
+               return err
+       }
+
+       excludeNodesFileKey := utils.GetExcludeNodesConfigMapKey(rss)
+       oldNodes := sets.NewString(strings.Split(cm.Data[excludeNodesFileKey], 
"\n")...)
+       newNodes := 
oldNodes.Difference(utils.ConvertShuffleServerKeysToNodes(deletedKeys))
+       newNodes.Insert(utils.GetShuffleServerNode(pod))
+       cmCopy := cm.DeepCopy()
+       cmCopy.Data[excludeNodesFileKey] = 
strings.Join(utils.GetSortedList(newNodes), "\n")
+       if _, err = i.kubeClient.CoreV1().ConfigMaps(namespace).
+               Update(context.Background(), cmCopy, metav1.UpdateOptions{}); 
err != nil {
+               klog.Errorf("updated exclude nodes in configMap (%v) for rss 
(%v) failed: %v",
+                       utils.UniqueName(cmCopy), utils.UniqueName(rss), err)
+               return err
+       }
+       return nil
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go 
b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
new file mode 100644
index 00000000..31bdff1d
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inspector
+
+import (
+       "encoding/json"
+       "fmt"
+
+       "gomodules.xyz/jsonpatch/v2"
+       admissionv1 "k8s.io/api/admission/v1"
+       "k8s.io/klog/v2"
+       "k8s.io/utils/pointer"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/util"
+)
+
+// validateRSS validates the create and update operation towards rss objects.
+func (i *inspector) validateRSS(ar *admissionv1.AdmissionReview) 
*admissionv1.AdmissionReview {
+       if !i.ignoreRSS && ar.Request.Operation == admissionv1.Update {
+               oldRSS := &unifflev1alpha1.RemoteShuffleService{}
+               if err := json.Unmarshal(ar.Request.OldObject.Raw, oldRSS); err 
!= nil {
+                       klog.Errorf("unmarshal old object of rss (%v) failed: 
%v",
+                               string(ar.Request.OldObject.Raw), err)
+                       return util.AdmissionReviewFailed(ar, err)
+               }
+               // for security purposes, we forbid updating rss objects when 
they are in upgrading phase.
+               if oldRSS.Status.Phase == unifflev1alpha1.RSSUpgrading {
+                       message := "can not update upgrading rss object: " + 
utils.UniqueName(oldRSS)
+                       return util.AdmissionReviewForbidden(ar, message)
+               }
+       }
+       newRSS := &unifflev1alpha1.RemoteShuffleService{}
+       if err := json.Unmarshal(ar.Request.Object.Raw, newRSS); err != nil {
+               klog.Errorf("unmarshal object of rss (%v) failed: %v",
+                       string(ar.Request.Object.Raw), err)
+               return util.AdmissionReviewFailed(ar, err)
+       }
+       // validate configurations for coordinators.
+       coordinator := newRSS.Spec.Coordinator
+       if len(coordinator.RPCNodePort) != int(*coordinator.Count) ||
+               len(coordinator.HTTPNodePort) != int(*coordinator.Count) {
+               return util.AdmissionReviewFailed(ar,
+                       fmt.Errorf("invalid number of http or rpc node ports 
(%v/%v) <> (%v)",
+                               len(coordinator.RPCNodePort), 
len(coordinator.HTTPNodePort), coordinator.Count))
+       }
+       if len(coordinator.ExcludeNodesFilePath) == 0 {
+               return util.AdmissionReviewFailed(ar,
+                       fmt.Errorf("empty exclude nodes file path for 
coordinators"))
+       }
+       // validate configurations of logHostPath for coordinators.
+       coordinatorLogPath := newRSS.Spec.Coordinator.LogHostPath
+       if len(coordinatorLogPath) > 0 && 
len(newRSS.Spec.Coordinator.HostPathMounts[coordinatorLogPath]) == 0 {
+               return util.AdmissionReviewFailed(ar, fmt.Errorf("empty log 
volume mount path for coordinators"))
+       }
+       // validate configurations of logHostPath for shuffle servers.
+       shuffleServerLogPath := newRSS.Spec.ShuffleServer.LogHostPath
+       if len(shuffleServerLogPath) > 0 && 
len(newRSS.Spec.ShuffleServer.HostPathMounts[shuffleServerLogPath]) == 0 {
+               return util.AdmissionReviewFailed(ar, fmt.Errorf("empty log 
volume mount path for shuffle servers"))
+       }
+       // validate configurations of different upgrade modes for shuffle 
servers.
+       upgradeStrategy := newRSS.Spec.ShuffleServer.UpgradeStrategy
+       switch upgradeStrategy.Type {
+       case unifflev1alpha1.FullUpgrade:
+       case unifflev1alpha1.PartitionUpgrade:
+               var err error
+               if upgradeStrategy.Partition == nil {
+                       err = fmt.Errorf("empty partition for %v", 
upgradeStrategy.Type)
+               } else if *upgradeStrategy.Partition <= 0 {
+                       err = fmt.Errorf("invalid partition (%v) for %v", 
*upgradeStrategy.Partition,
+                               upgradeStrategy.Type)
+               }
+               if err != nil {
+                       return util.AdmissionReviewFailed(ar, err)
+               }
+       case unifflev1alpha1.SpecificUpgrade:
+               if len(upgradeStrategy.SpecificNames) == 0 {
+                       return util.AdmissionReviewFailed(ar,
+                               fmt.Errorf("empty specific copies for %v", 
upgradeStrategy.Type))
+               }
+       case unifflev1alpha1.FullRestart:
+       default:
+               return util.AdmissionReviewFailed(ar,
+                       fmt.Errorf("invalid upgrade stragety type (%v)", 
upgradeStrategy.Type))
+       }
+       return util.AdmissionReviewAllow(ar)
+}
+
+// mutateNmg mutates the rss object according to our needs.
+func (i *inspector) mutateRSS(ar *admissionv1.AdmissionReview) 
*admissionv1.AdmissionReview {
+       rss := &unifflev1alpha1.RemoteShuffleService{}
+       if err := json.Unmarshal(ar.Request.Object.Raw, rss); err != nil {
+               klog.Errorf("unmarshal object of rss (%v) failed: %v",
+                       string(ar.Request.Object.Raw), err)
+               return util.AdmissionReviewFailed(ar, err)
+       }
+       patches, err := generateRSSPatches(ar, rss)
+       if err != nil {
+               klog.Errorf("generate patches for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return util.AdmissionReviewFailed(ar, err)
+       }
+       // if payload is not empty, we need set patch operations in response.
+       if len(patches) > 0 {
+               return util.AdmissionReviewWithPatches(ar, patches)
+       }
+       return util.AdmissionReviewAllow(ar)
+}
+
+// generateRSSPatches generates patch payloads for mutating rss objects.
+func generateRSSPatches(ar *admissionv1.AdmissionReview,
+       rss *unifflev1alpha1.RemoteShuffleService) ([]byte, error) {
+       // TODO: add default values for RSS objects.
+       if ar.Request.Operation == admissionv1.Create {
+               rss.SetFinalizers([]string{constants.RSSFinalizerName})
+               rss.Spec.ShuffleServer.Sync = pointer.Bool(false)
+       }
+
+       original := ar.Request.Object.Raw
+       current, err := json.Marshal(rss)
+       if err != nil {
+               klog.Errorf("marshal rss (%+v) failed: %v", rss, err)
+               return nil, err
+       }
+       var patches []jsonpatch.Operation
+       // build patch payload form mutating rss objects.
+       patches, err = jsonpatch.CreatePatch(original, current)
+       if err != nil {
+               klog.Errorf("create patches for rss (%v) failed: %v", 
string(current), err)
+               return nil, err
+       }
+       var patchBody []byte
+       patchBody, err = json.Marshal(patches)
+       if err != nil {
+               klog.Errorf("marshal patches (%+v) for rss (%v) failed: %v",
+                       patches, string(current), err)
+               return nil, err
+       }
+       klog.V(4).Infof("patch body (%v) for rss (%v)", string(patchBody), 
utils.UniqueName(rss))
+       return patchBody, nil
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/manager.go 
b/deploy/kubernetes/operator/pkg/webhook/manager.go
new file mode 100644
index 00000000..1bcacd41
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/manager.go
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package webhook
+
+import (
+       "context"
+       "crypto/tls"
+       "crypto/x509"
+       "fmt"
+       "net"
+
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/klog/v2"
+       ctrl "sigs.k8s.io/controller-runtime"
+       "sigs.k8s.io/controller-runtime/pkg/manager"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+       webhookconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/inspector"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/syncer"
+)
+
+const (
+       certsSecretName = "rss-webhook-certs"
+
+       serverCert = "server.crt"
+       serverKey  = "server.key"
+       caCert     = "ca.crt"
+)
+
+// AdmissionManager manages admission webhook server for shuffle servers.
+type AdmissionManager interface {
+       manager.Runnable
+}
+
+// NewAdmissionManager creates an AdmissionManager.
+func NewAdmissionManager(cfg *config.Config) AdmissionManager {
+       return newAdmissionManager(cfg)
+}
+
+// newAdmissionManager creates an admissionManager.
+func newAdmissionManager(cfg *config.Config) *admissionManager {
+       am := &admissionManager{
+               externalService: cfg.ExternalService,
+               kubeClient:      cfg.KubeClient,
+       }
+       if !cfg.NeedLoadCertsFromSecret() {
+               am.caCertBody = cfg.GetCaCert()
+               am.tlsConfig = cfg.TLSConfig()
+       } else {
+               am.loadCertsFromSecret()
+       }
+       mgr, err := ctrl.NewManager(cfg.RESTConfig, ctrl.Options{
+               LeaderElection:          true,
+               LeaderElectionID:        cfg.LeaderElectionID(),
+               LeaderElectionNamespace: utils.GetCurrentNamespace(),
+       })
+       if err != nil {
+               klog.Fatalf("build manager for admission manager failed: %v", 
err)
+       }
+       am.mgr = mgr
+       am.syncer = syncer.NewConfigSyncer(am.caCertBody, cfg.ExternalService, 
cfg.KubeClient)
+       am.inspector = inspector.NewInspector(cfg, am.tlsConfig)
+       return am
+}
+
+// admissionManager implements the AdmissionManager interface.
+type admissionManager struct {
+       externalService string
+       caCertBody      []byte
+       tlsConfig       *tls.Config
+
+       kubeClient kubernetes.Interface
+
+       mgr       manager.Manager
+       syncer    syncer.ConfigSyncer
+       inspector inspector.Inspector
+}
+
+// Start starts the AdmissionManager.
+func (am *admissionManager) Start(ctx context.Context) error {
+       stopCh := ctx.Done()
+       if err := am.mgr.Add(am.syncer); err != nil {
+               klog.Errorf("add syncer to mgr of admission manager failed: 
%v", err)
+               return err
+       }
+       go func() {
+               if err := am.mgr.Start(ctx); err != nil {
+                       klog.Fatalf("mgr of admission manager started failed: 
%v", err)
+               }
+       }()
+       go func() {
+               if err := am.inspector.Start(ctx); err != nil {
+                       klog.Fatalf("start webhook server failed: %v", err)
+               }
+       }()
+       klog.V(2).Info("admission manager started")
+       <-stopCh
+       return nil
+}
+
+// generateCerts generates certificate and privateKey and ca certificate for 
admission webhook
+// server, and they will be saved in a secret.
+func (am *admissionManager) generateCerts(create bool) (
+       serverCertificate, serverPrivateKey, caCertificate []byte,
+       err error) {
+       var caPrivateKey []byte
+       caPrivateKey, err = utils.SetUpCaKey()
+       if err != nil {
+               klog.Errorf("set up ca key failed %v", err)
+               return nil, nil, nil, err
+       }
+       caCertificate, err = utils.SetUpCaCert(webhookconstants.ComponentName, 
caPrivateKey)
+       if err != nil {
+               klog.Errorf("set up ca cert failed %v", err)
+               return nil, nil, nil, err
+       }
+       namespace := utils.GetCurrentNamespace()
+       domains, ips := subjectAltNames(namespace, am.externalService)
+       serverCertificate, serverPrivateKey, err = 
utils.SetUpSignedCertAndKey(domains, ips,
+               webhookconstants.ComponentName,
+               caPrivateKey, caCertificate, 
[]x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth})
+       if err != nil {
+               klog.Errorf("set up server cert error %v", err)
+               return nil, nil, nil, err
+       }
+       if create {
+               // try to create a new secret to save certificate and 
privateKey and ca certificate.
+               _, err = 
am.kubeClient.CoreV1().Secrets(namespace).Create(context.Background(),
+                       &corev1.Secret{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      certsSecretName,
+                                       Namespace: namespace,
+                               },
+                               Data: map[string][]byte{
+                                       serverCert: serverCertificate,
+                                       serverKey:  serverPrivateKey,
+                                       caCert:     caCertificate,
+                               },
+                       }, metav1.CreateOptions{})
+               if err != nil {
+                       klog.Errorf("create new certificate secret error %v", 
err)
+                       return nil, nil, nil, err
+               }
+       } else {
+               // try to update an old secret to save certificate and 
privateKey and ca certificate.
+               if err = utils.UpdateSecret(am.kubeClient, namespace, 
certsSecretName,
+                       func(secret *corev1.Secret) {
+                               secret.Data = map[string][]byte{
+                                       serverCert: serverCertificate,
+                                       serverKey:  serverPrivateKey,
+                                       caCert:     caCertificate,
+                               }
+                       }); err != nil {
+                       return nil, nil, nil, err
+               }
+       }
+       return caCertificate, serverCertificate, serverPrivateKey, nil
+}
+
+// loadCertsFromSecret loads certificate and privateKey and ca certificate 
from the secret.
+func (am *admissionManager) loadCertsFromSecret() {
+       namespace := utils.GetCurrentNamespace()
+       create := false
+       secret, err := 
am.kubeClient.CoreV1().Secrets(namespace).Get(context.Background(),
+               certsSecretName, metav1.GetOptions{})
+       if err != nil {
+               if !errors.IsNotFound(err) {
+                       klog.Fatalf("get secret of %v/%v failed: %v", 
namespace, certsSecretName, err)
+               }
+               create = true
+       }
+       var serverCertBody, serverKeyBody, caCertBody []byte
+       if secret == nil || secret.Data == nil || len(secret.Data[serverCert]) 
== 0 || len(secret.Data[serverKey]) == 0 ||
+               len(secret.Data[caCert]) == 0 {
+               caCertBody, serverCertBody, serverKeyBody, err = 
am.generateCerts(create)
+               if err != nil {
+                       klog.Fatalf("generate certs failed: %v", err)
+               }
+       } else {
+               caCertBody = secret.Data[caCert]
+               serverCertBody = secret.Data[serverCert]
+               serverKeyBody = secret.Data[serverKey]
+       }
+       am.caCertBody = caCertBody
+       cert, err := tls.X509KeyPair(serverCertBody, serverKeyBody)
+       if err != nil {
+               klog.Fatalf("generate key pair error :%v", err)
+       }
+       am.tlsConfig = &tls.Config{
+               Certificates: []tls.Certificate{cert},
+       }
+}
+
+// subjectAltNames builds subject alt names by namespace and service name.
+func subjectAltNames(namespace, svcName string) ([]string, []net.IP) {
+       return []string{
+               "localhost",
+               svcName,
+               fmt.Sprintf("%v.%v.svc", svcName, namespace),
+               fmt.Sprintf("%v.%v.svc.cluster.local", svcName, namespace),
+       }, []net.IP{net.ParseIP("127.0.0.1")}
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/manager_test.go 
b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
new file mode 100644
index 00000000..55d75230
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package webhook
+
+import (
+       "context"
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       admissionv1 "k8s.io/api/admissionregistration/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/wait"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/kubernetes/scheme"
+       "sigs.k8s.io/controller-runtime/pkg/envtest"
+       logf "sigs.k8s.io/controller-runtime/pkg/log"
+       "sigs.k8s.io/controller-runtime/pkg/log/zap"
+       "sigs.k8s.io/controller-runtime/pkg/manager/signals"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/config"
+       webhookconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+)
+
+var (
+       testEnv    *envtest.Environment
+       kubeClient kubernetes.Interface
+       rssClient  versioned.Interface
+)
+
+func TestAdmissionManager(t *testing.T) {
+       _ = os.Setenv(constants.PodNamespaceEnv, constants.DefaultNamespace)
+       RegisterFailHandler(Fail)
+       suiteCfg, reporterCfg := GinkgoConfiguration()
+       reporterCfg.VeryVerbose = true
+       reporterCfg.FullTrace = true
+       RunSpecs(t, "admission manager suite", suiteCfg, reporterCfg)
+}
+
+var _ = BeforeSuite(
+       func() {
+               logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), 
zap.UseDevMode(true)))
+               By("bootstrapping test environment")
+               testEnv = &envtest.Environment{
+                       CRDDirectoryPaths: []string{filepath.Join("../..", 
"config", "crd", "bases")},
+               }
+               restConfig, err := testEnv.Start()
+               Expect(err).To(BeNil())
+               Expect(restConfig).ToNot(BeNil())
+
+               kubeClient, err = kubernetes.NewForConfig(restConfig)
+               Expect(err).ToNot(HaveOccurred())
+               Expect(kubeClient).ToNot(BeNil())
+
+               rssClient, err = versioned.NewForConfig(restConfig)
+               Expect(err).ToNot(HaveOccurred())
+               Expect(rssClient).ToNot(BeNil())
+
+               err = unifflev1alpha1.AddToScheme(scheme.Scheme)
+               Expect(err).NotTo(HaveOccurred())
+
+               // +kubebuilder:scaffold:scheme
+
+               cfg := &config.Config{
+                       HTTPConfig: config.HTTPConfig{
+                               Port:            9876,
+                               ExternalService: webhookconstants.ComponentName,
+                       },
+                       GenericConfig: utils.GenericConfig{
+                               RESTConfig: restConfig,
+                               KubeClient: kubeClient,
+                               RSSClient:  rssClient,
+                       },
+               }
+               am := newAdmissionManager(cfg)
+               stopCtx := signals.SetupSignalHandler()
+               go func() {
+                       err = am.Start(stopCtx)
+                       Expect(err).ToNot(HaveOccurred())
+               }()
+       },
+)
+
+var _ = AfterSuite(func() {
+       Expect(testEnv.Stop()).To(Succeed())
+})
+
+var _ = Describe("AdmissionManager", func() {
+       Context("Setup syncer", func() {
+               It("Generate validation and webhook configurations", func() {
+                       By("Wait validation configurations synced")
+                       var vwc *admissionv1.ValidatingWebhookConfiguration
+                       err := wait.Poll(time.Second, time.Second*5, func() 
(bool, error) {
+                               var getErr error
+                               vwc, getErr = 
kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+                                       Get(context.TODO(), 
webhookconstants.ComponentName, metav1.GetOptions{})
+                               if getErr != nil {
+                                       return false, getErr
+                               }
+                               return true, nil
+                       })
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(vwc).ToNot(BeNil())
+
+                       By("Wait mutating configurations synced")
+                       var mwc *admissionv1.MutatingWebhookConfiguration
+                       err = wait.Poll(time.Second, time.Second*5, func() 
(bool, error) {
+                               var getErr error
+                               mwc, getErr = 
kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+                                       Get(context.TODO(), 
webhookconstants.ComponentName, metav1.GetOptions{})
+                               if getErr != nil {
+                                       return false, getErr
+                               }
+                               return true, nil
+                       })
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(mwc).ToNot(BeNil())
+               })
+       })
+})
diff --git a/deploy/kubernetes/operator/pkg/webhook/syncer/syncer.go 
b/deploy/kubernetes/operator/pkg/webhook/syncer/syncer.go
new file mode 100644
index 00000000..86380257
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/syncer/syncer.go
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package syncer
+
+import (
+       "context"
+       "reflect"
+       "time"
+
+       "golang.org/x/sync/errgroup"
+       admissionv1 "k8s.io/api/admissionregistration/v1"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/klog/v2"
+       "k8s.io/utils/pointer"
+       "sigs.k8s.io/controller-runtime/pkg/manager"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+       webhookconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/webhook/constants"
+)
+
+var _ ConfigSyncer = &configSyncer{}
+
+// ConfigSyncer syncs ValidatingWebhookConfigurations and 
MutatingWebhookConfigurations.
+type ConfigSyncer interface {
+       manager.Runnable
+}
+
+// NewConfigSyncer creates a ConfigSyncer.
+func NewConfigSyncer(caCert []byte, externalService string,
+       kubeClient kubernetes.Interface) ConfigSyncer {
+       return newConfigSyncer(caCert, externalService, kubeClient)
+}
+
+// newConfigSyncer creates a configSyncer.
+func newConfigSyncer(caCert []byte, externalService string,
+       kubeClient kubernetes.Interface) *configSyncer {
+       return &configSyncer{
+               caCert:          caCert,
+               externalService: externalService,
+               kubeClient:      kubeClient,
+       }
+}
+
+// configSyncer implements the ConfigSyncer interface.
+type configSyncer struct {
+       caCert          []byte
+       externalService string
+       kubeClient      kubernetes.Interface
+}
+
+// Start starts the ConfigSyncer.
+func (cs *configSyncer) Start(ctx context.Context) error {
+       klog.V(2).Info("config syncer started")
+       for {
+               select {
+               case <-ctx.Done():
+                       klog.V(3).Info("stop syncing webhook configurations")
+                       return nil
+               default:
+               }
+               if err := cs.syncWebhookCfg(); err != nil {
+                       klog.Errorf("sync webhook configuration failed: %v", 
err)
+               }
+               time.Sleep(time.Minute)
+       }
+}
+
+// syncWebhookCfg synchronizes the validatingWebhookConfiguration and 
mutatingWebhookConfiguration objects.
+func (cs *configSyncer) syncWebhookCfg() error {
+       currentVWC, currentMWC := cs.generateWebhookCfg()
+       eg := errgroup.Group{}
+       eg.Go(func() error {
+               return cs.syncValidatingWebhookCfg(currentVWC)
+       })
+       eg.Go(func() error {
+               return cs.syncMutatingWebhookCfg(currentMWC)
+       })
+       return eg.Wait()
+}
+
+// syncValidatingWebhookCfg synchronizes the validatingWebhookConfiguration 
object.
+func (cs *configSyncer) syncValidatingWebhookCfg(
+       currentVWC *admissionv1.ValidatingWebhookConfiguration) error {
+       vwc, err := 
cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+               Get(context.Background(), webhookconstants.ComponentName, 
metav1.GetOptions{})
+       if err != nil {
+               if errors.IsNotFound(err) {
+                       _, err = 
cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+                               Create(context.Background(), currentVWC, 
metav1.CreateOptions{})
+               }
+               return err
+       }
+       if reflect.DeepEqual(vwc.Webhooks, currentVWC.Webhooks) {
+               return nil
+       }
+       vwc.Webhooks = currentVWC.Webhooks
+       _, err = 
cs.kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().
+               Update(context.Background(), vwc, metav1.UpdateOptions{})
+       return err
+}
+
+// syncMutatingWebhookCfg synchronizes the mutatingWebhookConfiguration object.
+func (cs *configSyncer) syncMutatingWebhookCfg(
+       currentMWC *admissionv1.MutatingWebhookConfiguration) error {
+       vwc, err := 
cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+               Get(context.Background(), webhookconstants.ComponentName, 
metav1.GetOptions{})
+       if err != nil {
+               if errors.IsNotFound(err) {
+                       _, err = 
cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+                               Create(context.Background(), currentMWC, 
metav1.CreateOptions{})
+               }
+               return err
+       }
+       if reflect.DeepEqual(vwc.Webhooks, currentMWC.Webhooks) {
+               return nil
+       }
+       vwc.Webhooks = currentMWC.Webhooks
+       _, err = 
cs.kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().
+               Update(context.Background(), vwc, metav1.UpdateOptions{})
+       return err
+}
+
+// generateWebhookCfg generates the validatingWebhookConfiguration and 
mutatingWebhookConfiguration objects.
+func (cs *configSyncer) generateWebhookCfg() (
+       *admissionv1.ValidatingWebhookConfiguration, 
*admissionv1.MutatingWebhookConfiguration) {
+       validatingWebhooks := cs.generateValidatingWebhooks()
+       mutatingWebhooks := cs.generateMutatingWebhooks()
+       return &admissionv1.ValidatingWebhookConfiguration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name: webhookconstants.ComponentName,
+                       },
+                       Webhooks: validatingWebhooks,
+               }, &admissionv1.MutatingWebhookConfiguration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name: webhookconstants.ComponentName,
+                       },
+                       Webhooks: mutatingWebhooks,
+               }
+}
+
+// generateValidatingWebhooks generates validatingWebhooks of 
validatingWebhookConfiguration.
+func (cs *configSyncer) generateValidatingWebhooks() 
[]admissionv1.ValidatingWebhook {
+       failurePolicy := admissionv1.Fail
+       sideEffects := admissionv1.SideEffectClassNone
+       return []admissionv1.ValidatingWebhook{
+               {
+                       Name: "webhook.for.shuffle.server",
+                       Rules: []admissionv1.RuleWithOperations{
+                               {
+                                       Operations: 
[]admissionv1.OperationType{admissionv1.Delete},
+                                       Rule: admissionv1.Rule{
+                                               APIGroups:   
[]string{corev1.GroupName},
+                                               APIVersions: []string{"v1"},
+                                               Resources:   []string{"pods"},
+                                       },
+                               },
+                       },
+                       FailurePolicy: &failurePolicy,
+                       ClientConfig: admissionv1.WebhookClientConfig{
+                               CABundle: cs.caCert,
+                               Service: &admissionv1.ServiceReference{
+                                       Name:      cs.externalService,
+                                       Namespace: utils.GetCurrentNamespace(),
+                                       Path:      
pointer.StringPtr(webhookconstants.ValidatingPodPath),
+                               },
+                       },
+                       ObjectSelector: &metav1.LabelSelector{
+                               MatchLabels: map[string]string{
+                                       constants.LabelShuffleServer: "true",
+                               },
+                       },
+                       SideEffects:             &sideEffects,
+                       AdmissionReviewVersions: []string{"v1", "v1beta1"},
+                       TimeoutSeconds:          pointer.Int32(30),
+               },
+               {
+                       Name:          "webhook.for.rss",
+                       Rules:         buildRules(),
+                       FailurePolicy: &failurePolicy,
+                       ClientConfig: admissionv1.WebhookClientConfig{
+                               CABundle: cs.caCert,
+                               Service: &admissionv1.ServiceReference{
+                                       Name:      cs.externalService,
+                                       Namespace: utils.GetCurrentNamespace(),
+                                       Path:      
pointer.StringPtr(webhookconstants.ValidatingRssPath),
+                               },
+                       },
+                       SideEffects:             &sideEffects,
+                       AdmissionReviewVersions: []string{"v1", "v1beta1"},
+               },
+       }
+}
+
+// generateMutatingWebhooks generates mutatingWebhooks of 
mutatingWebhookConfiguration.
+func (cs *configSyncer) generateMutatingWebhooks() 
[]admissionv1.MutatingWebhook {
+       failurePolicy := admissionv1.Fail
+       sideEffects := admissionv1.SideEffectClassNone
+       return []admissionv1.MutatingWebhook{
+               {
+                       Name:          "webhook.for.rss",
+                       Rules:         buildRules(),
+                       FailurePolicy: &failurePolicy,
+                       ClientConfig: admissionv1.WebhookClientConfig{
+                               CABundle: cs.caCert,
+                               Service: &admissionv1.ServiceReference{
+                                       Name:      cs.externalService,
+                                       Namespace: utils.GetCurrentNamespace(),
+                                       Path:      
pointer.StringPtr(webhookconstants.MutatingRssPath),
+                               },
+                       },
+                       SideEffects:             &sideEffects,
+                       AdmissionReviewVersions: []string{"v1", "v1beta1"},
+               },
+       }
+}
+
+func buildRules() []admissionv1.RuleWithOperations {
+       return []admissionv1.RuleWithOperations{
+               {
+                       Operations: 
[]admissionv1.OperationType{admissionv1.Create, admissionv1.Update},
+                       Rule: admissionv1.Rule{
+                               APIGroups:   
[]string{unifflev1alpha1.SchemeGroupVersion.Group},
+                               APIVersions: 
[]string{unifflev1alpha1.SchemeGroupVersion.Version},
+                               Resources:   []string{"remoteshuffleservices"},
+                       },
+               },
+       }
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/util/patch.go 
b/deploy/kubernetes/operator/pkg/webhook/util/patch.go
new file mode 100644
index 00000000..57f18abb
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/util/patch.go
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+// PatchOperation defines the information of a patch's operation.
+type PatchOperation struct {
+       Op    string      `json:"op"`
+       Path  string      `json:"path"`
+       Value interface{} `json:"value,omitempty"`
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/util/util.go 
b/deploy/kubernetes/operator/pkg/webhook/util/util.go
new file mode 100644
index 00000000..f1425809
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/webhook/util/util.go
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import (
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "time"
+
+       "github.com/parnurzeal/gorequest"
+       admissionv1 "k8s.io/api/admission/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/runtime/serializer"
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var (
+       // RuntimeScheme defines methods for serializing and deserializing API 
objects
+       runtimeScheme = runtime.NewScheme()
+       // Codecs  serializers for specific versions and content types
+       codecs = serializer.NewCodecFactory(runtimeScheme)
+       // Deserializer attempts to load an object from data
+       deserializer = codecs.UniversalDeserializer()
+)
+
+// AdmissionReviewHandler handles AdmissionReviews and set response in them.
+type AdmissionReviewHandler func(ar *admissionv1.AdmissionReview) 
*admissionv1.AdmissionReview
+
+// handleResponse write message to http response.
+func handleResponse(w http.ResponseWriter, status int, message string) {
+       w.WriteHeader(status)
+       if _, err := w.Write([]byte(message)); err != nil {
+               klog.Errorf("write message (%v) failed: %v", message, err)
+       }
+}
+
+// AdmissionReviewFailed returns error for the AdmissionReview.
+func AdmissionReviewFailed(ar *admissionv1.AdmissionReview,
+       err error) *admissionv1.AdmissionReview {
+       ar.Response = &admissionv1.AdmissionResponse{
+               UID: ar.Request.UID,
+               Result: &metav1.Status{
+                       Message: fmt.Sprintf("handle admission review failed: 
%v", err),
+               },
+       }
+       return ar
+}
+
+// AdmissionReviewAllow allows the AdmissionReview.
+func AdmissionReviewAllow(ar *admissionv1.AdmissionReview) 
*admissionv1.AdmissionReview {
+       ar.Response = &admissionv1.AdmissionResponse{
+               UID:     ar.Request.UID,
+               Allowed: true,
+       }
+       return ar
+}
+
+// AdmissionReviewForbidden forbids the AdmissionReview with delete operation.
+func AdmissionReviewForbidden(ar *admissionv1.AdmissionReview,
+       message string) *admissionv1.AdmissionReview {
+       ar.Response = &admissionv1.AdmissionResponse{
+               UID: ar.Request.UID,
+               Result: &metav1.Status{
+                       Message: message,
+               },
+       }
+       return ar
+}
+
+// AdmissionReviewWithPatches returns the AdmissionReview with patches in 
response.
+func AdmissionReviewWithPatches(ar *admissionv1.AdmissionReview,
+       patches []byte) *admissionv1.AdmissionReview {
+       ar.Response = &admissionv1.AdmissionResponse{
+               UID:     ar.Request.UID,
+               Allowed: true,
+               Patch:   patches,
+               PatchType: func() *admissionv1.PatchType {
+                       pt := admissionv1.PatchTypeJSONPatch
+                       return &pt
+               }(),
+       }
+       return ar
+}
+
+// WithAdmissionReviewHandler checks before InspectorFunc executes and creates 
a handleFunc.
+func WithAdmissionReviewHandler(handler AdmissionReviewHandler) 
http.HandlerFunc {
+       return func(w http.ResponseWriter, req *http.Request) {
+               if req.Body == nil {
+                       klog.Error("Receive an invalid ar, body is empty")
+                       handleResponse(w, http.StatusBadRequest, "ar body 
required")
+                       return
+               }
+
+               data, err := ioutil.ReadAll(req.Body)
+               if err != nil {
+                       klog.Errorf("Read ar body failed: %v", err)
+                       handleResponse(w, http.StatusInternalServerError,
+                               fmt.Sprintf("read ar body failed: %v", err))
+                       return
+               }
+
+               ar := &admissionv1.AdmissionReview{}
+               if _, _, err = deserializer.Decode(data, nil, ar); err != nil {
+                       klog.Errorf("Parse ar body failed: %s, %v", 
string(data), err)
+                       handleResponse(w, http.StatusBadRequest, 
fmt.Sprintf("parse ar failed: %v", err))
+                       return
+               }
+               klog.V(4).Infof("receive request: %v/%v/%v from %+v, verb: %+v",
+                       ar.Request.Namespace, ar.Request.Name, ar.Request.UID, 
ar.Request.UserInfo,
+                       ar.Request.Operation)
+               var respBytes []byte
+               respBytes, err = json.Marshal(handler(ar))
+               if err != nil {
+                       handleResponse(w, http.StatusInternalServerError,
+                               fmt.Sprintf("marshal response failed: %v", err))
+                       return
+               }
+               if _, err := w.Write(respBytes); err != nil {
+                       klog.Errorf("Send response failed: %v", err)
+               }
+       }
+}
+
+// NeedInspectPod returns whether we need to inspect the pod.
+func NeedInspectPod(pod *corev1.Pod) bool {
+       if pod.DeletionTimestamp != nil || pod.Labels == nil {
+               return false
+       }
+       if val, ok := pod.Labels[constants.LabelShuffleServer]; ok && val == 
"true" {
+               return true
+       }
+       return false
+}
+
+// MetricItem records an item of metric information of shuffle servers.
+type MetricItem struct {
+       Name        string   `json:"name"`
+       LabelNames  []string `json:"labelNames"`
+       LabelValues []string `json:"labelValues"`
+       Value       float32  `json:"value"`
+}
+
+// MetricList records all items of metric information of shuffle servers.
+type MetricList struct {
+       Metrics   []*MetricItem `json:"metrics"`
+       TimeStamp int64         `json:"timestamp"`
+}
+
+func getLastAppNum(body []byte) (int, error) {
+       resp := &MetricList{}
+       if err := json.Unmarshal(body, resp); err != nil {
+               klog.Errorf("unmarshal body (%v) failed: %v", string(body), err)
+               return 0, err
+       }
+       for i := range resp.Metrics {
+               if resp.Metrics[i].Name == "app_num_with_node" {
+                       return int(resp.Metrics[i].Value), nil
+               }
+       }
+       return 0, nil
+}
+
+// HasZeroApps returns whether there are zero apps in the shuffle server pod.
+func HasZeroApps(pod *corev1.Pod) bool {
+       port := utils.GetMetricsServerPort(pod)
+       if len(port) == 0 {
+               return true
+       }
+       if pod.Status.Phase != corev1.PodRunning {
+               return true
+       }
+       url := fmt.Sprintf("http://%v:%v/metrics/server";, pod.Status.PodIP, 
port)
+       req := gorequest.New().Timeout(time.Second * 15).Get(url).Type("json")
+       resp, body, errs := req.EndBytes()
+       if len(errs) > 0 {
+               klog.Errorf("send metrics server request failed: %v->%+v", url, 
errs)
+               return true
+       }
+       if resp.StatusCode != http.StatusOK {
+               klog.Errorf("heartbeat response failed: invalid status 
(%v->%v)", url, resp.Status)
+               return false
+       }
+       if num, err := getLastAppNum(body); err != nil {
+               klog.Errorf("get last app number of (%v) failed: %v", 
pod.Spec.NodeName, err)
+               return false
+       } else if num > 0 {
+               klog.V(4).Infof("last %v apps in node %v", num, 
pod.Spec.NodeName)
+               return false
+       }
+       return true
+}

Reply via email to