lburgazzoli closed pull request #56: Add a wait option to kamel run URL: https://github.com/apache/camel-k/pull/56
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go index 3328370..ff56647 100644 --- a/cmd/kamel/kamel.go +++ b/cmd/kamel/kamel.go @@ -22,10 +22,12 @@ import ( "github.com/apache/camel-k/pkg/client/cmd" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "os" + "context" ) func main() { - rootCmd, err := cmd.NewKamelCommand() + ctx := context.Background() + rootCmd, err := cmd.NewKamelCommand(ctx) exitOnError(err) err = rootCmd.Execute() diff --git a/deploy/operator.yaml b/deploy/operator.yaml index a2c4abc..863b21c 100644 --- a/deploy/operator.yaml +++ b/deploy/operator.yaml @@ -15,6 +15,9 @@ spec: containers: - name: camel-k-operator image: docker.io/apache/camel-k:0.0.1-SNAPSHOT + ports: + - containerPort: 60000 + name: metrics command: - camel-k-operator imagePullPolicy: Always @@ -23,3 +26,5 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: OPERATOR_NAME + value: "camel-k-operator" diff --git a/pkg/client/cmd/root.go b/pkg/client/cmd/root.go index e0e2a08..50cfb40 100644 --- a/pkg/client/cmd/root.go +++ b/pkg/client/cmd/root.go @@ -23,15 +23,19 @@ import ( "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/pkg/errors" "github.com/spf13/cobra" + "context" ) type RootCmdOptions struct { + Context context.Context KubeConfig string Namespace string } -func NewKamelCommand() (*cobra.Command, error) { - options := RootCmdOptions{} +func NewKamelCommand(ctx context.Context) (*cobra.Command, error) { + options := RootCmdOptions{ + Context: ctx, + } var cmd = cobra.Command{ Use: "kamel", Short: "Kamel is a awesome client tool for running Apache Camel integrations natively on Kubernetes", diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go index d64a653..cee146e 100644 --- a/pkg/client/cmd/run.go +++ b/pkg/client/cmd/run.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/cobra" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/apache/camel-k/pkg/util/watch" ) type RunCmdOptions struct { @@ -38,6 +39,7 @@ type RunCmdOptions struct { Language string IntegrationName string Dependencies []string + Wait bool } func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command { @@ -54,8 +56,10 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command { } cmd.Flags().StringVarP(&options.Language, "language", "l", "", "Programming Language used to write the file") - cmd.Flags().StringVarP(&options.IntegrationName, "name", "", "", "The integration name") + cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The integration name") cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", nil, "The integration dependency") + cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the integration to be running") + cmd.ParseFlags(os.Args) return &cmd } @@ -74,10 +78,62 @@ func (*RunCmdOptions) validateArgs(cmd *cobra.Command, args []string) error { } func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error { - code, err := o.loadCode(args[0]) + integration, err := o.createIntegration(cmd, args) if err != nil { return err } + if o.Wait { + err = o.waitForIntegrationReady(integration) + if err != nil { + return err + } + } + return nil +} + +func (o *RunCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integration) error { + // Block this goroutine until the integration is in a final status + changes, err := watch.WatchStateChanges(o.Context, integration) + if err != nil { + return err + } + + var lastStatusSeen *v1alpha1.IntegrationStatus + +watcher: + for { + select { + case <-o.Context.Done(): + return nil + case i, ok := <-changes: + if !ok { + break watcher + } + lastStatusSeen = &i.Status + phase := string(i.Status.Phase) + if phase != "" { + fmt.Println("integration \""+integration.Name+"\" in phase", phase) + // TODO when we add health checks, we should wait until they are passed + if i.Status.Phase == v1alpha1.IntegrationPhaseRunning || i.Status.Phase == v1alpha1.IntegrationPhaseError { + // TODO display some error info when available in the status + break watcher + } + } + } + } + + // TODO we may not be able to reach this state, since the build will be done without sources (until we add health checks) + if lastStatusSeen != nil && lastStatusSeen.Phase == v1alpha1.IntegrationPhaseError { + return errors.New("integration deployment failed") + } + return nil +} + +func (o *RunCmdOptions) createIntegration(cmd *cobra.Command, args []string) (*v1alpha1.Integration, error) { + code, err := o.loadCode(args[0]) + if err != nil { + return nil, err + } namespace := o.Namespace @@ -124,14 +180,14 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error { clone := integration.DeepCopy() err = sdk.Get(clone) if err != nil { - return err + return nil, err } integration.ResourceVersion = clone.ResourceVersion err = sdk.Update(&integration) } if err != nil { - return err + return nil, err } if !existed { @@ -139,7 +195,7 @@ func (o *RunCmdOptions) run(cmd *cobra.Command, args []string) error { } else { fmt.Printf("integration \"%s\" updated\n", name) } - return nil + return &integration, nil } func (*RunCmdOptions) loadCode(fileName string) (string, error) { diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go new file mode 100644 index 0000000..3ec0bda --- /dev/null +++ b/pkg/util/watch/watch.go @@ -0,0 +1,82 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "context" + "github.com/operator-framework/operator-sdk/pkg/k8sclient" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "github.com/operator-framework/operator-sdk/pkg/util/k8sutil" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "github.com/sirupsen/logrus" +) + +// Watches a integration resource and send it through a channel when its status changes +func WatchStateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-chan *v1alpha1.Integration, error) { + resourceClient, _, err := k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, integration.Namespace) + if err != nil { + return nil, err + } + watcher, err := resourceClient.Watch(metav1.ListOptions{ + FieldSelector: "metadata.name=" + integration.Name, + }) + if err != nil { + return nil, err + } + events := watcher.ResultChan() + + out := make(chan *v1alpha1.Integration) + var lastObservedState *v1alpha1.IntegrationPhase + + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + case e, ok := <-events: + if !ok { + return + } + + if e.Object != nil { + if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { + unstr := unstructured.Unstructured{ + Object: runtimeUnstructured.UnstructuredContent(), + } + icopy := integration.DeepCopy() + err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy) + if err != nil { + logrus.Error("Unexpected error detected when watching resource", err) + return // closes the channel + } + + if lastObservedState == nil || *lastObservedState != icopy.Status.Phase { + lastObservedState = &icopy.Status.Phase + out <- icopy + } + } + } + } + } + }() + + return out, nil +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services