lburgazzoli closed pull request #56: Add a wait option to kamel run
URL: https://github.com/apache/camel-k/pull/56
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go
index 3328370..ff56647 100644
--- a/cmd/kamel/kamel.go
+++ b/cmd/kamel/kamel.go
@@ -22,10 +22,12 @@ import (
        "github.com/apache/camel-k/pkg/client/cmd"
        _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
        "os"
+       "context"
 )
 
 func main() {
-       rootCmd, err := cmd.NewKamelCommand()
+       ctx := context.Background()
+       rootCmd, err := cmd.NewKamelCommand(ctx)
        exitOnError(err)
 
        err = rootCmd.Execute()
diff --git a/deploy/operator.yaml b/deploy/operator.yaml
index a2c4abc..863b21c 100644
--- a/deploy/operator.yaml
+++ b/deploy/operator.yaml
@@ -15,6 +15,9 @@ spec:
       containers:
         - name: camel-k-operator
           image: docker.io/apache/camel-k:0.0.1-SNAPSHOT
+          ports:
+          - containerPort: 60000
+            name: metrics
           command:
           - camel-k-operator
           imagePullPolicy: Always
@@ -23,3 +26,5 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: metadata.namespace
+            - name: OPERATOR_NAME
+              value: "camel-k-operator"
diff --git a/pkg/client/cmd/root.go b/pkg/client/cmd/root.go
index e0e2a08..50cfb40 100644
--- a/pkg/client/cmd/root.go
+++ b/pkg/client/cmd/root.go
@@ -23,15 +23,19 @@ import (
        "github.com/apache/camel-k/pkg/util/kubernetes"
        "github.com/pkg/errors"
        "github.com/spf13/cobra"
+       "context"
 )
 
 type RootCmdOptions struct {
+       Context    context.Context
        KubeConfig string
        Namespace  string
 }
 
-func NewKamelCommand() (*cobra.Command, error) {
-       options := RootCmdOptions{}
+func NewKamelCommand(ctx context.Context) (*cobra.Command, error) {
+       options := RootCmdOptions{
+               Context: ctx,
+       }
        var cmd = cobra.Command{
                Use:   "kamel",
                Short: "Kamel is a awesome client tool for running Apache Camel 
integrations natively on Kubernetes",
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index d64a653..cee146e 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -31,6 +31,7 @@ import (
        "github.com/spf13/cobra"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/apache/camel-k/pkg/util/watch"
 )
 
 type RunCmdOptions struct {
@@ -38,6 +39,7 @@ type RunCmdOptions struct {
        Language        string
        IntegrationName string
        Dependencies    []string
+       Wait            bool
 }
 
 func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
@@ -54,8 +56,10 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) 
*cobra.Command {
        }
 
        cmd.Flags().StringVarP(&options.Language, "language", "l", "", 
"Programming Language used to write the file")
-       cmd.Flags().StringVarP(&options.IntegrationName, "name", "", "", "The 
integration name")
+       cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The 
integration name")
        cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", 
nil, "The integration dependency")
+       cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the 
integration to be running")
+       cmd.ParseFlags(os.Args)
 
        return &cmd
 }
@@ -74,10 +78,62 @@ func (*RunCmdOptions) validateArgs(cmd *cobra.Command, args 
[]string) error {
 }
 
 func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
-       code, err := o.loadCode(args[0])
+       integration, err := o.createIntegration(cmd, args)
        if err != nil {
                return err
        }
+       if o.Wait {
+               err = o.waitForIntegrationReady(integration)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (o *RunCmdOptions) waitForIntegrationReady(integration 
*v1alpha1.Integration) error {
+       // Block this goroutine until the integration is in a final status
+       changes, err := watch.WatchStateChanges(o.Context, integration)
+       if err != nil {
+               return err
+       }
+
+       var lastStatusSeen *v1alpha1.IntegrationStatus
+
+watcher:
+       for {
+               select {
+               case <-o.Context.Done():
+                       return nil
+               case i, ok := <-changes:
+                       if !ok {
+                               break watcher
+                       }
+                       lastStatusSeen = &i.Status
+                       phase := string(i.Status.Phase)
+                       if phase != "" {
+                               fmt.Println("integration 
\""+integration.Name+"\" in phase", phase)
+                               // TODO when we add health checks, we should 
wait until they are passed
+                               if i.Status.Phase == 
v1alpha1.IntegrationPhaseRunning || i.Status.Phase == 
v1alpha1.IntegrationPhaseError {
+                                       // TODO display some error info when 
available in the status
+                                       break watcher
+                               }
+                       }
+               }
+       }
+
+       // TODO we may not be able to reach this state, since the build will be 
done without sources (until we add health checks)
+       if lastStatusSeen != nil && lastStatusSeen.Phase == 
v1alpha1.IntegrationPhaseError {
+               return errors.New("integration deployment failed")
+       }
+       return nil
+}
+
+func (o *RunCmdOptions) createIntegration(cmd *cobra.Command, args []string) 
(*v1alpha1.Integration, error) {
+       code, err := o.loadCode(args[0])
+       if err != nil {
+               return nil, err
+       }
 
        namespace := o.Namespace
 
@@ -124,14 +180,14 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
                clone := integration.DeepCopy()
                err = sdk.Get(clone)
                if err != nil {
-                       return err
+                       return nil, err
                }
                integration.ResourceVersion = clone.ResourceVersion
                err = sdk.Update(&integration)
        }
 
        if err != nil {
-               return err
+               return nil, err
        }
 
        if !existed {
@@ -139,7 +195,7 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
        } else {
                fmt.Printf("integration \"%s\" updated\n", name)
        }
-       return nil
+       return &integration, nil
 }
 
 func (*RunCmdOptions) loadCode(fileName string) (string, error) {
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
new file mode 100644
index 0000000..3ec0bda
--- /dev/null
+++ b/pkg/util/watch/watch.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+       "context"
+       "github.com/operator-framework/operator-sdk/pkg/k8sclient"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "github.com/sirupsen/logrus"
+)
+
+// Watches a integration resource and send it through a channel when its 
status changes
+func WatchStateChanges(ctx context.Context, integration *v1alpha1.Integration) 
(<-chan *v1alpha1.Integration, error) {
+       resourceClient, _, err := 
k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, 
integration.Namespace)
+       if err != nil {
+               return nil, err
+       }
+       watcher, err := resourceClient.Watch(metav1.ListOptions{
+               FieldSelector: "metadata.name=" + integration.Name,
+       })
+       if err != nil {
+               return nil, err
+       }
+       events := watcher.ResultChan()
+
+       out := make(chan *v1alpha1.Integration)
+       var lastObservedState *v1alpha1.IntegrationPhase
+
+       go func() {
+               defer close(out)
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case e, ok := <-events:
+                               if !ok {
+                                       return
+                               }
+
+                               if e.Object != nil {
+                                       if runtimeUnstructured, ok := 
e.Object.(runtime.Unstructured); ok {
+                                               unstr := 
unstructured.Unstructured{
+                                                       Object: 
runtimeUnstructured.UnstructuredContent(),
+                                               }
+                                               icopy := integration.DeepCopy()
+                                               err := 
k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy)
+                                               if err != nil {
+                                                       
logrus.Error("Unexpected error detected when watching resource", err)
+                                                       return // closes the 
channel
+                                               }
+
+                                               if lastObservedState == nil || 
*lastObservedState != icopy.Status.Phase {
+                                                       lastObservedState = 
&icopy.Status.Phase
+                                                       out <- icopy
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }()
+
+       return out, nil
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to