This is an automated email from the ASF dual-hosted git repository. cdeppisch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push: new d2605bdd1 fix: Improve Knative trigger filter d2605bdd1 is described below commit d2605bdd153ab89ae594029bbcb6b9112a2146a0 Author: Christoph Deppisch <cdeppi...@redhat.com> AuthorDate: Thu Jun 6 10:26:04 2024 +0200 fix: Improve Knative trigger filter - Fixes #5537: Support filter attributes other than event type (e.g. source, subject, extensions) - Fixes #5529: Allows empty filter to consume the full event stream - Fixes #5446: Knative Trigger creation is only based on event type attribute - Fixes #5577: Consistently support "cloudEventsType" property in Pipes source/sink - Update documentation and improve Knative Kamelet/Pipe user guide --- .github/actions/e2e-knative-yaks/action.yml | 2 +- .github/actions/kamel-cleanup/cleanup.sh | 2 +- .github/actions/kamel-install-yaks/action.yml | 2 +- .github/workflows/knative.yml | 2 +- .../modules/ROOT/pages/kamelets/kamelets-user.adoc | 185 +++++- .../modules/ROOT/pages/running/camel-runtimes.adoc | 4 +- docs/modules/ROOT/partials/apis/camel-k-crds.adoc | 18 + docs/modules/traits/pages/knative.adoc | 12 + e2e/knative/kamelet_test.go | 6 +- e2e/support/util/dump.go | 38 ++ .../{yaks-config.yaml => event-source-pipe.yaml} | 35 +- .../common/knative-broker/knative-pipe.feature | 34 + .../{yaks-config.yaml => log-sink-pipe.yaml} | 29 +- .../{yaks-config.yaml => log-sink.kamelet.yaml} | 37 +- .../{yaks-config.yaml => no-filter-pipe.yaml} | 27 +- .../{yaks-config.yaml => source-filter-pipe.yaml} | 29 +- ...{yaks-config.yaml => timer-source.kamelet.yaml} | 46 +- e2e/yaks/common/knative-broker/yaks-config.yaml | 37 +- .../sinkbinding-http.feature | 2 +- helm/camel-k/crds/crd-integration-platform.yaml | 34 + helm/camel-k/crds/crd-integration-profile.yaml | 34 + helm/camel-k/crds/crd-integration.yaml | 17 + helm/camel-k/crds/crd-kamelet-binding.yaml | 17 + helm/camel-k/crds/crd-pipe.yaml | 17 + pkg/apis/camel/v1/trait/knative.go | 8 + pkg/apis/camel/v1/trait/zz_generated.deepcopy.go | 10 + .../camel.apache.org_integrationplatforms.yaml | 34 + .../camel.apache.org_integrationprofiles.yaml | 34 + .../crd/bases/camel.apache.org_integrations.yaml | 17 + .../bases/camel.apache.org_kameletbindings.yaml | 17 + .../config/crd/bases/camel.apache.org_pipes.yaml | 17 + pkg/trait/jvm_test.go | 2 +- pkg/trait/knative.go | 63 +- pkg/trait/knative_test.go | 714 ++++++++++++++++++++- pkg/util/bindings/bindings_test.go | 6 + pkg/util/bindings/knative_ref.go | 71 +- pkg/util/bindings/knative_ref_test.go | 182 +++++- pkg/util/knative/knative.go | 46 +- pkg/util/knative/uri.go | 2 +- script/Makefile | 2 +- 40 files changed, 1728 insertions(+), 163 deletions(-) diff --git a/.github/actions/e2e-knative-yaks/action.yml b/.github/actions/e2e-knative-yaks/action.yml index 83becbcad..50281fe48 100644 --- a/.github/actions/e2e-knative-yaks/action.yml +++ b/.github/actions/e2e-knative-yaks/action.yml @@ -106,7 +106,7 @@ runs: uses: ./.github/actions/kamel-install-yaks with: image-name: "docker.io/citrusframework/yaks" - version: 0.14.3 + version: 0.19.1 - id: report-problematic name: List Tests Marked As Problematic diff --git a/.github/actions/kamel-cleanup/cleanup.sh b/.github/actions/kamel-cleanup/cleanup.sh index f2a9c62e5..d38479423 100755 --- a/.github/actions/kamel-cleanup/cleanup.sh +++ b/.github/actions/kamel-cleanup/cleanup.sh @@ -103,6 +103,6 @@ fi kubectl get crds | grep camel | awk '{print $1}' | xargs kubectl delete crd &> /dev/null # -# Remove KNative resources +# Remove Knative resources # ./.github/actions/kamel-cleanup/cleanup-knative.sh diff --git a/.github/actions/kamel-install-yaks/action.yml b/.github/actions/kamel-install-yaks/action.yml index 641411511..a6babcea3 100644 --- a/.github/actions/kamel-install-yaks/action.yml +++ b/.github/actions/kamel-install-yaks/action.yml @@ -21,7 +21,7 @@ description: 'Install YAKS artifacts' inputs: version: description: "The YAKS version" - default: 0.14.3 + default: 0.19.1 required: false image-name: description: "The YAKS operator image name" diff --git a/.github/workflows/knative.yml b/.github/workflows/knative.yml index 5900fbff7..f70f203e6 100644 --- a/.github/workflows/knative.yml +++ b/.github/workflows/knative.yml @@ -88,7 +88,7 @@ jobs: -q "${{ github.event.inputs.log-level }}" \ -t "${{ github.event.inputs.test-filters }}" - - name: KNative Tests + - name: Knative Tests uses: ./.github/actions/e2e-knative with: cluster-config-data: ${{ secrets.E2E_CLUSTER_CONFIG }} diff --git a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc index 6011b4877..ae27db023 100644 --- a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc +++ b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc @@ -219,27 +219,123 @@ You can run this integration without specifying other parameters, the Kamelet en automatically mount the secret into the integration Pod. [[kamelets-usage-binding]] -== Binding Kamelets +== Binding Kamelets in Pipes In some contexts (for example **"serverless"**) users often want to leverage the power of Apache Camel to be able to connect to various sources/sinks, without -doing additional processing (such as tranformations or other enterprise integration patterns). +doing additional processing (such as transformations or other enterprise integration patterns). -A common use case is that of **Knative Sources**, for which the Apache Camel developers maintain the https://knative.dev/docs/eventing/samples/apache-camel-source/[Knative CamelSources]. -Kamelets represent an **evolution** of the model proposed in CamelSources, but they allow using the same declarative style of binding, via a resource named **Pipe**. +A common use case is that of **Knative Event Sources**, for which the Apache Camel developers provide the concept of Kamelets and Pipes. +Kamelets represent an **evolution** of the Camel route templates to provide an opinionated and easy to use connector to various components and services. +The Kamelets allow using a declarative style of binding sources and sinks where data produced by a source, transformed in the form of actions steps and pushed to a given sink, via a resource named **Pipe**. -=== Binding to a Knative Destination +=== Binding to Knative -A Pipe allows to declaratively move data from a system described by a Kamelet towards a Knative destination (or other kind of destinations, in the future), or from -a Knative channel/broker to another external system described by a Kamelet. +A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet. +This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way. -For example, here's an example of binding: +For example, here is a Pipe that connects a Kamelet Telegram source to the Knative broker: [source,yaml] ---- apiVersion: camel.apache.org/v1 kind: Pipe metadata: - name: telegram-text-source-to-channel + name: telegram-to-knative +spec: + source: # <1> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: # <2> + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default +---- +<1> Reference to the source that provides data +<2> Reference to the sink where data should be sent to + +This binding takes the `telegram-text-source` Kamelet, configures it using specific properties ("botToken") and +makes sure that messages produced by the Kamelet are forwarded to the Knative **Broker** named "default". + +Note that source and sink are specified as standard **Kubernetes object references** in a declarative way. + +Knative eventing uses the CloudEvents data format by default. +You may want to set some properties that specify the event attributes such as the event type. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.telegram.events # <1> +---- +<1> Sets the event type attribute of the CloudEvent produced by this Pipe + +This way you may specify event attributes before publishing to the Knative broker. +Note that Camel uses a default CloudEvents event type `org.apache.camel.event` for events produced by Camel. + +You can overwrite CloudEvent event attributes on the sink using the `ce.overwrite.` prefix when setting a property. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.telegram.events + ce.overwrite.ce-source: my-source # <1> +---- +<1> Use "ce.overwrite.ce-source" to explicitly set the CloudEvents source attribute. + +The example shows how we can reference the "telegram-text-source" resource in a Pipe. +It's contained in the `source` section because it's a Kamelet of type "source". +A Kamelet of type "sink", by contrast, can only be used in the `sink` section of a `Pipe`. + +**Under the covers, a Pipe creates an Integration** resource that implements the binding, but all details of how to connect with +Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a `SinkBinding` concept +under the covers in order to retrieve the Knative broker endpoint URL. + +In the same way you can also connect a Kamelet source to a Knative channel. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative-channel spec: source: # <1> ref: @@ -255,18 +351,75 @@ spec: name: messages ---- <1> Reference to the source that provides data +<2> Reference to the Knative channel that acts as the sink where data should be sent to + +When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe. +Events consumed from Knative event stream will be pushed to the given sink of the Pipe. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: knative-to-slack +spec: + source: # <1> + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.messages + sink: # <2> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: slack-sink + properties: + channel: "#my-channel" + webhookUrl: the-webhook-url +---- +<1> Reference to the Knative broker source that provides data <2> Reference to the sink where data should be sent to -This binding takes the `telegram-text-source` Kamelet, configures it using specific properties ("botToken") and -makes sure that messages produced by the Kamelet are forwarded to the Knative **InMemoryChannel** named "messages". +Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing. +In the example, all events of type `org.apache.camel.event.messages` get forwarded to the given Slack channel using the Webhook API. -Note that source and sink are specified declaratively as standard **Kubernetes object references**. +When consuming events from the Knative broker you most likely need to filter and select the events to process. +You can do that with the properties set on the Knative broker source reference, for instance filtering by the even type as shown in the example. +The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions. -The example shows how we can reference the "telegram-text-source" resource in a Pipe. It's contained in the `source` section -because it's a Kamelet of type "source". -A Kamelet of type "sink", by contrast, can only be used in the `sink` section of a `Pipe`. +In the background Camel K will automatically create a Knative Trigger resource for the Pipe that uses the filter attributes accordingly. + +.Sample trigger created by Camel K +[source,yaml] +---- +apiVersion: eventing.knative.dev/v1 +kind: Trigger +metadata: + name: camel-event-messages +spec: + broker: default # <1> + filter: + attributes: + type: org.apache.camel.event.messages + myextension: my-extension-value + subscriber: + ref: + apiVersion: serving.knative.dev/v1 # <2> + kind: Service + name: camel-service + uri: /events/camel.event.messages +---- +<1> Reference to the Knative broker source that provides data +<2> Reference to the Camel K integration/pipe service + +The trigger calls the Camel K integration service endpoint URL and pushes events with the given filter attributes to the Pipe. +All properties that you have set on the Knative broker source reference will be set as a filter attribute on the trigger resource (except for reserved properties such as `name` and `cloudEventsType`). -**Under the covers, a Pipe creates an Integration** resource that implements the binding, but this is transparent to the end user. +Note that Camel K creates the trigger resource only for Knative broker type event sources. +In case you reference a Knative channel as a source in a Pipe Camel K assumes that the channel and the trigger are already present. +Camel K will only create the subscription for the integration service on the channel. === Binding to a Kafka Topic diff --git a/docs/modules/ROOT/pages/running/camel-runtimes.adoc b/docs/modules/ROOT/pages/running/camel-runtimes.adoc index c479bfa31..3eacd7987 100644 --- a/docs/modules/ROOT/pages/running/camel-runtimes.adoc +++ b/docs/modules/ROOT/pages/running/camel-runtimes.adoc @@ -143,13 +143,13 @@ NOTE: this is a best effort analysis taking as reference the work available in v |v |x -|KNative Service +|Knative Service |x |x |x |v -|KNative +|Knative |x |v |x diff --git a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc index abe5aee8c..89c648300 100644 --- a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc +++ b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc @@ -7737,6 +7737,24 @@ Enables the camel-k-operator to set the "bindings.knative.dev/include=true" labe As Knative requires this label to perform injection of K_SINK URL into the service. If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true) +|`filters` + +[]string +| + + +Sets filter attributes on the event stream (such as event type, source, subject and so on). +A list of key-value pairs that represent filter attributes and its values. +The syntax is KEY=VALUE, e.g., `source="my.source"`. +Filter attributes get set on the Knative trigger that is being created as part of this integration. + +|`filterEventType` + +bool +| + + +Enables the default filtering for the Knative trigger using the event type +If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true) + |=== diff --git a/docs/modules/traits/pages/knative.adoc b/docs/modules/traits/pages/knative.adoc index d22f19f07..49b764ccb 100755 --- a/docs/modules/traits/pages/knative.adoc +++ b/docs/modules/traits/pages/knative.adoc @@ -87,6 +87,18 @@ It's enabled by default when the integration targets a single sink As Knative requires this label to perform injection of K_SINK URL into the service. If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true) +| knative.filters +| []string +| Sets filter attributes on the event stream (such as event type, source, subject and so on). +A list of key-value pairs that represent filter attributes and its values. +The syntax is KEY=VALUE, e.g., `source="my.source"`. +Filter attributes get set on the Knative trigger that is being created as part of this integration. + +| knative.filter-event-type +| bool +| Enables the default filtering for the Knative trigger using the event type +If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true) + |=== // End of autogenerated code - DO NOT EDIT! (configuration) diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go index 6d4830638..1ae16ffe2 100644 --- a/e2e/knative/kamelet_test.go +++ b/e2e/knative/kamelet_test.go @@ -47,16 +47,16 @@ func TestKameletChange(t *testing.T) { timerSource := "my-timer-source" g.Expect(CreateTimerKamelet(t, ctx, operatorID, ns, timerSource)()).To(Succeed()) g.Expect(CreateKnativeChannel(t, ctx, ns, knChannel)()).To(Succeed()) - // Consumer route that will read from the KNative channel + // Consumer route that will read from the Knative channel g.Expect(KamelRunWithID(t, ctx, operatorID, ns, "files/test-kamelet-display.groovy", "-w").Execute()).To(Succeed()) g.Eventually(IntegrationPodPhase(t, ctx, ns, "test-kamelet-display")).Should(Equal(corev1.PodRunning)) // Create the Pipe - g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, knChannelConf, "-p", "source.message=HelloKNative!", "--annotation", "trait.camel.apache.org/health.enabled=true", "--annotation", "trait.camel.apache.org/health.readiness-initial-delay=10", "--name", timerPipe).Execute()).To(Succeed()) + g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, knChannelConf, "-p", "source.message=HelloKnative!", "--annotation", "trait.camel.apache.org/health.enabled=true", "--annotation", "trait.camel.apache.org/health.readiness-initial-delay=10", "--name", timerPipe).Execute()).To(Succeed()) g.Eventually(IntegrationPodPhase(t, ctx, ns, timerPipe)).Should(Equal(corev1.PodRunning)) g.Eventually(IntegrationConditionStatus(t, ctx, ns, timerPipe, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) // Consume the message - g.Eventually(IntegrationLogs(t, ctx, ns, "test-kamelet-display"), TestTimeoutShort).Should(ContainSubstring("HelloKNative!")) + g.Eventually(IntegrationLogs(t, ctx, ns, "test-kamelet-display"), TestTimeoutShort).Should(ContainSubstring("HelloKnative!")) g.Eventually(PipeCondition(t, ctx, ns, timerPipe, v1.PipeConditionReady), TestTimeoutMedium).Should(And( WithTransform(PipeConditionStatusExtract, Equal(corev1.ConditionTrue)), diff --git a/e2e/support/util/dump.go b/e2e/support/util/dump.go index a5741e697..75b144db2 100644 --- a/e2e/support/util/dump.go +++ b/e2e/support/util/dump.go @@ -35,9 +35,12 @@ import ( routev1 "github.com/openshift/api/route/v1" olm "github.com/operator-framework/api/pkg/operators/v1alpha1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/client/camel/clientset/versioned" + "github.com/apache/camel-k/v2/pkg/util/knative" "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/openshift" ) @@ -202,6 +205,41 @@ func Dump(ctx context.Context, c client.Client, ns string, t *testing.T) error { t.Logf("---\n%s\n---\n", string(pdata)) } + // Knative resources + if installed, _ := knative.IsEventingInstalled(c); installed { + var trgs eventingv1.TriggerList + err = c.List(ctx, &trgs) + if err != nil { + return err + } + t.Logf("Found %d Knative trigger:\n", len(trgs.Items)) + for _, p := range trgs.Items { + ref := p + pdata, err := kubernetes.ToYAMLNoManagedFields(&ref) + if err != nil { + return err + } + t.Logf("---\n%s\n---\n", string(pdata)) + } + } + + if installed, _ := knative.IsServingInstalled(c); installed { + var ksrvs servingv1.ServiceList + err = c.List(ctx, &ksrvs) + if err != nil { + return err + } + t.Logf("Found %d Knative services:\n", len(ksrvs.Items)) + for _, p := range ksrvs.Items { + ref := p + pdata, err := kubernetes.ToYAMLNoManagedFields(&ref) + if err != nil { + return err + } + t.Logf("---\n%s\n---\n", string(pdata)) + } + } + // CamelCatalogs cats, err := camelClient.CamelV1().CamelCatalogs(ns).List(ctx, metav1.ListOptions{}) if err != nil { diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/event-source-pipe.yaml similarity index 60% copy from e2e/yaks/common/knative-broker/yaks-config.yaml copy to e2e/yaks/common/knative-broker/event-source-pipe.yaml index cd754db64..16d821c3a 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/event-source-pipe.yaml @@ -15,10 +15,31 @@ # limitations under the License. # --------------------------------------------------------------------------- -config: - namespace: - temporary: true -post: - - name: print dump - if: env:CI=true && failure() - run: yaks dump --includes app=camel-k +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: event-source-pipe +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + period: 5000 + message: "Hello this is event-1!" + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-action + properties: + showHeaders: true + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.messages + ce.override.ce-source: org.apache.camel diff --git a/e2e/yaks/common/knative-broker/knative-pipe.feature b/e2e/yaks/common/knative-broker/knative-pipe.feature new file mode 100644 index 000000000..426b7830d --- /dev/null +++ b/e2e/yaks/common/knative-broker/knative-pipe.feature @@ -0,0 +1,34 @@ +Feature: Pipes connecting with Knative broker + + Background: + Given create Knative broker default + Given Knative broker default is running + Given Camel K resource polling configuration + | maxAttempts | 60 | + | delayBetweenAttempts | 3000 | + + Scenario: Pipes exchanging events with the broker + # Pipe pushing events to the broker + Given load Pipe event-source-pipe.yaml + Then Camel K integration event-source-pipe should be running + + # Pipe receives given event type from the broker + Given load Pipe log-sink-pipe.yaml + Then Camel K integration log-sink-pipe should be running + And Camel K integration log-sink-pipe should print Hello this is event-1! + + # Pipe receives all events from the broker + Given load Pipe no-filter-pipe.yaml + Then Camel K integration no-filter-pipe should be running + And Camel K integration no-filter-pipe should print Hello this is event-1! + + # Pipe receives events with source filter + Given load Pipe source-filter-pipe.yaml + Then Camel K integration source-filter-pipe should be running + And Camel K integration source-filter-pipe should print Hello this is event-1! + + Scenario: Remove resources + Given delete Camel K integration event-source-pipe + Given delete Camel K integration log-sink-pipe + Given delete Camel K integration source-filter-pipe + Given delete Camel K integration no-filter-pipe diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/log-sink-pipe.yaml similarity index 67% copy from e2e/yaks/common/knative-broker/yaks-config.yaml copy to e2e/yaks/common/knative-broker/log-sink-pipe.yaml index cd754db64..dba1cf682 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/log-sink-pipe.yaml @@ -15,10 +15,25 @@ # limitations under the License. # --------------------------------------------------------------------------- -config: - namespace: - temporary: true -post: - - name: print dump - if: env:CI=true && failure() - run: yaks dump --includes app=camel-k +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: log-sink-pipe +spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.messages + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + multiline: true + showHeaders: true + showBodyType: false + showExchangePattern: false diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/log-sink.kamelet.yaml similarity index 59% copy from e2e/yaks/common/knative-broker/yaks-config.yaml copy to e2e/yaks/common/knative-broker/log-sink.kamelet.yaml index cd754db64..94a61c8ae 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/log-sink.kamelet.yaml @@ -15,10 +15,33 @@ # limitations under the License. # --------------------------------------------------------------------------- -config: - namespace: - temporary: true -post: - - name: print dump - if: env:CI=true && failure() - run: yaks dump --includes app=camel-k +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: log-sink + labels: + camel.apache.org/kamelet.type: "sink" +spec: + definition: + title: "Logger" + description: "Logs the received payload of each incoming event" + properties: + prefix: + title: Prefix + description: The prefix to prepend to the logged message + type: string + default: "message: " + dataTypes: + in: + types: + text: + mediaType: text/plain + out: + types: + text: + mediaType: text/plain + template: + from: + uri: "kamelet:source" + steps: + - log: "{{prefix}}${body}" diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/no-filter-pipe.yaml similarity index 70% copy from e2e/yaks/common/knative-broker/yaks-config.yaml copy to e2e/yaks/common/knative-broker/no-filter-pipe.yaml index cd754db64..3638543e7 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/no-filter-pipe.yaml @@ -15,10 +15,23 @@ # limitations under the License. # --------------------------------------------------------------------------- -config: - namespace: - temporary: true -post: - - name: print dump - if: env:CI=true && failure() - run: yaks dump --includes app=camel-k +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: no-filter-pipe +spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + multiline: true + showHeaders: true + showBodyType: false + showExchangePattern: false diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/source-filter-pipe.yaml similarity index 67% copy from e2e/yaks/common/knative-broker/yaks-config.yaml copy to e2e/yaks/common/knative-broker/source-filter-pipe.yaml index cd754db64..12a78a6fb 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/source-filter-pipe.yaml @@ -15,10 +15,25 @@ # limitations under the License. # --------------------------------------------------------------------------- -config: - namespace: - temporary: true -post: - - name: print dump - if: env:CI=true && failure() - run: yaks dump --includes app=camel-k +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: source-filter-pipe +spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + source: org.apache.camel + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + multiline: true + showHeaders: true + showBodyType: false + showExchangePattern: false diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/timer-source.kamelet.yaml similarity index 51% copy from e2e/yaks/common/knative-broker/yaks-config.yaml copy to e2e/yaks/common/knative-broker/timer-source.kamelet.yaml index cd754db64..e6a9922a7 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/timer-source.kamelet.yaml @@ -15,10 +15,42 @@ # limitations under the License. # --------------------------------------------------------------------------- -config: - namespace: - temporary: true -post: - - name: print dump - if: env:CI=true && failure() - run: yaks dump --includes app=camel-k +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: timer-source + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Timer" + description: "Produces periodic events with a custom payload" + required: + - message + properties: + period: + title: Period + description: The time interval between two events + type: integer + default: 1000 + message: + title: Message + description: The message to generate + type: string + dataTypes: + out: + types: + json: + mediaType: application/json + schema: + id: text.camel.apache.org + type: string + template: + from: + uri: timer:tick + parameters: + period: "{{period}}" + steps: + - setBody: + constant: "{{message}}" + - to: "kamelet:sink" diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/yaks-config.yaml index cd754db64..6c5c2ebb4 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/yaks-config.yaml @@ -18,7 +18,36 @@ config: namespace: temporary: true -post: - - name: print dump - if: env:CI=true && failure() - run: yaks dump --includes app=camel-k + runtime: + env: + - name: YAKS_CAMEL_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KAMELETS_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KNATIVE_AUTO_REMOVE_RESOURCES + value: false + resources: + - event-source-pipe.yaml + - log-sink-pipe.yaml + - source-filter-pipe.yaml + - no-filter-pipe.yaml + settings: + loggers: + - name: INTEGRATION_STATUS + level: INFO + - name: INTEGRATION_LOGS + level: INFO + dump: + enabled: true + failedOnly: true + includes: + - app=camel-k +pre: + - name: installation + run: | + kubectl apply -f timer-source.kamelet.yaml -n $YAKS_NAMESPACE + kubectl apply -f log-sink.kamelet.yaml -n $YAKS_NAMESPACE diff --git a/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature b/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature index 73e58336d..f2709fd9d 100644 --- a/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature +++ b/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature @@ -6,4 +6,4 @@ Feature: Camel K can run source HTTP endpoint in sinkbinding mode | delayBetweenAttempts | 3000 | Scenario: Integration knative-service starts with no errors - Given wait for condition=Ready on Kubernetes custom resource integration/rest2channel in integration.camel.apache.org/v1 + Given Camel K integration rest2channel should be running diff --git a/helm/camel-k/crds/crd-integration-platform.yaml b/helm/camel-k/crds/crd-integration-platform.yaml index eeeec4364..3107e80cb 100644 --- a/helm/camel-k/crds/crd-integration-platform.yaml +++ b/helm/camel-k/crds/crd-integration-platform.yaml @@ -1395,11 +1395,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3392,11 +3409,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/helm/camel-k/crds/crd-integration-profile.yaml b/helm/camel-k/crds/crd-integration-profile.yaml index c827dbd52..2c24920a1 100644 --- a/helm/camel-k/crds/crd-integration-profile.yaml +++ b/helm/camel-k/crds/crd-integration-profile.yaml @@ -1272,11 +1272,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3152,11 +3169,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/helm/camel-k/crds/crd-integration.yaml b/helm/camel-k/crds/crd-integration.yaml index 40b06eff2..26fe73641 100644 --- a/helm/camel-k/crds/crd-integration.yaml +++ b/helm/camel-k/crds/crd-integration.yaml @@ -7336,11 +7336,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/helm/camel-k/crds/crd-kamelet-binding.yaml b/helm/camel-k/crds/crd-kamelet-binding.yaml index fbfba55ca..202a701ed 100644 --- a/helm/camel-k/crds/crd-kamelet-binding.yaml +++ b/helm/camel-k/crds/crd-kamelet-binding.yaml @@ -7624,12 +7624,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/helm/camel-k/crds/crd-pipe.yaml b/helm/camel-k/crds/crd-pipe.yaml index 29962f268..c2648f8c8 100644 --- a/helm/camel-k/crds/crd-pipe.yaml +++ b/helm/camel-k/crds/crd-pipe.yaml @@ -7622,12 +7622,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/pkg/apis/camel/v1/trait/knative.go b/pkg/apis/camel/v1/trait/knative.go index b8ce7fe46..2681ac982 100644 --- a/pkg/apis/camel/v1/trait/knative.go +++ b/pkg/apis/camel/v1/trait/knative.go @@ -60,4 +60,12 @@ type KnativeTrait struct { // As Knative requires this label to perform injection of K_SINK URL into the service. // If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true) NamespaceLabel *bool `property:"namespace-label" json:"namespaceLabel,omitempty"` + // Sets filter attributes on the event stream (such as event type, source, subject and so on). + // A list of key-value pairs that represent filter attributes and its values. + // The syntax is KEY=VALUE, e.g., `source="my.source"`. + // Filter attributes get set on the Knative trigger that is being created as part of this integration. + Filters []string `property:"filters" json:"filters,omitempty"` + // Enables the default filtering for the Knative trigger using the event type + // If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true) + FilterEventType *bool `property:"filter-event-type" json:"filterEventType,omitempty"` } diff --git a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go index c8c03773e..a8db3093a 100644 --- a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go +++ b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go @@ -706,6 +706,16 @@ func (in *KnativeTrait) DeepCopyInto(out *KnativeTrait) { *out = new(bool) **out = **in } + if in.Filters != nil { + in, out := &in.Filters, &out.Filters + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.FilterEventType != nil { + in, out := &in.FilterEventType, &out.FilterEventType + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KnativeTrait. diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml index eeeec4364..3107e80cb 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml @@ -1395,11 +1395,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3392,11 +3409,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml index c827dbd52..2c24920a1 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml @@ -1272,11 +1272,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3152,11 +3169,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml index 40b06eff2..26fe73641 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml @@ -7336,11 +7336,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml b/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml index fbfba55ca..202a701ed 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml @@ -7624,12 +7624,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml index 29962f268..c2648f8c8 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml @@ -7622,12 +7622,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/pkg/trait/jvm_test.go b/pkg/trait/jvm_test.go index ff5fae279..9206e4514 100644 --- a/pkg/trait/jvm_test.go +++ b/pkg/trait/jvm_test.go @@ -309,7 +309,7 @@ func TestApplyJvmTraitWithDeploymentResource(t *testing.T) { }, d.Spec.Template.Spec.Containers[0].Args) } -func TestApplyJvmTraitWithKNativeResource(t *testing.T) { +func TestApplyJvmTraitWithKnativeResource(t *testing.T) { trait, environment := createNominalJvmTest(v1.IntegrationKitTypePlatform) s := serving.Service{} diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index db87ad504..c8aa3fc87 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/apache/camel-k/v2/pkg/util/boolean" + "github.com/apache/camel-k/v2/pkg/util/property" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -501,41 +502,53 @@ func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.Came } func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string, path string) error { - // TODO extend to additional filters too, to filter them at source and not at destination found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool { return trigger.Spec.Broker == ref.Name && - trigger.Spec.Filter != nil && - trigger.Spec.Filter.Attributes["type"] == eventType // can be also missing + trigger.Name == knativeutil.GetTriggerName(ref.Name, e.Integration.Name, eventType) }) - if !found { - if ref.Namespace == "" { - ref.Namespace = e.Integration.Namespace - } - controllerStrategy, err := e.DetermineControllerStrategy() + if found { + return nil + } + + if ref.Namespace == "" { + ref.Namespace = e.Integration.Namespace + } + + controllerStrategy, err := e.DetermineControllerStrategy() + if err != nil { + return err + } + + var attributes = make(map[string]string) + for _, filterExpression := range t.Filters { + key, value := property.SplitPropertyFileEntry(filterExpression) + attributes[key] = value + } + + if _, eventTypeSpecified := attributes["type"]; !eventTypeSpecified && pointer.BoolDeref(t.FilterEventType, true) && eventType != "" { + // Apply default trigger filter attribute for the event type + attributes["type"] = eventType + } + + var trigger *eventing.Trigger + switch controllerStrategy { + case ControllerStrategyKnativeService: + trigger, err = knativeutil.CreateKnativeServiceTrigger(*ref, e.Integration.Name, eventType, path, attributes) if err != nil { return err } - - var trigger *eventing.Trigger - switch controllerStrategy { - case ControllerStrategyKnativeService: - trigger, err = knativeutil.CreateKnativeServiceTrigger(*ref, e.Integration.Name, eventType, path) - if err != nil { - return err - } - case ControllerStrategyDeployment: - trigger, err = knativeutil.CreateServiceTrigger(*ref, e.Integration.Name, eventType, path) - if err != nil { - return err - } - default: - return fmt.Errorf("failed to create Knative trigger: unsupported controller strategy %s", controllerStrategy) + case ControllerStrategyDeployment: + trigger, err = knativeutil.CreateServiceTrigger(*ref, e.Integration.Name, eventType, path, attributes) + if err != nil { + return err } - - e.Resources.Add(trigger) + default: + return fmt.Errorf("failed to create Knative trigger: unsupported controller strategy %s", controllerStrategy) } + e.Resources.Add(trigger) + return nil } diff --git a/pkg/trait/knative_test.go b/pkg/trait/knative_test.go index d580a3f24..bcecc5581 100644 --- a/pkg/trait/knative_test.go +++ b/pkg/trait/knative_test.go @@ -43,6 +43,7 @@ import ( "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/util/boolean" "github.com/apache/camel-k/v2/pkg/util/camel" + "github.com/apache/camel-k/v2/pkg/util/knative" k8sutils "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/test" ) @@ -272,7 +273,7 @@ func TestKnativeEnvConfigurationFromSource(t *testing.T) { })) } -func TestKnativeTriggerConfiguration(t *testing.T) { +func TestKnativeTriggerExplicitFilterConfig(t *testing.T) { catalog, err := camel.DefaultCatalog() require.NoError(t, err) @@ -316,6 +317,7 @@ func TestKnativeTriggerConfiguration(t *testing.T) { Trait: traitv1.Trait{ Enabled: pointer.Bool(true), }, + Filters: []string{"source=my-source"}, }, }, }, @@ -352,20 +354,688 @@ func TestKnativeTriggerConfiguration(t *testing.T) { assert.NotEmpty(t, environment.ExecutedTraits) assert.NotNil(t, environment.GetTrait("knative")) - assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { - matching := true + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) - matching = matching && assert.Equal(t, "default", trigger.Spec.Broker) - matching = matching && assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) - matching = matching && assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) - matching = matching && assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) - matching = matching && assert.Equal(t, "default-test-evttype", trigger.Name) + assert.NotNil(t, trigger) - return matching - })) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 2) + assert.Equal(t, trigger.Spec.Filter.Attributes["type"], "evt.type") + assert.Equal(t, trigger.Spec.Filter.Attributes["source"], "my-source") +} + +func TestKnativeTriggerExplicitFilterConfigNoEventTypeFilter(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + Filters: []string{"source=my-source"}, + FilterEventType: pointer.Bool(false), + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) + + assert.NotNil(t, trigger) + + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, trigger.Spec.Filter.Attributes["source"], "my-source") +} + +func TestKnativeTriggerDefaultEventTypeFilter(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) + + assert.NotNil(t, trigger) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type", trigger.Spec.Filter.Attributes["type"]) +} + +func TestKnativeTriggerDefaultEventTypeFilterDisabled(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + FilterEventType: pointer.Bool(false), + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) + + assert.NotNil(t, trigger) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.Nil(t, trigger.Spec.Filter) } -func TestKnativeTriggerConfigurationNoServingAvailable(t *testing.T) { +func TestKnativeMultipleTrigger(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type.1") + .log("${body}"); + + from("knative:event/evt.type.2") + .log("${body}"); + + from("knative:event") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + triggerNames := make([]string, 0) + environment.Resources.VisitKnativeTrigger(func(trigger *eventing.Trigger) { + triggerNames = append(triggerNames, trigger.Name) + }) + + assert.Len(t, triggerNames, 3) + + trigger1 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.1") + }) + + assert.NotNil(t, trigger1) + assert.Equal(t, "default", trigger1.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger1.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger1.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.1", trigger1.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype1", trigger1.Name) + + assert.NotNil(t, trigger1.Spec.Filter) + assert.Len(t, trigger1.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type.1", trigger1.Spec.Filter.Attributes["type"]) + + trigger2 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.2") + }) + + assert.NotNil(t, trigger2) + assert.Equal(t, "default", trigger2.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger2.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger2.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.2", trigger2.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype2", trigger2.Name) + + assert.NotNil(t, trigger2.Spec.Filter) + assert.Len(t, trigger2.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type.2", trigger2.Spec.Filter.Attributes["type"]) + + trigger3 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "") + }) + + assert.NotNil(t, trigger3) + assert.Equal(t, "default", trigger3.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger3.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger3.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/", trigger3.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test", trigger3.Name) + + assert.Nil(t, trigger3.Spec.Filter) +} + +func TestKnativeMultipleTriggerAdditionalFilterConfig(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type.1") + .log("${body}"); + + from("knative:event/evt.type.2") + .log("${body}"); + + from("knative:event") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + Filters: []string{"subject=Hello"}, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + triggerNames := make([]string, 0) + environment.Resources.VisitKnativeTrigger(func(trigger *eventing.Trigger) { + triggerNames = append(triggerNames, trigger.Name) + }) + + assert.Len(t, triggerNames, 3) + + trigger1 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.1") + }) + + assert.NotNil(t, trigger1) + assert.Equal(t, "default", trigger1.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger1.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger1.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.1", trigger1.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype1", trigger1.Name) + + assert.NotNil(t, trigger1.Spec.Filter) + assert.Len(t, trigger1.Spec.Filter.Attributes, 2) + assert.Equal(t, "evt.type.1", trigger1.Spec.Filter.Attributes["type"]) + assert.Equal(t, "Hello", trigger1.Spec.Filter.Attributes["subject"]) + + trigger2 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.2") + }) + + assert.NotNil(t, trigger2) + assert.Equal(t, "default", trigger2.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger2.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger2.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.2", trigger2.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype2", trigger2.Name) + + assert.NotNil(t, trigger2.Spec.Filter) + assert.Len(t, trigger2.Spec.Filter.Attributes, 2) + assert.Equal(t, "evt.type.2", trigger2.Spec.Filter.Attributes["type"]) + assert.Equal(t, "Hello", trigger2.Spec.Filter.Attributes["subject"]) + + trigger3 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "") + }) + + assert.NotNil(t, trigger3) + assert.Equal(t, "default", trigger3.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger3.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger3.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/", trigger3.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test", trigger3.Name) + + assert.NotNil(t, trigger3.Spec.Filter) + assert.Len(t, trigger3.Spec.Filter.Attributes, 1) + assert.Equal(t, "Hello", trigger3.Spec.Filter.Attributes["subject"]) +} + +func TestKnativeTriggerNoEventType(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "") + }) + + assert.NotNil(t, trigger) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test", trigger.Name) + + assert.Nil(t, trigger.Spec.Filter) +} + +func TestKnativeTriggerNoServingAvailable(t *testing.T) { catalog, err := camel.DefaultCatalog() require.NoError(t, err) @@ -448,17 +1118,21 @@ func TestKnativeTriggerConfigurationNoServingAvailable(t *testing.T) { assert.NotEmpty(t, environment.ExecutedTraits) assert.NotNil(t, environment.GetTrait("knative")) - assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { - matching := true + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) - matching = matching && assert.Equal(t, "default", trigger.Spec.Broker) - matching = matching && assert.Equal(t, "v1", trigger.Spec.Subscriber.Ref.APIVersion) - matching = matching && assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) - matching = matching && assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) - matching = matching && assert.Equal(t, "default-test-evttype", trigger.Name) + assert.NotNil(t, trigger) - return matching - })) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, "v1", trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type", trigger.Spec.Filter.Attributes["type"]) } func TestKnativePlatformHttpConfig(t *testing.T) { diff --git a/pkg/util/bindings/bindings_test.go b/pkg/util/bindings/bindings_test.go index 7cfa78c86..0cd7e3516 100644 --- a/pkg/util/bindings/bindings_test.go +++ b/pkg/util/bindings/bindings_test.go @@ -125,6 +125,12 @@ func TestBindings(t *testing.T) { }), }, uri: "knative:event/myeventtype?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Filters: []string{"type=myeventtype"}, + FilterEventType: pointer.Bool(true), + }, + }, }, { endpointType: v1.EndpointTypeSource, diff --git a/pkg/util/bindings/knative_ref.go b/pkg/util/bindings/knative_ref.go index 301e62099..cfd48fa0c 100644 --- a/pkg/util/bindings/knative_ref.go +++ b/pkg/util/bindings/knative_ref.go @@ -24,7 +24,9 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" knativeapis "github.com/apache/camel-k/v2/pkg/apis/camel/v1/knative" - v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/v2/pkg/util/property" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -42,8 +44,6 @@ func (k KnativeRefBindingProvider) ID() string { } // Translate --. -// -//nolint:dupl func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx EndpointContext, e v1.Endpoint) (*Binding, error) { if e.Ref == nil { // works only on refs @@ -78,13 +78,9 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx End if props == nil { props = make(map[string]string) } - if props["apiVersion"] == "" { - props["apiVersion"] = e.Ref.APIVersion - } - if props["kind"] == "" { - props["kind"] = e.Ref.Kind - } + var filterEventType = true + var filterExpressions = make([]string, 0) var serviceURI string // TODO: refactor @@ -93,26 +89,63 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx End if props["name"] == "" { props["name"] = e.Ref.Name } + + if endpointCtx.Type == v1.EndpointTypeSource { + // Configure trigger filter attributes for the Knative event source + for key, value := range props { + if key == "cloudEventsType" { + // cloudEventsType is a synonym for type filter attribute + filterExpressions = append(filterExpressions, fmt.Sprintf("type=%s", value)) + } else if key != "name" { + filterExpressions = append(filterExpressions, fmt.Sprintf("%s=%s", key, value)) + } + } + } + if eventType, ok := props["type"]; ok { - // consume prop + // consume the type property and set it as URI path parameter delete(props, "type") serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, eventType) + } else if cloudEventsType, found := props["cloudEventsType"]; found && endpointCtx.Type == v1.EndpointTypeSource { + // set the cloud events type as URI path parameter, but keep it also as URI query param + serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, cloudEventsType) } else { - if endpointCtx.Type == v1.EndpointTypeSink || endpointCtx.Type == v1.EndpointTypeAction { - // Allowing no event type, but it can fail. See https://github.com/apache/camel-k/v2-runtime/issues/536 - serviceURI = fmt.Sprintf("knative:%s", *serviceType) - } else { - return nil, errors.New(`property "type" must be provided when reading from the Broker`) - } + filterEventType = false + serviceURI = fmt.Sprintf("knative:%s", *serviceType) } } else { serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, url.PathEscape(e.Ref.Name)) } + // Remove filter attributes from props to avoid adding them to the service URI query params + for _, exp := range filterExpressions { + key, _ := property.SplitPropertyFileEntry(exp) + delete(props, key) + } + + // Enrich service URI query params if not set + if props["apiVersion"] == "" { + props["apiVersion"] = e.Ref.APIVersion + } + if props["kind"] == "" { + props["kind"] = e.Ref.Kind + } + serviceURI = uri.AppendParameters(serviceURI, props) - return &Binding{ + var binding = Binding{ URI: serviceURI, - }, nil + } + + if endpointCtx.Type == v1.EndpointTypeSource && (len(filterExpressions) > 0 || !filterEventType) { + binding.Traits = v1.Traits{ + Knative: &trait.KnativeTrait{ + Filters: filterExpressions, + FilterEventType: &filterEventType, + }, + } + } + + return &binding, nil } func isKnownKnativeResource(ref *corev1.ObjectReference) (bool, error) { @@ -149,8 +182,6 @@ func (k V1alpha1KnativeRefBindingProvider) ID() string { // Translate --. // Deprecated. -// -//nolint:dupl func (k V1alpha1KnativeRefBindingProvider) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) { if e.Ref == nil { // works only on refs diff --git a/pkg/util/bindings/knative_ref_test.go b/pkg/util/bindings/knative_ref_test.go index 6ddadad49..18e482090 100644 --- a/pkg/util/bindings/knative_ref_test.go +++ b/pkg/util/bindings/knative_ref_test.go @@ -22,20 +22,170 @@ import ( "fmt" "testing" - "github.com/apache/camel-k/v2/pkg/util/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + "k8s.io/utils/pointer" camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/util/test" ) -func TestKnativeRefBinding(t *testing.T) { +func TestKnativeRefAsSource(t *testing.T) { testcases := []struct { + name string + endpoint camelv1.Endpoint + uri string + filters []string + filterEventType *bool + }{ + { + name: "broker", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + }, + uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filterEventType: pointer.Bool(false), + }, + { + name: "broker-name-property", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"name": "my-broker"}), + }, + uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=my-broker", + filterEventType: pointer.Bool(false), + }, + { + name: "event-type-filter", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"type": "org.apache.camel.myevent"}), + }, + uri: "knative:event/org.apache.camel.myevent?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filters: []string{"type=org.apache.camel.myevent"}, + }, + { + name: "cloud-events-type-filter", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"cloudEventsType": "org.apache.camel.cloudevent"}), + }, + uri: "knative:event/org.apache.camel.cloudevent?apiVersion=eventing.knative.dev%2Fv1&cloudEventsType=org.apache.camel.cloudevent&kind=Broker&name=default", + filters: []string{"type=org.apache.camel.cloudevent"}, + }, + { + name: "event-filters", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"source": "my-source", "subject": "mySubject"}), + }, + uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filters: []string{"source=my-source", "subject=mySubject"}, + filterEventType: pointer.Bool(false), + }, + { + name: "event-extension-filters", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"myextension": "foo"}), + }, + uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filters: []string{"myextension=foo"}, + filterEventType: pointer.Bool(false), + }, + { + name: "channel", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Channel", + Name: "mychannel", + APIVersion: "messaging.knative.dev/v1", + }, + }, + uri: "knative:channel/mychannel?apiVersion=messaging.knative.dev%2Fv1&kind=Channel", + }, + { + name: "service", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Service", + Name: "myservice", + APIVersion: "serving.knative.dev/v1", + }, + }, + uri: "knative:endpoint/myservice?apiVersion=serving.knative.dev%2Fv1&kind=Service", + }, + } + + for i, tc := range testcases { + t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := test.NewFakeClient() + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: camelv1.TraitProfileKnative, + } + + binding, err := KnativeRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSource, + }, tc.endpoint) + require.NoError(t, err) + assert.NotNil(t, binding) + assert.Equal(t, tc.uri, binding.URI) + + if tc.filters != nil || !pointer.BoolDeref(tc.filterEventType, true) { + assert.NotNil(t, binding.Traits.Knative) + assert.Len(t, binding.Traits.Knative.Filters, len(tc.filters)) + + for _, filterExpression := range tc.filters { + assert.Contains(t, binding.Traits.Knative.Filters, filterExpression) + } + + assert.Equal(t, pointer.BoolDeref(binding.Traits.Knative.FilterEventType, true), pointer.BoolDeref(tc.filterEventType, true)) + } + }) + } +} + +func TestKnativeRefAsSink(t *testing.T) { + testcases := []struct { + name string endpoint camelv1.Endpoint uri string }{ { + name: "broker", endpoint: camelv1.Endpoint{ Ref: &v1.ObjectReference{ Kind: "Broker", @@ -46,6 +196,31 @@ func TestKnativeRefBinding(t *testing.T) { uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", }, { + name: "broker-name-property", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"name": "my-broker"}), + }, + uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=my-broker", + }, + { + name: "event-type", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"type": "org.apache.camel.myevent"}), + }, + uri: "knative:event/org.apache.camel.myevent?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + }, + { + name: "channel", endpoint: camelv1.Endpoint{ Ref: &v1.ObjectReference{ Kind: "Channel", @@ -56,6 +231,7 @@ func TestKnativeRefBinding(t *testing.T) { uri: "knative:channel/mychannel?apiVersion=messaging.knative.dev%2Fv1&kind=Channel", }, { + name: "service", endpoint: camelv1.Endpoint{ Ref: &v1.ObjectReference{ Kind: "Service", @@ -68,7 +244,7 @@ func TestKnativeRefBinding(t *testing.T) { } for i, tc := range testcases { - t.Run(fmt.Sprintf("test-%d-%s", i, tc.endpoint.Ref.Kind), func(t *testing.T) { + t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go index b72ae1024..e6623ea1f 100644 --- a/pkg/util/knative/knative.go +++ b/pkg/util/knative/knative.go @@ -76,47 +76,36 @@ func CreateSubscription(channelReference corev1.ObjectReference, serviceName str } // CreateServiceTrigger create Knative trigger with arbitrary Kubernetes Service as a subscriber - usually used when no Knative Serving is available on the cluster. -func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) (*eventing.Trigger, error) { +func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string, attributes map[string]string) (*eventing.Trigger, error) { subscriberRef := duckv1.KReference{ APIVersion: "v1", Kind: "Service", Name: serviceName, } - return CreateTrigger(brokerReference, subscriberRef, eventType, path) + return CreateTrigger(brokerReference, subscriberRef, eventType, path, attributes) } // CreateKnativeServiceTrigger create Knative trigger with Knative Serving Service as a subscriber - default option when Knative Serving is available on the cluster. -func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) (*eventing.Trigger, error) { +func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string, attributes map[string]string) (*eventing.Trigger, error) { subscriberRef := duckv1.KReference{ APIVersion: serving.SchemeGroupVersion.String(), Kind: "Service", Name: serviceName, } - return CreateTrigger(brokerReference, subscriberRef, eventType, path) + return CreateTrigger(brokerReference, subscriberRef, eventType, path, attributes) } -func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef duckv1.KReference, eventType string, path string) (*eventing.Trigger, error) { - nameSuffix := "" - var attributes map[string]string - if eventType != "" { - nameSuffix = fmt.Sprintf("-%s", util.SanitizeLabel(eventType)) - attributes = map[string]string{ - "type": eventType, - } - } - return &eventing.Trigger{ +func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef duckv1.KReference, eventType string, path string, attributes map[string]string) (*eventing.Trigger, error) { + trigger := eventing.Trigger{ TypeMeta: metav1.TypeMeta{ APIVersion: eventing.SchemeGroupVersion.String(), Kind: "Trigger", }, ObjectMeta: metav1.ObjectMeta{ Namespace: brokerReference.Namespace, - Name: brokerReference.Name + "-" + subscriberRef.Name + nameSuffix, + Name: GetTriggerName(brokerReference.Name, subscriberRef.Name, eventType), }, Spec: eventing.TriggerSpec{ - Filter: &eventing.TriggerFilter{ - Attributes: attributes, - }, Broker: brokerReference.Name, Subscriber: duckv1.Destination{ Ref: &subscriberRef, @@ -125,7 +114,24 @@ func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef duckv1. }, }, }, - }, nil + } + + if len(attributes) > 0 { + trigger.Spec.Filter = &eventing.TriggerFilter{ + Attributes: attributes, + } + } + + return &trigger, nil +} + +func GetTriggerName(brokerName string, subscriberName string, eventType string) string { + nameSuffix := "" + if eventType != "" { + nameSuffix = fmt.Sprintf("-%s", util.SanitizeLabel(eventType)) + } + + return brokerName + "-" + subscriberName + nameSuffix } func CreateSinkBinding(source corev1.ObjectReference, target corev1.ObjectReference) *sources.SinkBinding { @@ -203,7 +209,7 @@ func getSinkURI(ctx context.Context, c client.Client, sink *corev1.ObjectReferen } objIdentifier := fmt.Sprintf("\"%s/%s\" (%s)", u.GetNamespace(), u.GetName(), u.GroupVersionKind()) - // Special case v1/Service to allow it be addressable + // Special case v1/Service allowing it to be addressable if u.GroupVersionKind().Kind == "Service" && u.GroupVersionKind().Group == "" && u.GroupVersionKind().Version == "v1" { return fmt.Sprintf("http://%s.%s.svc/", u.GetName(), u.GetNamespace()), nil } diff --git a/pkg/util/knative/uri.go b/pkg/util/knative/uri.go index 73f4fb2b2..f128d5e54 100644 --- a/pkg/util/knative/uri.go +++ b/pkg/util/knative/uri.go @@ -84,7 +84,7 @@ func ExtractObjectReference(uri string) (v1.ObjectReference, error) { }, nil } -// ExtractEventType extract the eventType from a event URI. +// ExtractEventType extract the eventType from an event URI. func ExtractEventType(uri string) string { return matchOrEmpty(uriRegexp, 2, uri) } diff --git a/script/Makefile b/script/Makefile index 215df7dd3..d8290f535 100644 --- a/script/Makefile +++ b/script/Makefile @@ -340,7 +340,7 @@ test-install-upgrade: do-build go test -timeout 30m -v ./e2e/install/upgrade -tags=integration $(TEST_INSTALL_RUN) $(GOTESTFMT) # -# Knative tests that require the presence of KNative configuration +# Knative tests that require the presence of Knative configuration # test-knative: do-build STAGING_RUNTIME_REPO="$(STAGING_RUNTIME_REPO)"; \