This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit c2840db015a2c449b5cfff4e1beceab1ddeb3c10 Author: nferraro <ni.ferr...@gmail.com> AuthorDate: Wed Sep 12 18:32:54 2018 +0200 Add a flag to wait until the integration is running --- cmd/kamel/kamel.go | 4 ++- pkg/client/cmd/root.go | 8 +++-- pkg/client/cmd/run.go | 66 ++++++++++++++++++++++++++++++++++++--- pkg/util/watch/watch.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 152 insertions(+), 8 deletions(-) diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go index 3328370..ff56647 100644 --- a/cmd/kamel/kamel.go +++ b/cmd/kamel/kamel.go @@ -22,10 +22,12 @@ import ( "github.com/apache/camel-k/pkg/client/cmd" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "os" + "context" ) func main() { - rootCmd, err := cmd.NewKamelCommand() + ctx := context.Background() + rootCmd, err := cmd.NewKamelCommand(ctx) exitOnError(err) err = rootCmd.Execute() diff --git a/pkg/client/cmd/root.go b/pkg/client/cmd/root.go index e0e2a08..50cfb40 100644 --- a/pkg/client/cmd/root.go +++ b/pkg/client/cmd/root.go @@ -23,15 +23,19 @@ import ( "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/pkg/errors" "github.com/spf13/cobra" + "context" ) type RootCmdOptions struct { + Context context.Context KubeConfig string Namespace string } -func NewKamelCommand() (*cobra.Command, error) { - options := RootCmdOptions{} +func NewKamelCommand(ctx context.Context) (*cobra.Command, error) { + options := RootCmdOptions{ + Context: ctx, + } var cmd = cobra.Command{ Use: "kamel", Short: "Kamel is a awesome client tool for running Apache Camel integrations natively on Kubernetes", diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go index d64a653..cee146e 100644 --- a/pkg/client/cmd/run.go +++ b/pkg/client/cmd/run.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/cobra" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/apache/camel-k/pkg/util/watch" ) type RunCmdOptions struct { @@ -38,6 +39,7 @@ type RunCmdOptions struct { Language string IntegrationName string Dependencies []string + Wait bool } func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command { @@ -54,8 +56,10 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command { } cmd.Flags().StringVarP(&options.Language, "language", "l", "", "Programming Language used to write the file") - cmd.Flags().StringVarP(&options.IntegrationName, "name", "", "", "The integration name") + cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The integration name") cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", nil, "The integration dependency") + cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the integration to be running") + cmd.ParseFlags(os.Args) return &cmd } @@ -74,10 +78,62 @@ func (*RunCmdOptions) validateArgs(cmd *cobra.Command, args []string) error { } func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error { - code, err := o.loadCode(args[0]) + integration, err := o.createIntegration(cmd, args) if err != nil { return err } + if o.Wait { + err = o.waitForIntegrationReady(integration) + if err != nil { + return err + } + } + return nil +} + +func (o *RunCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integration) error { + // Block this goroutine until the integration is in a final status + changes, err := watch.WatchStateChanges(o.Context, integration) + if err != nil { + return err + } + + var lastStatusSeen *v1alpha1.IntegrationStatus + +watcher: + for { + select { + case <-o.Context.Done(): + return nil + case i, ok := <-changes: + if !ok { + break watcher + } + lastStatusSeen = &i.Status + phase := string(i.Status.Phase) + if phase != "" { + fmt.Println("integration \""+integration.Name+"\" in phase", phase) + // TODO when we add health checks, we should wait until they are passed + if i.Status.Phase == v1alpha1.IntegrationPhaseRunning || i.Status.Phase == v1alpha1.IntegrationPhaseError { + // TODO display some error info when available in the status + break watcher + } + } + } + } + + // TODO we may not be able to reach this state, since the build will be done without sources (until we add health checks) + if lastStatusSeen != nil && lastStatusSeen.Phase == v1alpha1.IntegrationPhaseError { + return errors.New("integration deployment failed") + } + return nil +} + +func (o *RunCmdOptions) createIntegration(cmd *cobra.Command, args []string) (*v1alpha1.Integration, error) { + code, err := o.loadCode(args[0]) + if err != nil { + return nil, err + } namespace := o.Namespace @@ -124,14 +180,14 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error { clone := integration.DeepCopy() err = sdk.Get(clone) if err != nil { - return err + return nil, err } integration.ResourceVersion = clone.ResourceVersion err = sdk.Update(&integration) } if err != nil { - return err + return nil, err } if !existed { @@ -139,7 +195,7 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error { } else { fmt.Printf("integration \"%s\" updated\n", name) } - return nil + return &integration, nil } func (*RunCmdOptions) loadCode(fileName string) (string, error) { diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go new file mode 100644 index 0000000..3ec0bda --- /dev/null +++ b/pkg/util/watch/watch.go @@ -0,0 +1,82 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "context" + "github.com/operator-framework/operator-sdk/pkg/k8sclient" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "github.com/operator-framework/operator-sdk/pkg/util/k8sutil" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "github.com/sirupsen/logrus" +) + +// Watches a integration resource and send it through a channel when its status changes +func WatchStateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-chan *v1alpha1.Integration, error) { + resourceClient, _, err := k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, integration.Namespace) + if err != nil { + return nil, err + } + watcher, err := resourceClient.Watch(metav1.ListOptions{ + FieldSelector: "metadata.name=" + integration.Name, + }) + if err != nil { + return nil, err + } + events := watcher.ResultChan() + + out := make(chan *v1alpha1.Integration) + var lastObservedState *v1alpha1.IntegrationPhase + + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + case e, ok := <-events: + if !ok { + return + } + + if e.Object != nil { + if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { + unstr := unstructured.Unstructured{ + Object: runtimeUnstructured.UnstructuredContent(), + } + icopy := integration.DeepCopy() + err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy) + if err != nil { + logrus.Error("Unexpected error detected when watching resource", err) + return // closes the channel + } + + if lastObservedState == nil || *lastObservedState != icopy.Status.Phase { + lastObservedState = &icopy.Status.Phase + out <- icopy + } + } + } + } + } + }() + + return out, nil +}