This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit d99fd279eef4dee4690db73545ef48acbd170a83 Author: Nicola Ferraro <ni.ferr...@gmail.com> AuthorDate: Wed May 27 15:19:18 2020 +0200 Fix #1470: support inline YAML --- pkg/apis/camel/v1/common_types.go | 13 ++++++ pkg/apis/camel/v1/integration_types.go | 1 + pkg/apis/camel/v1/integration_types_support.go | 25 ++++++++++++ pkg/apis/camel/v1/zz_generated.deepcopy.go | 17 ++++++++ pkg/cmd/run.go | 25 ++++++++---- pkg/trait/init.go | 21 +++++++++- pkg/util/digest/digest.go | 12 ++++++ pkg/util/flows/io.go | 55 ++++++++++++++++++++++++++ pkg/util/flows/io_test.go | 45 +++++++++++++++++++++ 9 files changed, 205 insertions(+), 9 deletions(-) diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go index 6aecd5c..bbddb5b 100644 --- a/pkg/apis/camel/v1/common_types.go +++ b/pkg/apis/camel/v1/common_types.go @@ -20,6 +20,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) // ConfigurationSpec -- @@ -126,3 +127,15 @@ type ResourceCondition interface { GetReason() string GetMessage() string } + +// Flow is an unstructured object representing a Camel Flow in YAML/JSON DSL +type Flow map[string]interface{} + +// DeepCopy copies the receiver, creating a new Flow. +func (in *Flow) DeepCopy() *Flow { + if in == nil { + return nil + } + out := Flow(runtime.DeepCopyJSON(*in)) + return &out +} diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go index 9d7110e..37c3aef 100644 --- a/pkg/apis/camel/v1/integration_types.go +++ b/pkg/apis/camel/v1/integration_types.go @@ -28,6 +28,7 @@ import ( type IntegrationSpec struct { Replicas *int32 `json:"replicas,omitempty"` Sources []SourceSpec `json:"sources,omitempty"` + Flows []Flow `json:"flows,omitempty"` Resources []ResourceSpec `json:"resources,omitempty"` Kit string `json:"kit,omitempty"` Dependencies []string `json:"dependencies,omitempty"` diff --git a/pkg/apis/camel/v1/integration_types_support.go b/pkg/apis/camel/v1/integration_types_support.go index d04e1bc..d16394c 100644 --- a/pkg/apis/camel/v1/integration_types_support.go +++ b/pkg/apis/camel/v1/integration_types_support.go @@ -82,6 +82,11 @@ func (in *IntegrationSpec) AddResources(resources ...ResourceSpec) { in.Resources = append(in.Resources, resources...) } +// AddFlows -- +func (in *IntegrationSpec) AddFlows(flows ...Flow) { + in.Flows = append(in.Flows, flows...) +} + // AddConfiguration -- func (in *IntegrationSpec) AddConfiguration(confType string, confValue string) { in.Configuration = append(in.Configuration, ConfigurationSpec{ @@ -129,6 +134,26 @@ func (in *IntegrationStatus) AddOrReplaceGeneratedResources(resources ...Resourc in.GeneratedResources = append(in.GeneratedResources, newResources...) } +// AddOrReplaceGeneratedSources -- +func (in *IntegrationStatus) AddOrReplaceGeneratedSources(sources ...SourceSpec) { + newSources := make([]SourceSpec, 0) + for _, source := range sources { + replaced := false + for i, r := range in.GeneratedSources { + if r.Name == source.Name { + in.GeneratedSources[i] = source + replaced = true + break + } + } + if !replaced { + newSources = append(newSources, source) + } + } + + in.GeneratedSources = append(in.GeneratedSources, newSources...) +} + // Configurations -- func (in *IntegrationSpec) Configurations() []ConfigurationSpec { if in == nil { diff --git a/pkg/apis/camel/v1/zz_generated.deepcopy.go b/pkg/apis/camel/v1/zz_generated.deepcopy.go index 6e941cd..86c4a06 100644 --- a/pkg/apis/camel/v1/zz_generated.deepcopy.go +++ b/pkg/apis/camel/v1/zz_generated.deepcopy.go @@ -615,6 +615,16 @@ func (in *FailureRecovery) DeepCopy() *FailureRecovery { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in Flow) DeepCopyInto(out *Flow) { + { + in := &in + clone := in.DeepCopy() + *out = *clone + return + } +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ImageTask) DeepCopyInto(out *ImageTask) { *out = *in in.ContainerTask.DeepCopyInto(&out.ContainerTask) @@ -1080,6 +1090,13 @@ func (in *IntegrationSpec) DeepCopyInto(out *IntegrationSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Flows != nil { + in, out := &in.Flows, &out.Flows + *out = make([]Flow, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.Resources != nil { in, out := &in.Resources, &out.Resources *out = make([]ResourceSpec, len(*in)) diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 6392f2e..31869d3 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -37,6 +37,7 @@ import ( "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/trait" "github.com/apache/camel-k/pkg/util" + "github.com/apache/camel-k/pkg/util/flows" "github.com/apache/camel-k/pkg/util/gzip" "github.com/apache/camel-k/pkg/util/kubernetes" k8slog "github.com/apache/camel-k/pkg/util/kubernetes/log" @@ -82,6 +83,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) (*cobra.Command, *runCmdOptions) cmd.Flags().Bool("logs", false, "Print integration logs") cmd.Flags().Bool("sync", false, "Synchronize the local source file with the cluster, republishing at each change") cmd.Flags().Bool("dev", false, "Enable Dev mode (equivalent to \"-w --logs --sync\")") + cmd.Flags().Bool("use-flows", true, "Write yaml sources as Flow objects in the integration custom resource") cmd.Flags().String("profile", "", "Trait profile used for deployment") cmd.Flags().StringArrayP("trait", "t", nil, "Configure a trait. E.g. \"-t service.enabled=false\"") cmd.Flags().StringArray("logging-level", nil, "Configure the logging level. e.g. \"--logging-level org.apache.camel=DEBUG\"") @@ -110,6 +112,7 @@ type runCmdOptions struct { Logs bool `mapstructure:"logs" yaml:",omitempty"` Sync bool `mapstructure:"sync" yaml:",omitempty"` Dev bool `mapstructure:"dev" yaml:",omitempty"` + UseFlows bool `mapstructure:"use-flows" yaml:",omitempty"` Save bool `mapstructure:"save" yaml:",omitempty" kamel:"omitsave"` IntegrationKit string `mapstructure:"kit" yaml:",omitempty"` IntegrationName string `mapstructure:"name" yaml:",omitempty"` @@ -476,13 +479,21 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string) return nil, err } - integration.Spec.AddSources(v1.SourceSpec{ - DataSpec: v1.DataSpec{ - Name: path.Base(source), - Content: data, - Compression: o.Compression, - }, - }) + if o.UseFlows && (strings.HasSuffix(source, ".yaml") || strings.HasSuffix(source, ".yml")) { + flows, err := flows.UnmarshalString(data) + if err != nil { + return nil, err + } + integration.Spec.AddFlows(flows...) + } else { + integration.Spec.AddSources(v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: path.Base(source), + Content: data, + Compression: o.Compression, + }, + }) + } } for _, resource := range o.Resources { diff --git a/pkg/trait/init.go b/pkg/trait/init.go index cf77f6b..c81cde7 100644 --- a/pkg/trait/init.go +++ b/pkg/trait/init.go @@ -22,11 +22,13 @@ import ( "sort" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" - "github.com/pkg/errors" - "github.com/apache/camel-k/pkg/util" + "github.com/apache/camel-k/pkg/util/flows" + "github.com/pkg/errors" ) +const flowsInternalSourceName = "camel-k-embedded-flow.yaml" + // Internal trait type initTrait struct { BaseTrait `property:",squash"` @@ -48,6 +50,21 @@ func (t *initTrait) Configure(e *Environment) (bool, error) { func (t *initTrait) Apply(e *Environment) error { if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { + + // Flows need to be turned into a generated source + if len(e.Integration.Spec.Flows) > 0 { + content, err := flows.Marshal(e.Integration.Spec.Flows) + if err != nil { + return err + } + e.Integration.Status.AddOrReplaceGeneratedSources(v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: flowsInternalSourceName, + Content: string(content), + }, + }) + } + // // Dependencies need to be recomputed in case of a trait declares a capability but as // the dependencies trait runs earlier than some task such as the cron one, we need to diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go index c79f045..7488c89 100644 --- a/pkg/util/digest/digest.go +++ b/pkg/util/digest/digest.go @@ -31,6 +31,7 @@ import ( v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/defaults" + "github.com/apache/camel-k/pkg/util/flows" ) // ComputeForIntegration a digest of the fields that are relevant for the deployment @@ -66,6 +67,17 @@ func ComputeForIntegration(integration *v1.Integration) (string, error) { } } + // Integration flows + if len(integration.Spec.Flows) > 0 { + flowData, err := flows.Marshal(integration.Spec.Flows) + if err != nil { + return "", err + } + if _, err := hash.Write(flowData); err != nil { + return "", err + } + } + // Integration dependencies for _, item := range integration.Spec.Dependencies { if _, err := hash.Write([]byte(item)); err != nil { diff --git a/pkg/util/flows/io.go b/pkg/util/flows/io.go new file mode 100644 index 0000000..dc85959 --- /dev/null +++ b/pkg/util/flows/io.go @@ -0,0 +1,55 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flows + +import ( + "bytes" + "encoding/json" + "io" + "io/ioutil" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + yaml2 "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/yaml" +) + +// UnmarshalString reads flows contained in a string +func UnmarshalString(flowsString string) ([]v1.Flow, error) { + return Unmarshal(bytes.NewReader([]byte(flowsString))) +} + +// Unmarshal flows from a stream +func Unmarshal(reader io.Reader) ([]v1.Flow, error) { + buffered, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + var flows []v1.Flow + // Using the Kubernetes decoder to turn them into JSON before unmarshal. + // This avoids having map[interface{}]interface{} objects which are not JSON compatible. + jsonData, err := yaml.ToJSON(buffered) + if err = json.Unmarshal(jsonData, &flows); err != nil { + return nil, err + } + return flows, err +} + +// Marshal flows as byte array +func Marshal(flows []v1.Flow) ([]byte, error) { + return yaml2.Marshal(flows) +} diff --git a/pkg/util/flows/io_test.go b/pkg/util/flows/io_test.go new file mode 100644 index 0000000..7517a94 --- /dev/null +++ b/pkg/util/flows/io_test.go @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flows + +import ( + "bytes" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestReadWriteYaml(t *testing.T) { + // yaml in conventional form as marshalled by the go runtime + yaml := `- from: + steps: + - to: log:info + uri: timer:tick +` + yamlReader := bytes.NewReader([]byte(yaml)) + flows, err := Unmarshal(yamlReader) + assert.NoError(t, err) + assert.NotNil(t, flows) + assert.Len(t, flows, 1) + assert.NotNil(t, flows[0]["from"]) + assert.Nil(t, flows[0]["xx"]) + + clone, err := Marshal(flows) + assert.NoError(t, err) + assert.NotNil(t, clone) + assert.Equal(t, yaml, string(clone)) +}