This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit c2840db015a2c449b5cfff4e1beceab1ddeb3c10
Author: nferraro <ni.ferr...@gmail.com>
AuthorDate: Wed Sep 12 18:32:54 2018 +0200

    Add a flag to wait until the integration is running
---
 cmd/kamel/kamel.go      |  4 ++-
 pkg/client/cmd/root.go  |  8 +++--
 pkg/client/cmd/run.go   | 66 ++++++++++++++++++++++++++++++++++++---
 pkg/util/watch/watch.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 152 insertions(+), 8 deletions(-)

diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go
index 3328370..ff56647 100644
--- a/cmd/kamel/kamel.go
+++ b/cmd/kamel/kamel.go
@@ -22,10 +22,12 @@ import (
        "github.com/apache/camel-k/pkg/client/cmd"
        _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
        "os"
+       "context"
 )
 
 func main() {
-       rootCmd, err := cmd.NewKamelCommand()
+       ctx := context.Background()
+       rootCmd, err := cmd.NewKamelCommand(ctx)
        exitOnError(err)
 
        err = rootCmd.Execute()
diff --git a/pkg/client/cmd/root.go b/pkg/client/cmd/root.go
index e0e2a08..50cfb40 100644
--- a/pkg/client/cmd/root.go
+++ b/pkg/client/cmd/root.go
@@ -23,15 +23,19 @@ import (
        "github.com/apache/camel-k/pkg/util/kubernetes"
        "github.com/pkg/errors"
        "github.com/spf13/cobra"
+       "context"
 )
 
 type RootCmdOptions struct {
+       Context    context.Context
        KubeConfig string
        Namespace  string
 }
 
-func NewKamelCommand() (*cobra.Command, error) {
-       options := RootCmdOptions{}
+func NewKamelCommand(ctx context.Context) (*cobra.Command, error) {
+       options := RootCmdOptions{
+               Context: ctx,
+       }
        var cmd = cobra.Command{
                Use:   "kamel",
                Short: "Kamel is a awesome client tool for running Apache Camel 
integrations natively on Kubernetes",
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index d64a653..cee146e 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -31,6 +31,7 @@ import (
        "github.com/spf13/cobra"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/apache/camel-k/pkg/util/watch"
 )
 
 type RunCmdOptions struct {
@@ -38,6 +39,7 @@ type RunCmdOptions struct {
        Language        string
        IntegrationName string
        Dependencies    []string
+       Wait            bool
 }
 
 func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
@@ -54,8 +56,10 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) 
*cobra.Command {
        }
 
        cmd.Flags().StringVarP(&options.Language, "language", "l", "", 
"Programming Language used to write the file")
-       cmd.Flags().StringVarP(&options.IntegrationName, "name", "", "", "The 
integration name")
+       cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The 
integration name")
        cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", 
nil, "The integration dependency")
+       cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the 
integration to be running")
+       cmd.ParseFlags(os.Args)
 
        return &cmd
 }
@@ -74,10 +78,62 @@ func (*RunCmdOptions) validateArgs(cmd *cobra.Command, args 
[]string) error {
 }
 
 func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error {
-       code, err := o.loadCode(args[0])
+       integration, err := o.createIntegration(cmd, args)
        if err != nil {
                return err
        }
+       if o.Wait {
+               err = o.waitForIntegrationReady(integration)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (o *RunCmdOptions) waitForIntegrationReady(integration 
*v1alpha1.Integration) error {
+       // Block this goroutine until the integration is in a final status
+       changes, err := watch.WatchStateChanges(o.Context, integration)
+       if err != nil {
+               return err
+       }
+
+       var lastStatusSeen *v1alpha1.IntegrationStatus
+
+watcher:
+       for {
+               select {
+               case <-o.Context.Done():
+                       return nil
+               case i, ok := <-changes:
+                       if !ok {
+                               break watcher
+                       }
+                       lastStatusSeen = &i.Status
+                       phase := string(i.Status.Phase)
+                       if phase != "" {
+                               fmt.Println("integration 
\""+integration.Name+"\" in phase", phase)
+                               // TODO when we add health checks, we should 
wait until they are passed
+                               if i.Status.Phase == 
v1alpha1.IntegrationPhaseRunning || i.Status.Phase == 
v1alpha1.IntegrationPhaseError {
+                                       // TODO display some error info when 
available in the status
+                                       break watcher
+                               }
+                       }
+               }
+       }
+
+       // TODO we may not be able to reach this state, since the build will be 
done without sources (until we add health checks)
+       if lastStatusSeen != nil && lastStatusSeen.Phase == 
v1alpha1.IntegrationPhaseError {
+               return errors.New("integration deployment failed")
+       }
+       return nil
+}
+
+func (o *RunCmdOptions) createIntegration(cmd *cobra.Command, args []string) 
(*v1alpha1.Integration, error) {
+       code, err := o.loadCode(args[0])
+       if err != nil {
+               return nil, err
+       }
 
        namespace := o.Namespace
 
@@ -124,14 +180,14 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
                clone := integration.DeepCopy()
                err = sdk.Get(clone)
                if err != nil {
-                       return err
+                       return nil, err
                }
                integration.ResourceVersion = clone.ResourceVersion
                err = sdk.Update(&integration)
        }
 
        if err != nil {
-               return err
+               return nil, err
        }
 
        if !existed {
@@ -139,7 +195,7 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
        } else {
                fmt.Printf("integration \"%s\" updated\n", name)
        }
-       return nil
+       return &integration, nil
 }
 
 func (*RunCmdOptions) loadCode(fileName string) (string, error) {
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
new file mode 100644
index 0000000..3ec0bda
--- /dev/null
+++ b/pkg/util/watch/watch.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package watch
+
+import (
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+       "context"
+       "github.com/operator-framework/operator-sdk/pkg/k8sclient"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "github.com/sirupsen/logrus"
+)
+
+// Watches a integration resource and send it through a channel when its 
status changes
+func WatchStateChanges(ctx context.Context, integration *v1alpha1.Integration) 
(<-chan *v1alpha1.Integration, error) {
+       resourceClient, _, err := 
k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, 
integration.Namespace)
+       if err != nil {
+               return nil, err
+       }
+       watcher, err := resourceClient.Watch(metav1.ListOptions{
+               FieldSelector: "metadata.name=" + integration.Name,
+       })
+       if err != nil {
+               return nil, err
+       }
+       events := watcher.ResultChan()
+
+       out := make(chan *v1alpha1.Integration)
+       var lastObservedState *v1alpha1.IntegrationPhase
+
+       go func() {
+               defer close(out)
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case e, ok := <-events:
+                               if !ok {
+                                       return
+                               }
+
+                               if e.Object != nil {
+                                       if runtimeUnstructured, ok := 
e.Object.(runtime.Unstructured); ok {
+                                               unstr := 
unstructured.Unstructured{
+                                                       Object: 
runtimeUnstructured.UnstructuredContent(),
+                                               }
+                                               icopy := integration.DeepCopy()
+                                               err := 
k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy)
+                                               if err != nil {
+                                                       
logrus.Error("Unexpected error detected when watching resource", err)
+                                                       return // closes the 
channel
+                                               }
+
+                                               if lastObservedState == nil || 
*lastObservedState != icopy.Status.Phase {
+                                                       lastObservedState = 
&icopy.Status.Phase
+                                                       out <- icopy
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }()
+
+       return out, nil
+}

Reply via email to