This is an automated email from the ASF dual-hosted git repository.
kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 4fc6299 SUBMARINE-791. Read Submaine CRD spec in controller
4fc6299 is described below
commit 4fc62996842a6ed727007b3e878e9d8d554e9991
Author: Chi-Sheng Liu <[email protected]>
AuthorDate: Fri Apr 16 16:54:03 2021 +0800
SUBMARINE-791. Read Submaine CRD spec in controller
### What is this PR for?
After defining the spec for Submarine CRD, we need to read the spec in our
controller.
### What type of PR is it?
[Feature]
### Todos
* [x] - Modify types.go to meet the CRD spec.
* [x] - Modify controller.go such that it can read the CRD spec.
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-791
### How should this be tested?
https://travis-ci.org/github/MortalHappiness/submarine/builds/766867816
### Screenshots (if appropriate)

### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
---
submarine-cloud-v2/Makefile | 11 ++
submarine-cloud-v2/controller.go | 101 +++++++++------
submarine-cloud-v2/pkg/submarine/v1alpha1/types.go | 38 +++++-
.../submarine/v1alpha1/zz_generated.deepcopy.go | 135 +++++++++++++++++++++
4 files changed, 246 insertions(+), 39 deletions(-)
diff --git a/submarine-cloud-v2/Makefile b/submarine-cloud-v2/Makefile
index 7dac348..139df75 100644
--- a/submarine-cloud-v2/Makefile
+++ b/submarine-cloud-v2/Makefile
@@ -15,6 +15,17 @@
# limitations under the License.
#
+all: # The default target
+
+.PHONY: all
+all:
+ go build -o submarine-operator
+
+.PHONY: api
+api:
+ @cd hack; echo "Generating API..."; ./update-codegen.sh; \
+ echo "Verifying API..."; ./verify-codegen.sh
+
.PHONY: image
image:
GOOS=linux go build -o submarine-operator
diff --git a/submarine-cloud-v2/controller.go b/submarine-cloud-v2/controller.go
index ed561bc..30d686a 100644
--- a/submarine-cloud-v2/controller.go
+++ b/submarine-cloud-v2/controller.go
@@ -1,40 +1,43 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package main
import (
+ "encoding/json"
"fmt"
- "time"
corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
+ appsinformers "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
- appsinformers "k8s.io/client-go/informers/apps/v1"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
- informers
"submarine-cloud-v2/pkg/generated/informers/externalversions/submarine/v1alpha1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/klog/v2"
- "k8s.io/client-go/util/workqueue"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
- "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/klog/v2"
+ clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
submarinescheme
"submarine-cloud-v2/pkg/generated/clientset/versioned/scheme"
"submarine-cloud-v2/pkg/helm"
+ informers
"submarine-cloud-v2/pkg/generated/informers/externalversions/submarine/v1alpha1"
+ listers "submarine-cloud-v2/pkg/generated/listers/submarine/v1alpha1"
+ "time"
)
const controllerAgentName = "submarine-controller"
@@ -45,8 +48,10 @@ type Controller struct {
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
submarineclientset clientset.Interface
-
- submarinesSynced cache.InformerSynced
+
+ submarinesLister listers.SubmarineLister
+ submarinesSynced cache.InformerSynced
+
// workqueue is a rate limited work queue. This is used to queue work
to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
@@ -75,14 +80,14 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface:
kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme,
corev1.EventSource{Component: controllerAgentName})
-
// Initialize controller
controller := &Controller{
- kubeclientset: kubeclientset,
- submarineclientset: submarineclientset,
- submarinesSynced: submarineInformer.Informer().HasSynced,
- workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"Submarines"),
- recorder: recorder,
+ kubeclientset: kubeclientset,
+ submarineclientset: submarineclientset,
+ submarinesLister: submarineInformer.Lister(),
+ submarinesSynced: submarineInformer.Informer().HasSynced,
+ workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"Submarines"),
+ recorder: recorder,
}
// Setting up event handler for Submarine
@@ -117,7 +122,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan
struct{}) error {
// helm repo add k8s-as-helm
https://ameijer.github.io/k8s-as-helm/
// . helm repo update
// helm install helm-install-example-release k8s-as-helm/svc --set
ports[0].protocol=TCP,ports[0].port=80,ports[0].targetPort=9376
- // Useful Links:
+ // Useful Links:
// (1) https://github.com/PrasadG193/helm-clientgo-example
// . (2) https://github.com/ameijer/k8s-as-helm/tree/master/charts/svc
klog.Info("[Helm example] Install")
@@ -129,7 +134,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan
struct{}) error {
"default",
map[string]string {
"set":
"ports[0].protocol=TCP,ports[0].port=80,ports[0].targetPort=9376",
- },
+ },
)
klog.Info("[Helm example] Sleep 60 seconds")
@@ -138,7 +143,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan
struct{}) error {
klog.Info("[Helm example] Uninstall")
helm.HelmUninstall("helm-install-example-release", helmActionConfig)
-
+
klog.Info("Starting workers")
// Launch two workers to process Submarine resources
@@ -193,7 +198,31 @@ func (c *Controller) processNextWorkItem() bool {
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// TODO: business logic
+
+ // Convert the namespace/name string into a distinct namespace and name
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("Invalid resource key: %s",
key))
+ return nil
+ }
+
+ // Get the Submarine resource with this namespace/name
+ submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+ if err != nil {
+ // The Submarine resource may no longer exist, in which case we
stop
+ // processing
+ if errors.IsNotFound(err) {
+ utilruntime.HandleError(fmt.Errorf("submarine '%s' in
work queue no longer exists", key))
+ return nil
+ }
+ }
+
klog.Info("syncHandler: ", key)
+
+ // Print out the spec of the Submarine resource
+ b, err := json.MarshalIndent(submarine.Spec, "", " ")
+ fmt.Println(string(b))
+
return nil
}
@@ -209,6 +238,6 @@ func (c *Controller) enqueueSubmarine(obj interface{}) {
}
// key: [namespace]/[CR name]
- // Example: default/example-submarine
+ // Example: default/example-submarine
c.workqueue.Add(key)
}
diff --git a/submarine-cloud-v2/pkg/submarine/v1alpha1/types.go
b/submarine-cloud-v2/pkg/submarine/v1alpha1/types.go
index 97abe45..a73830d 100644
--- a/submarine-cloud-v2/pkg/submarine/v1alpha1/types.go
+++ b/submarine-cloud-v2/pkg/submarine/v1alpha1/types.go
@@ -32,10 +32,43 @@ type Submarine struct {
Status SubmarineStatus `json:"status"`
}
+type SubmarineServer struct {
+ Image string `json:"image"`
+ Replicas *int32 `json:"replicas"`
+}
+
+type SubmarineDatabase struct {
+ Image string `json:"image"`
+ Replicas *int32 `json:"replicas"`
+ MysqlRootPasswordSecret string `json:"mysqlRootPasswordSecret"`
+}
+
+type SubmarineTensorboard struct {
+ Enabled *bool `json:"enabled"`
+ StorageSizeGiB *int32 `json:"storageSizeGiB"`
+}
+
+type SubmarineMlflow struct {
+ Enabled *bool `json:"enabled"`
+ StorageSizeGiB *int32 `json:"storageSizeGiB"`
+}
+
+type SubmarineStorage struct {
+ StorageType string `json:"storageType"`
+ HostPath string `json:"hostPath"`
+ NfsPath string `json:"nfsPath"`
+ NfsIP string `json:"nfsIP"`
+}
+
// SubmarineSpec is the spec for a Submarine resource
type SubmarineSpec struct {
- DeploymentName string `json:"deploymentName"`
- Replicas *int32 `json:"replicas"`
+ Version string `json:"version"`
+ Replicas *int32 `json:"replicas"`
+ Server *SubmarineServer `json:"server"`
+ Database *SubmarineDatabase `json:"database"`
+ Tensorboard *SubmarineTensorboard `json:"tensorboard"`
+ Mlflow *SubmarineMlflow `json:"mlflow"`
+ Storage *SubmarineStorage `json:"storage"`
}
// SubmarineStatus is the status for a Submarine resource
@@ -52,4 +85,3 @@ type SubmarineList struct {
Items []Submarine `json:"items"`
}
-
diff --git a/submarine-cloud-v2/pkg/submarine/v1alpha1/zz_generated.deepcopy.go
b/submarine-cloud-v2/pkg/submarine/v1alpha1/zz_generated.deepcopy.go
index b68244d..8d21593 100644
--- a/submarine-cloud-v2/pkg/submarine/v1alpha1/zz_generated.deepcopy.go
+++ b/submarine-cloud-v2/pkg/submarine/v1alpha1/zz_generated.deepcopy.go
@@ -54,6 +54,27 @@ func (in *Submarine) DeepCopyObject() runtime.Object {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *SubmarineDatabase) DeepCopyInto(out *SubmarineDatabase) {
+ *out = *in
+ if in.Replicas != nil {
+ in, out := &in.Replicas, &out.Replicas
+ *out = new(int32)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SubmarineDatabase.
+func (in *SubmarineDatabase) DeepCopy() *SubmarineDatabase {
+ if in == nil {
+ return nil
+ }
+ out := new(SubmarineDatabase)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *SubmarineList) DeepCopyInto(out *SubmarineList) {
*out = *in
out.TypeMeta = in.TypeMeta
@@ -87,6 +108,53 @@ func (in *SubmarineList) DeepCopyObject() runtime.Object {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *SubmarineMlflow) DeepCopyInto(out *SubmarineMlflow) {
+ *out = *in
+ if in.Enabled != nil {
+ in, out := &in.Enabled, &out.Enabled
+ *out = new(bool)
+ **out = **in
+ }
+ if in.StorageSizeGiB != nil {
+ in, out := &in.StorageSizeGiB, &out.StorageSizeGiB
+ *out = new(int32)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SubmarineMlflow.
+func (in *SubmarineMlflow) DeepCopy() *SubmarineMlflow {
+ if in == nil {
+ return nil
+ }
+ out := new(SubmarineMlflow)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *SubmarineServer) DeepCopyInto(out *SubmarineServer) {
+ *out = *in
+ if in.Replicas != nil {
+ in, out := &in.Replicas, &out.Replicas
+ *out = new(int32)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SubmarineServer.
+func (in *SubmarineServer) DeepCopy() *SubmarineServer {
+ if in == nil {
+ return nil
+ }
+ out := new(SubmarineServer)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *SubmarineSpec) DeepCopyInto(out *SubmarineSpec) {
*out = *in
if in.Replicas != nil {
@@ -94,6 +162,31 @@ func (in *SubmarineSpec) DeepCopyInto(out *SubmarineSpec) {
*out = new(int32)
**out = **in
}
+ if in.Server != nil {
+ in, out := &in.Server, &out.Server
+ *out = new(SubmarineServer)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Database != nil {
+ in, out := &in.Database, &out.Database
+ *out = new(SubmarineDatabase)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Tensorboard != nil {
+ in, out := &in.Tensorboard, &out.Tensorboard
+ *out = new(SubmarineTensorboard)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Mlflow != nil {
+ in, out := &in.Mlflow, &out.Mlflow
+ *out = new(SubmarineMlflow)
+ (*in).DeepCopyInto(*out)
+ }
+ if in.Storage != nil {
+ in, out := &in.Storage, &out.Storage
+ *out = new(SubmarineStorage)
+ **out = **in
+ }
return
}
@@ -122,3 +215,45 @@ func (in *SubmarineStatus) DeepCopy() *SubmarineStatus {
in.DeepCopyInto(out)
return out
}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *SubmarineStorage) DeepCopyInto(out *SubmarineStorage) {
+ *out = *in
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SubmarineStorage.
+func (in *SubmarineStorage) DeepCopy() *SubmarineStorage {
+ if in == nil {
+ return nil
+ }
+ out := new(SubmarineStorage)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *SubmarineTensorboard) DeepCopyInto(out *SubmarineTensorboard) {
+ *out = *in
+ if in.Enabled != nil {
+ in, out := &in.Enabled, &out.Enabled
+ *out = new(bool)
+ **out = **in
+ }
+ if in.StorageSizeGiB != nil {
+ in, out := &in.StorageSizeGiB, &out.StorageSizeGiB
+ *out = new(int32)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SubmarineTensorboard.
+func (in *SubmarineTensorboard) DeepCopy() *SubmarineTensorboard {
+ if in == nil {
+ return nil
+ }
+ out := new(SubmarineTensorboard)
+ in.DeepCopyInto(out)
+ return out
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]