This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 1445a6f099268a28ffa10782c98ac87e21450bef Author: nferraro <ni.ferr...@gmail.com> AuthorDate: Mon Nov 26 17:07:58 2018 +0100 Fix #218: allow to push to Knative --- pkg/trait/knative.go | 112 +++++++++++++++++---- .../camel/component/knative/KnativeEndpoint.java | 4 +- 2 files changed, 97 insertions(+), 19 deletions(-) diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index e463746..b2cd958 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -20,6 +20,8 @@ package trait import ( "encoding/json" "fmt" + "github.com/operator-framework/operator-sdk/pkg/sdk" + "github.com/pkg/errors" "strings" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -28,7 +30,6 @@ import ( knativeutil "github.com/apache/camel-k/pkg/util/knative" eventing "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" serving "github.com/knative/serving/pkg/apis/serving/v1alpha1" - "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -36,6 +37,7 @@ import ( type knativeTrait struct { BaseTrait `property:",squash"` Sources string `property:"sources"` + Sinks string `property:"sinks"` } func newKnativeTrait() *knativeTrait { @@ -50,9 +52,13 @@ func (t *knativeTrait) appliesTo(e *Environment) bool { func (t *knativeTrait) autoconfigure(e *Environment) error { if t.Sources == "" { - channels := getSourceChannels(e) + channels := t.getSourceChannels(e) t.Sources = strings.Join(channels, ",") } + if t.Sinks == "" { + channels := t.getSinkChannels(e) + t.Sinks = strings.Join(channels, ",") + } return nil } @@ -60,11 +66,15 @@ func (t *knativeTrait) apply(e *Environment) error { for _, sub := range t.getSubscriptionsFor(e) { e.Resources.Add(sub) } - e.Resources.Add(t.getServiceFor(e)) + svc, err := t.getServiceFor(e) + if err != nil { + return err + } + e.Resources.Add(svc) return nil } -func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service { +func (t *knativeTrait) getServiceFor(e *Environment) (*serving.Service, error) { // combine properties of integration with context, integration // properties have the priority properties := CombineConfigurationAsMap("property", e.Context, e.Integration) @@ -102,7 +112,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service { environment["AB_JOLOKIA_OFF"] = "true" // Knative integration - environment["CAMEL_KNATIVE_CONFIGURATION"] = t.getConfigurationSerialized(e) + conf, err := t.getConfigurationSerialized(e) + if err != nil { + return nil, err + } + environment["CAMEL_KNATIVE_CONFIGURATION"] = conf labels := map[string]string{ "camel.apache.org/integration": e.Integration.Name, @@ -135,11 +149,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service { }, } - return &svc + return &svc, nil } func (t *knativeTrait) getSubscriptionsFor(e *Environment) []*eventing.Subscription { - channels := getConfiguredSourceChannels(t.Sources) + channels := t.getConfiguredSourceChannels() subs := make([]*eventing.Subscription, 0) for _, ch := range channels { subs = append(subs, t.getSubscriptionFor(e, ch)) @@ -174,19 +188,19 @@ func (*knativeTrait) getSubscriptionFor(e *Environment, channel string) *eventin } } -func (t *knativeTrait) getConfigurationSerialized(e *Environment) string { - env := t.getConfiguration(e) +func (t *knativeTrait) getConfigurationSerialized(e *Environment) (string, error) { + env, err := t.getConfiguration(e) res, err := json.Marshal(env) if err != nil { - logrus.Warning("Unable to serialize Knative configuration", err) - return "" + return "", errors.Wrap(err, "unable to serialize Knative configuration") } - return string(res) + return string(res), nil } -func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnvironment { - sourceChannels := getConfiguredSourceChannels(t.Sources) +func (t *knativeTrait) getConfiguration(e *Environment) (knativeutil.CamelEnvironment, error) { env := knativeutil.NewCamelEnvironment() + // Sources + sourceChannels := t.getConfiguredSourceChannels() for _, ch := range sourceChannels { svc := knativeutil.CamelServiceDefinition{ Name: ch, @@ -200,6 +214,29 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron } env.Services = append(env.Services, svc) } + // Sinks + sinkChannels := t.getConfiguredSinkChannels() + for _, ch := range sinkChannels { + channel, err := t.retrieveChannel(e.Integration.Namespace, ch) + if err != nil { + return env, err + } + hostname := channel.Status.Address.Hostname + if hostname == "" { + return env, errors.New("cannot find address of channel " + ch) + } + svc := knativeutil.CamelServiceDefinition{ + Name: ch, + Host: hostname, + Port: 80, + Protocol: knativeutil.CamelProtocolHTTP, + ServiceType: knativeutil.CamelServiceTypeChannel, + Metadata: map[string]string{ + knativeutil.CamelMetaServicePath: "/", + }, + } + env.Services = append(env.Services, svc) + } // Adding default endpoint defSvc := knativeutil.CamelServiceDefinition{ Name: "default", @@ -212,12 +249,12 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron }, } env.Services = append(env.Services, defSvc) - return env + return env, nil } -func getConfiguredSourceChannels(sources string) []string { +func (t *knativeTrait) getConfiguredSourceChannels() []string { channels := make([]string, 0) - for _, ch := range strings.Split(sources, ",") { + for _, ch := range strings.Split(t.Sources, ",") { cht := strings.Trim(ch, " \t\"") if cht != "" { channels = append(channels, cht) @@ -226,7 +263,7 @@ func getConfiguredSourceChannels(sources string) []string { return channels } -func getSourceChannels(e *Environment) []string { +func (*knativeTrait) getSourceChannels(e *Environment) []string { channels := make([]string, 0) metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool { @@ -236,3 +273,42 @@ func getSourceChannels(e *Environment) []string { return channels } + +func (t *knativeTrait) getConfiguredSinkChannels() []string { + channels := make([]string, 0) + for _, ch := range strings.Split(t.Sinks, ",") { + cht := strings.Trim(ch, " \t\"") + if cht != "" { + channels = append(channels, cht) + } + } + return channels +} + +func (*knativeTrait) getSinkChannels(e *Environment) []string { + channels := make([]string, 0) + + metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool { + channels = append(channels, knativeutil.ExtractChannelNames(meta.ToURIs)...) + return true + }) + + return channels +} + +func (*knativeTrait) retrieveChannel(namespace string, name string) (*eventing.Channel, error) { + channel := eventing.Channel{ + TypeMeta: metav1.TypeMeta{ + Kind: "Channel", + APIVersion: eventing.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } + if err := sdk.Get(&channel); err != nil { + return nil, errors.Wrap(err, "could not retrieve channel " + name + " in namespace " + namespace) + } + return &channel, nil +} diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index eb3a1fd..c1b92cd 100644 --- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -52,7 +52,6 @@ import static org.apache.camel.util.ObjectHelper.ifNotEmpty; scheme = "knative", syntax = "knative:type/target", title = "Knative", - producerOnly = true, label = "cloud,eventing") public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint { @UriPath(description = "The Knative type") @@ -118,6 +117,9 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint headers.putIfAbsent("CE-EventTime", eventTime); headers.putIfAbsent("CE-Source", getEndpointUri()); headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + + // Always remove host so it's always computed from the URL and not inherited from the exchange + headers.remove("Host"); }, endpoint.createProducer() );