This is an automated email from the ASF dual-hosted git repository.
zhouquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 9948133 SUBMARINE-361. Run submarine operator in k8s
9948133 is described below
commit 9948133683d737739006ca89ec06be65137d0c28
Author: Xun Liu <[email protected]>
AuthorDate: Mon Jan 27 19:36:55 2020 +0800
SUBMARINE-361. Run submarine operator in k8s
### What is this PR for?
Run the submarine operator image in k8s as a pod. Provide the service of
the submarine server.
1. Create submarine cluster CRD automatically
2. Support creating submarine server in k8s, detecting submarine server
running status, providing high availability
### What type of PR is it?
[Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE/
* Put link here, and add [SUBMARINE-*Jira number*] in PR title, eg.
[SUBMARINE-23]
### How should this be tested?
* https://travis-ci.org/liuxunorg/submarine/builds/642231831
### Screenshots (if appropriate)


### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Xun Liu <[email protected]>
Closes #164 from liuxunorg/SUBMARINE-361 and squashes the following commits:
444d1ac [Xun Liu] Fixed
0b4ec8b [Xun Liu] Revert bin/common.sh & bin/submarine-daemon.sh
ec5a1d6 [Xun Liu] exclude go.mod & go.sum
23cfdaa [Xun Liu] Add license head
5435b03 [Xun Liu] SUBMARINE-361. Run submarine operator in k8s
---
pom.xml | 5 +-
submarine-cloud/Makefile | 19 +-
submarine-cloud/go.mod | 11 +-
submarine-cloud/go.sum | 43 ++-
submarine-cloud/manifests/crd.yaml | 62 +++-
.../pkg/apis/submarine/v1alpha1/const.go | 24 ++
submarine-cloud/pkg/controller/actions.go | 256 +++++++++++++
submarine-cloud/pkg/controller/checks.go | 246 +++++++++++++
.../pkg/controller/clustering/cluster-migration.go | 61 ++++
.../pkg/controller/clustering/cluster-placement.go | 237 ++++++++++++
.../pkg/controller/clustering/cluster-roles.go | 44 +++
submarine-cloud/pkg/controller/condition.go | 100 +++++
submarine-cloud/pkg/controller/controller.go | 405 ++++++++++++++++++++-
submarine-cloud/pkg/controller/pod/control.go | 87 ++++-
submarine-cloud/pkg/controller/pod/utils.go | 62 ++++
.../pkg/controller/sanitycheck/process.go | 63 ++++
submarine-cloud/pkg/controller/services_control.go | 8 +
submarine-cloud/pkg/controller/utils.go | 82 +++++
submarine-cloud/pkg/operator/config.go | 2 +-
submarine-cloud/pkg/operator/operator.go | 53 +++
submarine-cloud/pkg/submarine/admin.go | 209 +++++++++++
submarine-cloud/pkg/submarine/client.go | 119 ++++++
submarine-cloud/pkg/submarine/cluster.go | 43 +++
submarine-cloud/pkg/submarine/clusterinfo.go | 168 +++++++++
submarine-cloud/pkg/submarine/connections.go | 281 ++++++++++++++
submarine-cloud/pkg/submarine/errors.go | 70 ++++
submarine-cloud/pkg/submarine/node.go | 192 ++++++++++
submarine-cloud/pkg/utils/build.go | 29 +-
submarine-cloud/pom.xml | 2 +-
submarine-cloud/submarine-operator.md | 11 +-
30 files changed, 2936 insertions(+), 58 deletions(-)
diff --git a/pom.xml b/pom.xml
index b912f46..99f1b60 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,6 +138,7 @@
<modules>
<module>submarine-commons</module>
<module>submarine-client</module>
+ <module>submarine-cloud</module>
<module>submodules/tony</module>
<module>submarine-server</module>
<module>submarine-all</module>
@@ -491,8 +492,8 @@
<exclude>licenses/**</exclude>
<exclude>licenses-binary/**</exclude>
<exclude>NOTICE-binary</exclude>
- <exclude>submarine-cloud/go.mod</exclude>
- <exclude>submarine-cloud/go.sum</exclude>
+ <exclude>**/go.mod</exclude>
+ <exclude>**/go.sum</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/submarine-cloud/Makefile b/submarine-cloud/Makefile
index 830c7b7..8cedcf5 100644
--- a/submarine-cloud/Makefile
+++ b/submarine-cloud/Makefile
@@ -21,17 +21,20 @@ GOTEST=$(GOCMD) test
GOGET=$(GOCMD) get
BINARY_NAME=submarine-operator
BINARY_UNIX=$(BINARY_NAME)_unix
-VERSION=0.3.0-SNAPSHOT
+VERSION="0.3.0-SNAPSHOT"
+BuildGitBranch=$(git rev-parse --abbrev-ref HEAD)
+BuildGitRev=$(git rev-list --count HEAD)
+BuildGitCommit=$(git rev-parse HEAD)
LDFLAGS := -s -w \
- -X 'main.BuildVersion=$(VERSION)' \
- -X 'main.BuildGitBranch=$(shell git describe --all)'
\
- -X 'main.BuildGitRev=$(shell git rev-list --count
HEAD)' \
- -X 'main.BuildGitCommit=$(shell git rev-parse HEAD)'
\
- -X 'main.BuildDate=$(shell /bin/date "+%F %T")'
+ -X
"github.com/apache/submarine/submarine-cloud/pkg/utils.VERSION=${VERSION}" \
+ -X
"github.com/apache/submarine/submarine-cloud/pkg/utils.BuildGitBranch=${BuildGitBranch}"
\
+ -X
"github.com/apache/submarine/submarine-cloud/pkg/utils.BuildGitRev=${BuildGitRev}"
\
+ -X
"github.com/apache/submarine/submarine-cloud/pkg/utils.BuildGitCommit=${BuildGitCommit}"
\
+ -X
"github.com/apache/submarine/submarine-cloud/pkg/utils.BuildTime=$(/bin/date
"+%F %T")"
.PHONY: build
build:
- $(GOBUILD) -o ./$(BINARY_NAME) -v cmd/operator/main.go
+ $(GOBUILD) -o ./bin/$(BINARY_NAME) -v cmd/operator/main.go
test:
$(GOTEST) -v ./...
@@ -47,4 +50,4 @@ fmt:
@go fmt $(CURDIR)/...
release:
- CGO_ENABLED=0 GOOS=linux GOARCH=amd64 $(GOBUILD) -a -installsuffix cgo
-ldflags "$(LDFLAGS)" -o ./$(BINARY_UNIX)-linux-amd64-$(VERSION) -v
cmd/operator/main.go
+ CGO_ENABLED=0 GOOS=${GOOS} GOARCH=amd64 $(GOBUILD) -a -installsuffix
cgo -ldflags "$(LDFLAGS)" -o ./$(BINARY_UNIX)-linux-amd64-$(VERSION) -v
cmd/operator/main.go
diff --git a/submarine-cloud/go.mod b/submarine-cloud/go.mod
index f85ba40..1a0ec08 100644
--- a/submarine-cloud/go.mod
+++ b/submarine-cloud/go.mod
@@ -6,15 +6,16 @@ go 1.12
require (
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
+ github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
github.com/mitchellh/go-homedir v1.1.0
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.5.0
- k8s.io/api v0.0.0-20191121015604-11707872ac1c
- k8s.io/apiextensions-apiserver v0.0.0-20191204090421-cd61debedab5
- k8s.io/apimachinery v0.0.0-20191203211716-adc6f4cd9e7d
- k8s.io/client-go v0.0.0-20191204082520-bc9b51d240b2
- k8s.io/code-generator v0.0.0-20191121015212-c4c8f8345c7e
+ k8s.io/api v0.17.0
+ k8s.io/apiextensions-apiserver v0.17.0
+ k8s.io/apimachinery v0.17.0
+ k8s.io/client-go v0.17.0
+ k8s.io/code-generator v0.17.0
)
replace (
diff --git a/submarine-cloud/go.sum b/submarine-cloud/go.sum
index ce4a608..b788913 100644
--- a/submarine-cloud/go.sum
+++ b/submarine-cloud/go.sum
@@ -11,6 +11,7 @@ github.com/Azure/go-autorest/logger v0.1.0/go.mod
h1:oExouG+K6PryycPJfVSxi/koC6L
github.com/Azure/go-autorest/tracing v0.5.0/go.mod
h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/BurntSushi/toml v0.3.1/go.mod
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod
h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46
h1:lsxEuwrXEAokXB9qhlbKWPpo3KMLZQ5WB5WLQRW1uq0=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod
h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/OneOfOne/xxhash v1.2.2/go.mod
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PuerkitoBio/purell v1.0.0/go.mod
h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
@@ -24,14 +25,16 @@ github.com/agnivade/levenshtein v1.0.1/go.mod
h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4Rq
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod
h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
-github.com/apache/submarine v0.0.0-20191203011414-d394781a2c7d
h1:jX4hWmypNtuNhMONSBeT+UvxhOSNSr4hq9PgGLwFI20=
-github.com/apache/submarine v0.0.0-20191210121440-855c010715db
h1:ZLSnW9MFe1alw+vfwrGx6n326f9Yj/zpwLPTRp5x1CA=
+github.com/apache/submarine v0.0.0-20191217064858-fc7e722da84f
h1:eQ/mULGQcKeW9CMan+NzSpoMpiPEG5YckWyInzxuUWI=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod
h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod
h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
+github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod
h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bgentry/speakeasy v0.1.0/go.mod
h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/blang/semver v3.5.0+incompatible
h1:CGxCgetQ64DKk7rdZ++Vfnb1+ogGNnB17OJKJXD2Cfs=
github.com/blang/semver v3.5.0+incompatible/go.mod
h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/cespare/xxhash v1.1.0/go.mod
h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod
h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@@ -43,9 +46,11 @@ github.com/coreos/go-oidc v2.1.0+incompatible/go.mod
h1:CgnwVTmzoESiwO9qyAFEMiHo
github.com/coreos/go-semver v0.2.0/go.mod
h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod
h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod
h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
+github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod
h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod
h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod
h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod
h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod
h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/creack/pty v1.1.7/go.mod
h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
@@ -82,9 +87,11 @@ github.com/go-openapi/analysis
v0.0.0-20180825180245-b006789cd277/go.mod h1:k70t
github.com/go-openapi/analysis v0.17.0/go.mod
h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
github.com/go-openapi/analysis v0.18.0/go.mod
h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
github.com/go-openapi/analysis v0.19.2/go.mod
h1:3P1osvZa9jKjb8ed2TPng3f0i/UY9snX6gxi44djMjk=
+github.com/go-openapi/analysis v0.19.5
h1:8b2ZgKfKIUTVQpTb77MoRDIMEIwvDVw40o3aOXdfYzI=
github.com/go-openapi/analysis v0.19.5/go.mod
h1:hkEAkxagaIvIP7VTn8ygJNkd4kAYON2rCu0v0ObL0AU=
github.com/go-openapi/errors v0.17.0/go.mod
h1:LcZQpmvG4wyF5j4IhA73wkLFQg+QJXOQHVjmcZxhka0=
github.com/go-openapi/errors v0.18.0/go.mod
h1:LcZQpmvG4wyF5j4IhA73wkLFQg+QJXOQHVjmcZxhka0=
+github.com/go-openapi/errors v0.19.2
h1:a2kIyV3w+OS3S97zxUndRVD46+FhGOUBDFY7nmu4CsY=
github.com/go-openapi/errors v0.19.2/go.mod
h1:qX0BLWsyaKfvhluLejVpVNwNRdXZhEbTA4kxxpKBC94=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod
h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonpointer v0.17.0/go.mod
h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M=
@@ -102,9 +109,11 @@ github.com/go-openapi/loads v0.17.0/go.mod
h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf
github.com/go-openapi/loads v0.18.0/go.mod
h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU=
github.com/go-openapi/loads v0.19.0/go.mod
h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU=
github.com/go-openapi/loads v0.19.2/go.mod
h1:QAskZPMX5V0C2gvfkGZzJlINuP7Hx/4+ix5jWFxsNPs=
+github.com/go-openapi/loads v0.19.4
h1:5I4CCSqoWzT+82bBkNIvmLc0UOsoKKQ4Fz+3VxOB7SY=
github.com/go-openapi/loads v0.19.4/go.mod
h1:zZVHonKd8DXyxyw4yfnVjPzBjIQcLt0CCsn0N0ZrQsk=
github.com/go-openapi/runtime v0.0.0-20180920151709-4f900dc2ade9/go.mod
h1:6v9a6LTXWQCdL8k1AO3cvqx5OtZY/Y9wKTgaoP6YRfA=
github.com/go-openapi/runtime v0.19.0/go.mod
h1:OwNfisksmmaZse4+gpV3Ne9AyMOlP1lt4sK4FXt0O64=
+github.com/go-openapi/runtime v0.19.4
h1:csnOgcgAiuGoM/Po7PEpKDoNulCcF3FGbSnbHfxgjMI=
github.com/go-openapi/runtime v0.19.4/go.mod
h1:X277bwSUBxVlCYR3r7xgZZGKVvBd/29gLDlFGtJ8NL4=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod
h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
github.com/go-openapi/spec v0.17.0/go.mod
h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI=
@@ -115,6 +124,7 @@ github.com/go-openapi/spec v0.19.3/go.mod
h1:FpwSN1ksY1eteniUU7X0N/BgJ7a4WvBFVA8
github.com/go-openapi/strfmt v0.17.0/go.mod
h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU=
github.com/go-openapi/strfmt v0.18.0/go.mod
h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU=
github.com/go-openapi/strfmt v0.19.0/go.mod
h1:+uW+93UVvGGq2qGaZxdDeJqSAqBqBdl+ZPMF/cC8nDY=
+github.com/go-openapi/strfmt v0.19.3
h1:eRfyY5SkaNJCAwmmMcADjY31ow9+N7MCLW7oRkbsINA=
github.com/go-openapi/strfmt v0.19.3/go.mod
h1:0yX7dbo8mKIvc3XSKp7MNfxw4JytCfCD6+bY1AVL9LU=
github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod
h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I=
github.com/go-openapi/swag v0.17.0/go.mod
h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg=
@@ -124,7 +134,9 @@ github.com/go-openapi/swag v0.19.5
h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tF
github.com/go-openapi/swag v0.19.5/go.mod
h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/validate v0.18.0/go.mod
h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/go-openapi/validate v0.19.2/go.mod
h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
+github.com/go-openapi/validate v0.19.5
h1:QhCBKRYqZR+SKo4gl1lPhPahope8/RLt6EVgY8X80w0=
github.com/go-openapi/validate v0.19.5/go.mod
h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
+github.com/go-stack/stack v1.8.0
h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod
h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -154,6 +166,7 @@ github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/martian v2.1.0+incompatible/go.mod
h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod
h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.0.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod
h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d
h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
@@ -164,6 +177,7 @@ github.com/gorilla/websocket v1.4.0/go.mod
h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoA
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod
h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod
h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware
v1.0.1-0.20190118093823-f849b5445de4/go.mod
h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod
h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod
h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod
h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
@@ -172,6 +186,8 @@ github.com/hashicorp/golang-lru v0.5.1
h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+
github.com/hashicorp/golang-lru v0.5.1/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
h1:GT4RsKmHh1uZyhmTkWJTDALRjSHYQp6FRKrotf0zhAs=
+github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40/go.mod
h1:NtmN9h8vrTveVQRLHcX2HQ5wIPBDCsZ351TGbZWgg38=
github.com/hpcloud/tail v1.0.0/go.mod
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod
h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
@@ -207,6 +223,7 @@ github.com/mailru/easyjson v0.7.0/go.mod
h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7
github.com/mattn/go-colorable v0.0.9/go.mod
h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod
h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod
h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
+github.com/matttproud/golang_protobuf_extensions v1.0.1
h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod
h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/go-homedir v1.1.0
h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod
h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@@ -220,6 +237,7 @@ github.com/modern-go/reflect2
v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1
h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod
h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod
h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod
h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod
h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
@@ -241,14 +259,18 @@ github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021/go.mod
h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA=
github.com/prometheus/client_golang v0.9.1/go.mod
h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod
h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
+github.com/prometheus/client_golang v1.0.0
h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
github.com/prometheus/client_golang v1.0.0/go.mod
h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod
h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod
h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0/go.mod
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.4.1
h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
github.com/prometheus/common v0.4.1/go.mod
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod
h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod
h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.0.2
h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
github.com/prometheus/procfs v0.0.2/go.mod
h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod
h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
@@ -297,14 +319,19 @@ github.com/xiang90/probing
v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod
h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
h1:VcrIfasaLFkyjk6KNlXQSzO+B0fZcnECiDrKJsfxka0=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod
h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.mongodb.org/mongo-driver v1.0.3/go.mod
h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod
h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
+go.mongodb.org/mongo-driver v1.1.2
h1:jxcFYjlkl8xaERsgLo+RNquI0epW6zuy/ZRQs6jnrFA=
go.mongodb.org/mongo-driver v1.1.2/go.mod
h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.uber.org/atomic v1.3.2/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -378,9 +405,11 @@ google.golang.org/appengine v1.5.0/go.mod
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod
h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod
h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873
h1:nfPFGzJkUDX6uBmpN/pSw7MbOAWegH5QDQuoXFHedLg=
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod
h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.19.0/go.mod
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0/go.mod
h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk=
google.golang.org/grpc v1.23.1/go.mod
h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -389,6 +418,7 @@ gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod
h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qS
gopkg.in/fsnotify.v1 v1.4.7/go.mod
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod
h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.2.2/go.mod
h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
@@ -406,15 +436,23 @@ k8s.io/api v0.0.0-20191121015604-11707872ac1c
h1:Z87my3sF4WhG0OMxzARkWY/IKBtOr+M
k8s.io/api v0.0.0-20191121015604-11707872ac1c/go.mod
h1:R/s4gKT0V/cWEnbQa9taNRJNbWUK57/Dx6cPj6MD3A0=
k8s.io/apiextensions-apiserver v0.0.0-20191204090421-cd61debedab5
h1:g+GvnbGqLU1Jxb/9iFm/BFcmkqG9HdsGh52+wHirpsM=
k8s.io/apiextensions-apiserver v0.0.0-20191204090421-cd61debedab5/go.mod
h1:CPw0IHz1YrWGy0+8mG/76oTHXvChlgCb3EAezKQKB2I=
+k8s.io/apiextensions-apiserver v0.17.0
h1:+XgcGxqaMztkbbvsORgCmHIb4uImHKvTjNyu7b8gRnA=
+k8s.io/apiextensions-apiserver v0.17.0/go.mod
h1:XiIFUakZywkUl54fVXa7QTEHcqQz9HG55nHd1DCoHj8=
k8s.io/apimachinery v0.0.0-20191121015412-41065c7a8c2a
h1:9V03T5lHv/iF4fSgvMCd+iB86AgEgmzLpheMqIJy7hs=
k8s.io/apimachinery v0.0.0-20191121015412-41065c7a8c2a/go.mod
h1:b9qmWdKlLuU9EBh+06BtLcSf/Mu89rWL33naRxs1uZg=
+k8s.io/apiserver v0.0.0-20191204084332-137a9d3b886b
h1:QCZdKeWUjPS7uv9ewmHn1GFCevRLKyTURPbvi1kFFHM=
k8s.io/apiserver v0.0.0-20191204084332-137a9d3b886b/go.mod
h1:itgfam5HJbT/4b2BGfpUkkxfheMmDH+Ix+tEAP3uqZk=
+k8s.io/apiserver v0.17.0 h1:XhUix+FKFDcBygWkQNp7wKKvZL030QUlH1o8vFeSgZA=
+k8s.io/apiserver v0.17.0/go.mod h1:ABM+9x/prjINN6iiffRVNCBR2Wk7uY4z+EtEGZD48cg=
k8s.io/client-go v0.0.0-20191121015835-571c0ef67034
h1:+/ppGIi1rJThJAz/xJSSOuD82gb6E5jRv2305MSznxQ=
k8s.io/client-go v0.0.0-20191121015835-571c0ef67034/go.mod
h1:Adhj+OyDRsEXTnL9BfL7xbLWGWMCqGLWpMqGHkZI4J8=
k8s.io/code-generator v0.0.0-20191121015212-c4c8f8345c7e
h1:HB9Zu5ZUvJfNpLiTPhz+CebVKV8C39qTBMQkAgAZLNw=
k8s.io/code-generator v0.0.0-20191121015212-c4c8f8345c7e/go.mod
h1:DVmfPQgxQENqDIzVR2ddLXMH34qeszkKSdH/N+s+38s=
k8s.io/component-base v0.0.0-20191204083903-0d4d24e738e4/go.mod
h1:8VIh1jErItC4bg9hLBkPneyS77Tin8KwSzbYepHJnQI=
+k8s.io/component-base v0.0.0-20191204083906-3ac1376c73aa
h1:SenKbYwkn+6a0owQHeg565VpA0snQAYupFVHFWA2qYg=
k8s.io/component-base v0.0.0-20191204083906-3ac1376c73aa/go.mod
h1:mECWvHCPhJudDVDMtBl+AIf/YnTMp5r1F947OYFUwP0=
+k8s.io/component-base v0.17.0 h1:BnDFcmBDq+RPpxXjmuYnZXb59XNN9CaFrX8ba9+3xrA=
+k8s.io/component-base v0.17.0/go.mod
h1:rKuRAokNMY2nn2A6LP/MiwpoaMRHpfRnrPaUJJj1Yoc=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod
h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/gengo v0.0.0-20190822140433-26a664648505
h1:ZY6yclUKVbZ+SdWnkfY+Je5vrMpKOxmGeKRbsXVmqYM=
k8s.io/gengo v0.0.0-20190822140433-26a664648505/go.mod
h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
@@ -432,6 +470,7 @@ modernc.org/mathutil v1.0.0/go.mod
h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03
modernc.org/strutil v1.0.0/go.mod
h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod
h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
+sigs.k8s.io/structured-merge-diff v1.0.1-0.20191108220359-b1b620dd3f06
h1:zD2IemQ4LmOcAumeiyDWXKUI2SO0NYDe3H6QGvPOVgU=
sigs.k8s.io/structured-merge-diff v1.0.1-0.20191108220359-b1b620dd3f06/go.mod
h1:/ULNhyfzRopfcjskuui0cTITekDduZ7ycKN3oUT9R18=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
diff --git a/submarine-cloud/manifests/crd.yaml
b/submarine-cloud/manifests/crd.yaml
index d8d2760..1b62032 100644
--- a/submarine-cloud/manifests/crd.yaml
+++ b/submarine-cloud/manifests/crd.yaml
@@ -17,29 +17,53 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
- # format:<plural>.<group>
- name: submarineservers.submarine.apache.org
+ creationTimestamp: "2020-01-27T03:46:24Z"
+ generation: 1
+ name: submarineclusters.submarine.apache.org
+ resourceVersion: "172901"
+ selfLink:
/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/submarineclusters.submarine.apache.org
+ uid: 96ee5ef5-40b7-11ea-8e11-0242ac110002
spec:
- # Groups are used in the REST API: /apis/<group>/<version>
+ additionalPrinterColumns:
+ - JSONPath: .metadata.creationTimestamp
+ description: |-
+ CreationTimestamp is a timestamp representing the server time when
this object was created. It is not guaranteed to be set in happens-before order
across separate operations. Clients may not set this value. It is represented
in RFC3339 form and is in UTC.
+
+ Populated by the system. Read-only. Null for lists. More info:
https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
+ name: Age
+ type: date
group: submarine.apache.org
- # Supported version list
+ names:
+ kind: SubmarineCluster
+ listKind: SubmarineClusterList
+ plural: submarineclusters
+ shortNames:
+ - submarine
+ singular: submarinecluster
+ scope: Namespaced
+ version: v1alpha1
versions:
- name: v1alpha1
- # Whether the switch is valid.
- # Each version can be enabled/disabled by Served flag
served: true
- # Only one version can be marked as storage
storage: true
- # Belongs to Namespaced or Cluster
- scope: Namespaced
- names:
- # Used in URLs: /apis/<group>/<version>/<plural>
- plural: submarineservers
- # as an alias on the CLI and for display, As alias in cli
- singular: submarineserver
- # Resource type, generally used in manifests
- kind: SubmarineServer
- # shortNames allow shorter string to match your resource on the CLI
- # Short name, just like service short name is svc
+status:
+ acceptedNames:
+ kind: SubmarineCluster
+ listKind: SubmarineClusterList
+ plural: submarineclusters
shortNames:
- - std
+ - submarine
+ singular: submarinecluster
+ conditions:
+ - lastTransitionTime: "2020-01-27T03:46:24Z"
+ message: no conflicts found
+ reason: NoConflicts
+ status: "True"
+ type: NamesAccepted
+ - lastTransitionTime: null
+ message: the initial names have been accepted
+ reason: InitialNamesAccepted
+ status: "True"
+ type: Established
+ storedVersions:
+ - v1alpha1
diff --git a/submarine-cloud/pkg/apis/submarine/v1alpha1/const.go
b/submarine-cloud/pkg/apis/submarine/v1alpha1/const.go
new file mode 100644
index 0000000..afa1dba
--- /dev/null
+++ b/submarine-cloud/pkg/apis/submarine/v1alpha1/const.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package v1alpha1
+
+const (
+ // ClusterNameLabelKey Label key for the ClusterName
+ ClusterNameLabelKey string = "submarine-operator.k8s.io/cluster-name"
+ // PodSpecMD5LabelKey label key for the PodSpec MD5 hash
+ PodSpecMD5LabelKey string = "submarine-operator.k8s.io/podspec-md5"
+)
diff --git a/submarine-cloud/pkg/controller/actions.go
b/submarine-cloud/pkg/controller/actions.go
new file mode 100644
index 0000000..95f8009
--- /dev/null
+++ b/submarine-cloud/pkg/controller/actions.go
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package controller
+
+import (
+ rapi
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+ "github.com/apache/submarine/submarine-cloud/pkg/controller/clustering"
+ "github.com/apache/submarine/submarine-cloud/pkg/submarine"
+ "github.com/golang/glog"
+ "time"
+)
+
+// Perform various management operations on Submarine Pod and Submarine
clusters to approximate the desired state
+func (c *Controller) clusterAction(admin submarine.AdminInterface, cluster
*rapi.SubmarineCluster, infos *submarine.ClusterInfos) (bool, error) {
+ glog.Info("clusterAction()")
+ var err error
+ /* run sanity check if needed
+ needSanity, err := sanitycheck.RunSanityChecks(admin,
&c.config.submarine, c.podControl, cluster, infos, true)
+ if err != nil {
+ glog.Errorf("[clusterAction] cluster %s/%s, an error occurs
during sanitycheck: %v ", cluster.Namespace, cluster.Name, err)
+ return false, err
+ }
+ if needSanity {
+ glog.V(3).Infof("[clusterAction] run sanitycheck cluster:
%s/%s", cluster.Namespace, cluster.Name)
+ return sanitycheck.RunSanityChecks(admin, &c.config.submarine,
c.podControl, cluster, infos, false)
+ }*/
+
+ // Start more pods in needed
+ if needMorePods(cluster) {
+ if setScalingCondition(&cluster.Status, true) {
+ if cluster, err = c.updateHandler(cluster); err != nil {
+ return false, err
+ }
+ }
+ pod, err2 := c.podControl.CreatePod(cluster)
+ if err2 != nil {
+ glog.Errorf("[clusterAction] unable to create a pod
associated to the SubmarineCluster: %s/%s, err: %v", cluster.Namespace,
cluster.Name, err2)
+ return false, err2
+ }
+
+ glog.V(3).Infof("[clusterAction]create a Pod %s/%s",
pod.Namespace, pod.Name)
+ return true, nil
+ }
+ if setScalingCondition(&cluster.Status, false) {
+ if cluster, err = c.updateHandler(cluster); err != nil {
+ return false, err
+ }
+ }
+
+ // Reconfigure the Cluster if needed
+ hasChanged, err := c.applyConfiguration(admin, cluster)
+ if err != nil {
+ glog.Errorf("[clusterAction] cluster %s/%s, an error occurs: %v
", cluster.Namespace, cluster.Name, err)
+ return false, err
+ }
+
+ if hasChanged {
+ glog.V(6).Infof("[clusterAction] cluster has changed cluster:
%s/%s", cluster.Namespace, cluster.Name)
+ return true, nil
+ }
+
+ glog.Infof("[clusterAction] cluster hasn't changed cluster: %s/%s",
cluster.Namespace, cluster.Name)
+ return false, nil
+}
+
+// applyConfiguration apply new configuration if needed:
+// - add or delete pods
+// - configure the submarine-server process
+func (c *Controller) applyConfiguration(admin submarine.AdminInterface,
cluster *rapi.SubmarineCluster) (bool, error) {
+ glog.Info("applyConfiguration START")
+ defer glog.Info("applyConfiguration STOP")
+
+ asChanged := false
+
+ // expected replication factor and number of master nodes
+ cReplicaFactor := *cluster.Spec.ReplicationFactor
+ cNbMaster := *cluster.Spec.NumberOfMaster
+ // Adapt, convert CR to structure in submarine package
+ rCluster, nodes, err := newSubmarineCluster(admin, cluster)
+ if err != nil {
+ glog.Errorf("Unable to create the SubmarineCluster view,
error:%v", err)
+ return false, err
+ }
+ // PodTemplate changes require rolling updates
+ if needRollingUpdate(cluster) {
+ if setRollingUpdategCondition(&cluster.Status, true) {
+ if cluster, err = c.updateHandler(cluster); err != nil {
+ return false, err
+ }
+ }
+
+ glog.Info("applyConfiguration needRollingUpdate")
+ return c.manageRollingUpdate(admin, cluster, rCluster, nodes)
+ }
+ if setRollingUpdategCondition(&cluster.Status, false) {
+ if cluster, err = c.updateHandler(cluster); err != nil {
+ return false, err
+ }
+ }
+
+ // if the number of Pods is greater than expected
+ if needLessPods(cluster) {
+ if setRebalancingCondition(&cluster.Status, true) {
+ if cluster, err = c.updateHandler(cluster); err != nil {
+ return false, err
+ }
+ }
+ glog.Info("applyConfiguration needLessPods")
+ // Configure Submarine cluster
+ return c.managePodScaleDown(admin, cluster, rCluster, nodes)
+ }
+ // If it is not a rolling update, modify the Condition
+ if setRebalancingCondition(&cluster.Status, false) {
+ if cluster, err = c.updateHandler(cluster); err != nil {
+ return false, err
+ }
+ }
+
+ clusterStatus := &cluster.Status.Cluster
+ if (clusterStatus.NbPods - clusterStatus.NbSubmarineRunning) != 0 {
+ glog.V(3).Infof("All pods not ready wait to be ready, nbPods:
%d, nbPodsReady: %d", clusterStatus.NbPods, clusterStatus.NbSubmarineRunning)
+ return false, err
+ }
+
+ // First, we define the new masters
+ // Select the desired number of Masters and assign Hashslots to each
Master. The Master will be distributed to different K8S nodes as much as
possible
+ // Set the cluster status to Calculating Rebalancing
+ newMasters, curMasters, allMaster, err :=
clustering.DispatchMasters(rCluster, nodes, cNbMaster, admin)
+ if err != nil {
+ glog.Errorf("Cannot dispatch slots to masters: %v", err)
+ rCluster.Status = rapi.ClusterStatusError
+ return false, err
+ }
+ // If the number of new and old masters is not the same
+ if len(newMasters) != len(curMasters) {
+ asChanged = true
+ }
+
+ // Second select Node that is already a slave
+ currentSlaveNodes := nodes.FilterByFunc(submarine.IsSlave)
+
+ //New slaves are slaves which is currently a master with no slots
+ newSlave := nodes.FilterByFunc(func(nodeA *submarine.Node) bool {
+ for _, nodeB := range newMasters {
+ if nodeA.ID == nodeB.ID {
+ return false
+ }
+ }
+ for _, nodeB := range currentSlaveNodes {
+ if nodeA.ID == nodeB.ID {
+ return false
+ }
+ }
+ return true
+ })
+
+ // Depending on whether we scale up or down, we will dispatch slaves
before/after the dispatch of slots
+ if cNbMaster < int32(len(curMasters)) {
+ // this happens usually after a scale down of the cluster
+ // we should dispatch slots before dispatching slaves
+ if err := clustering.DispatchSlotToNewMasters(rCluster, admin,
newMasters, curMasters, allMaster); err != nil {
+ glog.Error("Unable to dispatch slot on new master,
err:", err)
+ return false, err
+ }
+
+ // assign master/slave roles
+ newSubmarineSlavesByMaster, bestEffort :=
clustering.PlaceSlaves(rCluster, newMasters, currentSlaveNodes, newSlave,
cReplicaFactor)
+ if bestEffort {
+ rCluster.NodesPlacement =
rapi.NodesPlacementInfoBestEffort
+ }
+
+ if err := clustering.AttachingSlavesToMaster(rCluster, admin,
newSubmarineSlavesByMaster); err != nil {
+ glog.Error("Unable to dispatch slave on new master,
err:", err)
+ return false, err
+ }
+ } else {
+ // We are scaling up the nbmaster or the nbmaster doesn't
change.
+ // assign master/slave roles
+ newSubmarineSlavesByMaster, bestEffort :=
clustering.PlaceSlaves(rCluster, newMasters, currentSlaveNodes, newSlave,
cReplicaFactor)
+ if bestEffort {
+ rCluster.NodesPlacement =
rapi.NodesPlacementInfoBestEffort
+ }
+
+ if err := clustering.AttachingSlavesToMaster(rCluster, admin,
newSubmarineSlavesByMaster); err != nil {
+ glog.Error("Unable to dispatch slave on new master,
err:", err)
+ return false, err
+ }
+
+ if err := clustering.DispatchSlotToNewMasters(rCluster, admin,
newMasters, curMasters, allMaster); err != nil {
+ glog.Error("Unable to dispatch slot on new master,
err:", err)
+ return false, err
+ }
+ }
+
+ glog.V(4).Infof("new nodes status: \n %v", nodes)
+
+ // Set the cluster status
+ rCluster.Status = rapi.ClusterStatusOK
+ // wait a bit for the cluster to propagate configuration to reduce
warning logs because of temporary inconsistency
+ time.Sleep(1 * time.Second)
+ return asChanged, nil
+}
+
+func newSubmarineCluster(admin submarine.AdminInterface, cluster
*rapi.SubmarineCluster) (*submarine.Cluster, submarine.Nodes, error) {
+ infos, err := admin.GetClusterInfos()
+ if submarine.IsPartialError(err) {
+ glog.Errorf("Error getting consolidated view of the cluster
err: %v", err)
+ return nil, nil, err
+ }
+
+ // now we can trigger the rebalance
+ nodes := infos.GetNodes()
+
+ // build submarine cluster vision
+ rCluster := &submarine.Cluster{
+ Name: cluster.Name,
+ Namespace: cluster.Namespace,
+ Nodes: make(map[string]*submarine.Node),
+ }
+
+ for _, node := range nodes {
+ rCluster.Nodes[node.ID] = node
+ }
+
+ for _, node := range cluster.Status.Cluster.Nodes {
+ if rNode, ok := rCluster.Nodes[node.ID]; ok {
+ rNode.Pod = node.Pod
+ }
+ }
+
+ return rCluster, nodes, nil
+}
+
+// manageRollingUpdate used to manage properly a cluster rolling update if the
podtemplate spec has changed
+func (c *Controller) manageRollingUpdate(admin submarine.AdminInterface,
cluster *rapi.SubmarineCluster, rCluster *submarine.Cluster, nodes
submarine.Nodes) (bool, error) {
+ return true, nil
+}
+
+// managePodScaleDown used to manage properly the scale down of a cluster
+func (c *Controller) managePodScaleDown(admin submarine.AdminInterface,
cluster *rapi.SubmarineCluster, rCluster *submarine.Cluster, nodes
submarine.Nodes) (bool, error) {
+ return true, nil
+}
diff --git a/submarine-cloud/pkg/controller/checks.go
b/submarine-cloud/pkg/controller/checks.go
new file mode 100644
index 0000000..f2ddca5
--- /dev/null
+++ b/submarine-cloud/pkg/controller/checks.go
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package controller
+
+import (
+ "reflect"
+
+ rapi
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+ podctrl "github.com/apache/submarine/submarine-cloud/pkg/controller/pod"
+ "github.com/golang/glog"
+ kapi "k8s.io/api/core/v1"
+)
+
+// Divide pods for lost and other
+func filterLostNodes(pods []*kapi.Pod) (ok []*kapi.Pod, ko []*kapi.Pod) {
+ for _, pod := range pods {
+ if pod.Status.Reason == "NodeLost" {
+ ko = append(ko, pod)
+ } else {
+ ok = append(ok, pod)
+ }
+ }
+ return ok, ko
+}
+
+func compareStatus(old, new *rapi.SubmarineClusterClusterStatus) bool {
+ if compareStringValue("ClusterStatus", string(old.Status),
string(new.Status)) {
+ return true
+ }
+ if compareInts("NbPods", old.NbPods, new.NbPods) {
+ return true
+ }
+ if compareInts("NbPodsReady", old.NbPodsReady, new.NbPodsReady) {
+ return true
+ }
+ if compareInts("NbSubmarineRunning", old.NbSubmarineRunning,
new.NbSubmarineRunning) {
+ return true
+ }
+ if compareInts("NumberOfMaster", old.NumberOfMaster,
new.NumberOfMaster) {
+ return true
+ }
+ if compareInts("MinReplicationFactor", old.MinReplicationFactor,
new.MinReplicationFactor) {
+ return true
+ }
+ if compareInts("MaxReplicationFactor", old.MaxReplicationFactor,
new.MaxReplicationFactor) {
+ return true
+ }
+ if compareStringValue("ClusterStatus", string(old.Status),
string(new.Status)) {
+ return true
+ }
+ if compareStringValue("NodesPlacement", string(old.NodesPlacement),
string(new.NodesPlacement)) {
+ return true
+ }
+ if compareInts("len(Nodes)", int32(len(old.Nodes)),
int32(len(new.Nodes))) {
+ return true
+ }
+
+ if len(old.Nodes) != len(new.Nodes) {
+ return true
+ }
+ for _, nodeA := range old.Nodes {
+ found := false
+ for _, nodeB := range new.Nodes {
+ if nodeA.ID == nodeB.ID {
+ found = true
+ if compareNodes(&nodeA, &nodeB) {
+ return true
+ }
+ }
+ }
+ if !found {
+ return true
+ }
+ }
+
+ return false
+}
+
+func compareStringValue(name string, old, new string) bool {
+ if old != new {
+ glog.V(6).Infof("compare %s: %s - %s", name, old, new)
+ return true
+ }
+
+ return false
+}
+
+func compareInts(name string, old, new int32) bool {
+ if old != new {
+ glog.Infof("compare status.%s: %d - %d", name, old, new)
+ return true
+ }
+
+ return false
+}
+
+func compareNodes(nodeA, nodeB *rapi.SubmarineClusterNode) bool {
+ if compareStringValue("Node.IP", nodeA.IP, nodeB.IP) {
+ return true
+ }
+ if compareStringValue("Node.MasterRef", nodeA.MasterRef,
nodeB.MasterRef) {
+ return true
+ }
+ if compareStringValue("Node.PodName", nodeA.PodName, nodeB.PodName) {
+ return true
+ }
+ if compareStringValue("Node.Port", nodeA.Port, nodeB.Port) {
+ return true
+ }
+ if compareStringValue("Node.Role", string(nodeA.Role),
string(nodeB.Role)) {
+ return true
+ }
+
+ sizeSlotsA := 0
+ sizeSlotsB := 0
+ if nodeA.Slots != nil {
+ sizeSlotsA = len(nodeA.Slots)
+ }
+ if nodeB.Slots != nil {
+ sizeSlotsB = len(nodeB.Slots)
+ }
+ if sizeSlotsA != sizeSlotsB {
+ glog.Infof("compare Node.Slote size: %d - %d", sizeSlotsA,
sizeSlotsB)
+ return true
+ }
+
+ if (sizeSlotsA != 0) && !reflect.DeepEqual(nodeA.Slots, nodeB.Slots) {
+ glog.Infof("compare Node.Slote deepEqual: %v - %v",
nodeA.Slots, nodeB.Slots)
+ return true
+ }
+
+ return false
+}
+
+func needClusterOperation(cluster *rapi.SubmarineCluster) bool {
+ /*
+ if needRollingUpdate(cluster) {
+
glog.V(6).Info("needClusterOperation---needRollingUpdate")
+ return true
+ }
+
+ if needMorePods(cluster) {
+ glog.V(6).Info("needClusterOperation---needMorePods")
+ return true
+ }
+
+ if needLessPods(cluster) {
+ glog.Info("needClusterOperation---needLessPods")
+ return true
+ }
+
+ if compareIntValue("NumberOfMaster",
&cluster.Status.Cluster.NumberOfMaster, cluster.Spec.NumberOfMaster) {
+ glog.V(6).Info("needClusterOperation---NumberOfMaster")
+ return true
+ }
+
+ if compareIntValue("MinReplicationFactor",
&cluster.Status.Cluster.MinReplicationFactor, cluster.Spec.ReplicationFactor) {
+
glog.V(6).Info("needClusterOperation---MinReplicationFactor")
+ return true
+ }
+
+ if compareIntValue("MaxReplicationFactor",
&cluster.Status.Cluster.MaxReplicationFactor, cluster.Spec.ReplicationFactor) {
+
glog.V(6).Info("needClusterOperation---MaxReplicationFactor")
+ return true
+ }*/
+
+ return false
+}
+
+func needMorePods(cluster *rapi.SubmarineCluster) bool {
+ // Expected number of Pods depends on replication factor and Master
number
+ nbPodNeed := *cluster.Spec.NumberOfMaster * (1 +
*cluster.Spec.ReplicationFactor)
+ glog.Infof("nbPodNeed=%d, *cluster.Spec.NumberOfMaster=%d,
*cluster.Spec.ReplicationFactor=%d", nbPodNeed, *cluster.Spec.NumberOfMaster,
*cluster.Spec.ReplicationFactor)
+
+ // If not all Pods are ready, do nothing
+ glog.Infof("cluster.Status.Cluster.NbPods=%d",
cluster.Status.Cluster.NbPods)
+ glog.Infof("cluster.Status.Cluster.NbPodsReady=%d",
cluster.Status.Cluster.NbPodsReady)
+ if cluster.Status.Cluster.NbPods != cluster.Status.Cluster.NbPodsReady {
+ return false
+ }
+ output := false
+ if cluster.Status.Cluster.NbPods < nbPodNeed {
+ glog.V(4).Infof("Not enough Pods running to apply the cluster
[%s-%s] spec, current %d, needed %d ", cluster.Namespace, cluster.Name,
cluster.Status.Cluster.NbPodsReady, nbPodNeed)
+ output = true
+ }
+
+ return output
+}
+
+func needLessPods(cluster *rapi.SubmarineCluster) bool {
+ nbPodNeed := *cluster.Spec.NumberOfMaster * (1 +
*cluster.Spec.ReplicationFactor)
+
+ if cluster.Status.Cluster.NbPods != cluster.Status.Cluster.NbPodsReady {
+ return false
+ }
+ output := false
+ if cluster.Status.Cluster.NbPods > nbPodNeed {
+ glog.V(4).Infof("To many Pods running, needs to scale down the
cluster [%s-%s], current %d, needed %d ", cluster.Namespace, cluster.Name,
cluster.Status.Cluster.NbPods, nbPodNeed)
+ output = true
+ }
+ return output
+}
+
+func needRollingUpdate(cluster *rapi.SubmarineCluster) bool {
+ return !comparePodsWithPodTemplate(cluster)
+}
+
+func comparePodsWithPodTemplate(cluster *rapi.SubmarineCluster) bool {
+ clusterPodSpecHash, _ :=
podctrl.GenerateMD5Spec(&cluster.Spec.PodTemplate.Spec)
+ for _, node := range cluster.Status.Cluster.Nodes {
+ if node.Pod == nil {
+ continue
+ }
+ if !comparePodSpecMD5Hash(clusterPodSpecHash, node.Pod) {
+ return false
+ }
+ }
+
+ return true
+}
+
+func comparePodSpecMD5Hash(hash string, pod *kapi.Pod) bool {
+ if val, ok := pod.Annotations[rapi.PodSpecMD5LabelKey]; ok {
+ if val != hash {
+ return false
+ }
+ } else {
+ return false
+ }
+
+ return true
+}
diff --git a/submarine-cloud/pkg/controller/clustering/cluster-migration.go
b/submarine-cloud/pkg/controller/clustering/cluster-migration.go
new file mode 100644
index 0000000..3b0cc64
--- /dev/null
+++ b/submarine-cloud/pkg/controller/clustering/cluster-migration.go
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package clustering
+
+import (
+ "fmt"
+ v1
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+ "github.com/apache/submarine/submarine-cloud/pkg/submarine"
+ "github.com/golang/glog"
+)
+
+// DispatchMasters used to select nodes with master roles
+func DispatchMasters(cluster *submarine.Cluster, nodes submarine.Nodes,
nbMaster int32, admin submarine.AdminInterface) (submarine.Nodes,
submarine.Nodes, submarine.Nodes, error) {
+ glog.Info("Start dispatching slots to masters nb nodes: ", len(nodes))
+ var allMasterNodes submarine.Nodes
+ // First loop get Master with already Slots assign on it
+ currentMasterNodes := nodes.FilterByFunc(submarine.IsMasterWithSlot)
+ allMasterNodes = append(allMasterNodes, currentMasterNodes...)
+
+ // add also available Master without slot
+ currentMasterWithNoSlot :=
nodes.FilterByFunc(submarine.IsMasterWithNoSlot)
+ allMasterNodes = append(allMasterNodes, currentMasterWithNoSlot...)
+ glog.V(2).Info("Master with No slot:", len(currentMasterWithNoSlot))
+
+ newMasterNodesSmartSelection, besteffort, err := PlaceMasters(cluster,
currentMasterNodes, currentMasterWithNoSlot, nbMaster)
+
+ glog.V(2).Infof("Total masters: %d - target %d - selected: %d",
len(allMasterNodes), nbMaster, len(newMasterNodesSmartSelection))
+ if err != nil {
+ return submarine.Nodes{}, submarine.Nodes{}, submarine.Nodes{},
fmt.Errorf("Not Enough Master available current:%d target:%d, err:%v",
len(allMasterNodes), nbMaster, err)
+ }
+
+ newMasterNodesSmartSelection =
newMasterNodesSmartSelection.SortByFunc(func(a, b *submarine.Node) bool {
return a.ID < b.ID })
+
+ cluster.Status = v1.ClusterStatusCalculatingRebalancing
+ if besteffort {
+ cluster.NodesPlacement = v1.NodesPlacementInfoBestEffort
+ } else {
+ cluster.NodesPlacement = v1.NodesPlacementInfoOptimal
+ }
+
+ return newMasterNodesSmartSelection, currentMasterNodes,
allMasterNodes, nil
+}
+
+// DispatchSlotToNewMasters used to dispatch Slot to the new master nodes
+func DispatchSlotToNewMasters(cluster *submarine.Cluster, admin
submarine.AdminInterface, newMasterNodes, currentMasterNodes, allMasterNodes
submarine.Nodes) error {
+ return nil
+}
diff --git a/submarine-cloud/pkg/controller/clustering/cluster-placement.go
b/submarine-cloud/pkg/controller/clustering/cluster-placement.go
new file mode 100644
index 0000000..adfc2b8
--- /dev/null
+++ b/submarine-cloud/pkg/controller/clustering/cluster-placement.go
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package clustering
+
+import (
+ "fmt"
+ "github.com/apache/submarine/submarine-cloud/pkg/submarine"
+ "github.com/golang/glog"
+)
+
+const unknownVMName = "unknown" // <-- I hope nobody will ever name a VM
"unknown" because this will impact the algorythm inside that package. Maybe you
should generate a mangled name or amore complex name here to reduce probability.
+
+// PlaceMasters used to select Submarine Node knowing on which VM they are
running in order to spread as possible
+// the masters on different VMs.
+// Improvement: Use Kube Node labeling instead of the "NodeName",
(availability zone and so)
+func PlaceMasters(cluster *submarine.Cluster, currentMaster submarine.Nodes,
allPossibleMasters submarine.Nodes, nbMaster int32) (submarine.Nodes, bool,
error) {
+ selection := submarine.Nodes{}
+ selection = append(selection, currentMaster...)
+
+ // in case of scale down the current number of master is supperior to
+ // the number of needed master so we limit the size of the selection.
+ if len(selection) > int(nbMaster) {
+ selection = selection[0:nbMaster]
+ }
+
+ masterByVM := sortSubmarineNodeByVM(cluster, allPossibleMasters)
+ vmWithAlreadyMaster := sortSubmarineNodeByVM(cluster, currentMaster)
+
+ bestEffort := false
+ for len(selection) < int(nbMaster) {
+ isProgress := false
+ for vmName, nodes := range masterByVM {
+ if !bestEffort {
+ // discard vm with already Master(s) when we
are not in best effort
+ if _, ok := vmWithAlreadyMaster[vmName]; ok {
+ continue
+ }
+ }
+ if len(nodes) == 0 {
+ continue
+ }
+ glog.Infof("- add node:%s to the master selection",
nodes[0].ID)
+ selection = append(selection, nodes[0])
+ masterByVM[vmName] = nodes[1:]
+ isProgress = true
+ if len(selection) >= int(nbMaster) {
+ return selection, bestEffort, nil
+ }
+ }
+ if bestEffort && !isProgress {
+ glog.Errorf("Nothing appends since last loop, it means
no more master available")
+ break
+ }
+ bestEffort = true
+ if glog.V(4) {
+ glog.Warning("the Pod are not spread enough on VMs to
have only one Master by VM.")
+ }
+ }
+ glog.Infof("- bestEffort %v", bestEffort)
+ for _, node := range selection {
+ glog.Infof("- Master %s, ip:%s", node.ID, node.IP)
+ }
+ if len(selection) >= int(nbMaster) {
+ return selection, bestEffort, nil
+ }
+ return selection, bestEffort, fmt.Errorf("unable to found enough node
for have the request number of master")
+}
+
+func sortSubmarineNodeByVM(cluster *submarine.Cluster, nodes submarine.Nodes)
map[string]submarine.Nodes {
+ nodesByVM := make(map[string]submarine.Nodes)
+
+ for _, rnode := range nodes {
+ cnode, err := cluster.GetNodeByID(rnode.ID)
+ if err != nil {
+ glog.Errorf("[sortSubmarineNodeByVM] unable fo found
the Cluster.Node with submarine ID:%s", rnode.ID)
+ continue // if not then next line with cnode.Pod will
cause a panic since cnode is nil
+ }
+ vmName := unknownVMName
+ if cnode.Pod != nil && cnode.Pod.Spec.NodeName != "" {
+ vmName = cnode.Pod.Spec.NodeName
+ }
+ if _, ok := nodesByVM[vmName]; !ok {
+ nodesByVM[vmName] = submarine.Nodes{}
+ }
+ nodesByVM[vmName] = append(nodesByVM[vmName], rnode)
+ }
+
+ return nodesByVM
+}
+
+// PlaceSlaves used to select Submarine Node knowing on which VM they are
running in order to spread as possible
+func PlaceSlaves(cluster *submarine.Cluster, masters, oldSlaves, newSlaves
submarine.Nodes, replicationFactor int32) (map[string]submarine.Nodes, bool) {
+ slavesByMaster := make(map[string]submarine.Nodes)
+
+ // be sure that no oldSlaves is presentin in newSlaves
+ for _, newSlave := range newSlaves {
+ for _, oldSlaves := range oldSlaves {
+ if newSlave.ID == oldSlaves.ID {
+ removeIDFunc := func(node *submarine.Node) bool
{
+ return node.ID == newSlave.ID
+ }
+ newSlaves.FilterByFunc(removeIDFunc)
+ if glog.V(4) {
+ glog.Warning("Remove oldSlave for
newSlave, id:", newSlave.ID)
+ }
+ }
+ }
+ }
+
+ newSlavesByVM := sortSubmarineNodeByVM(cluster, newSlaves)
+
+ for _, node := range masters {
+ slavesByMaster[node.ID] = submarine.Nodes{}
+ }
+
+ for _, slave := range oldSlaves {
+ for _, master := range masters {
+ if slave.MasterReferent == master.ID {
+ if len(slavesByMaster[slave.MasterReferent]) >=
int(replicationFactor) {
+ if node, err :=
cluster.GetNodeByID(slave.ID); err != nil {
+ vmName := unknownVMName
+ if node.Pod != nil &&
node.Pod.Spec.NodeName != "" {
+ vmName =
node.Pod.Spec.NodeName
+ }
+ newSlavesByVM[vmName] =
append(newSlavesByVM[vmName], slave)
+ }
+ } else {
+ //The master of this slave is among the
new master nodes
+ slavesByMaster[slave.MasterReferent] =
append(slavesByMaster[slave.MasterReferent], slave)
+ break
+ }
+ }
+ }
+ }
+
+ slavesByVMNotUsed := make(map[string]submarine.Nodes)
+ isSlaveNodeUsed := false
+
+ // we iterate on free slaves by Vms
+ for vmName, slaves := range newSlavesByVM {
+ // then for this VM "vmName" we try to attach those slaves on a
Master
+ for idPossibleSlave, possibleSlave := range slaves {
+ // Now we iterate on the Master and check if the
current VM is already used for a Slave attach
+ // to the current master "idMaster"
+ slaveUsed := false
+ for idMaster, currentSlaves := range slavesByMaster {
+ if len(currentSlaves) >= int(replicationFactor)
{
+ // already enough slaves attached to
this master
+ continue
+ }
+
+ if checkIfSameVM(cluster, idMaster, vmName) {
+ continue
+ }
+
+ // lets check if the VM already host a slave
for this master
+ vmAlreadyUsedForSlave := false
+ for _, currentSlave := range currentSlaves {
+ vmSlaveNode, err :=
cluster.GetNodeByID(currentSlave.ID)
+ if err != nil {
+ glog.Error("unable to find in
the cluster the slave with id:", currentSlave.ID)
+ continue
+ }
+ vmSlaveName := unknownVMName
+ if vmSlaveNode.Pod != nil {
+ vmSlaveName =
vmSlaveNode.Pod.Spec.NodeName
+ }
+ if vmName == vmSlaveName {
+ vmAlreadyUsedForSlave = true
+ break
+ }
+ }
+ if !vmAlreadyUsedForSlave {
+ // This vm is not already used for
hosting a slave for this master so we can attach this slave to it.
+ slavesByMaster[idMaster] =
append(slavesByMaster[idMaster], slaves[idPossibleSlave])
+ slaveUsed = true
+ break
+ }
+ }
+ if !slaveUsed {
+ isSlaveNodeUsed = true
+ // store unused slave for later dispatch
+ slavesByVMNotUsed[vmName] =
append(slavesByVMNotUsed[vmName], possibleSlave)
+ }
+ }
+ }
+
+ bestEffort := false
+ if isSlaveNodeUsed {
+ bestEffort = true
+ if glog.V(4) {
+ glog.Warning("Unable to spread properly all the Slave
on different VMs, we start best effort")
+ }
+ for _, freeSlaves := range slavesByVMNotUsed {
+ for _, freeSlave := range freeSlaves {
+ for masterID, slaves := range slavesByMaster {
+ if len(slaves) >=
int(replicationFactor) {
+ continue
+ }
+ slavesByMaster[masterID] =
append(slavesByMaster[masterID], freeSlave)
+ break
+ }
+ }
+ }
+ }
+
+ return slavesByMaster, bestEffort
+}
+
+func checkIfSameVM(cluster *submarine.Cluster, submarineID, vmName string)
bool {
+ nodeVMName := unknownVMName
+ if vmNode, err := cluster.GetNodeByID(submarineID); err == nil {
+ if vmNode.Pod != nil {
+ nodeVMName = vmNode.Pod.Spec.NodeName
+ }
+ }
+
+ if vmName == nodeVMName {
+ return true
+ }
+
+ return false
+}
diff --git a/submarine-cloud/pkg/controller/clustering/cluster-roles.go
b/submarine-cloud/pkg/controller/clustering/cluster-roles.go
new file mode 100644
index 0000000..c3be565
--- /dev/null
+++ b/submarine-cloud/pkg/controller/clustering/cluster-roles.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package clustering
+
+import (
+ "github.com/apache/submarine/submarine-cloud/pkg/submarine"
+ "github.com/golang/glog"
+)
+
+// AttachingSlavesToMaster used to attach slaves to there masters
+func AttachingSlavesToMaster(cluster *submarine.Cluster, admin
submarine.AdminInterface, slavesByMaster map[string]submarine.Nodes) error {
+ var globalErr error
+ for masterID, slaves := range slavesByMaster {
+ masterNode, err := cluster.GetNodeByID(masterID)
+ if err != nil {
+ glog.Errorf("[AttachingSlavesToMaster] unable fo found
the Cluster.Node with submarine ID:%s", masterID)
+ continue
+ }
+ for _, slave := range slaves {
+ glog.V(2).Infof("[AttachingSlavesToMaster] Attaching
node %s to master %s", slave.ID, masterID)
+
+ err := admin.AttachSlaveToMaster(slave, masterNode)
+ if err != nil {
+ glog.Errorf("Error while attaching node %s to
master %s: %v", slave.ID, masterID, err)
+ globalErr = err
+ }
+ }
+ }
+ return globalErr
+}
diff --git a/submarine-cloud/pkg/controller/condition.go
b/submarine-cloud/pkg/controller/condition.go
new file mode 100644
index 0000000..d92b8f3
--- /dev/null
+++ b/submarine-cloud/pkg/controller/condition.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package controller
+
+import (
+ rapi
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+ apiv1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+func setRebalancingCondition(clusterStatus *rapi.SubmarineClusterStatus,
status bool) bool {
+ statusCondition := apiv1.ConditionFalse
+ if status {
+ statusCondition = apiv1.ConditionTrue
+ }
+ return setCondition(clusterStatus, rapi.SubmarineClusterRebalancing,
statusCondition, metav1.Now(), "topology as changed", "reconfigure on-going
after topology changed")
+}
+
+func setCondition(clusterStatus *rapi.SubmarineClusterStatus, conditionType
rapi.SubmarineClusterConditionType, status apiv1.ConditionStatus, now
metav1.Time, reason, message string) bool {
+ updated := false
+ found := false
+ for i, c := range clusterStatus.Conditions {
+ if c.Type == conditionType {
+ found = true
+ if c.Status != status {
+ updated = true
+ clusterStatus.Conditions[i] =
updateCondition(c, status, now, reason, message)
+ }
+ }
+ }
+ if !found {
+ updated = true
+ clusterStatus.Conditions = append(clusterStatus.Conditions,
newCondition(conditionType, status, now, reason, message))
+ }
+ return updated
+}
+
+func setRollingUpdategCondition(clusterStatus *rapi.SubmarineClusterStatus,
status bool) bool {
+ statusCondition := apiv1.ConditionFalse
+ if status {
+ statusCondition = apiv1.ConditionTrue
+ }
+ return setCondition(clusterStatus, rapi.SubmarineClusterRollingUpdate,
statusCondition, metav1.Now(), "Rolling update ongoing", "a Rolling update is
ongoing")
+}
+
+func setScalingCondition(clusterStatus *rapi.SubmarineClusterStatus, status
bool) bool {
+ statusCondition := apiv1.ConditionFalse
+ if status {
+ statusCondition = apiv1.ConditionTrue
+ }
+ return setCondition(clusterStatus, rapi.SubmarineClusterScaling,
statusCondition, metav1.Now(), "cluster needs more pods", "cluster needs more
pods")
+}
+
+// updateCondition return an updated version of the SubmarineClusterCondition
+func updateCondition(from rapi.SubmarineClusterCondition, status
apiv1.ConditionStatus, now metav1.Time, reason, message string)
rapi.SubmarineClusterCondition {
+ newCondition := from.DeepCopy()
+ newCondition.LastProbeTime = now
+ newCondition.Message = message
+ newCondition.Reason = reason
+ if status != newCondition.Status {
+ newCondition.Status = status
+ newCondition.LastTransitionTime = now
+ }
+
+ return *newCondition
+}
+
+// newCondition return a new defaulted instance of a SubmarineClusterCondition
+func newCondition(conditionType rapi.SubmarineClusterConditionType, status
apiv1.ConditionStatus, now metav1.Time, reason, message string)
rapi.SubmarineClusterCondition {
+ return rapi.SubmarineClusterCondition{
+ Type: conditionType,
+ Status: status,
+ LastProbeTime: now,
+ LastTransitionTime: now,
+ Reason: reason,
+ Message: message,
+ }
+}
+
+func setClusterStatusCondition(clusterStatus *rapi.SubmarineClusterStatus,
status bool) bool {
+ statusCondition := apiv1.ConditionFalse
+ if status {
+ statusCondition = apiv1.ConditionTrue
+ }
+ return setCondition(clusterStatus, rapi.SubmarineClusterOK,
statusCondition, metav1.Now(), "submarine-cluster is correctly configure",
"submarine-cluster is correctly configure")
+}
diff --git a/submarine-cloud/pkg/controller/controller.go
b/submarine-cloud/pkg/controller/controller.go
index 7cdb0da..e08fa13 100644
--- a/submarine-cloud/pkg/controller/controller.go
+++ b/submarine-cloud/pkg/controller/controller.go
@@ -19,8 +19,11 @@ package controller
import (
"fmt"
"github.com/apache/submarine/submarine-cloud/pkg/controller/pod"
+ "github.com/apache/submarine/submarine-cloud/pkg/controller/sanitycheck"
+ "github.com/apache/submarine/submarine-cloud/pkg/submarine"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
+ policyv1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -32,6 +35,8 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
+ "math"
+ "reflect"
"time"
rapi
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
@@ -47,7 +52,7 @@ type Controller struct {
submarineClient sClient.Interface
submarineClusterLister sListers.SubmarineClusterLister
- submarineClusterSynced cache.InformerSynced
+ SubmarineClusterSynced cache.InformerSynced
podLister corev1listers.PodLister
PodSynced cache.InformerSynced
@@ -86,7 +91,7 @@ func NewController(cfg *Config, kubeClient
clientset.Interface, submarineClient
kubeClient: kubeClient,
submarineClient: submarineClient,
submarineClusterLister: submarineInformer.Lister(),
- submarineClusterSynced:
submarineInformer.Informer().HasSynced,
+ SubmarineClusterSynced:
submarineInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
PodSynced: podInformer.Informer().HasSynced,
serviceLister: serviceInformer.Lister(),
@@ -128,7 +133,7 @@ func NewController(cfg *Config, kubeClient
clientset.Interface, submarineClient
func (c *Controller) Run(stop <-chan struct{}) error {
glog.Infof("Starting SubmarineCluster controller")
- if !cache.WaitForCacheSync(stop, c.PodSynced, c.submarineClusterSynced,
c.ServiceSynced) {
+ if !cache.WaitForCacheSync(stop, c.PodSynced, c.SubmarineClusterSynced,
c.ServiceSynced) {
return fmt.Errorf("Timed out waiting for caches to sync")
}
@@ -146,6 +151,7 @@ func (c *Controller) runWorker() {
}
func (c *Controller) processNextItem() bool {
+ glog.Infof("processNextItem")
key, quit := c.queue.Get()
if quit {
return false
@@ -218,12 +224,154 @@ func (c *Controller) sync(key string) (bool, error) {
}
func (c *Controller) syncCluster(submarineCluster *rapi.SubmarineCluster)
(forceRequeue bool, err error) {
- glog.Infof("syncCluster()")
+ glog.Info("syncCluster START")
+ defer glog.Info("syncCluster STOP")
+ forceRequeue = false
+ submarineClusterService, err :=
c.getSubmarineClusterService(submarineCluster)
+ if err != nil {
+ glog.Errorf("SubmarineCluster-Operator.sync unable to retrieves
service associated to the SubmarineCluster: %s/%s", submarineCluster.Namespace,
submarineCluster.Name)
+ return forceRequeue, err
+ }
+ if submarineClusterService == nil {
+ if _, err =
c.serviceControl.CreateSubmarineClusterService(submarineCluster); err != nil {
+ glog.Errorf("SubmarineCluster-Operator.sync unable to
create service associated to the SubmarineCluster: %s/%s",
submarineCluster.Namespace, submarineCluster.Name)
+ return forceRequeue, err
+ }
+ }
+
+ submarineClusterPodDisruptionBudget, err :=
c.getSubmarineClusterPodDisruptionBudget(submarineCluster)
+ if err != nil {
+ glog.Errorf("SubmarineCluster-Operator.sync unable to retrieves
podDisruptionBudget associated to the SubmarineCluster: %s/%s",
submarineCluster.Namespace, submarineCluster.Name)
+ return forceRequeue, err
+ }
+ if submarineClusterPodDisruptionBudget == nil {
+ if _, err =
c.podDisruptionBudgetControl.CreateSubmarineClusterPodDisruptionBudget(submarineCluster);
err != nil {
+ glog.Errorf("SubmarineCluster-Operator.sync unable to
create podDisruptionBudget associated to the SubmarineCluster: %s/%s",
submarineCluster.Namespace, submarineCluster.Name)
+ return forceRequeue, err
+ }
+ }
+
+ submarineClusterPods, err :=
c.podControl.GetSubmarineClusterPods(submarineCluster)
+ if err != nil {
+ glog.Errorf("SubmarineCluster-Operator.sync unable to retrieves
pod associated to the SubmarineCluster: %s/%s", submarineCluster.Namespace,
submarineCluster.Name)
+ return forceRequeue, err
+ }
+
+ Pods, LostPods := filterLostNodes(submarineClusterPods)
+ if len(LostPods) != 0 {
+ for _, p := range LostPods {
+ err := c.podControl.DeletePodNow(submarineCluster,
p.Name)
+ glog.Errorf("Lost node with pod %s. Deleting... %v",
p.Name, err)
+ }
+ submarineClusterPods = Pods
+ }
+
+ // SubmarineAdmin is used access the Submarine process in the different
pods.
+ admin, err := NewSubmarineAdmin(submarineClusterPods,
&c.config.submarine)
+ if err != nil {
+ return forceRequeue, fmt.Errorf("unable to create the
submarine.Admin, err:%v", err)
+ }
+ defer admin.Close()
+
+ clusterInfos, errGetInfos := admin.GetClusterInfos()
+ if errGetInfos != nil {
+ glog.Errorf("Error when get cluster infos to rebuild bom : %v",
errGetInfos)
+ if clusterInfos.Status == submarine.ClusterInfosPartial {
+ return false, fmt.Errorf("partial Cluster infos")
+ }
+ }
+
+ // From the Submarine cluster nodes connections, build the cluster
status
+ // Calculate the actual cluster status through node information,
cluster Pod list, and CR
+ // The cluster status includes: whether it is normal, the number of
Ready Pods, the number of Masters,
+ // the number of Submarine instances in operation, the list of
Submarine instances, replication factors, etc.
+ clusterStatus, err := c.buildClusterStatus(clusterInfos,
submarineClusterPods)
+ if err != nil {
+ glog.Errorf("unable to build the SubmarineClusterStatus,
err:%v", err)
+ return forceRequeue, fmt.Errorf("unable to build clusterStatus,
err:%v", err)
+ }
+
+ // If the cluster status (Status.Cluster) in the CR does not match the
actual situation, update
+ updated, err := c.updateClusterIfNeed(submarineCluster, clusterStatus)
+ if err != nil {
+ return forceRequeue, err
+ }
+ if updated {
+ // If the cluster status changes requeue the key. Because we
want to apply Submarine Cluster operation only on stable cluster,
+ // already stored in the API server.
+ glog.V(3).Infof("cluster updated %s-%s",
submarineCluster.Namespace, submarineCluster.Name)
+ forceRequeue = true
+ return forceRequeue, nil
+ }
+
+ // If the CR state matches the actual state of the Submarine cluster,
then check if reconciliation is required-let the actual state match the
expected state
+ allPodsNotReady := true
+ if (clusterStatus.NbPods - clusterStatus.NbSubmarineRunning) != 0 {
+ glog.V(3).Infof("All pods not ready wait to be ready, nbPods:
%d, nbPodsReady: %d", clusterStatus.NbPods, clusterStatus.NbSubmarineRunning)
+ allPodsNotReady = false
+ }
+
+ // Now check if the Operator need to execute some operation the
submarine cluster. if yes run the clusterAction(...) method.
+ needSanitize, err := c.checkSanityCheck(submarineCluster, admin,
clusterInfos)
+ if err != nil {
+ glog.Errorf("checkSanityCheck, error happened in dryrun mode,
err:%v", err)
+ return false, err
+ }
+
+ // If all Pods are not ready and need rolling updates (Pod and
PodTemplate do not match), more or fewer Pods are needed,
+ // or the number of master nodes and replication factor are incorrect
+ // Or, need to perform "clean up"
+ // Then, perform Submarine cluster management operations to approximate
the expected state and update the status of SubmarineCluster
+ if (allPodsNotReady && needClusterOperation(submarineCluster)) ||
needSanitize {
+ var requeue bool
+ forceRequeue = false
+ // Perform cluster management operations, including creating /
deleting pods and configuring Submarine
+ requeue, err = c.clusterAction(admin, submarineCluster,
clusterInfos)
+ if err != nil {
+ glog.Errorf("error during action on cluster: %s-%s,
err: %v", submarineCluster.Namespace, submarineCluster.Name, err)
+ } else if requeue {
+ forceRequeue = true
+ }
+ _, err = c.updateSubmarineCluster(submarineCluster)
+ return forceRequeue, err
+ }
+
+ // Reset all conditions and reconcile
+ if setRebalancingCondition(&submarineCluster.Status, false) ||
+ setRollingUpdategCondition(&submarineCluster.Status, false) ||
+ setScalingCondition(&submarineCluster.Status, false) ||
+ setClusterStatusCondition(&submarineCluster.Status, true) {
+ _, err = c.updateHandler(submarineCluster)
+ return forceRequeue, err
+ }
+
return false, nil
}
func (c *Controller) onAddSubmarineCluster(obj interface{}) {
glog.Infof("onAddSubmarineCluster(%v)", obj)
+ submarineCluster, ok := obj.(*rapi.SubmarineCluster)
+ if !ok {
+ glog.Errorf("adding SubmarineCluster, expected SubmarineCluster
object. Got: %+v", obj)
+ return
+ }
+ glog.V(6).Infof("onAddSubmarineCluster %s/%s",
submarineCluster.Namespace, submarineCluster.Name)
+ if !reflect.DeepEqual(submarineCluster.Status,
rapi.SubmarineClusterStatus{}) {
+ glog.Errorf("submarinecluster %s/%s created with non empty
status. Going to be removed", submarineCluster.Namespace, submarineCluster.Name)
+
+ if _, err := cache.MetaNamespaceKeyFunc(submarineCluster); err
!= nil {
+ glog.Errorf("couldn't get key for SubmarineCluster (to
be deleted) %s/%s: %v", submarineCluster.Namespace, submarineCluster.Name, err)
+ return
+ }
+ // TODO: how to remove a submarineCluster created with an
invalid or even with a valid status. What in case of error for this delete?
+ if err := c.deleteSubmarineCluster(submarineCluster.Namespace,
submarineCluster.Name); err != nil {
+ glog.Errorf("unable to delete non empty status
SubmarineCluster %s/%s: %v. No retry will be performed.",
submarineCluster.Namespace, submarineCluster.Name, err)
+ }
+
+ return
+ }
+
+ c.enqueue(submarineCluster)
}
func (c *Controller) onDeleteSubmarineCluster(obj interface{}) {
@@ -232,18 +380,97 @@ func (c *Controller) onDeleteSubmarineCluster(obj
interface{}) {
func (c *Controller) onUpdateSubmarineCluster(oldObj, newObj interface{}) {
glog.Infof("onUpdateSubmarineCluster(%v, %v)", oldObj, newObj)
+
+ submarineCluster, ok := newObj.(*rapi.SubmarineCluster)
+ if !ok {
+ glog.Errorf("Expected SubmarineCluster object. Got: %+v",
newObj)
+ return
+ }
+ glog.V(6).Infof("onUpdateSubmarineCluster %s/%s",
submarineCluster.Namespace, submarineCluster.Name)
+ c.enqueue(submarineCluster)
}
func (c *Controller) onAddPod(obj interface{}) {
- glog.Infof("onAddPod(%v)", obj)
+ glog.Infof("onAddPod()")
+ pod, ok := obj.(*apiv1.Pod)
+ if !ok {
+ glog.Errorf("adding Pod, expected Pod object. Got: %+v", obj)
+ return
+ }
+ if _, ok := pod.GetObjectMeta().GetLabels()[rapi.ClusterNameLabelKey];
!ok {
+ return
+ }
+ submarineCluster, err := c.getSubmarineClusterFromPod(pod)
+ if err != nil {
+ glog.Errorf("unable to retrieve the associated submarinecluster
for pod %s/%s:%v", pod.Namespace, pod.Name, err)
+ return
+ }
+ if submarineCluster == nil {
+ glog.Errorf("empty submarineCluster. Unable to retrieve the
associated submarinecluster for the pod %s/%s", pod.Namespace, pod.Name)
+ return
+ }
+
+ c.enqueue(submarineCluster)
}
func (c *Controller) onUpdatePod(oldObj, newObj interface{}) {
glog.Infof("onUpdatePod()")
+ oldPod := oldObj.(*apiv1.Pod)
+ newPod := newObj.(*apiv1.Pod)
+ if oldPod.ResourceVersion == newPod.ResourceVersion { // Since periodic
resync will send update events for all known Pods.
+ return
+ }
+ if _, ok :=
newPod.GetObjectMeta().GetLabels()[rapi.ClusterNameLabelKey]; !ok {
+ return
+ }
+ glog.V(6).Infof("onUpdatePod old=%v, cur=%v ", oldPod.Name, newPod.Name)
+ submarineCluster, err := c.getSubmarineClusterFromPod(newPod)
+ if err != nil {
+ glog.Errorf("SubmarineCluster-Operator.onUpdateJob cannot get
submarineclusters for Pod %s/%s: %v", newPod.Namespace, newPod.Name, err)
+ return
+ }
+ if submarineCluster == nil {
+ glog.Errorf("empty submarineCluster .onUpdateJob cannot get
submarineclusters for Pod %s/%s", newPod.Namespace, newPod.Name)
+ return
+ }
+
+ c.enqueue(submarineCluster)
+
+ // TODO: in case of relabelling ?
+ // TODO: in case of labelSelector relabelling?
}
func (c *Controller) onDeletePod(obj interface{}) {
glog.Infof("onDeletePod()")
+ pod, ok := obj.(*apiv1.Pod)
+ if _, ok := pod.GetObjectMeta().GetLabels()[rapi.ClusterNameLabelKey];
!ok {
+ return
+ }
+ glog.V(6).Infof("onDeletePod old=%v", pod.Name)
+ if !ok {
+ tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ glog.Errorf("Couldn't get object from tombstone %+v",
obj)
+ return
+ }
+ pod, ok = tombstone.Obj.(*apiv1.Pod)
+ if !ok {
+ glog.Errorf("Tombstone contained object that is not a
pod %+v", obj)
+ return
+ }
+ }
+
+ submarineCluster, err := c.getSubmarineClusterFromPod(pod)
+ if err != nil {
+ glog.Errorf("SubmarineCluster-Operator.onDeletePod: %v", err)
+ return
+ }
+ if submarineCluster == nil {
+ glog.Errorf("empty submarineCluster .
SubmarineCluster-Operator.onDeletePod")
+ return
+ }
+
+ c.enqueue(submarineCluster)
}
func (c *Controller) updateSubmarineCluster(submarineCluster
*rapi.SubmarineCluster) (*rapi.SubmarineCluster, error) {
@@ -256,3 +483,171 @@ func (c *Controller)
updateSubmarineCluster(submarineCluster *rapi.SubmarineClus
glog.V(6).Infof("SubmarineCluster %s/%s updated",
submarineCluster.Namespace, submarineCluster.Name)
return rc, nil
}
+
+// enqueue adds key in the controller queue
+func (c *Controller) enqueue(submarinecluster *rapi.SubmarineCluster) {
+ key, err := cache.MetaNamespaceKeyFunc(submarinecluster)
+ if err != nil {
+ glog.Errorf("SubmarineCluster-Controller:enqueue: couldn't get
key for SubmarineCluster %s/%s: %v", submarinecluster.Namespace,
submarinecluster.Name, err)
+ return
+ }
+ c.queue.Add(key)
+}
+
+func (c *Controller) getSubmarineClusterService(submarineCluster
*rapi.SubmarineCluster) (*apiv1.Service, error) {
+ serviceName := getServiceName(submarineCluster)
+ labels, err := pod.GetLabelsSet(submarineCluster)
+ if err != nil {
+ return nil, fmt.Errorf("couldn't get cluster label, err: %v ",
err)
+ }
+
+ svcList, err :=
c.serviceLister.Services(submarineCluster.Namespace).List(labels.AsSelector())
+ if err != nil {
+ return nil, fmt.Errorf("couldn't list service with label:%s,
err:%v ", labels.String(), err)
+ }
+ var svc *apiv1.Service
+ for i, s := range svcList {
+ if s.Name == serviceName {
+ svc = svcList[i]
+ }
+ }
+ return svc, nil
+}
+
+func (c *Controller) getSubmarineClusterPodDisruptionBudget(submarineCluster
*rapi.SubmarineCluster) (*policyv1.PodDisruptionBudget, error) {
+ podDisruptionBudgetName := submarineCluster.Name
+ labels, err := pod.GetLabelsSet(submarineCluster)
+ if err != nil {
+ return nil, fmt.Errorf("couldn't get cluster label, err: %v ",
err)
+ }
+
+ pdbList, err :=
c.podDisruptionBudgetLister.PodDisruptionBudgets(submarineCluster.Namespace).List(labels.AsSelector())
+ if err != nil {
+ return nil, fmt.Errorf("couldn't list PodDisruptionBudget with
label:%s, err:%v ", labels.String(), err)
+ }
+ var pdb *policyv1.PodDisruptionBudget
+ for i, p := range pdbList {
+ if p.Name == podDisruptionBudgetName {
+ pdb = pdbList[i]
+ }
+ }
+ return pdb, nil
+}
+
+func (c *Controller) buildClusterStatus(clusterInfos *submarine.ClusterInfos,
pods []*apiv1.Pod) (*rapi.SubmarineClusterClusterStatus, error) {
+ clusterStatus := &rapi.SubmarineClusterClusterStatus{}
+ clusterStatus.NbPodsReady = 0
+ clusterStatus.NbSubmarineRunning = 0
+ clusterStatus.MaxReplicationFactor = 0
+ clusterStatus.MinReplicationFactor = 0
+
+ clusterStatus.NbPods = int32(len(pods))
+ var nbSubmarineRunning, nbPodsReady int32
+
+ nbMaster := int32(0)
+ nbSlaveByMaster := map[string]int{}
+
+ for _, pod := range pods {
+ if podready, _ := IsPodReady(pod); podready {
+ nbPodsReady++
+ }
+
+ newNode := rapi.SubmarineClusterNode{
+ PodName: pod.Name,
+ IP: pod.Status.PodIP,
+ Pod: pod,
+ Slots: []string{},
+ }
+ // find corresponding Submarine node
+ submarineNodes, err :=
clusterInfos.GetNodes().GetNodesByFunc(func(node *submarine.Node) bool {
+ return node.IP == pod.Status.PodIP
+ })
+ if err != nil {
+ glog.Errorf("Unable to retrieve the associated
Submarine Node with the pod: %s, ip:%s, err:%v", pod.Name, pod.Status.PodIP,
err)
+ continue
+ }
+ if len(submarineNodes) == 1 {
+ submarineNode := submarineNodes[0]
+ if submarine.IsMasterWithSlot(submarineNode) {
+ if _, ok := nbSlaveByMaster[submarineNode.ID];
!ok {
+ nbSlaveByMaster[submarineNode.ID] = 0
+ }
+ nbMaster++
+ }
+
+ newNode.ID = submarineNode.ID
+ newNode.Role = submarineNode.GetRole()
+ newNode.Port = submarineNode.Port
+ newNode.Slots = []string{}
+ if submarine.IsSlave(submarineNode) &&
submarineNode.MasterReferent != "" {
+ nbSlaveByMaster[submarineNode.MasterReferent] =
nbSlaveByMaster[submarineNode.MasterReferent] + 1
+ newNode.MasterRef = submarineNode.MasterReferent
+ }
+ ///if len(submarineNode.Slots) > 0 {
+ /// slots :=
submarine.SlotRangesFromSlots(submarineNode.Slots)
+ /// for _, slot := range slots {
+ /// newNode.Slots = append(newNode.Slots,
slot.String())
+ /// }
+ ///}
+ nbSubmarineRunning++
+ }
+ clusterStatus.Nodes = append(clusterStatus.Nodes, newNode)
+ }
+ clusterStatus.NbSubmarineRunning = nbSubmarineRunning
+ clusterStatus.NumberOfMaster = nbMaster
+ clusterStatus.NbPodsReady = nbPodsReady
+ clusterStatus.Status = rapi.ClusterStatusOK
+
+ minReplicationFactor := math.MaxInt32
+ maxReplicationFactor := 0
+ for _, counter := range nbSlaveByMaster {
+ if counter > maxReplicationFactor {
+ maxReplicationFactor = counter
+ }
+ if counter < minReplicationFactor {
+ minReplicationFactor = counter
+ }
+ }
+ if len(nbSlaveByMaster) == 0 {
+ minReplicationFactor = 0
+ }
+ clusterStatus.MaxReplicationFactor = int32(maxReplicationFactor)
+ clusterStatus.MinReplicationFactor = int32(minReplicationFactor)
+
+ glog.V(3).Infof("Build Bom, current Node list : %s ",
clusterStatus.String())
+
+ return clusterStatus, nil
+}
+
+func (c *Controller) updateClusterIfNeed(cluster *rapi.SubmarineCluster,
newStatus *rapi.SubmarineClusterClusterStatus) (bool, error) {
+ if compareStatus(&cluster.Status.Cluster, newStatus) {
+ glog.V(3).Infof("Status changed for cluster: %s-%s",
cluster.Namespace, cluster.Name)
+ // the status have been update, needs to update the
SubmarineCluster
+ cluster.Status.Cluster = *newStatus
+ _, err := c.updateSubmarineCluster(cluster)
+ return true, err
+ }
+ // TODO improve this by checking properly the kapi.Pod informations
inside each Node
+ cluster.Status.Cluster.Nodes = newStatus.Nodes
+ return false, nil
+}
+
+func (c *Controller) checkSanityCheck(cluster *rapi.SubmarineCluster, admin
submarine.AdminInterface, infos *submarine.ClusterInfos) (bool, error) {
+ return sanitycheck.RunSanityChecks(admin, &c.config.submarine,
c.podControl, cluster, infos, true)
+}
+
+func (c *Controller) deleteSubmarineCluster(namespace, name string) error {
+ return nil
+}
+
+func (c *Controller) getSubmarineClusterFromPod(pod *apiv1.Pod)
(*rapi.SubmarineCluster, error) {
+ if len(pod.Labels) == 0 {
+ return nil, fmt.Errorf("no submarineCluster found for pod. Pod
%s/%s has no labels", pod.Namespace, pod.Name)
+ }
+
+ clusterName, ok := pod.Labels[rapi.ClusterNameLabelKey]
+ if !ok {
+ return nil, fmt.Errorf("no submarineCluster name found for pod.
Pod %s/%s has no labels %s", pod.Namespace, pod.Name, rapi.ClusterNameLabelKey)
+ }
+ return
c.submarineClusterLister.SubmarineClusters(pod.Namespace).Get(clusterName)
+}
diff --git a/submarine-cloud/pkg/controller/pod/control.go
b/submarine-cloud/pkg/controller/pod/control.go
index a3ab1b7..3f7ea59 100644
--- a/submarine-cloud/pkg/controller/pod/control.go
+++ b/submarine-cloud/pkg/controller/pod/control.go
@@ -17,8 +17,14 @@
package pod
import (
+ "bytes"
+ "crypto/md5"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
rapi
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
"github.com/golang/glog"
+ "io"
kapiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
@@ -59,13 +65,22 @@ func NewSubmarineClusterControl(lister
corev1listers.PodLister, client clientset
// GetSubmarineClusterPods return list of Pod attached to a SubmarineCluster
func (p *SubmarineClusterControl) GetSubmarineClusterPods(submarineCluster
*rapi.SubmarineCluster) ([]*kapiv1.Pod, error) {
glog.Infof("GetSubmarineClusterPods()")
- return nil, nil
+ selector, err := CreateSubmarineClusterLabelSelector(submarineCluster)
+ if err != nil {
+ return nil, err
+ }
+ return p.PodLister.Pods(submarineCluster.Namespace).List(selector)
}
// CreatePod used to create a Pod from the SubmarineCluster pod template
func (p *SubmarineClusterControl) CreatePod(submarineCluster
*rapi.SubmarineCluster) (*kapiv1.Pod, error) {
glog.Infof("CreatePod()")
- return nil, nil
+ pod, err := initPod(submarineCluster)
+ if err != nil {
+ return pod, err
+ }
+ glog.V(6).Infof("CreatePod: %s/%s", submarineCluster.Namespace,
pod.Name)
+ return
p.KubeClient.CoreV1().Pods(submarineCluster.Namespace).Create(pod)
}
// DeletePod used to delete a pod from its name
@@ -86,3 +101,71 @@ func (p *SubmarineClusterControl)
deletePodGracefullperiode(submarineCluster *ra
glog.Infof("deletePodGracefullperiode()")
return
p.KubeClient.CoreV1().Pods(submarineCluster.Namespace).Delete(podName,
&metav1.DeleteOptions{GracePeriodSeconds: period})
}
+
+// GenerateMD5Spec used to generate the PodSpec MD5 hash
+func GenerateMD5Spec(spec *kapiv1.PodSpec) (string, error) {
+ b, err := json.Marshal(spec)
+ if err != nil {
+ return "", err
+ }
+ hash := md5.New()
+ io.Copy(hash, bytes.NewReader(b))
+ return hex.EncodeToString(hash.Sum(nil)), nil
+}
+
+// Add the necessary tags to the Pod. These tags are used to determine whether
a Pod is managed by the Operator and associated with a SubmarineCluster
+func initPod(submarineCluster *rapi.SubmarineCluster) (*kapiv1.Pod, error) {
+ if submarineCluster == nil {
+ return nil, fmt.Errorf("submarinecluster nil pointer")
+ }
+
+ desiredLabels, err := GetLabelsSet(submarineCluster)
+ if err != nil {
+ return nil, err
+ }
+ desiredAnnotations, err := GetAnnotationsSet(submarineCluster)
+ if err != nil {
+ return nil, err
+ }
+ PodName := fmt.Sprintf("submarinecluster-%s-", submarineCluster.Name)
+ pod := &kapiv1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: submarineCluster.Namespace,
+ Labels: desiredLabels,
+ Annotations: desiredAnnotations,
+ GenerateName: PodName,
+ OwnerReferences:
[]metav1.OwnerReference{BuildOwnerReference(submarineCluster)},
+ },
+ }
+
+ if submarineCluster.Spec.PodTemplate == nil {
+ return nil, fmt.Errorf("submarinecluster[%s/%s] PodTemplate
missing", submarineCluster.Namespace, submarineCluster.Name)
+ }
+ pod.Spec = *submarineCluster.Spec.PodTemplate.Spec.DeepCopy()
+
+ // Generate a MD5 representing the PodSpec send
+ hash, err := GenerateMD5Spec(&pod.Spec)
+ if err != nil {
+ return nil, err
+ }
+ pod.Annotations[rapi.PodSpecMD5LabelKey] = hash
+
+ return pod, nil
+}
+
+// BuildOwnerReference used to build the OwnerReference from a SubmarineCluster
+func BuildOwnerReference(cluster *rapi.SubmarineCluster) metav1.OwnerReference
{
+ controllerRef := metav1.OwnerReference{
+ APIVersion: rapi.SchemeGroupVersion.String(),
+ Kind: rapi.ResourceKind,
+ Name: cluster.Name,
+ UID: cluster.UID,
+ Controller: boolPtr(true),
+ }
+
+ return controllerRef
+}
+
+func boolPtr(value bool) *bool {
+ return &value
+}
diff --git a/submarine-cloud/pkg/controller/pod/utils.go
b/submarine-cloud/pkg/controller/pod/utils.go
new file mode 100644
index 0000000..843ecf4
--- /dev/null
+++ b/submarine-cloud/pkg/controller/pod/utils.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package pod
+
+import (
+ "fmt"
+ "k8s.io/apimachinery/pkg/labels"
+
+ sapi
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+)
+
+// GetLabelsSet return labels associated to the submarine-node pods
+func GetLabelsSet(submarineCluster *sapi.SubmarineCluster) (labels.Set, error)
{
+ desiredLabels := labels.Set{}
+ if submarineCluster == nil {
+ return desiredLabels, fmt.Errorf("submarineCluster nil pointer")
+ }
+ if submarineCluster.Spec.AdditionalLabels != nil {
+ desiredLabels = submarineCluster.Spec.AdditionalLabels
+ }
+ if submarineCluster.Spec.PodTemplate != nil {
+ for k, v := range submarineCluster.Spec.PodTemplate.Labels {
+ desiredLabels[k] = v
+ }
+ }
+ desiredLabels[sapi.ClusterNameLabelKey] = submarineCluster.Name // add
submarineCluster name to the Pod labels
+ return desiredLabels, nil
+}
+
+// CreateSubmarineClusterLabelSelector creates label selector to select the
jobs related to a submarineCluster, stepName
+func CreateSubmarineClusterLabelSelector(submarineCluster
*sapi.SubmarineCluster) (labels.Selector, error) {
+ set, err := GetLabelsSet(submarineCluster)
+ if err != nil {
+ return nil, err
+ }
+ return labels.SelectorFromSet(set), nil
+}
+
+// GetAnnotationsSet return a labels.Set of annotation from the
SubmarineCluster
+func GetAnnotationsSet(submarineCluster *sapi.SubmarineCluster) (labels.Set,
error) {
+ desiredAnnotations := make(labels.Set)
+ for k, v := range submarineCluster.Annotations {
+ desiredAnnotations[k] = v
+ }
+
+ // TODO: add createdByRef
+ return desiredAnnotations, nil // no error for the moment, when we'll
add createdByRef an error could be returned
+}
diff --git a/submarine-cloud/pkg/controller/sanitycheck/process.go
b/submarine-cloud/pkg/controller/sanitycheck/process.go
new file mode 100644
index 0000000..7f8e721
--- /dev/null
+++ b/submarine-cloud/pkg/controller/sanitycheck/process.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sanitycheck
+
+import (
+ rapi
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+ "github.com/apache/submarine/submarine-cloud/pkg/config"
+ "github.com/apache/submarine/submarine-cloud/pkg/controller/pod"
+ "github.com/apache/submarine/submarine-cloud/pkg/submarine"
+)
+
+// RunSanityChecks function used to run all the sanity check on the current
cluster
+// Return actionDone = true if a modification has been made on the cluster
+func RunSanityChecks(admin submarine.AdminInterface, config *config.Submarine,
podControl pod.SubmarineClusterControlInteface, cluster *rapi.SubmarineCluster,
infos *submarine.ClusterInfos, dryRun bool) (actionDone bool, err error) {
+ /*
+ // * fix failed nodes: in some cases (cluster without enough
master after crash or scale down), some nodes may still know about fail nodes
+ if actionDone, err = FixFailedNodes(admin, cluster, infos,
dryRun); err != nil {
+ return actionDone, err
+ } else if actionDone {
+ glog.V(2).Infof("FixFailedNodes done an action on the
cluster (dryRun:%v)", dryRun)
+ return actionDone, nil
+ }
+
+ // forget nodes and delete pods when a submarine node is
untrusted.
+ if actionDone, err = FixUntrustedNodes(admin, podControl,
cluster, infos, dryRun); err != nil {
+ return actionDone, err
+ } else if actionDone {
+ glog.V(2).Infof("FixUntrustedNodes done an action on
the cluster (dryRun:%v)", dryRun)
+ return actionDone, nil
+ }
+
+ // forget nodes and delete pods when a submarine node is
untrusted.
+ if actionDone, err = FixTerminatingPods(cluster, podControl,
5*time.Minute, dryRun); err != nil {
+ return actionDone, err
+ } else if actionDone {
+ glog.V(2).Infof("FixTerminatingPods done an action on
the cluster (dryRun:%v)", dryRun)
+ return actionDone, nil
+ }
+
+ // forget nodes and delete pods when a submarine node is
untrusted.
+ if actionDone, err = FixClusterSplit(admin, config, infos,
dryRun); err != nil {
+ return actionDone, err
+ } else if actionDone {
+ glog.V(2).Infof("FixClusterSplit done an action on the
cluster (dryRun:%v)", dryRun)
+ return actionDone, nil
+ }*/
+
+ return true, nil ///actionDone, err
+}
diff --git a/submarine-cloud/pkg/controller/services_control.go
b/submarine-cloud/pkg/controller/services_control.go
index 7137995..93780ed 100644
--- a/submarine-cloud/pkg/controller/services_control.go
+++ b/submarine-cloud/pkg/controller/services_control.go
@@ -68,3 +68,11 @@ func (s *ServicesControl)
DeleteSubmarineClusterService(submarineCluster *rapi.S
glog.Infof("DeleteSubmarineClusterService()")
return nil
}
+
+func getServiceName(submarineCluster *rapi.SubmarineCluster) string {
+ serviceName := submarineCluster.Name
+ if submarineCluster.Spec.ServiceName != "" {
+ serviceName = submarineCluster.Spec.ServiceName
+ }
+ return serviceName
+}
diff --git a/submarine-cloud/pkg/controller/utils.go
b/submarine-cloud/pkg/controller/utils.go
new file mode 100644
index 0000000..8d9eaa1
--- /dev/null
+++ b/submarine-cloud/pkg/controller/utils.go
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package controller
+
+import (
+ "errors"
+ "fmt"
+ "github.com/golang/glog"
+ apiv1 "k8s.io/api/core/v1"
+ "net"
+ "time"
+
+ "github.com/apache/submarine/submarine-cloud/pkg/config"
+ "github.com/apache/submarine/submarine-cloud/pkg/submarine"
+)
+
+// NewSubmarineAdmin builds and returns new submarine.Admin from the list of
pods
+func NewSubmarineAdmin(pods []*apiv1.Pod, cfg *config.Submarine)
(submarine.AdminInterface, error) {
+ nodesAddrs := []string{}
+ for _, pod := range pods {
+ submarinePort := submarine.DefaultSubmarinePort
+ glog.Info("pod = %v", pod)
+ for _, container := range pod.Spec.Containers {
+ if container.Name == "submarine-node" {
+ for _, port := range container.Ports {
+ if port.Name == "submarine" {
+ submarinePort =
fmt.Sprintf("%d", port.ContainerPort)
+ }
+ }
+ }
+ }
+ nodesAddrs = append(nodesAddrs,
net.JoinHostPort(pod.Status.PodIP, submarinePort))
+ }
+ adminConfig := submarine.AdminOptions{
+ ConnectionTimeout: time.Duration(cfg.DialTimeout) *
time.Millisecond,
+ RenameCommandsFile: cfg.GetRenameCommandsFile(),
+ }
+
+ return submarine.NewAdmin(nodesAddrs, &adminConfig), nil
+}
+
+// IsPodReady check if pod is in ready condition, return the error message
otherwise
+func IsPodReady(pod *apiv1.Pod) (bool, error) {
+ if pod == nil {
+ return false, errors.New("No Pod")
+ }
+
+ // get ready condition
+ var readycondition apiv1.PodCondition
+ found := false
+ for _, cond := range pod.Status.Conditions {
+ if cond.Type == apiv1.PodReady {
+ readycondition = cond
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ return false, errors.New("Cound't find ready condition")
+ }
+
+ if readycondition.Status != apiv1.ConditionTrue {
+ return false, errors.New(readycondition.Message)
+ }
+
+ return true, nil
+}
diff --git a/submarine-cloud/pkg/operator/config.go
b/submarine-cloud/pkg/operator/config.go
index eaa01ce..7caec95 100644
--- a/submarine-cloud/pkg/operator/config.go
+++ b/submarine-cloud/pkg/operator/config.go
@@ -38,6 +38,6 @@ func NewSubmarineOperatorConfig() *Config {
func (c *Config) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.KubeConfigFile, "kubeconfig", c.KubeConfigFile,
"Location of kubecfg file for access to kubernetes master service")
fs.StringVar(&c.Master, "master", c.Master, "The address of the
Kubernetes API server. Overrides any value in kubeconfig. Only required if
out-of-cluster.")
- fs.StringVar(&c.ListenAddr, "addr", "0.0.0.0:8086", "listen address of
the http server which serves kubernetes probes and prometheus endpoints")
+ fs.StringVar(&c.ListenAddr, "addr", "0.0.0.0:8080", "listen address of
the http server which serves kubernetes probes and prometheus endpoints")
c.Submarine.AddFlags(fs)
}
diff --git a/submarine-cloud/pkg/operator/operator.go
b/submarine-cloud/pkg/operator/operator.go
index dff37cc..419f777 100644
--- a/submarine-cloud/pkg/operator/operator.go
+++ b/submarine-cloud/pkg/operator/operator.go
@@ -17,11 +17,15 @@
package operator
import (
+ "context"
+ "fmt"
"github.com/apache/submarine/submarine-cloud/pkg/client"
"github.com/apache/submarine/submarine-cloud/pkg/controller"
+ "github.com/heptiolabs/healthcheck"
apiextensionsclient
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
+ "net/http"
"time"
submarineInformers
"github.com/apache/submarine/submarine-cloud/pkg/client/informers/externalversions"
@@ -37,6 +41,9 @@ type SubmarineOperator struct {
kubeInformerFactory kubeinformers.SharedInformerFactory
submarineInformerFactory submarineInformers.SharedInformerFactory
controller *controller.Controller
+ // Kubernetes Probes handler
+ health healthcheck.Handler
+ httpServer *http.Server
}
func NewSubmarineOperator(cfg *Config) *SubmarineOperator {
@@ -72,6 +79,9 @@ func NewSubmarineOperator(cfg *Config) *SubmarineOperator {
controller:
controller.NewController(controller.NewConfig(1, cfg.Submarine), kubeClient,
submarineClient, kubeInformerFactory, submarineInformerFactory),
}
+ op.configureHealth()
+ op.httpServer = &http.Server{Addr: cfg.ListenAddr, Handler: op.health}
+
return op
}
@@ -88,8 +98,51 @@ func (op *SubmarineOperator) Run(stop <-chan struct{}) error
{
if op.controller != nil {
op.kubeInformerFactory.Start(stop)
op.submarineInformerFactory.Start(stop)
+ go op.runHTTPServer(stop)
err = op.controller.Run(stop)
}
return err
}
+
+func (op *SubmarineOperator) configureHealth() {
+ op.health = healthcheck.NewHandler()
+ op.health.AddReadinessCheck("SubmarineCluster_cache_sync", func() error
{
+ if op.controller.SubmarineClusterSynced() {
+ return nil
+ }
+ return fmt.Errorf("SubmarineCluster cache not sync")
+ })
+ op.health.AddReadinessCheck("Pod_cache_sync", func() error {
+ if op.controller.PodSynced() {
+ return nil
+ }
+ return fmt.Errorf("Pod cache not sync")
+ })
+ op.health.AddReadinessCheck("Service_cache_sync", func() error {
+ if op.controller.ServiceSynced() {
+ return nil
+ }
+ return fmt.Errorf("Service cache not sync")
+ })
+ op.health.AddReadinessCheck("PodDiscruptionBudget_cache_sync", func()
error {
+ if op.controller.PodDiscruptionBudgetSynced() {
+ return nil
+ }
+ return fmt.Errorf("PodDiscruptionBudget cache not sync")
+ })
+}
+
+func (op *SubmarineOperator) runHTTPServer(stop <-chan struct{}) error {
+ go func() {
+ glog.Infof("Listening on http://%s\n", op.httpServer.Addr)
+
+ if err := op.httpServer.ListenAndServe(); err != nil {
+ glog.Error("Http server error: ", err)
+ }
+ }()
+
+ <-stop
+ glog.Info("Shutting down the http server...")
+ return op.httpServer.Shutdown(context.Background())
+}
diff --git a/submarine-cloud/pkg/submarine/admin.go
b/submarine-cloud/pkg/submarine/admin.go
new file mode 100644
index 0000000..3924c8f
--- /dev/null
+++ b/submarine-cloud/pkg/submarine/admin.go
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package submarine
+
+import (
+ "github.com/golang/glog"
+ "time"
+)
+
+// AdminInterface submarine cluster admin interface
+type AdminInterface interface {
+ // Connections returns the connection map of all clients
+ Connections() AdminConnectionsInterface
+ // Close the admin connections
+ Close()
+ // InitSubmarineCluster used to configure the first node of a cluster
+ InitSubmarineCluster(addr string) error
+ // GetClusterInfos get node infos for all nodes
+ GetClusterInfos() (*ClusterInfos, error)
+ // GetClusterInfosSelected return the Nodes infos for all nodes
selected in the cluster
+ //GetClusterInfosSelected(addrs []string) (*ClusterInfos, error)
+ // AttachNodeToCluster command use to connect a Node to the cluster
+ // the connection will be done on a random node part of the connection
pool
+ AttachNodeToCluster(addr string) error
+ // AttachSlaveToMaster attach a slave to a master node
+ AttachSlaveToMaster(slave *Node, master *Node) error
+ // DetachSlave dettach a slave to its master
+ //DetachSlave(slave *Node) error
+ // StartFailover execute the failover of the Submarine Master
corresponding to the addr
+ StartFailover(addr string) error
+ // ForgetNode execute the Submarine command to force the cluster to
forgot the the Node
+ ForgetNode(id string) error
+ // ForgetNodeByAddr execute the Submarine command to force the cluster
to forgot the the Node
+ ForgetNodeByAddr(id string) error
+ // SetSlots exect the submarine command to set slots in a pipeline,
provide
+ // and empty nodeID if the set slots commands doesn't take a nodeID in
parameter
+ //SetSlots(addr string, action string, slots []Slot, nodeID string)
error
+ // AddSlots exect the submarine command to add slots in a pipeline
+ //AddSlots(addr string, slots []Slot) error
+ // DelSlots exec the submarine command to del slots in a pipeline
+ //DelSlots(addr string, slots []Slot) error
+ // GetKeysInSlot exec the submarine command to get the keys in the
given slot on the node we are connected to
+ //GetKeysInSlot(addr string, slot Slot, batch int, limit bool)
([]string, error)
+ // CountKeysInSlot exec the submarine command to count the keys given
slot on the node
+ //CountKeysInSlot(addr string, slot Slot) (int64, error)
+ // MigrateKeys from addr to destination node. returns number of slot
migrated. If replace is true, replace key on busy error
+ //MigrateKeys(addr string, dest *Node, slots []Slot, batch, timeout
int, replace bool) (int, error)
+ // FlushAndReset reset the cluster configuration of the node, the node
is flushed in the same pipe to ensure reset works
+ FlushAndReset(addr string, mode string) error
+ // FlushAll flush all keys in cluster
+ FlushAll()
+ // GetHashMaxSlot get the max slot value
+ //GetHashMaxSlot() Slot
+ //RebuildConnectionMap rebuild the connection map according to the
given addresses
+ //RebuildConnectionMap(addrs []string, options *AdminOptions)
+}
+
+// AdminOptions optional options for submarine admin
+type AdminOptions struct {
+ ConnectionTimeout time.Duration
+ ClientName string
+ RenameCommandsFile string
+}
+
+// Admin wraps submarine cluster admin logic
+type Admin struct {
+ ///hashMaxSlots Slot
+ cnx AdminConnectionsInterface
+}
+
+func (a Admin) Connections() AdminConnectionsInterface {
+ return a.cnx
+}
+
+func (a Admin) Close() {
+ a.Connections().Reset()
+}
+
+func (a Admin) InitSubmarineCluster(addr string) error {
+ panic("implement me")
+}
+
+func (a Admin) GetClusterInfos() (*ClusterInfos, error) {
+ glog.V(1).Info("GetClusterInfos")
+
+ infos := NewClusterInfos()
+ clusterErr := NewClusterInfosError()
+
+ for addr, c := range a.Connections().GetAll() {
+ nodeinfos, err := a.getInfos(c, addr)
+ if err != nil {
+ infos.Status = ClusterInfosPartial
+ clusterErr.partial = true
+ clusterErr.errs[addr] = err
+ continue
+ }
+ if nodeinfos.Node != nil && nodeinfos.Node.IPPort() == addr {
+ infos.Infos[addr] = nodeinfos
+ } else {
+ glog.Warningf("Bad node info retreived from %s", addr)
+ }
+ }
+
+ if len(clusterErr.errs) == 0 {
+ clusterErr.inconsistent = !infos.ComputeStatus()
+ }
+ if infos.Status == ClusterInfosConsistent {
+ return infos, nil
+ }
+ return infos, clusterErr
+}
+
+func (a Admin) AttachNodeToCluster(addr string) error {
+ panic("implement me")
+}
+
+func (a Admin) AttachSlaveToMaster(slave *Node, master *Node) error {
+ panic("implement me")
+}
+
+func (a Admin) StartFailover(addr string) error {
+ panic("implement me")
+}
+
+func (a Admin) ForgetNode(id string) error {
+ panic("implement me")
+}
+
+func (a Admin) ForgetNodeByAddr(id string) error {
+ panic("implement me")
+}
+
+func (a Admin) FlushAndReset(addr string, mode string) error {
+ panic("implement me")
+}
+
+func (a Admin) FlushAll() {
+ panic("implement me")
+}
+
+// NewAdmin returns new AdminInterface instance
+// at the same time it connects to all Submarine Nodes thanks to the addrs list
+func NewAdmin(addrs []string, options *AdminOptions) AdminInterface {
+ a := &Admin{
+ //hashMaxSlots: defaultHashMaxSlots,
+ }
+
+ // perform initial connections
+ a.cnx = NewAdminConnections(addrs, options)
+
+ return a
+}
+
+func (a *Admin) getInfos(c ClientInterface, addr string) (*NodeInfos, error) {
+ /*
+ resp := c.Cmd("CLUSTER", "NODES")
+ if err := a.Connections().ValidateResp(resp, addr, "Unable to
retrieve Node Info"); err != nil {
+ return nil, err
+ }
+
+ var raw string
+ var err error
+ raw, err = resp.Str()
+
+ if err != nil {
+ return nil, fmt.Errorf("Wrong format from CLUSTER
NODES: %v", err)
+ }
+ */
+ var raw string = ""
+ nodeInfos := DecodeNodeInfos(&raw, addr)
+
+ /*
+ if glog.V(3) {
+ //Retrieve server info for debugging
+ resp = c.Cmd("INFO", "SERVER")
+ if err = a.Connections().ValidateResp(resp, addr,
"Unable to retrieve Node Info"); err != nil {
+ return nil, err
+ }
+ raw, err = resp.Str()
+ if err != nil {
+ return nil, fmt.Errorf("Wrong format from INFO
SERVER: %v", err)
+ }
+
+ var serverStartTime time.Time
+ serverStartTime, err = DecodeNodeStartTime(&raw)
+
+ if err != nil {
+ return nil, err
+ }
+
+ nodeInfos.Node.ServerStartTime = serverStartTime
+ }*/
+
+ return nodeInfos, nil
+}
diff --git a/submarine-cloud/pkg/submarine/client.go
b/submarine-cloud/pkg/submarine/client.go
new file mode 100644
index 0000000..f866e5c
--- /dev/null
+++ b/submarine-cloud/pkg/submarine/client.go
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package submarine
+
+import (
+ "bytes"
+ "encoding/json"
+ "github.com/golang/glog"
+ "io"
+ "net/http"
+ "time"
+)
+
+// ClientInterface submarine client interface
+type ClientInterface interface {
+ // Close closes the connection.
+ Close() error
+
+ // Cmd calls the given Submarine command.
+ ///Cmd(cmd string, args ...interface{}) *submarine.Resp
+
+ // PipeAppend adds the given call to the pipeline queue.
+ // Use PipeResp() to read the response.
+ ///PipeAppend(cmd string, args ...interface{})
+
+ // PipeResp returns the reply for the next request in the pipeline
queue. Err
+ // with ErrPipelineEmpty is returned if the pipeline queue is empty.
+ ///PipeResp() *submarine.Resp
+
+ // PipeClear clears the contents of the current pipeline queue, both
commands
+ // queued by PipeAppend which have yet to be sent and responses which
have yet
+ // to be retrieved through PipeResp. The first returned int will be the
number
+ // of pending commands dropped, the second will be the number of pending
+ // responses dropped
+ PipeClear() (int, int)
+
+ // ReadResp will read a Resp off of the connection without sending
anything
+ // first (useful after you've sent a SUSBSCRIBE command). This will
block until
+ // a reply is received or the timeout is reached (returning the IOErr).
You can
+ // use IsTimeout to check if the Resp is due to a Timeout
+ //
+ // Note: this is a more low-level function, you really shouldn't have to
+ // actually use it unless you're writing your own pub/sub code
+ ///ReadResp() *submarine.Resp
+
+ // GetClusterAddress calls the given Submarine cluster server address
list
+ GetClusterAddress() ([]string, error)
+}
+
+// Client structure representing a client connection to submarine
+type Client struct {
+ commandsMapping map[string]string
+ ///client *submarine.Client
+ client ClientInterface
+}
+
+const getClusterAddressUrl = "/api/v1/cluster/address"
+const getClusterNodesUrl = "/api/v1/cluster/nodes"
+
+// NewClient build a client connection and connect to a submarine address
+func NewClient(addr string, cnxTimeout time.Duration, commandsMapping
map[string]string) (ClientInterface, error) {
+ var err error
+ c := &Client{
+ commandsMapping: commandsMapping,
+ }
+
+ // c.client, err = submarine.DialTimeout("tcp", addr, cnxTimeout)
+ // TODO error!!!!
+
+ return c.client, err
+}
+
+// GetClusterAddress calls the given Submarine cluster server address list.
+func (c *Client) GetClusterAddress(host string) ([]string, error) {
+ clusterAddrBuff := httpGet(host + getClusterAddressUrl)
+ var clusterAddress []string
+ err := json.Unmarshal(clusterAddrBuff.Bytes(), &clusterAddress)
+ if err != nil {
+ glog.Error("Unmarshal failure: %s", clusterAddrBuff.String())
+ }
+
+ return clusterAddress, nil
+}
+
+func httpGet(url string) *bytes.Buffer {
+ client := &http.Client{Timeout: 5 * time.Second}
+ resp, err := client.Get(url)
+ if err != nil {
+ panic(err)
+ }
+ defer resp.Body.Close()
+ var buffer [4096]byte
+ result := bytes.NewBuffer(nil)
+ for {
+ n, err := resp.Body.Read(buffer[0:])
+ result.Write(buffer[0:n])
+ if err != nil && err == io.EOF {
+ break
+ } else if err != nil {
+ panic(err)
+ }
+ }
+
+ return result
+}
diff --git a/submarine-cloud/pkg/submarine/cluster.go
b/submarine-cloud/pkg/submarine/cluster.go
new file mode 100644
index 0000000..a13c97c
--- /dev/null
+++ b/submarine-cloud/pkg/submarine/cluster.go
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package submarine
+
+import v1
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+
+// Cluster represents a Submarine Cluster
+type Cluster struct {
+ Name string
+ Namespace string
+ Nodes map[string]*Node
+ Status v1.ClusterStatus
+ NodesPlacement v1.NodesPlacementInfo
+ ActionsInfo ClusterActionsInfo
+}
+
+// ClusterActionsInfo use to store information about current action on the
Cluster
+type ClusterActionsInfo struct {
+ NbslotsToMigrate int32
+}
+
+// GetNodeByID returns a Cluster Node by its ID
+// if not present in the cluster return an error
+func (c *Cluster) GetNodeByID(id string) (*Node, error) {
+ if n, ok := c.Nodes[id]; ok {
+ return n, nil
+ }
+ return nil, nodeNotFoundedError
+}
diff --git a/submarine-cloud/pkg/submarine/clusterinfo.go
b/submarine-cloud/pkg/submarine/clusterinfo.go
new file mode 100644
index 0000000..ff01d29
--- /dev/null
+++ b/submarine-cloud/pkg/submarine/clusterinfo.go
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package submarine
+
+const (
+ // ClusterInfosUnset status of the cluster info: no data set
+ ClusterInfosUnset = "Unset"
+ // ClusterInfosPartial status of the cluster info: data is not complete
(some nodes didn't respond)
+ ClusterInfosPartial = "Partial"
+ // ClusterInfosInconsistent status of the cluster info: nodesinfos is
not consistent between nodes
+ ClusterInfosInconsistent = "Inconsistent"
+ // ClusterInfosConsistent status of the cluster info: nodeinfos is
complete and consistent between nodes
+ ClusterInfosConsistent = "Consistent"
+)
+
+// ClusterInfos represents the node infos for all nodes of the cluster
+type ClusterInfos struct {
+ Infos map[string]*NodeInfos
+ Status string
+}
+
+// NodeInfos representation of a node info, i.e. data returned by the CLUSTER
NODE submarine command
+// Node is the information of the targetted node
+// Friends are the view of the other nodes from the targetted node
+type NodeInfos struct {
+ Node *Node
+ Friends Nodes
+}
+
+// GetNodes returns a nodeSlice view of the cluster
+// the slice if formed from how each node see itself
+// you should check the Status before doing it, to wait for a consistent view
+func (c *ClusterInfos) GetNodes() Nodes {
+ nodes := Nodes{}
+ for _, nodeinfos := range c.Infos {
+ nodes = append(nodes, nodeinfos.Node)
+ }
+ return nodes
+}
+
+// NewNodeInfos returns an instance of NodeInfo
+func NewNodeInfos() *NodeInfos {
+ return &NodeInfos{
+ Node: NewDefaultNode(),
+ Friends: Nodes{},
+ }
+}
+
+// NewClusterInfos returns an instance of ClusterInfos
+func NewClusterInfos() *ClusterInfos {
+ return &ClusterInfos{
+ Infos: make(map[string]*NodeInfos),
+ Status: ClusterInfosUnset,
+ }
+}
+
+// DecodeNodeInfos decode from the cmd output the Submarine nodes info. Second
argument is the node on which we are connected to request info
+func DecodeNodeInfos(input *string, addr string) *NodeInfos {
+ infos := NewNodeInfos()
+ /*
+ lines := strings.Split(*input, "\n")
+ for _, line := range lines {
+ values := strings.Split(line, " ")
+ if len(values) < 8 {
+ // last line is always empty
+ glog.V(7).Infof("Not enough values in line
split, ignoring line: '%s'", line)
+ continue
+ } else {
+ node := NewDefaultNode()
+
+ node.ID = values[0]
+ //remove trailing port for cluster internal
protocol
+ ipPort := strings.Split(values[1], "@")
+ if ip, port, err :=
net.SplitHostPort(ipPort[0]); err == nil {
+ node.IP = ip
+ node.Port = port
+ if ip == "" {
+ // ip of the node we are
connecting to is sometime empty
+ node.IP, _, _ =
net.SplitHostPort(addr)
+ }
+ } else {
+ glog.Errorf("Error while decoding node
info for node '%s', cannot split ip:port ('%s'): %v", node.ID, values[1], err)
+ }
+ node.SetRole(values[2])
+ node.SetFailureStatus(values[2])
+ node.SetReferentMaster(values[3])
+ if i, err := strconv.ParseInt(values[4], 10,
64); err == nil {
+ node.PingSent = i
+ }
+ if i, err := strconv.ParseInt(values[5], 10,
64); err == nil {
+ node.PongRecv = i
+ }
+ if i, err := strconv.ParseInt(values[6], 10,
64); err == nil {
+ node.ConfigEpoch = i
+ }
+ node.SetLinkStatus(values[7])
+
+ for _, slot := range values[8:] {
+ if s, importing, migrating, err :=
DecodeSlotRange(slot); err == nil {
+ node.Slots = append(node.Slots,
s...)
+ if importing != nil {
+
node.ImportingSlots[importing.SlotID] = importing.FromNodeID
+ }
+ if migrating != nil {
+
node.MigratingSlots[migrating.SlotID] = migrating.ToNodeID
+ }
+ }
+ }
+
+ if strings.HasPrefix(values[2], "myself") {
+ infos.Node = node
+ glog.V(7).Infof("Getting node info for
node: '%s'", node)
+ } else {
+ infos.Friends = append(infos.Friends,
node)
+ glog.V(7).Infof("Adding node to slice:
'%s'", node)
+ }
+ }
+ }*/
+
+ return infos
+}
+
+// ComputeStatus check the ClusterInfos status based on the current data
+// the status ClusterInfosPartial is set while building the clusterinfos
+// if already set, do nothing
+// returns true if contistent or if another error
+func (c *ClusterInfos) ComputeStatus() bool {
+ if c.Status != ClusterInfosUnset {
+ return false
+ }
+ return true
+
+ /*
+ consistencyStatus := false
+
+ consolidatedView := c.GetNodes().SortByFunc(LessByID)
+ consolidatedSignature := getConfigSignature(consolidatedView)
+ glog.V(7).Infof("Consolidated view:\n%s", consolidatedSignature)
+ for addr, nodeinfos := range c.Infos {
+ nodesView := append(nodeinfos.Friends,
nodeinfos.Node).SortByFunc(LessByID)
+ nodeSignature := getConfigSignature(nodesView)
+ glog.V(7).Infof("Node view from %s (ID: %s):\n%s",
addr, nodeinfos.Node.ID, nodeSignature)
+ if !reflect.DeepEqual(consolidatedSignature,
nodeSignature) {
+ glog.V(4).Info("Temporary inconsistency between
nodes is possible. If the following inconsistency message persists for more
than 20 mins, any cluster operation (scale, rolling update) should be avoided
before the message is gone")
+ glog.V(4).Infof("Inconsistency from %s:
\n%s\nVS\n%s", addr, consolidatedSignature, nodeSignature)
+ c.Status = ClusterInfosInconsistent
+ }
+ }
+ if c.Status == ClusterInfosUnset {
+ c.Status = ClusterInfosConsistent
+ consistencyStatus = true
+ }
+ return consistencyStatus*/
+}
diff --git a/submarine-cloud/pkg/submarine/connections.go
b/submarine-cloud/pkg/submarine/connections.go
new file mode 100644
index 0000000..5de06bf
--- /dev/null
+++ b/submarine-cloud/pkg/submarine/connections.go
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package submarine
+
+import (
+ "bufio"
+ "errors"
+ "github.com/golang/glog"
+ "math/rand"
+ "os"
+ "strings"
+ "time"
+)
+
+const (
+ defaultClientTimeout = 2 * time.Second
+ defaultClientName = ""
+
+ // ErrNotFound cannot find a node to connect to
+ ErrNotFound = "Unable to find a node to connect"
+)
+
+// AdminConnectionsInterface interface representing the map of admin
connections to submarine cluster nodes
+type AdminConnectionsInterface interface {
+ // Add connect to the given address and
+ // register the client connection to the pool
+ Add(addr string) error
+ // Remove disconnect and remove the client connection from the map
+ Remove(addr string)
+ // Get returns a client connection for the given address,
+ // connects if the connection is not in the map yet
+ Get(addr string) (ClientInterface, error)
+ // GetRandom returns a client connection to a random node of the client
map
+ GetRandom() (ClientInterface, error)
+ // GetDifferentFrom returns a random client connection different from
given address
+ GetDifferentFrom(addr string) (ClientInterface, error)
+ // GetAll returns a map of all clients per address
+ GetAll() map[string]ClientInterface
+ //GetSelected returns a map of clients based on the input addresses
+ GetSelected(addrs []string) map[string]ClientInterface
+ // Reconnect force a reconnection on the given address
+ // if the adress is not part of the map, act like Add
+ Reconnect(addr string) error
+ // AddAll connect to the given list of addresses and
+ // register them in the map
+ // fail silently
+ AddAll(addrs []string)
+ // ReplaceAll clear the map and re-populate it with new connections
+ // fail silently
+ ReplaceAll(addrs []string)
+ // ValidateResp check the submarine resp, eventually reconnect on
connection error
+ // in case of error, customize the error, log it and return it
+ ///ValidateResp(resp *submarine.Resp, addr, errMessage string) error
+ // ValidatePipeResp wait for all answers in the pipe and validate the
response
+ // in case of network issue clear the pipe and return
+ // in case of error return false
+ // ValidatePipeResp(c ClientInterface, addr, errMessage string) bool
+ // Reset close all connections and clear the connection map
+ Reset()
+}
+
+// AdminConnections connection map for submarine cluster
+// currently the admin connection is not threadSafe since it is only use in
the Events thread.
+type AdminConnections struct {
+ clients map[string]ClientInterface
+ connectionTimeout time.Duration
+ commandsMapping map[string]string
+ clientName string
+}
+
+// Add connect to the given address and
+// register the client connection to the map
+func (cnx *AdminConnections) Add(addr string) error {
+ _, err := cnx.Update(addr)
+ return err
+}
+
+// Update returns a client connection for the given adress,
+// connects if the connection is not in the map yet
+func (cnx *AdminConnections) Update(addr string) (ClientInterface, error) {
+ // if already exist close the current connection
+ if c, ok := cnx.clients[addr]; ok {
+ c.Close()
+ }
+
+ c, err := cnx.connect(addr)
+ if err == nil && c != nil {
+ cnx.clients[addr] = c
+ } else {
+ glog.V(3).Infof("Cannot connect to %s ", addr)
+ }
+ return c, err
+}
+
+func (cnx *AdminConnections) connect(addr string) (ClientInterface, error) {
+ c, err := NewClient(addr, cnx.connectionTimeout, cnx.commandsMapping)
+ if err != nil {
+ return nil, err
+ }
+ if cnx.clientName != "" {
+ ///resp := c.Cmd("CLIENT", "SETNAME", cnx.clientName)
+ ///return c, cnx.ValidateResp(resp, addr, "Unable to run
command CLIENT SETNAME")
+ }
+
+ return c, nil
+}
+
+// AddAll connect to the given list of addresses and
+// register them in the map
+// fail silently
+func (cnx *AdminConnections) AddAll(addrs []string) {
+ for _, addr := range addrs {
+ cnx.Add(addr)
+ }
+}
+
+// buildCommandReplaceMapping reads the config file with the command-replace
lines and build a mapping of
+// bad lines are ignored silently
+func buildCommandReplaceMapping(filePath string) map[string]string {
+ mapping := make(map[string]string)
+ file, err := os.Open(filePath)
+ if err != nil {
+ glog.Errorf("Cannor open %s: %v", filePath, err)
+ return mapping
+ }
+ defer file.Close()
+
+ scanner := bufio.NewScanner(file)
+ for scanner.Scan() {
+ elems := strings.Fields(scanner.Text())
+ if len(elems) == 3 && strings.ToLower(elems[0]) ==
"rename-command" {
+ mapping[strings.ToUpper(elems[1])] = elems[2]
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ glog.Errorf("Cannor parse %s: %v", filePath, err)
+ return mapping
+ }
+ return mapping
+}
+
+// NewAdminConnections returns and instance of AdminConnectionsInterface
+func NewAdminConnections(addrs []string, options *AdminOptions)
AdminConnectionsInterface {
+ cnx := &AdminConnections{
+ clients: make(map[string]ClientInterface),
+ connectionTimeout: defaultClientTimeout,
+ commandsMapping: make(map[string]string),
+ clientName: defaultClientName,
+ }
+ if options != nil {
+ if options.ConnectionTimeout != 0 {
+ cnx.connectionTimeout = options.ConnectionTimeout
+ }
+ if _, err := os.Stat(options.RenameCommandsFile); err == nil {
+ cnx.commandsMapping =
buildCommandReplaceMapping(options.RenameCommandsFile)
+ }
+ cnx.clientName = options.ClientName
+ }
+ cnx.AddAll(addrs)
+ return cnx
+}
+
+// ReplaceAll clear the pool and re-populate it with new connections
+// fail silently
+func (cnx *AdminConnections) ReplaceAll(addrs []string) {
+ cnx.Reset()
+ cnx.AddAll(addrs)
+}
+
+// Reset close all connections and clear the connection map
+func (cnx *AdminConnections) Reset() {
+ for _, c := range cnx.clients {
+ c.Close()
+ }
+ cnx.clients = map[string]ClientInterface{}
+}
+
+// Remove disconnect and remove the client connection from the map
+func (cnx *AdminConnections) Remove(addr string) {
+ if c, ok := cnx.clients[addr]; ok {
+ c.Close()
+ delete(cnx.clients, addr)
+ }
+}
+
+// Get returns a client connection for the given adress,
+// connects if the connection is not in the map yet
+func (cnx *AdminConnections) Get(addr string) (ClientInterface, error) {
+ if c, ok := cnx.clients[addr]; ok {
+ return c, nil
+ }
+ c, err := cnx.connect(addr)
+ if err == nil && c != nil {
+ cnx.clients[addr] = c
+ }
+ return c, err
+}
+
+// GetRandom returns a client connection to a random node of the client map
+func (cnx *AdminConnections) GetRandom() (ClientInterface, error) {
+ _, c, err := cnx.getRandomKeyClient()
+ return c, err
+}
+
+// GetRandom returns a client connection to a random node of the client map
+func (cnx *AdminConnections) getRandomKeyClient() (string, ClientInterface,
error) {
+ nbClient := len(cnx.clients)
+ if nbClient == 0 {
+ return "", nil, errors.New(ErrNotFound)
+ }
+ randNumber := rand.Intn(nbClient)
+ for k, c := range cnx.clients {
+ if randNumber == 0 {
+ return k, c, nil
+ }
+ randNumber--
+ }
+
+ return "", nil, errors.New(ErrNotFound)
+}
+
+// GetDifferentFrom returns random a client connection different from given
address
+func (cnx *AdminConnections) GetDifferentFrom(addr string) (ClientInterface,
error) {
+ if len(cnx.clients) == 1 {
+ for a, c := range cnx.clients {
+ if a != addr {
+ return c, nil
+ }
+ return nil, errors.New(ErrNotFound)
+ }
+ }
+
+ for {
+ a, c, err := cnx.getRandomKeyClient()
+ if err != nil {
+ return nil, err
+ }
+ if a != addr {
+ return c, nil
+ }
+ }
+}
+
+// GetAll returns a map of all clients per address
+func (cnx *AdminConnections) GetAll() map[string]ClientInterface {
+ return cnx.clients
+}
+
+//GetSelected returns a map of clients based on the input addresses
+func (cnx *AdminConnections) GetSelected(addrs []string)
map[string]ClientInterface {
+ clientsSelected := make(map[string]ClientInterface)
+ for _, addr := range addrs {
+ if client, ok := cnx.clients[addr]; ok {
+ clientsSelected[addr] = client
+ }
+ }
+ return clientsSelected
+}
+
+// Reconnect force a reconnection on the given address
+// is the adress is not part of the map, act like Add
+func (cnx *AdminConnections) Reconnect(addr string) error {
+ glog.Infof("Reconnecting to %s", addr)
+ cnx.Remove(addr)
+ return cnx.Add(addr)
+}
diff --git a/submarine-cloud/pkg/submarine/errors.go
b/submarine-cloud/pkg/submarine/errors.go
new file mode 100644
index 0000000..64116f3
--- /dev/null
+++ b/submarine-cloud/pkg/submarine/errors.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package submarine
+
+import "fmt"
+
+// Error used to represent an error
+type Error string
+
+func (e Error) Error() string { return string(e) }
+
+// ClusterInfosError error type for submarine cluster infos access
+type ClusterInfosError struct {
+ errs map[string]error
+ partial bool
+ inconsistent bool
+}
+
+// nodeNotFoundedError returns when a node is not present in the cluster
+const nodeNotFoundedError = Error("node not founded")
+
+// Partial true if the some nodes of the cluster didn't answer
+func (e ClusterInfosError) Partial() bool {
+ return e.partial
+}
+
+// Error error string
+func (e ClusterInfosError) Error() string {
+ s := ""
+ if e.partial {
+ s += "Cluster infos partial: "
+ for addr, err := range e.errs {
+ s += fmt.Sprintf("%s: '%s'", addr, err)
+ }
+ return s
+ }
+ if e.inconsistent {
+ s += "Cluster view is inconsistent"
+ }
+ return s
+}
+
+// IsPartialError returns true if the error is due to partial data recovery
+func IsPartialError(err error) bool {
+ e, ok := err.(ClusterInfosError)
+ return ok && e.Partial()
+}
+
+// NewClusterInfosError returns an instance of cluster infos error
+func NewClusterInfosError() ClusterInfosError {
+ return ClusterInfosError{
+ errs: make(map[string]error),
+ partial: false,
+ inconsistent: false,
+ }
+}
diff --git a/submarine-cloud/pkg/submarine/node.go
b/submarine-cloud/pkg/submarine/node.go
new file mode 100644
index 0000000..2fa9a1f
--- /dev/null
+++ b/submarine-cloud/pkg/submarine/node.go
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package submarine
+
+import (
+
"github.com/apache/submarine/submarine-cloud/pkg/apis/submarine/v1alpha1"
+ kapiv1 "k8s.io/api/core/v1"
+ "net"
+ "sort"
+ "time"
+)
+
+const (
+ // DefaultSubmarinePort define the default Submarine Port
+ DefaultSubmarinePort = "8080"
+ // submarineMasterRole submarine role master
+ submarineMasterRole = "master"
+ // submarineSlaveRole submarine role slave
+ submarineSlaveRole = "slave"
+)
+
+// Node Represent a Submarine Node
+type Node struct {
+ ID string
+ IP string
+ Port string
+ Role string
+ LinkState string
+ MasterReferent string
+ FailStatus []string
+ PingSent int64
+ PongRecv int64
+ ConfigEpoch int64
+ ///Slots []Slot
+ ///MigratingSlots map[Slot]string
+ ///ImportingSlots map[Slot]string
+ ServerStartTime time.Time
+
+ Pod *kapiv1.Pod
+}
+
+// NewDefaultNode builds and returns new defaultNode instance
+func NewDefaultNode() *Node {
+ return &Node{
+ Port: DefaultSubmarinePort,
+ ///Slots: []Slot{},
+ ///MigratingSlots: map[Slot]string{},
+ ///ImportingSlots: map[Slot]string{},
+ }
+}
+
+// Nodes represent a Node slice
+type Nodes []*Node
+
+// nodeSorter joins a By function and a slice of Nodes to be sorted.
+type nodeSorter struct {
+ nodes Nodes
+ by func(p1, p2 *Node) bool // Closure used in the Less method.
+}
+
+// Len is part of sort.Interface.
+func (s *nodeSorter) Len() int {
+ return len(s.nodes)
+}
+
+// Swap is part of sort.Interface.
+func (s *nodeSorter) Swap(i, j int) {
+ s.nodes[i], s.nodes[j] = s.nodes[j], s.nodes[i]
+}
+
+// Less is part of sort.Interface. It is implemented by calling the "by"
closure in the sorter.
+func (s *nodeSorter) Less(i, j int) bool {
+ return s.by(s.nodes[i], s.nodes[j])
+}
+
+// FindNodeFunc function for finding a Node
+// it is use as input for GetNodeByFunc and GetNodesByFunc
+type FindNodeFunc func(node *Node) bool
+
+// GetNodesByFunc returns first node found by the FindNodeFunc
+func (n Nodes) GetNodesByFunc(f FindNodeFunc) (Nodes, error) {
+ nodes := Nodes{}
+ for _, node := range n {
+ if f(node) {
+ nodes = append(nodes, node)
+ }
+ }
+ if len(nodes) == 0 {
+ return nodes, nodeNotFoundedError
+ }
+ return nodes, nil
+}
+
+// IsMasterWithSlot anonymous function for searching Master Node withslot
+var IsMasterWithSlot = func(n *Node) bool {
+ if (n.GetRole() == v1alpha1.SubmarineClusterNodeRoleMaster) &&
(n.TotalSlots() > 0) {
+ return true
+ }
+ return false
+}
+
+// GetRole return the Submarine Cluster Node GetRole
+func (n *Node) GetRole() v1alpha1.SubmarineClusterNodeRole {
+ switch n.Role {
+ case submarineMasterRole:
+ return v1alpha1.SubmarineClusterNodeRoleMaster
+ case submarineSlaveRole:
+ return v1alpha1.SubmarineClusterNodeRoleSlave
+ default:
+ if n.MasterReferent != "" {
+ return v1alpha1.SubmarineClusterNodeRoleSlave
+ }
+ ///if len(n.Slots) > 0 {
+ /// return v1alpha1.SubmarineClusterNodeRoleMaster
+ ///}
+ }
+
+ return v1alpha1.SubmarineClusterNodeRoleNone
+}
+
+// TotalSlots return the total number of slot
+func (n *Node) TotalSlots() int {
+ return 1 ///len(n.Slots)
+}
+
+// IsSlave anonymous function for searching Slave Node
+var IsSlave = func(n *Node) bool {
+ return n.GetRole() == v1alpha1.SubmarineClusterNodeRoleSlave
+}
+
+// FilterByFunc remove a node from a slice by node ID and returns the slice.
If not found, fail silently. Value must be unique
+func (n Nodes) FilterByFunc(fn func(*Node) bool) Nodes {
+ newSlice := Nodes{}
+ for _, node := range n {
+ if fn(node) {
+ newSlice = append(newSlice, node)
+ }
+ }
+ return newSlice
+}
+
+// IsMasterWithNoSlot anonymous function for searching Master Node with no slot
+var IsMasterWithNoSlot = func(n *Node) bool {
+ if (n.GetRole() == v1alpha1.SubmarineClusterNodeRoleMaster) &&
(n.TotalSlots() == 0) {
+ return true
+ }
+ return false
+}
+
+// By is the type of a "less" function that defines the ordering of its Node
arguments.
+type by func(p1, p2 *Node) bool
+
+// Sort is a method on the function type, By, that sorts the argument slice
according to the function.
+func (b by) Sort(nodes Nodes) {
+ ps := &nodeSorter{
+ nodes: nodes,
+ by: b, // The Sort method's receiver is the function
(closure) that defines the sort order.
+ }
+ sort.Sort(ps)
+}
+
+// SortByFunc returns a new ordered NodeSlice, determined by a func defining
‘less’.
+func (n Nodes) SortByFunc(less func(*Node, *Node) bool) Nodes {
+ result := make(Nodes, len(n))
+ copy(result, n)
+ by(less).Sort(n)
+ return result
+}
+
+// IPPort returns join Ip Port string
+func (n *Node) IPPort() string {
+ return net.JoinHostPort(n.IP, n.Port)
+}
+
+// LessByID compare 2 Nodes with there ID
+func LessByID(n1, n2 *Node) bool {
+ return n1.ID < n2.ID
+}
diff --git a/submarine-cloud/pkg/utils/build.go
b/submarine-cloud/pkg/utils/build.go
index ebcb876..45aa110 100644
--- a/submarine-cloud/pkg/utils/build.go
+++ b/submarine-cloud/pkg/utils/build.go
@@ -23,25 +23,30 @@ import (
// BUILDTIME should be populated by at build time: -ldflags "-w -X
github.com/apache/submarine/submarine-cloud/pkg/utils.BUILDTIME=${DATE}
// with for example DATE=$(shell date +%Y-%m-%d/%H:%M:%S ) (pay attention
not to use space!)
-var BUILDTIME string
+var BuildTime string
-// TAG should be populated by at build time: -ldflags "-w -X
github.com/apache/submarine/submarine-cloud/pkg/utils.TAG=${TAG}
-// with for example TAG=$(shell git tag|tail -1)
-var TAG string
+// BuildGitBranch should be populated by at build time: -ldflags "-w -X
github.com/apache/submarine/submarine-cloud/pkg/utils.TAG=${BuildGitBranch}
+// with for example BuildGitBranch=$(git describe --all)
+var BuildGitBranch string
-// COMMIT should be populated by at build time: -ldflags "-w -X
github.com/apache/submarine/submarine-cloud/pkg/utils.COMMIT=${COMMIT}
-// with for example COMMIT=$(shell git rev-parse HEAD)
-var COMMIT string
+// BuildGitRev should be populated by at build time: -ldflags "-w -X
github.com/apache/submarine/submarine-cloud/pkg/utils.TAG=${BuildGitRev}
+// with for example BuildGitRev=$(git rev-list --count HEAD)
+var BuildGitRev string
+
+// BuildGitCommit should be populated by at build time: -ldflags "-w -X
github.com/apache/submarine/submarine-cloud/pkg/utils.COMMIT=${COMMIT}
+// with for example COMMIT=$(git rev-parse HEAD)
+var BuildGitCommit string
// VERSION should be populated by at build time: -ldflags "-w -X
github.com/apache/submarine/submarine-cloud/pkg/utils.VERSION=${VERSION}
-// with for example VERSION=$(shell git rev-parse --abbrev-ref HEAD)
+// with for example VERSION=$(git rev-parse --abbrev-ref HEAD)
var VERSION string
// BuildInfos returns builds information
func BuildInfos() {
fmt.Println("Program started at: " + time.Now().String())
- fmt.Println("BUILDTIME=" + BUILDTIME)
- fmt.Println("TAG=" + TAG)
- fmt.Println("COMMIT=" + COMMIT)
- fmt.Println("VERSION=" + VERSION)
+ fmt.Println("Build Time : " + BuildTime)
+ fmt.Println("Build Git Branch : " + BuildGitBranch)
+ fmt.Println("Build Git Rev : " + BuildGitRev)
+ fmt.Println("Build Git Commit : " + BuildGitCommit)
+ fmt.Println("Submarine Version : " + VERSION)
}
diff --git a/submarine-cloud/pom.xml b/submarine-cloud/pom.xml
index 9db0fe6..a7ebe45 100644
--- a/submarine-cloud/pom.xml
+++ b/submarine-cloud/pom.xml
@@ -60,7 +60,7 @@
</goals>
<configuration>
<targetOs>${GOOS}</targetOs> <!-- linux|darwin|windows -->
- <resultName>submarine-operator</resultName>
+ <resultName>submarine-operator_${GOOS}</resultName>
<ldFlags>
<flag>-X</flag>
<flag>'github.com/apache/submarine/submarine-cloud/pkg/utils.BUILDTIME=${maven.build.timestamp}'</flag>
diff --git a/submarine-cloud/submarine-operator.md
b/submarine-cloud/submarine-operator.md
index a2e72be..e395ce1 100644
--- a/submarine-cloud/submarine-operator.md
+++ b/submarine-cloud/submarine-operator.md
@@ -30,7 +30,9 @@ Run submarine operator
# create submarine crd
kubectl apply -f ../manifests/crd.yaml
KUBECONFIG=$(kind get kubeconfig-path --name submarine)
-./submarine-operator run config=${KUBECONFIG}
+./submarine-operator --kubeconfig=${KUBECONFIG} --alsologtostderr --v=7
+OR
+./submarine-operator --kubeconfig=$(kind get kubeconfig-path --name submarine)
--alsologtostderr --v=7
```
## Submarine operator implementation steps
@@ -159,3 +161,10 @@ kubectl apply -f test1.yaml
kubectl describe std test1
```
+### Kind
+
+```
+kind load docker-image --name=submarine busybox:1.28.4
+kind load docker-image --name=submarine apache/submarine:server-0.3.0-SNAPSHOT
+kind load docker-image --name=submarine
apache/submarine:database-0.3.0-SNAPSHOT
+```
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]