This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit ab773ded9e75f5794ef02a6c1e676de6c7963ff4 Author: nferraro <ni.ferr...@gmail.com> AuthorDate: Wed May 22 13:05:28 2019 +0200 Fix #668: allow to subscribe to multiple channels --- deploy/resources.go | 46 ++++++++++++++++---------------- pkg/apis/camel/v1alpha1/knative/types.go | 16 ++++++----- pkg/trait/knative.go | 36 ++++++++++++++++++------- 3 files changed, 58 insertions(+), 40 deletions(-) diff --git a/deploy/resources.go b/deploy/resources.go index 882550a..c6878fa 100644 --- a/deploy/resources.go +++ b/deploy/resources.go @@ -8352,29 +8352,6 @@ spec: passive: false ` - Resources["cr-example.yaml"] = - ` -apiVersion: camel.apache.org/v1alpha1 -kind: Integration -metadata: - name: example -spec: - source: - content: |- - // This is Camel K Groovy example route - - rnd = new Random() - - from('timer:groovy?period=1s') - .routeId('groovy') - .setBody() - .constant('Hello Camel K!') - .process { - it.in.headers['RandomValue'] = rnd.nextInt() - } - .to('log:info?showHeaders=true') - name: routes.groovy -` Resources["crd-build.yaml"] = ` apiVersion: apiextensions.k8s.io/v1beta1 @@ -8541,6 +8518,29 @@ spec: JSONPath: .status.context ` + Resources["cr-example.yaml"] = + ` +apiVersion: camel.apache.org/v1alpha1 +kind: Integration +metadata: + name: example +spec: + source: + content: |- + // This is Camel K Groovy example route + + rnd = new Random() + + from('timer:groovy?period=1s') + .routeId('groovy') + .setBody() + .constant('Hello Camel K!') + .process { + it.in.headers['RandomValue'] = rnd.nextInt() + } + .to('log:info?showHeaders=true') + name: routes.groovy +` Resources["operator-deployment.yaml"] = ` apiVersion: apps/v1 diff --git a/pkg/apis/camel/v1alpha1/knative/types.go b/pkg/apis/camel/v1alpha1/knative/types.go index 5c80e43..833931b 100644 --- a/pkg/apis/camel/v1alpha1/knative/types.go +++ b/pkg/apis/camel/v1alpha1/knative/types.go @@ -60,11 +60,13 @@ const ( // Meta Options const ( - CamelMetaServicePath = "service.path" - CamelMetaServiceID = "service.id" - CamelMetaServiceName = "service.name" - CamelMetaServiceHost = "service.host" - CamelMetaServicePort = "service.port" - CamelMetaServiceZone = "service.zone" - CamelMetaServiceProtocol = "service.protocol" + CamelMetaServicePath = "service.path" + CamelMetaServiceID = "service.id" + CamelMetaServiceName = "service.name" + CamelMetaServiceHost = "service.host" + CamelMetaServicePort = "service.port" + CamelMetaServiceZone = "service.zone" + CamelMetaServiceProtocol = "service.protocol" + CamelMetaFilterHeaderName = "filter.header.name" + CamelMetaFilterHeaderValue = "filter.header.value" ) diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index 7436f67..edad18f 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -34,15 +34,20 @@ import ( ) type knativeTrait struct { - BaseTrait `property:",squash"` - Configuration string `property:"configuration"` - ChannelSources string `property:"channel-sources"` - ChannelSinks string `property:"channel-sinks"` - EndpointSources string `property:"endpoint-sources"` - EndpointSinks string `property:"endpoint-sinks"` - Auto *bool `property:"auto"` + BaseTrait `property:",squash"` + Configuration string `property:"configuration"` + ChannelSources string `property:"channel-sources"` + ChannelSinks string `property:"channel-sinks"` + EndpointSources string `property:"endpoint-sources"` + EndpointSinks string `property:"endpoint-sinks"` + FilterSourceChannels *bool `property:"filter-source-channels"` + Auto *bool `property:"auto"` } +const ( + knativeHistoryHeader = "ce-knativehistory" +) + func newKnativeTrait() *knativeTrait { t := &knativeTrait{ BaseTrait: newBaseTrait("knative"), @@ -101,6 +106,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { t.EndpointSinks = strings.Join(items, ",") } + if t.FilterSourceChannels == nil && len(strings.Split(t.ChannelSources, ",")) > 1 { + // Filter channels when the integration subscribes to more than one + filter := true + t.FilterSourceChannels = &filter + } } return true, nil @@ -169,15 +179,21 @@ func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEn if env.ContainsService(ch, knativeapi.CamelServiceTypeChannel) { continue } + meta := map[string]string{ + knativeapi.CamelMetaServicePath: "/", + } + if t.FilterSourceChannels != nil && *t.FilterSourceChannels { + fullName := ch + "." + e.Integration.Namespace + ".channels.cluster.local" + meta[knativeapi.CamelMetaFilterHeaderName] = knativeHistoryHeader + meta[knativeapi.CamelMetaFilterHeaderValue] = fullName + } svc := knativeapi.CamelServiceDefinition{ Name: ch, Host: "0.0.0.0", Port: 8080, Protocol: knativeapi.CamelProtocolHTTP, ServiceType: knativeapi.CamelServiceTypeChannel, - Metadata: map[string]string{ - knativeapi.CamelMetaServicePath: "/", - }, + Metadata: meta, } env.Services = append(env.Services, svc) }